Skip to content
Snippets Groups Projects
Commit b00f37fa authored by Jörg Martin's avatar Jörg Martin
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 1362 additions and 0 deletions
from math import pi
import numpy as np
import torch
import torch.nn as nn
from EIVGeneral import repetition
class EIVDropout(nn.Module):
"""
A Dropout Layer with Dropout probability `p` (default 0.5) that repeats the
same Bernoulli mask L times along the batch dimension - instead of taking a
different one for each batch member - where L is the output of
repetition_map(). When evaluation `forward(x)` the batch dimension of `x`
(the first one) is asserted to be a multiple of `L=repetition_map()`. The
`repetition_map` defaults to the constant map to 1, in which case
`EIVDropout` is equivalent to `torch.nn.Dropout`.
:param p: A float between 0 and 1. Defaults to 0.5.
:param repetition_map: Map that takes no
argument and returns an integer. Defaults to the constant map to 1.
"""
def __init__(self, p=0.5, repetition_map=lambda: 1):
super().__init__()
self.p = p
self.repetition_map = repetition_map
self._train = True
def train(self, training=True):
if training:
self._train = True
else:
self._train = False
def eval(self):
self.train(training=False)
def forward(self, x):
if not self._train:
return x
else:
device = self.study_device(x)
L = self.repetition_map()
input_shape = x.shape
assert input_shape[0] % L == 0
mask_shape = list(input_shape)
mask_shape[0] = int(input_shape[0] / L)
mask = torch.bernoulli(torch.ones(mask_shape) * (1-self.p))\
/ (1-self.p)
repeated_mask = repetition.repeat_tensors(mask,
number_of_draws=L)[0]
assert x.shape == repeated_mask.shape
return x * repeated_mask.to(device)
@staticmethod
def study_device(x):
if x.is_cuda:
return torch.device('cuda:' + str(x.get_device()))
else:
return torch.device('cpu')
class EIVVariationalDropout(nn.Module):
"""
A Variational Dropout Layer (of Type A, as in Kingma et al.) with Dropout
probability `p`
(default 0.5) that repeats the same Bernoulli mask L times along the batch
dimension - instead of taking a
different one for each batch member - where L is the output of
repetition_map(). When evaluation `forward(x)` the batch dimension of `x`
(the first one) is asserted to be a multiple of `L=repetition_map()`. The
`repetition_map` defaults to the constant map to 1, in which case
`EIVDropout` is equivalent to classical variational Dropout.
:param initial_alpha: A positive float, will be taken as initial value for
alpha (the variance of the Gaussian dropout mask).
:param repetition_map: Map that takes no
argument and returns an integer. Defaults to the constant map to 1.
"""
c_1 = 1.16145124
c_2 = -1.50204118
c_3 = 0.58629921
def __init__(self, initial_alpha=0.5, repetition_map=lambda: 1):
super().__init__()
self.initial_alpha = initial_alpha
self.repetition_map = repetition_map
initial_alpha_par = self.invert_softplus(self.initial_alpha)
self.alpha_par = nn.Parameter(torch.tensor(initial_alpha_par))
self._train = True
def train(self, training=True):
if training:
self._train = True
else:
self._train = False
def eval(self):
self.train(training=False)
@staticmethod
def invert_softplus(softplus_value):
"""
Inverts the Softplus function
:param softplus_value: A float
"""
return np.log(np.exp(softplus_value)-1)
def alpha(self):
return nn.Softplus()(self.alpha_par)
def forward(self, x):
if not self._train:
return x
else:
device = self.study_device(x)
L = self.repetition_map()
input_shape = x.shape
assert input_shape[0] % L == 0
mask_shape = list(input_shape)
mask_shape[0] = int(input_shape[0] / L)
mask = 1.0 + torch.sqrt(self.alpha()) * \
torch.randn(mask_shape).to(device)
repeated_mask = repetition.repeat_tensors(mask,
number_of_draws=L)[0]
assert x.shape == repeated_mask.shape
return x * repeated_mask.to(device)
@staticmethod
def study_device(x):
if x.is_cuda:
return torch.device('cuda:' + str(x.get_device()))
else:
return torch.device('cpu')
def variational_dropout_regularizer(self):
"""
Taken from Kingma et al. Identical, up to a constant, to the neg.
KL-div of the variational distribution and the improper prior.
"""
alpha = self.alpha()
return 0.5 * torch.log(alpha)\
+ self.c_1 * alpha + self.c_2 * alpha**2 + self.c_3 * alpha**3
class EIVInput(nn.Module):
"""
Input class for an Error-in-Variables model based on variational inference.
For regularization there is a method EIVInput.x_evidence included. The
underlying model is x = zeta + epsilon, where zeta denotes the true (but
unknown) input variable and epsilon denotes normal noise with standard
deviation std_x.
:param init_std_x: The initial value of sigma_x (std of
noise on input), will be made a learnable parameter. Defaults to 1.0
:param precision_prior_zeta: The precision of the prior on zeta (the true,
but unknown input). Defaults to 0 (= flat prior)
:param external_std_x: If not None, should be a map that returns (without
arguments) std_x (i.e. as the output of self.get_std_x). If None (the
Default) will be parametrized via a Softmax of a torch.nn.Parameter.
:param std_x_scale: Will be used for scaling std_x internally. Defaults
to 1.0.
"""
def __init__(self, init_std_x = 1.0, precision_prior_zeta=0.0,
external_std_x=None, std_x_scale=1.0):
super(EIVInput, self).__init__()
self.init_std_x = init_std_x
self.precision_prior_zeta = precision_prior_zeta
self.external_std_x = external_std_x
InverseSoftplus = lambda sigma: torch.log(torch.exp(sigma) - 1 )
self.std_x_scale = std_x_scale
if external_std_x is None:
self.std_x_par = nn.Parameter(
InverseSoftplus(
torch.tensor([init_std_x]))/self.std_x_scale)
self.noise = True
# self.fixed_z will be used as "noise" if self.noise is False
self.fixed_z = 0.0
def get_std_x(self):
"""
Returns the standard deviation of the distribution of x given zeta.
"""
if self.external_std_x is None:
return nn.Softplus()(self.std_x_par * self.std_x_scale)
else:
return self.external_std_x()
def forward(self, x):
std_x = self.get_std_x()
if std_x.item() > 0:
std_zeta = 1/(1/std_x**2 + self.precision_prior_zeta)**0.5
mean_zeta = std_zeta**2/std_x**2 * x
else:
assert std_x.item() == 0
std_zeta = 0.0
mean_zeta = x
if self.noise:
z = torch.randn_like(x)
return mean_zeta + std_zeta * z
else:
return mean_zeta + std_zeta * self.fixed_z
def neg_x_evidence(self, x, remove_divergent_term=True):
"""
Returns the value of x under the prior predictive of the EIV model
:param x: A torch.tensor
:param remove_divergent_term: If True (default) the constant term, that
diverges for a float prior will be removed.
"""
if not remove_divergent_term:
if self.precision_prior_zeta == 0:
raise ValueError('Divergent term infinite in EIV model')
else:
divergent_term = 0.5 * torch.log(self.precision_prior_zeta)
else:
divergent_term = 0
batch_size = x.shape[0]
regularization = 0.5/batch_size\
* self.precision_prior_zeta/(
self.precision_prior_zeta * self.get_std_x()**2 + 1) \
* torch.sum(x**2)
regularization += 0.5 * torch.log(2 * pi *
(self.get_std_x()**2 * self.precision_prior_zeta + 1) )
regularization += divergent_term
return regularization
import torch
import torch.nn as nn
from EIVArchitectures.Layers import EIVInput, EIVDropout, EIVVariationalDropout
from EIVGeneral.repetition import repeat_tensors, reshape_to_chunks
class FNNEIV(nn.Module):
"""
A fully connected net with Error-in-Variables input and Bernoulli dropout
layers.
:param p: dropout rate, defaults to 0.5
:param init_std_y: Initial estimated standard deviation for y.
:param precision_prior_zeta: precision of the prior for zeta.
Defaults to 0.0 (=improper prior)
:param deming: Will be used as a coupling factor between std_y and std_x
(the Deming regression), that is std_x = deming * std_y.
:param h: A list specifying the number of neurons in each layer.
**Note**:
To change the deming factor afterwards, use the method `change_deming`
"""
def __init__(self, p = 0.5, init_std_y=1.0, precision_prior_zeta=0.0,
deming=1.0, h=[1, 50,100,50, 1]):
super().__init__()
# part before Bernoulli dropout
self.init_std_y = init_std_y
InverseSoftplus = lambda sigma: torch.log(torch.exp(sigma) - 1 )
self.std_y_par = nn.Parameter(
InverseSoftplus(torch.tensor([init_std_y])))
self._repetition = 1
self.main = nn.Sequential(
EIVInput(precision_prior_zeta=precision_prior_zeta,
external_std_x=self.get_std_x),
nn.Linear(h[0], h[1]),
nn.LeakyReLU(1e-2),
EIVDropout(p=p, repetition_map=self._repetition_map),
nn.Linear(h[1],h[2]),
nn.LeakyReLU(1e-2),
EIVDropout(p=p, repetition_map=self._repetition_map),
nn.Linear(h[2],h[3]),
nn.LeakyReLU(1e-2),
EIVDropout(p=p, repetition_map=self._repetition_map),
nn.Linear(h[3],h[4]))
self.p = p
self._deming = deming
# needed for switch_noise_off()
self._stored_deming = deming
self.noise_is_on = True
def change_deming(self, deming):
"""
Update deming factor to `deming`
:param deming: A positive float
"""
print('Updating deming from %.3f to %.3f' % (self._deming, deming))
self._deming = deming
self._stored_deming = deming
def noise_off(self):
self._deming = 0.0
self.noise_is_on = False
def noise_on(self):
self._deming = self._stored_deming
self.noise_is_on = True
def sigma(self, y):
scalar_sigma = self.get_std_y()
return scalar_sigma.repeat(y.shape)
def get_std_x(self):
return self._deming * self.get_std_y()
def get_std_y(self):
return nn.Softplus()(self.std_y_par)
def _repetition_map(self):
return self._repetition
def forward(self, x, repetition=1):
self._repetition = repetition
mu = self.main(x)
sigma = self.sigma(mu)
self._repetition = 1
return mu, sigma
def regularizer(self, x, lamb):
"""
Regularization for EIV net: prior KL term,
from "Bernoulli Dropout" by Gal et al., plus the regularizer term
from the EIV model (which is constant if precision_prior_zeta is 0).
:param x: A torch.tensor, the input
:param lamb: float, prefactor for regularization
"""
regularization = 0
p = torch.tensor(self.p)
last_Dropout_layer = None
for i, layer in enumerate(self.main):
if type(layer) is not EIVDropout and last_Dropout_layer != i-1:
for par in layer.parameters():
regularization += lamb*(par**2).sum().view((-1,))
elif type(layer) is EIVDropout:
next_layer = self.main[i+1]
assert type(next_layer) is nn.Linear
regularization += lamb*(next_layer.bias**2).sum().view((-1,))
regularization += lamb/(1-p) \
* (next_layer.weight**2).sum().view((1,))
last_Dropout_layer = i
else:
pass
# entropy actually not needed here, added for completeness
entropy = -1 * (p * torch.log(p) + (1-p) * torch.log(1-p))
regularization += entropy
if self._deming > 0:
# add EIV regularization term
# (constant if precision_prior_zeta is 0)
regularization += self.main[0].neg_x_evidence(x)
return regularization
def predict(self, x, number_of_draws=100, remove_graph=True
, take_average_of_prediction=True):
"""
Average over `number_of_draws` forward passes. If
`take_average_of_prediction` is False, the averaging is skipped and
all forward passes are returned.
**Note**: This method does neither touch the input noise nor Dropout.
The corresponding setting is left to the user!
:param x: A torch.tensor, the input
:param number_of_draws: Number of draws to obtain from x
:param remove_graph: If True (default) the output will
be detached to save memory
:param take_average_of_prediction: If False, no averaging will be
applied to the prediction and the second dimension of the first output
will count the number_of_draws.
"""
x, = repeat_tensors(x, number_of_draws=number_of_draws)
pred, sigma = self.forward(x)
if remove_graph:
pred, sigma = pred.detach(), sigma.detach()
pred, sigma = reshape_to_chunks(pred, sigma,
number_of_draws=number_of_draws)
# reduce along the draws (actually redundant for sigma)
if take_average_of_prediction:
pred, sigma = torch.mean(pred, dim=1), torch.mean(sigma, dim=1)
else:
sigma = torch.mean(sigma, dim=1)
return pred, sigma
class FNNBer(nn.Module):
"""
A fully connected net Bernoulli dropout layers.
:param p: dropout rate, defaults to 0.5
:param init_std_y: Initial standard deviation for input y.
:param h: A list specifying the number of neurons in each layer.
"""
def __init__(self, p=0.5, init_std_y=1.0, h=[1, 50,100,50, 1]):
super().__init__()
# part before Bernoulli dropout
self.init_std_y = init_std_y
InverseSoftplus = lambda sigma: torch.log(torch.exp(sigma) - 1 )
self.std_y_par = nn.Parameter(
InverseSoftplus(torch.tensor([init_std_y])))
self.main = nn.Sequential(
nn.Linear(h[0], h[1]),
nn.LeakyReLU(1e-2),
nn.Dropout(p=p),
nn.Linear(h[1],h[2]),
nn.LeakyReLU(1e-2),
nn.Dropout(p=p),
nn.Linear(h[2],h[3]),
nn.LeakyReLU(1e-2),
nn.Dropout(p=p),
nn.Linear(h[3],h[4]))
self.p = p
def sigma(self, y):
scalar_sigma = self.get_std_y()
return scalar_sigma.repeat(y.shape)
def get_std_y(self):
return nn.Softplus()(self.std_y_par)
def forward(self, x):
mu = self.main(x)
sigma = self.sigma(mu)
return mu, sigma
def regularizer(self, x, lamb):
"""
Regularization (prior KL term), from "Bernoulli Dropout" by Gal et al.
:param x: A torch.tensor, the input
:param lamb: float, prefactor for regularization
"""
regularization = 0
p = torch.tensor(self.p)
last_Dropout_layer = None
for i, layer in enumerate(self.main):
if type(layer) is not nn.Dropout and last_Dropout_layer != i-1:
for par in layer.parameters():
regularization += lamb*(par**2).sum().view((-1,))
elif type(layer) is nn.Dropout:
next_layer = self.main[i+1]
assert type(next_layer) is nn.Linear
regularization += lamb*(next_layer.bias**2).sum().view((-1,))
regularization += lamb/(1-p) \
* (next_layer.weight**2).sum().view((1,))
last_Dropout_layer = i
else:
pass
# entropy actually not needed here, added for completeness
entropy = -1 * (p * torch.log(p) + (1-p) * torch.log(1-p))
regularization += entropy
return regularization
def predict(self, x, number_of_draws=100, remove_graph=True,
take_average_of_prediction=True):
"""
Average over `number_of_draws` forward passes. If
`take_average_of_prediction` is False, the averaging is skipped and
all forward passes are returned.
**Note**: This method does not touch the Dropout.
The corresponding setting is left to the user! (analogous to
the corresponding method for FNNEIV)
:param x: A torch.tensor, the input
:param number_of_draws: Number of draws to obtain from x
:param remove_graph: If True (default) the output will
be detached to save memory
:param take_average_of_prediction: If False, no averaging will be
applied to the prediction and the second dimension of the first output
will count the number_of_draws.
"""
x, = repeat_tensors(x, number_of_draws=number_of_draws)
pred, sigma = self.forward(x)
if remove_graph:
pred, sigma = pred.detach(), sigma.detach()
pred, sigma = reshape_to_chunks(pred, sigma,
number_of_draws=number_of_draws)
# reduce along the draws (actually redundant for sigma)
if take_average_of_prediction:
pred, sigma = torch.mean(pred, dim=1), torch.mean(sigma, dim=1)
else:
sigma = torch.mean(sigma, dim=1)
return pred, sigma
class FNN_VD_EIV(nn.Module):
"""
A fully connected net with Error-in-Variables input and variational dropout
layers.
:param initial_alpha: initial value for the alpha of each VD layer
:param init_std_y: Initial estimated standard deviation for y.
:param precision_prior_zeta: precision of the prior for zeta.
Defaults to 0.0 (=improper prior)
:param deming: Will be used as a coupling factor between std_y and std_x
(the Deming regression), that is std_x = deming * std_y.
:param h: A list specifying the number of neurons in each layer.
**Note**:
To change the deming factor afterwards, use the method `change_deming`
"""
def __init__(self, initial_alpha = 0.5, init_std_y=1.0, precision_prior_zeta=0.0,
deming=1.0, h=[1, 50,100,50, 1]):
super().__init__()
# part before Bernoulli dropout
self.init_std_y = init_std_y
InverseSoftplus = lambda sigma: torch.log(torch.exp(sigma) - 1 )
self.std_y_par = nn.Parameter(
InverseSoftplus(torch.tensor([init_std_y])))
self._repetition = 1
self.main = nn.Sequential(
EIVInput(precision_prior_zeta=precision_prior_zeta,
external_std_x=self.get_std_x),
nn.Linear(h[0], h[1]),
nn.LeakyReLU(1e-2),
EIVVariationalDropout(initial_alpha=initial_alpha, repetition_map=self._repetition_map),
nn.Linear(h[1],h[2]),
nn.LeakyReLU(1e-2),
EIVVariationalDropout(initial_alpha=initial_alpha, repetition_map=self._repetition_map),
nn.Linear(h[2],h[3]),
nn.LeakyReLU(1e-2),
EIVVariationalDropout(initial_alpha=initial_alpha, repetition_map=self._repetition_map),
nn.Linear(h[3],h[4]))
self.initial_alpha = initial_alpha
self._deming = deming
# needed for switch_noise_off()
self._stored_deming = deming
self.noise_is_on = True
def change_deming(self, deming):
"""
Update deming factor to `deming`
:param deming: A positive float
"""
print('Updating deming from %.3f to %.3f' % (self._deming, deming))
self._deming = deming
self._stored_deming = deming
def noise_off(self):
self._deming = 0.0
self.noise_is_on = False
def noise_on(self):
self._deming = self._stored_deming
self.noise_is_on = True
def sigma(self, y):
scalar_sigma = self.get_std_y()
return scalar_sigma.repeat(y.shape)
def get_std_x(self):
return self._deming * self.get_std_y()
def get_std_y(self):
return nn.Softplus()(self.std_y_par)
def _repetition_map(self):
return self._repetition
def forward(self, x, repetition=1):
self._repetition = repetition
mu = self.main(x)
sigma = self.sigma(mu)
self._repetition = 1
return mu, sigma
def regularizer(self, x, lamb):
"""
Regularization for EIV net: prior KL term,
from "Bernoulli Dropout" by Gal et al., plus the regularizer term
from the EIV model (which is constant if precision_prior_zeta is 0).
:param x: A torch.tensor, the input
:param lamb: A list of floats, prefactors for bias regularization
(first term) and VD-KL term (seconnd term)
"""
regularization = 0
last_Dropout_layer = None
assert type(lamb) is list
lamb_bias, lamb_kl = lamb
for i, layer in enumerate(self.main):
if type(layer) is not EIVVariationalDropout and last_Dropout_layer != i-1:
for par in layer.parameters():
regularization += lamb_bias*(par**2).sum().view((-1,))
elif type(layer) is EIVVariationalDropout:
next_layer = self.main[i+1]
assert type(next_layer) is nn.Linear
regularization += lamb_bias *(next_layer.bias**2).sum().view((-1,))
regularization += lamb_kl *\
layer.variational_dropout_regularizer()
last_Dropout_layer = i
else:
pass
if self._deming > 0:
# add EIV regularization term
# (constant if precision_prior_zeta is 0)
regularization += self.main[0].neg_x_evidence(x)
return regularization
def predict(self, x, number_of_draws=100, remove_graph=True
, take_average_of_prediction=True):
"""
Average over `number_of_draws` forward passes. If
`take_average_of_prediction` is False, the averaging is skipped and
all forward passes are returned.
**Note**: This method does neither touch the input noise nor Dropout.
The corresponding setting is left to the user!
:param x: A torch.tensor, the input
:param number_of_draws: Number of draws to obtain from x
:param remove_graph: If True (default) the output will
be detached to save memory
:param take_average_of_prediction: If False, no averaging will be
applied to the prediction and the second dimension of the first output
will count the number_of_draws.
"""
x, = repeat_tensors(x, number_of_draws=number_of_draws)
pred, sigma = self.forward(x)
if remove_graph:
pred, sigma = pred.detach(), sigma.detach()
pred, sigma = reshape_to_chunks(pred, sigma,
number_of_draws=number_of_draws)
# reduce along the draws (actually redundant for sigma)
if take_average_of_prediction:
pred, sigma = torch.mean(pred, dim=1), torch.mean(sigma, dim=1)
else:
sigma = torch.mean(sigma, dim=1)
return pred, sigma
class FNN_VD_Ber(nn.Module):
"""
A fully connected net with Variational dropout layers.
:param initial_alpha: initial value for the alpha of each VD layer
:param init_std_y: Initial standard deviation for input y.
:param h: A list specifying the number of neurons in each layer.
"""
def __init__(self, initial_alpha=0.5, init_std_y=1.0, h=[1, 50,100,50, 1]):
super().__init__()
# part before Bernoulli dropout
self.init_std_y = init_std_y
InverseSoftplus = lambda sigma: torch.log(torch.exp(sigma) - 1 )
self.std_y_par = nn.Parameter(
InverseSoftplus(torch.tensor([init_std_y])))
self.main = nn.Sequential(
nn.Linear(h[0], h[1]),
nn.LeakyReLU(1e-2),
EIVVariationalDropout(initial_alpha=initial_alpha),
nn.Linear(h[1],h[2]),
nn.LeakyReLU(1e-2),
EIVVariationalDropout(initial_alpha=initial_alpha),
nn.Linear(h[2],h[3]),
nn.LeakyReLU(1e-2),
EIVVariationalDropout(initial_alpha=initial_alpha),
nn.Linear(h[3],h[4]))
self.initial_alpha = initial_alpha
def sigma(self, y):
scalar_sigma = self.get_std_y()
return scalar_sigma.repeat(y.shape)
def get_std_y(self):
return nn.Softplus()(self.std_y_par)
def forward(self, x):
mu = self.main(x)
sigma = self.sigma(mu)
return mu, sigma
def regularizer(self, x, lamb):
"""
Regularization (prior KL term), from "Bernoulli Dropout" by Gal et al.
:param x: A torch.tensor, the input
:param lamb: float, prefactor for regularization
"""
regularization = 0
last_Dropout_layer = None
assert type(lamb) is list
lamb_bias, lamb_kl = lamb
for i, layer in enumerate(self.main):
if type(layer) is not EIVVariationalDropout and last_Dropout_layer != i-1:
for par in layer.parameters():
regularization += lamb_bias * (par**2).sum().view((-1,))
elif type(layer) is EIVVariationalDropout:
next_layer = self.main[i+1]
assert type(next_layer) is nn.Linear
regularization += lamb_bias *\
(next_layer.bias**2).sum().view((-1,))
regularization += lamb_kl *\
layer.variational_dropout_regularizer()
last_Dropout_layer = i
else:
pass
return regularization
def predict(self, x, number_of_draws=100, remove_graph=True,
take_average_of_prediction=True):
"""
Average over `number_of_draws` forward passes. If
`take_average_of_prediction` is False, the averaging is skipped and
all forward passes are returned.
**Note**: This method does not touch the Dropout.
The corresponding setting is left to the user! (analogous to
the corresponding method for FNNEIV)
:param x: A torch.tensor, the input
:param number_of_draws: Number of draws to obtain from x
:param remove_graph: If True (default) the output will
be detached to save memory
:param take_average_of_prediction: If False, no averaging will be
applied to the prediction and the second dimension of the first output
will count the number_of_draws.
"""
x, = repeat_tensors(x, number_of_draws=number_of_draws)
pred, sigma = self.forward(x)
if remove_graph:
pred, sigma = pred.detach(), sigma.detach()
pred, sigma = reshape_to_chunks(pred, sigma,
number_of_draws=number_of_draws)
# reduce along the draws (actually redundant for sigma)
if take_average_of_prediction:
pred, sigma = torch.mean(pred, dim=1), torch.mean(sigma, dim=1)
else:
sigma = torch.mean(sigma, dim=1)
return pred, sigma
import torch
from EIVTrainingRoutines.train_and_store import open_stored_training
def create_strings(template, iterator, before=(), after=()):
"""
Returns a list of strings that are created via inserting `before`
and `after` in the string returned by `iterator`
:param template: A string
:param iterator: Iterator (list, generator, range, ...)
:param before: A list
:param after: A list
"""
return_list = []
for i in iterator:
return_list.append(template % (*before, i, *after))
return return_list
class Ensemble():
"""
Takes the strings from the list saved_files and uses them to load
networks of architecture_class with **kwargs into a collection.
A list of all members is given by self.members.
:param saved_files: A list of strings
:param architecture_class: A class, inherited from nn.Module
:param device: torch.device
:param :**kwargs will be given to architecture_class upon creation of
each member
"""
def __init__(self, saved_files, architecture_class, device, **kwargs):
self.saved_files = saved_files
self.architecture_class = architecture_class
self.device = device
self.kwargs = kwargs
self.members = []
#
self.load_members()
def load_members(self):
"""
Loads nets from saved_files, sets them to eval mode and stores them in
into self.members
"""
for filename in self.saved_files:
net = self.architecture_class(**self.kwargs)
open_stored_training(saved_file=filename,
net=net,
device=self.device)
net.eval()
self.members.append(net)
def __getitem__(self, i):
return self.members[i]
def __call__(self, x):
"""
Evaluates net(x) for each net in self.members and returns the result
as a list.
"""
out = []
for net in self.members:
out.append(net(x))
return out
def mean_and_std(self, x, detach = True):
out = []
for net in self.members:
pred = net(x)
if type(pred) is list or type(pred) is tuple:
pred = pred[0]
if detach:
pred = pred.detach()
out.append(pred)
out = torch.stack(out, dim=0)
return torch.mean(out, dim=0), torch.std(out, dim=0)
def mean(self, x, detach = True):
return self.mean_and_std(x, detach=detach)[0]
def std(self, x, detach = True):
return self.mean_and_std(x, detach=detach)[1]
import torch
def repeat_tensors(*args, number_of_draws=1):
"""
For each tensor in args repeat the slice
for each batch index (the first one) `number_of_draws` times.
This is useful in combination in EIV Modelling for multiple draws.
:param *args: Include here torch.tensor elements
:param number_of_draws: An integer >= 1
"""
repeated_args = []
for arg in args:
repeated_arg = arg.repeat_interleave(number_of_draws, dim=0)
repeated_args.append(repeated_arg)
return repeated_args
def reshape_to_chunks(*args, number_of_draws=1):
"""
For each element of *args, split in chunks of number_of_draws
and stack them. Applying this to the output of repeat_tensors
this leads to a tensor, where the first dimension
counts the batch dimension, and the second counts the number_of_draws.
:param number_of_draws: An integer, the chunk size, defaults to 1
"""
reshaped_args = []
for arg in args:
arg = torch.split(arg, number_of_draws, dim=0)
reshaped_args.append(torch.stack(arg, dim=0))
return reshaped_args
from math import pi
import numpy as np
import torch
from EIVGeneral.repetition import repeat_tensors, reshape_to_chunks
def nll_reg_loss(net, x, y, reg):
"""
Returns the neg log likelihood with an additional regularization term.
*Note that `reg` will not be divided by the data size (and by 2),
this should be done beforehand.*
:param net: A torch.nn.Module.
:param x: A torch.tensor, the input.
:param y: A torch.tensor, the output.
:param reg: A non-negative float, the regularization.
"""
out, std_y = net(x)
neg_log_likelihood = torch.mean(0.5* torch.log(2*pi*std_y**2) \
+ ((out-y)**2)/(2*std_y**2))
regularization = net.regularizer(x, lamb=reg)
return neg_log_likelihood + regularization
def nll_eiv_no_jensen(net, x, y, reg, number_of_draws=5):
"""
negative log likelihood criterion for an Error in variables model (EIV)
where `torch.logsumexp` is applied to partitions of size `number_of_draws`
of `mu` and `sigma` in the batch dimension (that is the first one).
**Note**: This function is supposed to be used in combination
of `repeat_tensors` with the same argument `number_of_draws`.
*Note that `reg` will not be divided by the data size (and by 2),
this should be done beforehand.*
:param mu: predicted mu
:param sigma: predicted sigma
:param y: ground truth
:number_of_draws: Integer, supposed to be larger than 2
"""
regularization = net.regularizer(x, lamb=reg)
# repeat_tensors
x, y = repeat_tensors(x, y, number_of_draws=number_of_draws)
pred, sigma = net(x, repetition=number_of_draws)
# split into chunks of size number_of_draws along batch dimension
pred, sigma, y = reshape_to_chunks(pred, sigma,
y, number_of_draws=number_of_draws)
# apply logsumexp to chunks and average the results
nll = -1 * (torch.logsumexp(-1 * sigma.log()
-((y-pred)**2)/(2*sigma**2), dim=1)
- np.log(number_of_draws)).mean()
return nll + regularization
import numpy as np
import torch
class TrainEpoch():
"""
A simple implementation of the training during one epoch.
The training is implemented in the __call__ method that returns collected
samples of the train loss and test loss, averaged between
two "report points", and the std_x and std_y returned by `std_x_map` and
`std_y_map` (which should be **floats**).
If `verbose` is `True` (the default) these values will be printed at
"report points".
:param train_dataloader: A torch.nn.utils.DataLoader (for train data)
:param test_dataloader: A torch.nn.utils.DataLoader (for test data)
:param criterion: A map that takes net, x, y and reg as input and returns
a single-element torch.tensor
:param std_y_map: Map without arguments that returns a float.
:param std_x_map: Map without arguments that returns a float.
:param lr: The learn rate (a positive float), defaults to 1e-3
:param reg: The regularization, defaults to 1.0
:param report_point: Positive integer, distance between two report points.
Defaults to 100.
:param verbose: If True (the default) prints information while training.
:param device: Do training on this device (defaults to 'cpu')
std_x and std_y are printed at each report point.
"""
def __init__(self, train_dataloader, test_dataloader,
criterion, std_y_map, std_x_map=lambda: 0.0, lr=1e-3,
reg=1.0, report_point=100, verbose=True, device='cpu'):
self.train_dataloader = train_dataloader
self.test_dataloader = test_dataloader
self.initial_lr = lr
self.criterion = criterion
self.std_x_map = std_x_map
self.std_y_map = std_y_map
self.reg = reg
self.report_point = report_point
self.verbose = verbose
self.device = device
#
self.lr_generator = iter(self.next_lr())
self.lr = None
def next_lr(self):
while True:
yield self.initial_lr
def pre_epoch_update(self, net, epoch):
"""
Overwrite to update optimizer, the learn rate or similar in a
different manner.
*Note* This method is expected to define at least `self.optimizer`.
:param net: The net to be used for the optimizer
:param epoch: The current epoch, an integer.
"""
old_lr = self.lr
self.lr = next(self.lr_generator)
if old_lr != self.lr:
self.optimizer = torch.optim.Adam(net.parameters(), lr=self.lr)
def post_epoch_update(self, net, epoch):
"""
Overwrite for inheritance to update the net (e.g.
its deming factor for EIV) after each epoch
:param net: The current net, a torch.nn.Module.
:param epoch: The current epoch, an integer.
"""
pass
def extra_report(self, net, step):
"""
Overwrite for reporting on state of net
at each report point
"""
pass
def __call__(self, net, epoch):
"""
:param net: A torch.nn.Module
:param epoch: The current epoch, a non-negative integer.
Will only be used for formatting of the printing.
"""
self.pre_epoch_update(net, epoch)
train_loss, test_loss = [], []
stored_train_loss, stored_test_loss = [], []
stored_std_x, stored_std_y = [], []
if self.verbose:
print('>>>> Epoch %i' % (epoch,))
stored_train_loss_to_average = []
stored_test_loss_to_average = []
for i, (x,y) in enumerate(self.train_dataloader):
# optimize on train data
x, y = x.to(self.device), y.to(self.device)
loss = self.criterion(net, x, y, self.reg)
loss.backward()
self.optimizer.step()
net.zero_grad()
stored_train_loss_to_average.append(loss.detach().cpu().item())
if i % self.report_point == 0:
# evaluate on test_data
for j, (x,y) in enumerate(self.test_dataloader):
x,y = x.to(self.device), y.to(self.device)
loss = self.criterion(net, x, y,
self.reg).detach().cpu().item()
stored_test_loss_to_average.append(loss)
std_x = self.std_x_map()
std_y = self.std_y_map()
# store values
stored_train_loss.append(
np.mean(stored_train_loss_to_average))
stored_test_loss.append(
np.mean(stored_test_loss_to_average))
stored_std_x.append(std_x)
stored_std_y.append(std_y)
if i>0 and self.verbose:
print('Step %i,'\
' train_loss: %.2f,'\
' test_loss: %.2f,'\
' std_x: %.2f,'
' std_y: %.2f' % (i,
stored_train_loss[-1],
stored_test_loss[-1],
std_x,
std_y
))
self.extra_report(net, i)
stored_train_loss_to_average = []
stored_test_loss_to_average = []
self.post_epoch_update(net, epoch)
# convert to tensors
stored_train_loss = torch.tensor(stored_train_loss,
dtype=torch.float32)
stored_test_loss = torch.tensor(stored_test_loss,
dtype=torch.float32)
stored_std_x = torch.tensor(stored_std_x, dtype=torch.float32)
stored_std_y = torch.tensor(stored_std_y, dtype=torch.float32)
return stored_train_loss,\
stored_test_loss,\
stored_std_x,\
stored_std_y
def train_and_store(net, epoch_map, number_of_epochs, save_file, **kwargs):
"""
Calls `epoch_map` with `epoch` and the current epoch number
`number_of_epochs` times and stores a list of
its output as a pickled file under `save_file`.
**Note**: The output of `epoch_map` is supposed to
consist of 4 specific arguments, see below.
:param net: A torch.nn.Module
:param epoch_map: A map taking net and an integer (the epoch number) as an
input and returning 4 torch.tensors containing the evolution of
train_loss, test_loss, std_x, std_y during one epoch.
:param save_file: A string containing the path to the file to be pickled.
:param kwargs: keywords and their values will
also be store after the training. Use lists to ensure they are updated.
"""
std_x_collection = []
std_y_collection = []
train_loss_collection = []
test_loss_collection = []
std_x_collection = []
std_x_collection = []
# Saving
for epoch in range(number_of_epochs):
train_loss, test_loss, std_x, std_y = epoch_map(net, epoch)
train_loss_collection.append(train_loss)
test_loss_collection.append(test_loss)
std_x_collection.append(std_x)
std_y_collection.append(std_y)
# Saving
state_dict = net.state_dict()
to_save = {
'train_loss' : train_loss_collection,
'test_loss' : test_loss_collection,
'std_x' : std_x_collection,
'std_y' : std_y_collection,
'state_dict' : state_dict
}
for key, value in kwargs.items():
to_save[key] = value
with open(save_file, 'wb') as file_handle:
torch.save(to_save, file_handle)
def open_stored_training(saved_file, net=None, join_epochs = True,
extra_keys = None, device=torch.device('cpu')):
"""
Counterpart to `train_and_store`, opens `saved_file` and returns
:param saved_file: A pickle file generated with train_and_store
:param net: The corresponding torch.nn.Module
:param join_epochs: If True (default), all epochs will be concatenated.
:param extra_keys: None (default) or a list of strings. If the latter, is
used to extract extra entries of the stored dictionary to be returned
as a list.
:param device: The device to be used for loading the state_dict.
"""
with open(saved_file, 'rb') as file_handle:
loaded_dict = torch.load(file_handle, map_location=device)
train_loss = loaded_dict['train_loss']
test_loss = loaded_dict['test_loss']
std_x = loaded_dict['std_x']
std_y = loaded_dict['std_y']
state_dict = loaded_dict['state_dict']
if net is not None:
net.load_state_dict(state_dict)
if join_epochs:
train_loss = torch.cat(train_loss, dim=0)
test_loss = torch.cat(test_loss, dim=0)
std_x = torch.cat(std_x, dim=0)
std_y = torch.cat(std_y, dim=0)
if extra_keys is None:
return train_loss, test_loss, std_x, std_y, state_dict
else:
extra_list = [loaded_dict[key] for key in extra_keys]
return train_loss, test_loss, std_x, std_y, state_dict, extra_list
from distutils.core import setup
setup(name='eiv_article_software',
version='0.1',
author='Anonymous',
packages=[ 'EIVTrainingRoutines', 'EIVArchitectures', 'EIVGeneral'])
import pandas as pd
import numpy as np
import torch
import torch.nn
from torch.utils.data import Dataset, DataLoader
class CSVData(Dataset):
"""
A dataset to load csv ("comma seperated values") data.
To change for a different delimiter than `,` use the argument `delimiter`.
:param data_path: Path to CSV File
:param load_into_memory: Boolean. If True (default), whole dataset
will be loaded into memory.
:param y_columns: If not None should be a numpy array (or similar) and
list the columns which will be returned as second item of __getitem__
:param delimiter: Will be used for reading CSV, defaults to ','
:param header: Which row to take as header, defaults to None (=no header)
:param chunksize: This argument is only used once and only if
`load_into_memory` is false to scan through the dataset.
Defaults to 10,000.
"""
def __init__(self, data_path, load_into_memory=True,
y_columns=None, delimiter=",", header=None, chunksize=10000):
self.data_path = data_path
self.load_into_memory = load_into_memory
self.y_indices = y_columns
self.delimiter = delimiter
self.header = header
if self.load_into_memory:
# load dataset as array and determine length of data
self.array_dataset = np.array(pd.read_csv(self.data_path,
delimiter=delimiter,
header=self.header))
self.len = self.array_dataset.shape[0]
else:
# determine only self.len using chunksize
with pd.read_csv(self.data_path,
delimiter=delimiter,
header=self.header,
chunksize=chunksize) as reader:
for i, chunk in enumerate(reader):
pass
self.len = i*chunksize+chunk.shape[0]
def __getitem__(self, i):
assert i < self.len
while i<0:
i = i + self.len
if self.load_into_memory:
ith_row = self.array_dataset[i,:]
else:
ith_row = np.array(pd.read_csv(self.data_path,
delimiter=self.delimiter,
header=self.header,
skiprows=max(0,i),
nrows=1))[0,:]
if self.y_indices is None:
return torch.tensor(ith_row, dtype=torch.float32)
else:
y_index_array = np.array(self.y_indices)
total_number_of_columns = len(ith_row)
x_index_array = np.setdiff1d(
np.arange(0,total_number_of_columns),
y_index_array
)
x = torch.tensor(ith_row[x_index_array], dtype=torch.float32)
y = torch.tensor(ith_row[y_index_array], dtype=torch.float32)
return x,y
def __len__(self):
return self.len
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import torch
import numpy as np
from torch.utils.data import TensorDataset
from sklearn.datasets import load_boston
## Load data in a numpy array
# load data in bunch object
data_bunch = load_boston()
# get input data
x = data_bunch['data']
# cut out 'B' column
cut_x = np.concatenate((x[...,0:-2],x[...,-1][...,None]), axis=1)
x = cut_x
# get output data
y = data_bunch['target']
# normalize data
y_mean = np.mean(y)
y_std = np.std(y)
y = (y-y_mean)/y_std
x_mean = np.mean(x, axis=0, keepdims=True)
x_std = np.std(x, axis=0, keepdims=True)
x = (x-x_mean)/x_std
# randomly split for training and testing
length_data = y.shape[0]
test_percentage = 0.2
length_test_data = int(length_data * test_percentage)
full_indices = np.arange(0, length_data)
np.random.seed(0)
test_indices = np.random.choice(full_indices,
size=length_test_data, replace=False)
train_indices = np.setdiff1d(full_indices, test_indices)
train_x, train_y = [torch.tensor(t[train_indices,...],
dtype=torch.float32) for t in (x,y)]
test_x, test_y = [torch.tensor(t[test_indices,...],
dtype=torch.float32) for t in (x,y)]
# create datasets
train_data = TensorDataset(train_x, train_y.view((-1,1)))
test_data = TensorDataset(test_x, test_y.view((-1,1)))
import os
import torch
import numpy as np
import csv
from scipy import signal
# function to model
a=0.25
func = lambda x: (1 - (x/a)**2) * np.exp(-0.5*(x/a)**2)
# standard deviation of input and out output noise
n_train = 300
def normalize(*args,tuple_for_normalization):
"""
returns a list of normalized args,
where each arg is expected to be a tuple.
The mean and std will be taken from `tuple_for_normalization` and
will be returned as last arguments
"""
return_list = []
assert len(tuple_for_normalization) == 2
mean =[np.mean(a) for a in tuple_for_normalization]
std = [np.std(a) for a in tuple_for_normalization]
for xy in args:
x,y = xy
assert len(xy) == 2
normalized_x = (x-mean[0])/std[0]
normalized_y = (y-mean[1])/std[1]
return_list.append((normalized_x, normalized_y))
return return_list, (mean, std)
def get_data(std_x, std_y, seed = 1, n_train=n_train,
n_test=200, n_val=200, post_normalize=False):
"""
Returns Mexican hat data in 1 dimension, perturbed
by input and output noise with standard deviation `std_x` and `std_y`.
:param std_x: A positive float
:param std_y: A positive float
:param seed: The seed for random number generation, defaults to 1.
:param n_train: Number of training points, defaults to 1e4
:param n_test: Number of test points, defaults to 1e2
:param n_val: Number of validation points, defaults to 1e2
:param n_offset: Number of points for scaling purposes, defaults to 5e4.
:returns: train_data_pure, train_data,
:post_normalize: If True the data will be "normlalized"
by the mean and std of the noisy train before returned
test_data_pure, test_data, val_data_pure, val_data, func
"""
# Fix a seed
np.random.seed(seed)
def gen_data(zeta, std_x=std_x, std_y=std_y, func=func):
print(' -- Generating Mexican hat data with std_x = %.2f and std_y = %.2f --' % (std_x, std_y,))
true_y = func(zeta)
x = zeta + std_x * np.random.normal(size=zeta.shape)
y = true_y + std_y * np.random.normal(size=zeta.shape)
return (zeta, true_y), (x,y)
# Generate zeta
train_zeta = np.random.uniform(low=-1.0, high=1.0, size=n_train)
test_zeta = np.random.uniform(low=-1.0, high=1.0, size=n_test)
val_zeta = np.random.uniform(low=-1.0, high=1.0, size=n_val)
# Generate data
train_data_pure, train_data = gen_data(train_zeta)
test_data_pure, test_data = gen_data(test_zeta)
val_data_pure, val_data = gen_data(val_zeta)
if post_normalize:
data_list, (mean, std) = normalize(train_data_pure, train_data,
test_data_pure, test_data,
val_data_pure, val_data,
tuple_for_normalization=train_data)
else:
data_list = [train_data_pure, train_data,
test_data_pure, test_data,
val_data_pure, val_data]
mean=[0, 0]; std= [1, 1]
train_data_pure, train_data,\
test_data_pure,test_data,\
val_data_pure,val_data = \
[(torch.tensor(a[0], dtype=torch.float32)[:,None],
torch.tensor(a[1], dtype=torch.float32)[:,None])
for a in data_list]
def normalized_func(x):
normalized_x = (x-mean[0])/std[0]
y = func(x)
normalized_y = (y-mean[1])/std[1]
return normalized_x, normalized_y
return train_data_pure, train_data,\
test_data_pure,test_data,\
val_data_pure,val_data,(normalized_func, mean, std)
import numpy as np
import torch
from get_multinomial_function import draw_polynomial
def get_data(std_x, std_y, dim=10, degree=5,
seed = 1, n_train=10000,
n_test=200, n_val=200, n_offset=50000):
"""
Returns modulated multinomial data in dimension `dim`, perturbed
by input and output noise with standard deviation `std_x` and `std_y`.
:param std_x: A positive float
:param std_y: A positive float
:param dim: The input dimension, default to 10.
:param degree: The degree of the multinomial, defaults to 5.
:param seed: The seed for random number generation, defaults to 1.
:param n_train: Number of training points, defaults to 1e4
:param n_test: Number of test points, defaults to 1e2
:param n_val: Number of validation points, defaults to 1e2
:param n_offset: Number of points for scaling purposes, defaults to 5e4.
:returns: train_data_pure, train_data,
test_data_pure, test_data, val_data_pure, val_data, func
"""
# draw the polynomial to use
pol = draw_polynomial(dim=dim, degree=degree, number_of_terms=dim*2, seed=seed)
# modulated polynomial, will be scaled below by offset
def unscaled_func(x, freq=5):
return pol(x) * torch.exp(-torch.sin(freq*torch.sum(x**2, dim=1)))
# compute offset to scale function
np.random.seed(seed)
offset_input = torch.tensor(np.random.uniform(low=-1.0,
high=1.0, size=(n_offset, dim)), dtype=torch.float32)
offset_values = unscaled_func(offset_input)
offset_mean = torch.mean(offset_values).item()
offset_std = torch.std(offset_values).item()
# actual function that will be used.
def func(x):
return (unscaled_func(x)-offset_mean)/offset_std
# map to generate arrays using func
def gen_data(zeta, std_x, std_y):
true_y = func(zeta)[...,None]
x = zeta + std_x * torch.randn_like(zeta)
y = true_y + std_y * torch.randn_like(true_y)
return (zeta, true_y), (x,y)
# Generate zeta
np.random.seed(seed)
train_zeta = torch.tensor(np.random.uniform(low=-1.0,
high=1.0, size=(n_train, dim)), dtype=torch.float32)
test_zeta = torch.tensor(np.random.uniform(low=-1.0,
high=1.0, size=(n_test, dim)), dtype=torch.float32)
val_zeta = torch.tensor(np.random.uniform(low=-1.0,
high=1.0, size=(n_val, dim)), dtype=torch.float32)
# Generate data
train_data_pure, train_data = gen_data(zeta=train_zeta, std_x=std_x, std_y=std_y)
test_data_pure, test_data = gen_data(zeta=test_zeta, std_x=std_x, std_y=std_y)
val_data_pure, val_data = gen_data(val_zeta, std_x=std_x, std_y=std_y)
return train_data_pure, train_data,\
test_data_pure, test_data,\
val_data_pure, val_data, func
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment