Commit 4033e9af authored by Manuel Marschall's avatar Manuel Marschall
Browse files

inter commit

parent 355ddd5f
"""
A generative model class to wrap VAEs.
"""
from abc import abstractmethod
from math import sqrt
from typing import Optional
from numpy import ndarray as array_type
from onnx_vae import ONNX_VAE_DET, ONNX_VAE_STO
from tf_vae import TF_VAE
from numpy import ndarray as array_type
from typing import Optional
class GenerativeModel():
"""Abstract base class for Generative models"""
@abstractmethod
def __init__(self, dim_x: int, dim_z: int) -> None:
def __init__(self, dim_x: int, dim_z: int, channels: Optional[int] = 1) -> None:
"""Constructor method.
Args:
dim_x (int): original space X dimension
dim_z (int): latent space Z dimension
channels (Optional[int], optional): number of channels of space X dimension. Defaults to 1.
Raises:
ValueError: In case, dim Z > dim X
"""
if dim_x < dim_z:
raise ValueError("Is this really a generative model if dim Z > dim X?")
self.dim_x = dim_x
self.image_dim_x = (int(sqrt(dim_x)), int(sqrt(dim_x)), channels)
self.dim_z = dim_z
self.vae = None # type: Optional[TF_VAE]
def __call__(self, z: array_type) -> array_type:
def __call__(self, z_0: array_type) -> array_type:
"""Call of decoder method of generative model VAE
Args:
z_0 (array_type): latent variable
Raises:
NotImplementedError: This can only be called for VAE models atm.
Returns:
array_type: decoder result in X for given latent variable in Z
"""
if self.vae is None:
raise NotImplementedError("This function is not implemented without knowledge "
"about the generative model")
return self.vae.decoder(z)
return self.vae.decoder(z_0)
class DeterministicGenerator(GenerativeModel):
def __init__(self, dim_x: int, dim_z: int) -> None:
super().__init__(dim_x, dim_z)
"""Deterministic Generator as generative model without variational component"""
def __init__(self, dim_x: int, dim_z: int, channels: Optional[int] = 1) -> None:
"""Base constructor.
Args:
dim_x (int): dimension in X
dim_z (int): dimension in Z
channels (Optional[int], optional): number of channels. Defaults to 1.
"""
super().__init__(dim_x, dim_z, channels)
self.class_type = "DeterministicGenerator"
@staticmethod
def from_vae(vae: TF_VAE,
dim_x: int,
dim_z: int) -> GenerativeModel:
retval = DeterministicGenerator(dim_x, dim_z)
"""static method to generate a Deterministic generator from a VAE
Args:
vae (TF_VAE): Tensorflow Variational auto-encoder
dim_x (int): dimension in X. e.g. 28*28*1 for MNIST images
dim_z (int): dimension in Z
Returns:
GenerativeModel: wrapper class for generative models
"""
retval = DeterministicGenerator(dim_x, dim_z, 1)
retval.vae = vae
return retval
class ProbabilisticGenerator(GenerativeModel):
def __init__(self, dim_x: int, dim_z: int) -> None:
super().__init__(dim_x, dim_z)
"""Probabilistic / Variational generative models"""
def __init__(self, dim_x: int, dim_z: int, channels: Optional[int] = 1) -> None:
"""Base constructor.
Args:
dim_x (int): dimension in X
dim_z (int): dimension in Z
channels (Optional[int], optional): channels of space X images. Defaults to 1.
"""
super().__init__(dim_x, dim_z, channels)
self.class_type = "ProbabilisticGenerator"
@staticmethod
def from_vae(vae: TF_VAE,
dim_x: int,
dim_z: int) -> GenerativeModel:
retval = ProbabilisticGenerator(dim_x, dim_z)
"""static method to generate a probabilistic VAE from a tensorflow VAE
Args:
vae (TF_VAE): Tensorflow variational auto-encoder
dim_x (int): dimension in X
dim_z (int): dimension in Z
Returns:
GenerativeModel: Probabilistic generative model
"""
retval = ProbabilisticGenerator(dim_x, dim_z, 1)
retval.vae = vae
return retval
if __name__ == "__main__":
path_det = "onnx_models/deterministic/"
path_prob = "onnx_models/stochastic_good/"
path_add = "probabilistic_vae_comparison/"
PATH_DET = "onnx_models/deterministic/"
PATH_PROB = "onnx_models/stochastic_good/"
PATH_ADD = "probabilistic_vae_comparison/"
onnx_vae1 = ONNX_VAE_DET(path_add + path_det + "encoder.onnx",
path_add + path_det + "decoder.onnx")
onnx_vae1 = ONNX_VAE_DET(PATH_ADD + PATH_DET + "encoder.onnx",
PATH_ADD + PATH_DET + "decoder.onnx")
tf_vae_det = onnx_vae1.to_tensorflow()
det_generator = DeterministicGenerator.from_vae(tf_vae_det, int(28*28), 20)
onnx_vae2 = ONNX_VAE_STO(path_add + path_prob + "good_probVAE_encoder.onnx",
path_add + path_prob + "good_probVAE_decoder.onnx")
onnx_vae2 = ONNX_VAE_STO(PATH_ADD + PATH_PROB + "good_probVAE_encoder.onnx",
PATH_ADD + PATH_PROB + "good_probVAE_decoder.onnx")
tf_vae_good = onnx_vae2.to_tensorflow()
prop_generator = ProbabilisticGenerator.from_vae(tf_vae_good, int(28*28), 20)
import numpy as np
z = np.random.randn(10, 20)
x = det_generator(z)
assert(x.shape == (10, 28, 28))
assert x.shape == (10, 28, 28)
m, s = prop_generator(z)
assert(m.shape == (10, 28, 28))
assert(s.shape == (10, 28, 28))
assert m.shape == (10, 28, 28)
assert s.shape == (10, 28, 28)
print("Successfully tested: Generative models")
from numpy import ndarray as array_type
"""Wrapper class for a linear inverse problem (LIP)"""
from typing import Optional
from numpy import ndarray as array_type
from numpy.linalg import norm
from numpy.testing._private.utils import assert_almost_equal
from utils import reconstruction_problem
class LinearInverseProblem():
"""Abstract class for a LIP"""
def __init__(self,
operator: array_type,
data: array_type,
sigma: float,
ground_truth: Optional[array_type] = None,
) -> None:
"""Base constructor
Args:
operator (array_type): linear operator A
data (array_type): given data or rhs y
sigma (float): measurement noise standard deviation sigma
ground_truth (Optional[array_type], optional): true value x. Defaults to None.
"""
assert len(operator.shape) == 2 # A is a matrix
assert len(data.shape) == 1 # y is a vector
assert len(data.shape) == 1 # y is a vector
if ground_truth is not None:
assert operator.shape[1] == ground_truth.shape[0] # A times x is defined
assert operator.shape[0] == data.shape[0] # A times x is of dimension of y
......@@ -25,38 +36,60 @@ class LinearInverseProblem():
self.sigma = sigma
def apply_operator(self, item: array_type) -> array_type:
"""operator application A*item
Args:
item (array_type): given item to apply operator to
Returns:
array_type: result
"""
return self.operator.dot(item)
def res(self, item: array_type) -> float:
return self.sigma**(-2) * norm(self.apply_operator(item) - self.data)**2
"""Computation of L2 residual squared
s^-2 || Ax - y ||_2^2
Args:
item (array_type): given item
Returns:
float: L2 residual squared
"""
return float(self.sigma**(-2) * norm(self.apply_operator(item) - self.data)**2)
@staticmethod
def mnist_recon_problem(blur_sigma: int = 4,
noise_sigma: float = 0.1):
from utils import reconstruction_problem
A, x, y = reconstruction_problem(sigma=blur_sigma)
return LinearInverseProblem(A.real, y.reshape(-1), noise_sigma, x.numpy().reshape(-1))
"""Example LIP from MNIST dataset
Args:
blur_sigma (int, optional): number of pixel in blurring operator. Defaults to 4.
noise_sigma (float, optional): noise of measurements, sigma. Defaults to 0.1.
Returns:
LinearInverseProblem: constructed LIP
"""
operator, ground_truth, data = reconstruction_problem(sigma=blur_sigma)
return LinearInverseProblem(operator.real, data.reshape(-1), noise_sigma, ground_truth.numpy().reshape(-1))
if __name__ == "__main__":
from utils import reconstruction_problem
A, x, y = reconstruction_problem()
Lip = LinearInverseProblem(A, y.ravel(), 0.1, x.numpy().ravel())
lip = LinearInverseProblem(A, y.ravel(), 0.1, x.numpy().ravel())
import numpy as np
item = np.random.randn(28*28)
random_x = np.random.randn(28*28)
lhs = Lip.apply_operator(item)
assert_almost_equal(np.dot(A, item), lhs, err_msg="Operator application failed")
assert_almost_equal(np.dot(A, random_x), lip.apply_operator(random_x), err_msg="Operator application failed")
lhs = Lip.res(item)
assert_almost_equal(100 * np.linalg.norm(np.dot(A, item) - y.reshape(-1))**2, lhs,
assert_almost_equal(100 * np.linalg.norm(np.dot(A, random_x) - y.reshape(-1))**2, lip.res(random_x),
err_msg="Residual computation failed")
Lip2 = LinearInverseProblem.mnist_recon_problem()
assert_almost_equal(Lip.operator, Lip2.operator)
assert_almost_equal(Lip.data, Lip2.data)
assert_almost_equal(Lip.sigma, Lip2.sigma)
assert_almost_equal(Lip.ground_truth, Lip2.ground_truth)
assert_almost_equal(lip.operator, Lip2.operator)
assert_almost_equal(lip.data, Lip2.data)
assert_almost_equal(lip.sigma, Lip2.sigma)
assert_almost_equal(lip.ground_truth, Lip2.ground_truth)
print("Sucessfully tested: LinearInverseProblem")
"""Simple linear inverse problem solver"""
from numpy import ndarray as array_type
from numpy.linalg import solve
from numpy.linalg import norm
from numpy import eye as npeye
from numpy import logspace as nplogspace
from numpy import argmin as npargmin
......@@ -9,52 +11,102 @@ from utils import build_neighbour_matrix
class LIPSolver:
"""Abstract base class"""
def __init__(self,
lip: LinearInverseProblem,
gm: GenerativeModel) -> None:
genm: GenerativeModel) -> None:
"""Linear inverse problem solver with a generative model and a linear inverse problem
Args:
lip (LinearInverseProblem): linear inverse problem
genm (GenerativeModel): generative model
"""
self.lip = lip
self.gm = gm
self.genm = genm
def solve(self) -> array_type:
# (A'A) x = A' y --> invert to solve
"""The most basic linear inverse problem solver
(A'A) x = A' y --> invert to solve
Returns:
array_type: solution of dual problem
"""
return solve(self.lip.operator.T.dot(self.lip.operator), self.lip.operator.T.dot(self.lip.data))
class LIPThikonovSolver(LIPSolver):
"""Thiknov regularized solver"""
def __init__(self,
lip: LinearInverseProblem,
gm: GenerativeModel,
genm: GenerativeModel,
llambda: float) -> None:
super().__init__(lip, gm)
"""Linear inverse problem solver with a generative model and a Thiknov regularizer to solve the problem
Args:
lip (LinearInverseProblem): linear inverse problem
genm (GenerativeModel): generative model
llambda (float): Langrange multiplier or regularizer parameter
"""
super().__init__(lip, genm)
self.llambda = llambda
def solve(self) -> array_type:
return solve(self.lip.operator.T.dot(self.lip.operator) + self.llambda*npeye(self.gm.dim_x),
def solve(self, *args, **kwargs) -> array_type:
"""Solve method
(A'A + lambda I) x = A' y --> invert to solve
Returns:
array_type: result of regularized inverse problem
"""
# pylint: disable=unused-argument
return solve(self.lip.operator.T.dot(self.lip.operator) + self.llambda*npeye(self.genm.dim_x),
self.lip.operator.T.dot(self.lip.data))
def solve_oracle(self,
logspace_min: int = -6,
logspace_max: int = 2,
lospace_num: int = 1000) -> array_type:
lambda_list = nplogspace(logspace_min, logspace_max, num=logspace_max)
logspace_num: int = 1000) -> array_type:
"""Solve the problem `logspace_num` times for `lambda` in [`logspace_min`, `logspace_max`] and find the
best solution given the ground truth.
Args:
logspace_min (int, optional): minimal exponent. Defaults to -6.
logspace_max (int, optional): maximal exponent. Defaults to 2.
logspace_num (int, optional): number of solves. Defaults to 1000.
Returns:
array_type: best inversion result
"""
lambda_list = nplogspace(logspace_min, logspace_max, num=logspace_num)
x_list = []
res_list = []
for ell in lambda_list:
self.llambda = ell
_x, _res = self.solve()
_x = self.solve()
x_list.append(_x)
res_list.append(_res)
res_list.append(norm(_x - self.lip.ground_truth)/norm(self.lip.ground_truth))
return x_list[npargmin(res_list)]
class LIPGMRFSolver(LIPThikonovSolver):
"""Thikonov regularized solver using a GMRF as regularization matrix"""
def __init__(self,
lip: LinearInverseProblem,
gm: GenerativeModel,
genm: GenerativeModel,
llambda: float) -> None:
super().__init__(lip, gm, llambda)
self.gmrf = build_neighbour_matrix(gm.dim_x).toarray()
"""Base constructor to create the neighbour matrix (GMRF)
def solve(self) -> array_type:
Args:
lip (LinearInverseProblem): linear inverse problem
genm (GenerativeModel): generative model
llambda (float): regularization parameter
"""
super().__init__(lip, genm, llambda)
self.gmrf = build_neighbour_matrix(genm.dim_x, genm.dim_x).toarray()
def solve(self, *args, **kwargs) -> array_type:
"""Solve method
(A'A + lambda GMRF) x = A' y --> invert to solve
Returns:
array_type: result of regularized inverse problem
"""
# pylint: disable=unused-argument
return solve(self.lip.operator.T.dot(self.lip.operator) + self.llambda*self.gmrf,
self.lip.operator.T.dot(self.lip.data))
\ No newline at end of file
self.lip.operator.T.dot(self.lip.data))
# %%
import tensorflow as tf
from utils import (load_vae_models, plot_recon, savefig, plot_z_coverage, plot_convergence)
import math
import numpy as np
import matplotlib.pyplot as plt
from utils import (load_vae_models, plot_recon, savefig, plot_z_coverage, plot_convergence)
# tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
# tf.config.run_functions_eagerly(True)
vae = None
......@@ -692,7 +692,7 @@ def compare_with_matlab_vae():
from scipy.io import loadmat
data = loadmat(path)
matlab_z = data["z_images"]
from utils import load_mnist
_, eval_dataset = load_mnist(shuffle=False)
_, vae_good, _ = load_vae_models()
......@@ -712,7 +712,7 @@ def compare_with_matlab_vae():
errs = []
for lia in range(len(z_list)):
errs.append(np.linalg.norm(z_list[lia] - matlab_z[:, lia])/np.linalg.norm(matlab_z[:, lia]))
print(f"max error: {np.max(errs)}")
print(f"min error: {np.min(errs)}")
print(f"avg error: {np.mean(errs)}")
......
from typing import Any
from numpy.lib.arraysetops import isin
from typing import Any, Union
import tensorflow as tf
from numpy import ndarray as array_like
from scipy.optimize import minimize
from tensorflow.python.types.core import Value
from lip_solver import LIPSolver
from linear_inverse_problem import LinearInverseProblem
from generative_model import GenerativeModel, ProbabilisticGenerator, DeterministicGenerator
......@@ -23,11 +25,12 @@ class VAELIPSolver(LIPSolver):
gm: GenerativeModel,
export_path: str) -> None:
super().__init__(lip, gm)
assert gm.vae is not None
self.path = export_path
self.path_suffix = "generic/"
def get_initial_value(self, iteration: int = 10) -> Any:
z0 = tf.zeros([1, 20, 1, 1], dtype=tf.float64)
z0 = tf.zeros([1, self.genm.dim_z, 1, 1], dtype=tf.float64)
A = self.lip.operator
sigma = self.lip.sigma
......@@ -36,58 +39,63 @@ class VAELIPSolver(LIPSolver):
if counter >= iteration:
break
counter += 1
if isinstance(self.gm, ProbabilisticGenerator):
g, gamma = self.gm(z0)
if isinstance(self.genm, ProbabilisticGenerator):
g, gamma = self.genm(z0)
g = tf.reshape(g, [-1])
gamma = tf.reshape(gamma, [-1])
elif isinstance(self.gm, DeterministicGenerator):
g = self.gm(z0)
elif isinstance(self.genm, DeterministicGenerator):
g = self.genm(z0)
g = tf.reshape(g, [-1])
gamma = tf.ones(self.gm.dim_x, dtype=tf.double)
gamma = tf.ones(self.genm.dim_x, dtype=tf.double)
else:
raise ValueError(f"Unknown generative model type: {type(self.gm)}")
raise ValueError(f"Unknown generative model type: {type(self.genm)}")
# A'A/s^2 + 1/Gamma
lhs = tf.matmul(tf.transpose(A), A)/(sigma*sigma) + tf.linalg.diag(1/gamma)
# A' y/s^2 + 1/Gamma g(z)
rhs = matvecmul(tf.transpose(A), tf.reshape(self.lip.data, [-1]))/(sigma*sigma) + \
matvecmul(tf.linalg.diag(1/gamma), g)
# solution of s^-2|| Ax - y ||_2^2 + ||x - g(z)||_G^2
x0 = matvecsolve(lhs, rhs)
z0 = tf.reshape(self.gm.vae.encoder(tf.reshape(x0, [28, 28]))[:, :20], [20])
z0 = tf.reshape(self.genm.vae.encoder(tf.reshape(x0, [self.genm.image_dim_x[0],
self.genm.image_dim_x[1]]))[:, :self.genm.dim_z], [-1])
return z0
class VAELatpushSolver(VAELIPSolver):
def __init__(self, lip: LinearInverseProblem, gm: GenerativeModel, export_path: str) -> None:
super().__init__(lip, gm, export_path)
def __init__(self, lip: LinearInverseProblem, genm: GenerativeModel, export_path: str) -> None:
super().__init__(lip, genm, export_path)
self.path_suffix = "latpush/"
def solve(self,
x0_iteration: int = 1,
num_iteration: int = 10000,
loss_tolerance: float = 1e-1) -> dict:
oracle_z0 = tf.squeeze(self.gm.vae.encoder(tf.reshape(self.lip.ground_truth, [28, 28]))[0, :self.gm.dim_z]).numpy()
def solve_tf(self,
x0_iteration: int = 1,
num_iteration: int = 10000,
loss_tolerance: float = 1e-1) -> dict:
oracle_z0 = tf.squeeze(self.genm.vae.encoder(tf.reshape(self.lip.ground_truth, [self.genm.image_dim_x[0],
self.genm.image_dim_x[1]]))[0, :self.genm.dim_z]).numpy()
z0 = tf.Variable(self.get_initial_value(iteration=x0_iteration))
# z0 = tf.Variable(tf.zeros(20))
y = tf.reshape(self.lip.data, [-1])
loss_tolerance = 1e-1
mse_list = []
psnr_list = []
ssim_list = []
loss_list = []
def neg_log_post():
if isinstance(self.gm, ProbabilisticGenerator):
g, _ = self.gm(z0)
elif isinstance(self.gm, DeterministicGenerator):
g = self.gm(z0)
if isinstance(self.genm, ProbabilisticGenerator):
g, _ = self.genm(z0)
elif isinstance(self.genm, DeterministicGenerator):
g = self.genm(z0)
else:
raise ValueError(f"Unknown generative model type: {type(self.gm)}")
raise ValueError(f"Unknown generative model type: {type(self.genm)}")
g = tf.reshape(g, [-1])
s2 = (1/(self.lip.sigma*self.lip.sigma))
loss1 = s2*tf.square(tf.linalg.norm(matvecmul(self.lip.operator, g) - y))
# s^-2 || Ag(z) - y ||_2^2
inner = matvecmul(self.lip.operator, g) - y
loss1 = s2*tf.reduce_sum(inner*inner)
# pylint: disable=unexpected-keyword-arg, redundant-keyword-arg, no-value-for-parameter
# ||z||_2^2
loss2 = tf.cast(tf.reduce_sum(z0*z0), dtype=tf.double)
retval = (loss1 + loss2)
return retval
......@@ -96,23 +104,26 @@ class VAELatpushSolver(VAELIPSolver):
for lia in range(num_iteration):
opt.minimize(neg_log_post, var_list=[z0])
if lia % 100 == 0 or lia == 0:
if isinstance(self.gm, ProbabilisticGenerator):
x0 = tf.reshape(self.gm(z0)[0], [-1])
elif isinstance(self.gm, DeterministicGenerator):
x0 = tf.reshape(self.gm(z0), [-1])
if isinstance(self.genm, ProbabilisticGenerator):
x0 = tf.reshape(self.genm(z0)[0], [-1])
elif isinstance(self.genm, DeterministicGenerator):
x0 = tf.reshape(self.genm(z0), [-1])
else:
raise ValueError(f"Unknown generative model type: {type(self.gm)}")
fig, mse, psnr, ssim = plot_recon(tf.reshape(x0, [28, 28]),
tf.cast(tf.reshape(self.lip.ground_truth, [28, 28]), dtype=tf.double),
tf.reshape(y, [28, 28]), return_stats=True)
raise ValueError(f"Unknown generative model type: {type(self.genm)}")
# pylint: disable=unexpected-keyword-arg, redundant-keyword-arg, no-value-for-parameter
fig, mse, psnr, ssim = plot_recon(tf.reshape(x0, [self.genm.image_dim_x[0], self.genm.image_dim_x[1]]),
tf.cast(tf.reshape(self.lip.ground_truth, [self.genm.image_dim_x[0],
self.genm.image_dim_x[1]]),
dtype=tf.double),
tf.reshape(y, [self.genm.image_dim_x[0], self.genm.image_dim_x[1]]),
return_stats=True)
mse_list.append(mse)
psnr_list.append(psnr)
ssim_list.append(ssim)
loss_list.append(neg_log_post())
print(f"Iteration {lia}/{num_iteration}, loss: {loss_list[-1]}")
savefig(fig, self.path + self.path_suffix, f"image_{lia}.png")
fig = plot_z_coverage(self.gm.vae, x0, oracle_z0)
fig = plot_z_coverage(self.genm.vae, x0, oracle_z0)
savefig(fig, self.path + self.path_suffix, f"z0values_{lia}.png")
if lia > 100 and tf.abs(loss_list[-2] - loss_list[-1]) < loss_tolerance:
break
......@@ -120,6 +131,96 @@ class VAELatpushSolver(VAELIPSolver):
fig = plot_convergence(ll, title=label)
savefig(fig, self.path + self.path_suffix, f"{label}_conv.png")
retval = {
"zopt": z0.numpy(),
"MSE": mse_list,
"PSNR": psnr_list,
"SSIM": ssim_list,
"LOSS": loss_list
}
return retval
def solve_scipy(self,
x0_iteration: int = 1,
num_iteration: int = 10000,
loss_tolerance: float = 1e-1) -> dict:
oracle_z0 = tf.squeeze(self.genm.vae.encoder(tf.reshape(self.lip.ground_truth, [self.genm.image_dim_x[0],
self.genm.image_dim_x[1]]))[0, :self.genm.dim_z]).numpy()
z0 = self.get_initial_value(iteration=x0_iteration).numpy()
# z0 = tf.Variable(tf.zeros(20))
y = tf.reshape(self.lip.data, [-1]).numpy()
mse_list = []
psnr_list = []
ssim_list = []