Skip to content
Snippets Groups Projects
Commit 4cbd9a38 authored by Finn Hughes's avatar Finn Hughes
Browse files

Upload Python code

parent 0049dfbf
No related branches found
No related tags found
No related merge requests found
"""Check the implementations of MCVE and MCVE informative using MCMC"""
import os
from typing import TypedDict, Any
from multiprocessing import Pool
from time import perf_counter as pfc
import json
import numpy as np
import numpy.typing as npt
import matplotlib.pyplot as plt # type: ignore
import emcee # type: ignore
import corner # type: ignore
from tqdm import tqdm
T_setting = TypedDict("T_setting",
{"n_samples": int,
"n_observations": int,
"xbar": float,
"s": float,
"y0": float,
"sig0": float,
"alpha": float,
"beta": float,
"mcmc_samples": int,
"load_mcmc_samples": bool,
"mcmc_chains": int}
)
def slope_function_d1(z: float | npt.NDArray[np.double]) -> float | npt.NDArray[np.double]:
"""Slope function of semi-linear VE function with variability in z
Args:
z (float | np.ndarray): Type B Quantity value
Returns:
float | np.ndarray: Value for slope function
"""
return 1+z
def offset_function_d2(z: float | npt.NDArray[np.double]) -> float | npt.NDArray[np.double]:
"""Offset function for semi-linear VE function with variability in z
Args:
z (float | np.ndarray): Type B quantity value
Returns:
float | np.ndarray: Value for slope function
"""
return 0*z
def sample_z(num_samples: int = 1) -> npt.NDArray[np.double]:
"""Sample from type B quantity state of knowledge distribution
Args:
num_samples (int, optional): number of sampels. Defaults to 1.
Returns:
npt.NDArray[np.double]: `num_samples` many samples from state of knowledge distribution
"""
return np.random.uniform(5, 10, num_samples)
def ve(y: float | npt.NDArray[np.double],
z: float | npt.NDArray[np.double],
epsilon: float | npt.NDArray[np.double]) -> float | npt.NDArray[np.double]:
"""Virtual Experiment (VE) function call.
Args:
y (float | npt.NDArray[np.double]): measurand value
z (float | npt.NDArray[np.double]): type B quantity value
epsilon (float | npt.NDArray[np.double]): random realization of the noise term
Returns:
float | npt.NDArray[np.double]: Output of VE
"""
return slope_function_d1(z)*y + offset_function_d2(z) + epsilon
def measurement_model(x: float | npt.NDArray[np.double],
z: float | npt.NDArray[np.double]) -> float | npt.NDArray[np.double]:
"""Partial inverse model of the VE.
Actually a measurement model defined on quantities rather than parameters or values.
Args:
x (float | npt.NDArray[np.double]): Type A quantity (value)
z (float | npt.NDArray[np.double]): Type B quantity (value)
Returns:
float | npt.NDArray[np.double]: values of measurand
"""
return (x - offset_function_d2(z)) / slope_function_d1(z)
def mean_ve(y: float | npt.NDArray[np.double],
z: float | npt.NDArray[np.double]) -> float | npt.NDArray[np.double]:
"""Mean of VE. Here only the deterministic part
Args:
y (float | npt.NDArray[np.double]): measurand value
z (float | npt.NDArray[np.double]): type B quantity value
Returns:
float | npt.NDArray[np.double]: mean of VE value
"""
return slope_function_d1(z)*y + offset_function_d2(z)
def grad_ve(y: float | npt.NDArray[np.double], # pylint: disable=unused-argument
z: float | npt.NDArray[np.double]) -> float | npt.NDArray[np.double]:
"""Partial derivative of VE with respect to measurand evaluated at y
Args:
y (float | npt.NDArray[np.double]): measurand value
z (float | npt.NDArray[np.double]): type B quantity value
Returns:
float | npt.NDArray[np.double]: gradient of VE evaluated at y
"""
return slope_function_d1(z)
def get_settings() -> T_setting:
"""Define a scenario to compute measurement uncertainty for
Returns:
dict[str, int | float]: defines the quantities required to run the script
"""
sig0 = 0.5
alpha = 16/2
beta = alpha * sig0**2
retval: T_setting = {"n_samples": int(1e7),
"n_observations": 4,
"xbar": 50,
"s": 5,
"y0": 50/8.5,
"sig0": sig0,
"alpha": alpha,
"beta": beta,
"mcmc_samples": int(1e5),
"load_mcmc_samples": False,
"mcmc_chains": 64}
return retval
def mc_jcgm101(setting: T_setting) -> float | npt.NDArray[np.double]:
"""JCGM 101 approach using a noninformative prior.
Args:
setting (T_setting): Setting dictionary
Returns:
list[float | npt.NDArray[np.double]]: list of samples from measurand
"""
# xi = np.random.randn(setting["n_samples"]) * setting["s"] + setting["xbar"] # assume normal distribution
ti_samples = np.random.standard_t(setting["n_observations"]-1, size=setting["n_samples"])
# ti_samples = scs.t.rvs(setting["n_observations"]-1, size=setting["n_samples"])
xi = ti_samples * setting["s"]/np.sqrt(setting["n_observations"]) + setting["xbar"]
zi = sample_z(setting["n_samples"])
return measurement_model(xi, zi)
def mc_jcgm101_informative(setting: T_setting) -> float | npt.NDArray[np.double]:
"""JCGM 101 approach using a noninformative prior.
Args:
setting (T_setting): Setting dictionary
Returns:
list[float | npt.NDArray[np.double]]: list of samples from measurand
"""
n = setting["n_observations"]
s = setting["s"]
alpha = setting["alpha"]
beta = setting["beta"]
# xi = np.random.randn(setting["n_samples"]) * setting["s"] + setting["xbar"] # assume normal distribution
ti_samples = np.random.standard_t(2*alpha + n - 1, size=setting["n_samples"])
# ti_samples = scs.t.rvs(setting["n_observations"]-1, size=setting["n_samples"])
xi = ti_samples * np.sqrt((2*beta + (n-1) * s**2)/(n*(2*alpha + n - 1))) + setting["xbar"]
zi = sample_z(setting["n_samples"])
return measurement_model(xi, zi)
def mcve_noninformative_serial(setting: T_setting) -> list[float | npt.NDArray[np.double]]:
"""Serial implementation that is much slower.
MCVE approach using a noninformative prior. Is equivalent to JCGM 100 - GUM supplement 1
Args:
setting (T_setting): Setting dictionary
Returns:
list[float | npt.NDArray[np.double]]: list of samples from measurand
"""
y_samples: list[float | npt.NDArray[np.double]] = []
for _ in tqdm(range(setting["n_samples"])):
epsi = np.random.randn(setting["n_observations"]) * setting["sig0"]
zi = sample_z()
xvebar = np.mean(ve(setting["y0"], zi, epsi), axis=0)
ci = np.random.chisquare(setting["n_observations"] - 1)
mu = mean_ve(setting["y0"], zi)
trafo_i = setting["s"]/setting["sig0"] * np.sqrt((setting["n_observations"]-1) / ci)
yi = 1/grad_ve(setting["y0"], zi) * (trafo_i * (mu - xvebar) + (setting["xbar"] - mu)) + setting["y0"]
y_samples.append(yi)
return y_samples
def mcve_noninformative(setting: T_setting) -> npt.NDArray[np.double]:
"""MCVE approach using a noninformative prior. Is equivalent to JCGM 100 - GUM supplement 1
Args:
setting (T_setting): Setting dictionary
Returns:
list[float | npt.NDArray[np.double]]: list of samples from measurand
"""
n = setting["n_samples"]
epsi = np.random.randn(setting["n_observations"], n) * setting["sig0"]
zi = sample_z(n)
xvebar = np.mean(ve(setting["y0"], zi, epsi), axis=0)
ci = np.random.chisquare(setting["n_observations"] - 1, n)
mu = mean_ve(setting["y0"], zi)
trafo_i = setting["s"]/setting["sig0"] * np.sqrt((setting["n_observations"]-1) / ci)
y_samples = 1/grad_ve(setting["y0"], zi) * (trafo_i * (mu - xvebar) + (setting["xbar"] - mu)) + setting["y0"]
return y_samples
def mcve_informative(setting: T_setting) -> npt.NDArray[np.double]:
"""MCVE approach using an informative prior.
Args:
setting (T_setting): Setting dictionary
Returns:
list[float | npt.NDArray[np.double]]: list of samples from measurand
"""
n = setting["n_samples"]
epsi = np.random.randn(setting["n_observations"], n) * setting["sig0"]
zi = sample_z(n)
xvebar = np.mean(ve(setting["y0"], zi, epsi), axis=0)
ci = np.random.chisquare(2*setting["alpha"] + setting["n_observations"] - 1, n)
mu = mean_ve(setting["y0"], zi)
trafo_i = np.sqrt(2*setting["beta"] + (setting["n_observations"]-1)*setting["s"]**2)/(setting["sig0"] * np.sqrt(ci))
y_samples = 1/grad_ve(setting["y0"], zi) * (trafo_i * (mu - xvebar) + (setting["xbar"] - mu)) + setting["y0"]
return y_samples
def log_post_noninformative(theta: tuple[float, float],
n: int,
xbar: float,
s: float) -> float | npt.NDArray[np.double]:
"""The marginal posterior using a noninformative (Jeffreys) prior on the variance is given by
$$
\\pi(y, z | x) \\propto (1+n/(n-1)(g(y, z)-\bar{x})^2/(s^2))^{-(n/2)}\\vert\\partial g(y, z)/\\partial y\\vert
$$
Args:
theta (tuple[float, float]): input parameter the MCMC chain is running over (y, z)
n (int): number of observations
xbar (float): mean of observations
s (float): standard deviation of observations
Returns:
float | npt.NDArray[np.double]: log of posterior without constant terms
"""
y_, z_ = theta[0], theta[1]
llambda = 1e8 # Gaussian prior with large variance for measurand
mu_y = 5 # some arbitrary mean for measurand
if z_ < 5 or z_ > 10:
return -np.inf
loss1 = -(n/2) * np.log(1 + (n*(mean_ve(y_, z_)-xbar)**2)/((n-1)*s**2)) # marginal likelihood
loss2 = - (1/(2*llambda**2)) * (y_-mu_y)**2 # measurand prior
loss3 = + np.log(grad_ve(y_, z_)) # change of variables term
loss = loss1 + loss2 + loss3
return loss
def log_post_informative(theta: tuple[float, float], # pylint: disable=R0913, R0917
n: int,
alpha: float,
beta: float,
xbar: float,
s: float) -> float | npt.NDArray[np.double]:
"""The marginal posterior using a noninformative (Jeffreys) prior on the variance is given by
$$
\\pi(y, z | x) \\propto (1 + n/(2\\beta+(n-1)s^2) n(g(y, z) - \\bar{x})^2) ^{-(\\alpha + n/2)}
\times \\vert \\partial g(y, z)/\\partial y\\vert
$$
Args:
theta (tuple[float, float]): input parameter the MCMC chain is running over (y, z)
n (int): number of observations
alpha (float): shape parameter of inverse gamma prior
beta (float): scale parameter of inverse gamma prior
xbar (float): mean of observations
s (float): standard deviation of observations
Returns:
float | npt.NDArray[np.double]: log of posterior without constant terms
"""
y_, z_ = theta[0], theta[1]
llambda = 1e8 # Gaussian prior with large variance for measurand
mu_y = 5 # some arbitrary mean for measurand
if z_ < 5 or z_ > 10:
return -np.inf
loss1 = -(alpha + n/2) * np.log(1 + (n*(mean_ve(y_, z_)-xbar)**2)/(2*beta + (n-1)*s**2)) # marginal likelihood
loss2 = - (1/(2*llambda**2)) * (y_-mu_y)**2 # measurand prior
loss3 = + np.log(grad_ve(y_, z_)) # change of variables term
loss = loss1 + loss2 + loss3
return loss
def visualize_mcmc_results(sampler: Any, path: str) -> npt.NDArray[np.double]:
"""plot some analysis of chain exploration and resulting marginals and store the samples in a json file
Args:
sampler (any): sampler from emcee package
path (str): path to store plots and samples
Returns:
npt.NDArray[np.double]: samples of joint posterior
"""
samples: npt.NDArray[np.double] = sampler.get_chain()
ndim = samples.shape[2]
fig, axes = plt.subplots(ndim, figsize=(10, 7), sharex=True) # type: ignore
labels = ["y", "z", "sigma"]
for i in range(ndim):
ax = axes[i] # type: ignore
ax.plot(samples[:, :, i], "k", alpha=0.3) # type: ignore
ax.set_xlim(0, len(samples)) # type: ignore
ax.set_ylabel(labels[i]) # type: ignore
ax.yaxis.set_label_coords(-0.1, 0.5) # type: ignore
axes[-1].set_xlabel("step number") # type: ignore
fig.savefig(f"{path}_chain.png") # type: ignore
fig = corner.corner(sampler.get_chain(flat=True), labels=labels) # type: ignore
fig.savefig(f"{path}_corner.png") # type: ignore
samples = sampler.get_chain(flat=True, discard=1000)
fig = corner.corner(samples, labels=labels) # type: ignore
fig.savefig(f"{path}_corner_discarded.png") # type: ignore
# store samples in a file
with open(f"{path}_samples.json", "w", encoding="utf-8") as filepath:
json.dump({"samples": samples[:, 0].tolist()}, filepath)
return samples
def mcmc_jcgm101(setting: T_setting) -> npt.NDArray[np.double]:
"""Start an MCMC metropolis hastings algorithm to compute samples from the posterior
using a noninformative prior for the variance
Args:
setting (T_setting): Setting
Returns:
npt.NDArray[np.double]: Samples from measurand (possible correlated)
"""
if setting["load_mcmc_samples"]:
with open("mcmc_samples.json", "r", encoding="utf-8") as filepath:
samples = np.array(json.load(filepath)["samples"])
return samples
pos = np.random.normal(loc=[5, 8], scale=[0.01, 0.01], size=(64, 2)) # 64 chains for 9 parameters
nwalkers, ndim = pos.shape
# run MCMC in parallel
# ########
# num_threads = os.environ["OMP_NUM_THREADS"]
os.environ["OMP_NUM_THREADS"] = "1"
start = pfc()
with Pool(1) as pool:
sampler = emcee.EnsembleSampler(nwalkers, ndim, log_post_noninformative,
args=(setting["n_observations"], setting["xbar"], setting["s"]),
pool=pool)
sampler.run_mcmc(pos, setting["mcmc_samples"], progress=True) # type: ignore
print(f"MCMC runtime: {pfc() - start}")
# os.environ["OMP_NUM_THREADS"] = num_threads
# #######
samples = visualize_mcmc_results(sampler, "mcmc")
return samples[:, 0]
def mcmc_informative(setting: T_setting) -> npt.NDArray[np.double]:
"""Start an MCMC metropolis hastings algorithm to compute samples from the posterior
using an informative inverse Gamma prior for the variance
Args:
setting (T_setting): Setting
Returns:
npt.NDArray[np.double]: Samples from measurand (possible correlated)
"""
if setting["load_mcmc_samples"]:
with open("mcmc_info_samples.json", "r", encoding="utf-8") as filepath:
samples = np.array(json.load(filepath)["samples"])
return samples
pos = np.random.normal(loc=[5, 8], scale=[0.01, 0.01], size=(64, 2)) # 64 chains for 9 parameters
nwalkers, ndim = pos.shape
# run MCMC in parallel
# ########
# num_threads = os.environ["OMP_NUM_THREADS"]
os.environ["OMP_NUM_THREADS"] = "1"
start = pfc()
with Pool(1) as pool:
sampler = emcee.EnsembleSampler(nwalkers, ndim, log_post_informative,
args=(setting["n_observations"],
setting["alpha"],
setting["beta"],
setting["xbar"],
setting["s"]),
pool=pool)
sampler.run_mcmc(pos, setting["mcmc_samples"], progress=True) # type: ignore
print(f"MCMC runtime: {pfc() - start}")
# os.environ["OMP_NUM_THREADS"] = num_threads
# #######
samples = visualize_mcmc_results(sampler, "mcmc_info")
return samples[:, 0]
def get_hist_info(samples: npt.NDArray[np.double],
bins: int = 100) -> tuple[npt.NDArray[np.double], npt.NDArray[np.double]]:
"""Simple helper function to retrieve a (x, y) coordinate plot from a sample density using numpy.histogram
Args:
samples (npt.NDArray[np.double]): samples to retrieve PDF from
bins (int, optional): number of bins to enforce. Defaults to 100.
Returns:
tuple[npt.NDArray[np.double], npt.NDArray[np.double]]: x and y positions of plot
"""
vmin = float(np.mean(samples) - 4*np.std(samples))
vmax = float(np.mean(samples) + 4*np.std(samples))
v, x = np.histogram(samples, bins=bins, density=True, range=(vmin, vmax))
x = 0.5*(x[1:] + x[:-1])
return x, v
def main() -> None:
"""Main runner"""
setting = get_settings()
y_mcmc_noninformative = mcmc_jcgm101(setting)
y_mcmc_informative = mcmc_informative(setting)
y_mcve_noninformative = np.array(mcve_noninformative(setting))
y_mcve_informative = np.array(mcve_informative(setting))
# y_mcve_noninformative_serial = np.array(mcve_noninformative_serial(setting))
y_jcgm101_noninformative = np.array(mc_jcgm101(setting))
y_jcgm101_informative = np.array(mc_jcgm101_informative(setting))
vmin = np.mean(y_mcve_noninformative) - 4*np.std(y_mcve_noninformative)
vmax = np.mean(y_mcve_noninformative) + 4*np.std(y_mcve_noninformative)
fig = plt.figure() # type: ignore
x, v = get_hist_info(y_jcgm101_noninformative, 100)
plt.plot(x, v, label="JCGM 101 - nonInfo") # type: ignore
x, v = get_hist_info(y_mcve_noninformative, 100)
plt.plot(x, v, label="MCVE - nonInfo") # type: ignore
x, v = get_hist_info(y_mcmc_noninformative, 100)
plt.plot(x, v, label="MCMC - nonInfo") # type: ignore
x, v = get_hist_info(y_mcmc_informative, 100)
plt.plot(x, v, label="MCMC - Info") # type: ignore
x, v = get_hist_info(y_jcgm101_informative, 100)
plt.plot(x, v, label="JCGM 101 - Info") # type: ignore
x, v = get_hist_info(y_mcve_informative, 100)
plt.plot(x, v, label="MCVE - Info") # type: ignore
plt.xlim(vmin, vmax) # type: ignore
plt.legend() # type: ignore
fig.savefig("density.png") # type: ignore
print("JCGM 101")
print(f" {np.mean(y_jcgm101_noninformative):.3f}, {np.std(y_jcgm101_noninformative):.3f} ")
print("MCVE")
print(f" {np.mean(y_mcve_noninformative):.3f}, {np.std(y_mcve_noninformative):.3f} ")
print("MCMC noninfo")
print(f" {np.mean(y_mcmc_noninformative):.3f}, {np.std(y_mcmc_noninformative):.3f} ")
print("JCGM 101-Informative")
print(f" {np.mean(y_jcgm101_informative):.3f}, {np.std(y_jcgm101_informative):.3f} ")
print("MCVE informative")
print(f" {np.mean(y_mcve_informative):.3f}, {np.std(y_mcve_informative):.3f} ")
print("MCMC informative")
print(f" {np.mean(y_mcmc_informative):.3f}, {np.std(y_mcmc_informative):.3f} ")
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment