Commit b7edb10c authored by Manuel Marschall's avatar Manuel Marschall
Browse files

initial commit the current paper code

parent 4033e9af
*.mat
*.npz
experiments/*
experiments_no17/*
logs/*
onnx_models/*
paper/*
train_vae_test_no17/*
\ No newline at end of file
from typing import Dict
import tensorflow as tf
import json
import os
from numpy import sqrt as npsqrt
from lutils import (plot_recon, savefig, get_error_metrics, to_image_shape)
from generative_model import (GenerativeModel, ProbabilisticGenerator, DeterministicGenerator, ConvexProbabilisticGenerator)
from linear_inverse_problem import LinearInverseProblem
from vae_laplace_solver import VAELaplaceSolver
from vae_latpush_solver import (VAELatpushSolver, VAEEBLatpushSolver)
from lip_solver import (LIPSolver, LIPThikonovSolver, LIPGMRFSolver)
from vae_ebqmin_solver import VAEEBQminLatpushSolver
from vae_lip_solver import VAELIPSolver
from vae_latpush_linear_solver import VAELatpushLinearSolver
def do_plot_recon(xh, lip, genm, path):
fig = plot_recon(to_image_shape(xh, genm), to_image_shape(lip.ground_truth, genm), to_image_shape(lip.data, genm))
savefig(fig, path, "result.png")
def prepare_experiment_retval(xh, loc_lip, genm, path, variance=None):
mse, psnr, ssim = get_error_metrics(to_image_shape(xh, genm), to_image_shape(loc_lip.ground_truth, genm))
if loc_lip.print_metrics:
print(f"RMSE/PSNR/SSIM: {npsqrt(mse):.2f}/{psnr:.2f}/{ssim:.2f}")
if loc_lip.plot_recon:
do_plot_recon(xh, loc_lip, genm, path)
retval = {
"mse": mse,
"psnr": psnr,
"ssim": ssim,
"xh": xh.tolist(),
"y": loc_lip.data.tolist(),
"sigma": loc_lip.sigma,
"x": loc_lip.ground_truth.tolist()
}
if variance is not None:
retval["variance"] = variance.tolist()
if loc_lip.save_result:
if not os.path.exists(path):
os.makedirs(path)
with open(path + "results.json", "w") as fp:
json.dump(retval, fp)
return retval
def laplace_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str):
solver = VAELaplaceSolver(loc_lip, genm, exp_str + "/")
xh, variance = solver.solve()
xh = xh.numpy()
variance = variance.numpy()
return prepare_experiment_retval(xh, loc_lip, genm, solver.path + solver.path_suffix, variance=variance)
def thikonov_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str,
logspace_min: int = -6,
logspace_max: int = 2,
logspace_num: int = 100):
solver = LIPThikonovSolver(loc_lip)
xh = solver.solve_oracle(logspace_min=logspace_min, logspace_max=logspace_max, logspace_num=logspace_num)
return prepare_experiment_retval(xh, loc_lip, genm, exp_str + "/L2/")
def gmrf_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str,
logspace_min: int = -6,
logspace_max: int = 2,
logspace_num: int = 100):
solver = LIPGMRFSolver(loc_lip)
xh = solver.solve_oracle(logspace_min=logspace_min, logspace_max=logspace_max, logspace_num=logspace_num)
return prepare_experiment_retval(xh, loc_lip, genm, exp_str + "/GMRF/")
def mnist_regularized_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str):
solver = VAELIPSolver(loc_lip, genm, exp_str + "/mnist_")
xh = solver.solve_mnist_regularized()
return prepare_experiment_retval(xh, loc_lip, genm, solver.path + solver.path_suffix)
def generic_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str):
solver = VAELIPSolver(loc_lip, genm, exp_str + "/")
xh = solver.solve_naive().numpy()
return prepare_experiment_retval(xh, loc_lip, genm, solver.path + solver.path_suffix)
def generic_encoder_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str):
solver = VAELIPSolver(loc_lip, genm, exp_str + "/encoder_")
z0 = solver.get_initial_value()
if isinstance(genm, ProbabilisticGenerator) or isinstance(genm, ConvexProbabilisticGenerator):
xh, _ = genm(z0)
xh = tf.reshape(xh, [-1])
elif isinstance(genm, DeterministicGenerator):
xh = genm(z0)
xh = tf.reshape(xh, [-1])
else:
raise ValueError(f"Unknown generative model type: {type(genm)}")
return prepare_experiment_retval(xh.numpy(), loc_lip, genm, solver.path + solver.path_suffix)
def latpush_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str):
solver = VAELatpushSolver(loc_lip, genm, exp_str + "/")
res = solver.solve_tfp()
#res = solver.solve_scipy()
if isinstance(genm, ProbabilisticGenerator) or isinstance(genm, ConvexProbabilisticGenerator):
xh, _ = genm(res["zopt"])
xh = tf.reshape(xh, [-1])
elif isinstance(genm, DeterministicGenerator):
xh = genm(res["zopt"])
xh = tf.reshape(xh, [-1])
else:
raise ValueError(f"unknown generative model type: {type(genm)}")
retval = prepare_experiment_retval(xh.numpy(), loc_lip, genm, solver.path + solver.path_suffix)
retval["solver_res"] = res
return retval
def latpush_linear_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str):
solver = VAELatpushLinearSolver(loc_lip, genm, exp_str + "/")
xh = solver.solve()
return prepare_experiment_retval(xh.numpy(), loc_lip, genm, solver.path + solver.path_suffix)
def eb_latpush_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str,
res: dict):
solver = VAEEBLatpushSolver(loc_lip, genm, exp_str + "/")
xh = solver.solve_from_latpush_result(res["zopt"])
retval = prepare_experiment_retval(xh, loc_lip, genm, solver.path + solver.path_suffix)
return retval
def eb_qmin_experiment(loc_lip: LinearInverseProblem,
genm: GenerativeModel,
exp_str: str):
solver = VAEEBQminLatpushSolver(loc_lip, genm, exp_str + "/")
res = solver.solve_scipy()
xh = solver.do_EB_step(res["zopt"])
return prepare_experiment_retval(xh, loc_lip, genm, solver.path + solver.path_suffix)
def run_experiment_collection(loc_lip: LinearInverseProblem, genm: GenerativeModel,
exp_str: str, experiment_names):
methods = {"laplace": laplace_experiment,
"thikonov": thikonov_experiment,
"gmrf": gmrf_experiment,
"mnist_regu": mnist_regularized_experiment,
"generic": generic_experiment,
"generic_encoder": generic_encoder_experiment,
"latpush": latpush_experiment,
"eb_latpush": eb_latpush_experiment,
"latpush_linear": latpush_linear_experiment
}
retval = {} # type: Dict[str, Dict]
for experiment in experiment_names:
if experiment not in methods.keys():
raise ValueError(f"unknown experiment: {experiment}")
if experiment == "eb_latpush":
assert "latpush" in retval.keys()
# print("EB latpush experiment")
retval[experiment] = methods[experiment](loc_lip, genm, exp_str, retval["latpush"]["solver_res"])
continue
# print(f"{experiment} experiment")
retval[experiment] = methods[experiment](loc_lip, genm, exp_str)
if "latpush" in retval.keys():
retval["latpush"].pop("solver_res")
return retval
def run_experiments(loc_lip: LinearInverseProblem, genm: GenerativeModel,
exp_str: str):
print("LatPush solution")
latpush_res = latpush_experiment(loc_lip, genm, exp_str)
exit()
print("Latpush Linear experiment")
latpush_linear_res = latpush_linear_experiment(loc_lip, genm, exp_str)
print("Mnist Regularized solution")
mnist_regu_res = mnist_regularized_experiment(loc_lip, genm, exp_str)
print("Laplace solution")
lap_res = laplace_experiment(loc_lip, genm, exp_str)
print("Generic VAE solution")
generic_res = generic_experiment(loc_lip, genm, exp_str)
print("Generic VAE encoder push solution")
generic_encoder_res = generic_encoder_experiment(loc_lip, genm, exp_str)
print("L2 Oracle solution")
thik_res = thikonov_experiment(loc_lip, genm, exp_str)
print("GMRF Oracle solution")
gmrf_res = gmrf_experiment(loc_lip, genm, exp_str)
print("EB Latpush solution")
eblatpush_res = eb_latpush_experiment(loc_lip, genm, exp_str, latpush_res["solver_res"])
latpush_res.pop("solver_res") # needs to be done since the result is not serializable
# print("EBQmin solution")
# ebqmin_res = eb_qmin_experiment(loc_lip, genm, exp_str)
return {
"mnist_regu": mnist_regu_res,
"generic": generic_res,
"generic_encoder": generic_encoder_res,
"laplace": lap_res,
"thikonov": thik_res,
"gmrf": gmrf_res,
"latpush": latpush_res,
"latpush_linear": latpush_linear_res,
"eb_latpush": eblatpush_res,
#"eb_qmin": ebqmin_res
}
if __name__ == "__main__":
from onnx_vae import ONNX_VAE_STO, ONNX_VAE_DET
PATH_DET = "deterministic/"
PATH_PROB = "stochastic_good/"
PATH_ADD = "onnx_models/"
onnx_vae1 = ONNX_VAE_DET(PATH_ADD + PATH_DET + "encoder.onnx",
PATH_ADD + PATH_DET + "decoder.onnx")
tf_vae_det = onnx_vae1.to_tensorflow()
det_generator = DeterministicGenerator.from_vae(tf_vae_det, int(28*28), 20)
onnx_vae2 = ONNX_VAE_STO(PATH_ADD + PATH_PROB + "good_probVAE_encoder.onnx",
PATH_ADD + PATH_PROB + "good_probVAE_decoder.onnx")
tf_vae_good = onnx_vae2.to_tensorflow()
prob_generator = ProbabilisticGenerator.from_vae(tf_vae_good, int(28*28), 20)
loc_lip = LinearInverseProblem.mnist_recon_problem(noise_sigma=0.01, blur_sigma=5)
# from scipy.io import savemat
# savemat("lip_example_data.mat", {"A": loc_lip.operator, "y": loc_lip.data, "x": loc_lip.ground_truth,
# "sigma": loc_lip.sigma})
exp_res = run_experiments(loc_lip, prob_generator, "test_run")
# with open("test_run/result.json", "w") as fp:
# json.dump(exp_res, fp)
This diff is collapsed.
......@@ -28,6 +28,7 @@ class GenerativeModel():
self.dim_x = dim_x
self.image_dim_x = (int(sqrt(dim_x)), int(sqrt(dim_x)), channels)
self.dim_z = dim_z
self.channels = channels
self.vae = None # type: Optional[TF_VAE]
def __call__(self, z_0: array_type) -> array_type:
......@@ -47,6 +48,21 @@ class GenerativeModel():
"about the generative model")
return self.vae.decoder(z_0)
def J_z0_already_variable(self, z0):
return self.vae.J_z0_already_variable(z0)
def encoder(self, x):
"""
wraps the encoder function of a variational auto-encoder. Not clean design but works atm.
Arguments:
x {array_like} -- given image to decode
Returns:
array_like -- encoded latent variable
"""
return self.vae.encoder(x)
class DeterministicGenerator(GenerativeModel):
"""Deterministic Generator as generative model without variational component"""
......@@ -81,6 +97,7 @@ class DeterministicGenerator(GenerativeModel):
return retval
class ProbabilisticGenerator(GenerativeModel):
"""Probabilistic / Variational generative models"""
def __init__(self, dim_x: int, dim_z: int, channels: Optional[int] = 1) -> None:
......@@ -113,16 +130,88 @@ class ProbabilisticGenerator(GenerativeModel):
return retval
class ConvexProbabilisticGenerator():
"""Convec combination of Probabilistic / Variational generative models"""
def __init__(self, genm1: GenerativeModel, genm2: GenerativeModel, alpha) -> None:
"""Base constructor of a convex combination of two generative models
generator = alpha*generator1 + (1-alpha)*generator
Args:
genm1 (GenerativeModel): first generative model
genm2 (GenerativeModel): second generative model
"""
assert 0 <= alpha <= 1
assert genm1.dim_x == genm2.dim_x
assert genm1.dim_z == genm2.dim_z
assert genm1.channels == genm2.channels
self.alpha = alpha
self.genm1 = genm1
self.genm2 = genm2
self.dim_x = genm1.dim_x
self.image_dim_x = (int(sqrt(genm1.dim_x)), int(sqrt(genm1.dim_x)), genm1.channels)
self.dim_z = genm1.dim_z
def __call__(self, z_0: array_type) -> array_type:
"""Call of decoder method of generative model VAEs and convex combine them
Args:
z_0 (array_type): latent variable
Raises:
NotImplementedError: This can only be called for VAE models atm.
Returns:
array_type: decoder result in X for given latent variable in Z
"""
if self.alpha == 0:
return self.genm2(z_0)
if self.alpha == 1:
return self.genm1(z_0)
mean1, cov1 = self.genm1(z_0)
mean2, cov2 = self.genm2(z_0)
new_mean = self.alpha*mean1 + (1-self.alpha)*mean2
new_cov = self.alpha**2*cov1 + (1-self.alpha)**2*cov2
return new_mean, new_cov
def encoder(self, x):
"""
wraps the encoder function of a variational auto-encoder. Not clean design but works atm.
Arguments:
x {array_like} -- given image to decode
Returns:
array_like -- encoded latent variable
"""
z1 = self.genm1.encoder(x)
mean1, cov1 = z1[:, :20], z1[:, 20:]
z2 = self.genm2.encoder(x)
mean2, cov2 = z2[:, :20], z2[:, 20:]
new_mean = self.alpha*mean1 + (1-self.alpha)*mean2
new_cov = self.alpha**2*cov1 + (1-self.alpha)**2*cov2
import numpy as np
retval = np.zeros([new_mean.shape[0], int(2*self.dim_z)])
retval[:, :self.dim_z] = new_mean.numpy()
retval[:, self.dim_z:] = new_cov.numpy()
return retval
def J_z0_already_variable(self, z0):
jac1 = self.genm1.J_z0_already_variable(z0)
jac2 = self.genm2.J_z0_already_variable(z0)
return self.alpha*jac1 + (1-self.alpha)*jac2
if __name__ == "__main__":
PATH_DET = "onnx_models/deterministic/"
PATH_PROB = "onnx_models/stochastic_good/"
PATH_ADD = "probabilistic_vae_comparison/"
PATH_DET = "deterministic/"
PATH_PROB = "stochastic_good/"
PATH_ADD = "onnx_models/"
onnx_vae1 = ONNX_VAE_DET(PATH_ADD + PATH_DET + "encoder.onnx",
PATH_ADD + PATH_DET + "decoder.onnx")
tf_vae_det = onnx_vae1.to_tensorflow()
det_generator = DeterministicGenerator.from_vae(tf_vae_det, int(28*28), 20)
# onnx_vae1 = ONNX_VAE_DET(PATH_ADD + PATH_DET + "encoder.onnx",
# PATH_ADD + PATH_DET + "decoder.onnx")
# tf_vae_det = onnx_vae1.to_tensorflow()
# det_generator = DeterministicGenerator.from_vae(tf_vae_det, int(28*28), 20)
onnx_vae2 = ONNX_VAE_STO(PATH_ADD + PATH_PROB + "good_probVAE_encoder.onnx",
PATH_ADD + PATH_PROB + "good_probVAE_decoder.onnx")
......@@ -131,8 +220,8 @@ if __name__ == "__main__":
import numpy as np
z = np.random.randn(10, 20)
x = det_generator(z)
assert x.shape == (10, 28, 28)
# x = det_generator(z)
# assert x.shape == (10, 28, 28)
m, s = prop_generator(z)
assert m.shape == (10, 28, 28)
......
......@@ -3,7 +3,7 @@ from typing import Optional
from numpy import ndarray as array_type
from numpy.linalg import norm
from numpy.testing._private.utils import assert_almost_equal
from utils import reconstruction_problem
from lutils import reconstruction_problem
class LinearInverseProblem():
......@@ -13,6 +13,9 @@ class LinearInverseProblem():
data: array_type,
sigma: float,
ground_truth: Optional[array_type] = None,
print_metrics: bool = False,
plot_recon: bool = False,
save_result: bool = False
) -> None:
"""Base constructor
......@@ -21,6 +24,9 @@ class LinearInverseProblem():
data (array_type): given data or rhs y
sigma (float): measurement noise standard deviation sigma
ground_truth (Optional[array_type], optional): true value x. Defaults to None.
print_metrics (bool, optional): flag to print metrics after optimization. Defaults to False.
plot_recon (bool, optional): flag to plot the reconstruction result. Defaults to False.
save_result (bool, optional): flag to store the result as json file. Defaults to False.
"""
assert len(operator.shape) == 2 # A is a matrix
......@@ -34,6 +40,9 @@ class LinearInverseProblem():
self.ground_truth = ground_truth
self.data = data
self.sigma = sigma
self.print_metrics = print_metrics
self.plot_recon = plot_recon
self.save_result = save_result
def apply_operator(self, item: array_type) -> array_type:
"""operator application A*item
......@@ -71,13 +80,17 @@ class LinearInverseProblem():
LinearInverseProblem: constructed LIP
"""
operator, ground_truth, data = reconstruction_problem(sigma=blur_sigma)
return LinearInverseProblem(operator.real, data.reshape(-1), noise_sigma, ground_truth.numpy().reshape(-1))
import numpy as np
data = data + np.random.normal(loc=0, scale=noise_sigma, size=data.shape)
return LinearInverseProblem(operator.real, data.reshape(-1), noise_sigma,
np.array(ground_truth.numpy().reshape(-1), dtype=np.float64),
True, True, True)
if __name__ == "__main__":
A, x, y = reconstruction_problem()
lip = LinearInverseProblem(A, y.ravel(), 0.1, x.numpy().ravel())
lip = LinearInverseProblem(A, y.ravel(), 0.01, x.numpy().ravel())
import numpy as np
random_x = np.random.randn(28*28)
......
"""Simple linear inverse problem solver"""
from typing import Optional
from math import sqrt
from numpy import ndarray as array_type
from numpy.linalg import solve
from numpy.linalg import norm
......@@ -6,23 +8,19 @@ from numpy import eye as npeye
from numpy import logspace as nplogspace
from numpy import argmin as npargmin
from linear_inverse_problem import LinearInverseProblem
from generative_model import GenerativeModel
from utils import build_neighbour_matrix
from lutils import build_neighbour_matrix
class LIPSolver:
"""Abstract base class"""
def __init__(self,
lip: LinearInverseProblem,
genm: GenerativeModel) -> None:
lip: LinearInverseProblem) -> None:
"""Linear inverse problem solver with a generative model and a linear inverse problem
Args:
lip (LinearInverseProblem): linear inverse problem
genm (GenerativeModel): generative model
"""
self.lip = lip
self.genm = genm
def solve(self) -> array_type:
"""The most basic linear inverse problem solver
......@@ -37,16 +35,14 @@ class LIPThikonovSolver(LIPSolver):
"""Thiknov regularized solver"""
def __init__(self,
lip: LinearInverseProblem,
genm: GenerativeModel,
llambda: float) -> None:
llambda: Optional[float] = None) -> None:
"""Linear inverse problem solver with a generative model and a Thiknov regularizer to solve the problem
Args:
lip (LinearInverseProblem): linear inverse problem
genm (GenerativeModel): generative model
llambda (float): Langrange multiplier or regularizer parameter
llambda (Optional[float]): Langrange multiplier or regularizer parameter
"""
super().__init__(lip, genm)
super().__init__(lip)
self.llambda = llambda
def solve(self, *args, **kwargs) -> array_type:
......@@ -56,13 +52,14 @@ class LIPThikonovSolver(LIPSolver):
array_type: result of regularized inverse problem
"""
# pylint: disable=unused-argument
return solve(self.lip.operator.T.dot(self.lip.operator) + self.llambda*npeye(self.genm.dim_x),
assert self.llambda is not None
return solve(self.lip.operator.T.dot(self.lip.operator) + self.llambda*npeye(self.lip.operator.shape[1]),
self.lip.operator.T.dot(self.lip.data))
def solve_oracle(self,
logspace_min: int = -6,
logspace_max: int = 2,
logspace_num: int = 1000) -> array_type:
logspace_num: int = 100) -> array_type:
"""Solve the problem `logspace_num` times for `lambda` in [`logspace_min`, `logspace_max`] and find the
best solution given the ground truth.
......@@ -89,17 +86,16 @@ class LIPGMRFSolver(LIPThikonovSolver):
"""Thikonov regularized solver using a GMRF as regularization matrix"""
def __init__(self,
lip: LinearInverseProblem,
genm: GenerativeModel,
llambda: float) -> None:
llambda: Optional[float] = None) -> None:
"""Base constructor to create the neighbour matrix (GMRF)
Args:
lip (LinearInverseProblem): linear inverse problem
genm (GenerativeModel): generative model
llambda (float): regularization parameter
llambda (Optional[float]): regularization parameter
"""
super().__init__(lip, genm, llambda)
self.gmrf = build_neighbour_matrix(genm.dim_x, genm.dim_x).toarray()
super().__init__(lip, llambda)
self.gmrf = build_neighbour_matrix(int(sqrt(self.lip.operator.shape[1])),
int(sqrt(self.lip.operator.shape[1]))).toarray()
def solve(self, *args, **kwargs) -> array_type:
"""Solve method
......@@ -110,3 +106,30 @@ class LIPGMRFSolver(LIPThikonovSolver):
# pylint: disable=unused-argument
return solve(self.lip.operator.T.dot(self.lip.operator) + self.llambda*self.gmrf,
self.lip.operator.T.dot(self.lip.data))
def adan_plus(grad, hess, x0, y0):
from numpy.linalg import norm
from numpy import sqrt
from numpy import eye
from numpy.linalg import solve
def _Mk(gradk, gradk_1, hessk_1, xk, xk_1):
return norm(gradk - gradk_1 - hessk_1 @ (xk - xk_1))/(norm(xk - xk_1)**2)
grad_xk_1 = grad(x0)
Hk_1 = _Mk(grad_xk_1, grad(y0), hess(x0), x0, y0)
xk = x0
xk_1 = y0
for lia in range(1000):
grad_xk = grad(xk)
hess_xk = hess(xk)
Mk = _Mk(grad_xk, grad_xk_1, hess_xk, xk, xk_1)
Hk = max(Mk, 0.5*Hk_1)
lambdak = sqrt(Hk*norm(grad_xk))
xk_new = xk - solve((hess_xk + lambdak*eye(xk.shape[0])), grad_xk)
grad_xk_1 = grad_xk
Hk_1 = Hk
xk_1 = xk
xk = xk_new
print(f"iteration: {lia}. diff steps: {norm(xk - xk_1)}")
return xk
This diff is collapsed.
......@@ -106,12 +106,12 @@ class ONNX_VAE_BAD(ONNX_VAE_STO):
if __name__ == '__main__':
path = "probabilistic_vae_comparison/onnx_models/deterministic/"
path = "onnx_models/stochastic_good/good_probVAE_"
z = np.random.randn(10, 20)
vae_det = ONNX_VAE_DET(path + "encoder.onnx", path + "decoder.onnx")
x = vae_det.decoder(z)
assert(x.shape == (10, 28, 28))
# vae_det = ONNX_VAE_DET(path + "encoder.onnx", path + "decoder.onnx")
# x = vae_det.decoder(z)
# assert(x.shape == (10, 28, 28))
vae_sto = ONNX_VAE_STO(path + "encoder.onnx", path + "decoder.onnx")
m, s = vae_sto.decoder(z)
......@@ -120,9 +120,9 @@ if __name__ == '__main__':
tf_vae = vae_sto.to_tensorflow()
z0 = tf.Variable(tf.random.normal([10, 20, 1, 1]))
z0 = tf.Variable(tf.random.normal([20]))