Skip to content
Snippets Groups Projects
Verified Commit 0f7345fe authored by Björn Ludwig's avatar Björn Ludwig
Browse files

refactor(dataset): meet pylint, mypy and flake8 quality criteria again

parent 925bce66
Branches
No related tags found
No related merge requests found
...@@ -12,7 +12,6 @@ __all__ = [ ...@@ -12,7 +12,6 @@ __all__ = [
import operator import operator
import os import os
import pickle import pickle
from dataclasses import dataclass
from enum import Enum from enum import Enum
from functools import reduce from functools import reduce
from os.path import dirname, exists from os.path import dirname, exists
...@@ -23,6 +22,7 @@ import h5py ...@@ -23,6 +22,7 @@ import h5py
import numpy as np import numpy as np
from h5py import Dataset from h5py import Dataset
from numpy._typing import NDArray from numpy._typing import NDArray
from pooch import retrieve
from zema_emc_annotated.data_types import RealMatrix, RealVector, UncertainArray from zema_emc_annotated.data_types import RealMatrix, RealVector, UncertainArray
...@@ -57,7 +57,6 @@ class ExtractionDataType(Enum): ...@@ -57,7 +57,6 @@ class ExtractionDataType(Enum):
UNCERTAINTIES = "qudt:standardUncertainty" UNCERTAINTIES = "qudt:standardUncertainty"
@dataclass
class ZeMASamples: class ZeMASamples:
"""Extracts requested number of samples of values with associated uncertainties """Extracts requested number of samples of values with associated uncertainties
...@@ -90,32 +89,31 @@ class ZeMASamples: ...@@ -90,32 +89,31 @@ class ZeMASamples:
def __init__( def __init__(
self, n_samples: int = 1, size_scaler: int = 1, normalize: bool = False self, n_samples: int = 1, size_scaler: int = 1, normalize: bool = False
): ):
self.normalize = normalize
self.n_samples = n_samples self.n_samples = n_samples
self.size_scaler = size_scaler self.size_scaler = size_scaler
# if cached_data := _check_and_load_cache(n_samples, size_scaler): if cached_data := self._check_and_load_cache(normalize):
# return cached_data self.uncertain_values = cached_data
dataset_full_path = ( else:
"/home/bjorn/code/zema_emc_annotated/src/zema_emc_annotated/" self._uncertainties = np.empty((n_samples, 0))
"datasets/394da54b1fc044fc498d60367c4e292d-axis11_2kHz_ZeMA_PTB_SI.h5" self._values = np.empty((n_samples, 0))
self.uncertain_values = self._extract_data(normalize)
self._store_cache(normalize)
del self._uncertainties
del self._values
def _extract_data(self, normalize: bool) -> UncertainArray:
dataset_full_path = retrieve(
url=ZEMA_DATASET_URL,
known_hash=ZEMA_DATASET_HASH,
path=LOCAL_ZEMA_DATASET_PATH,
progressbar=True,
) )
# retrieve(
# url=ZEMA_DATASET_URL,
# known_hash=ZEMA_DATASET_HASH,
# path=LOCAL_ZEMA_DATASET_PATH,
# progressbar=True,
# )
assert exists(dataset_full_path) assert exists(dataset_full_path)
self._uncertainties = np.empty((n_samples, 0))
self._values = np.empty((n_samples, 0))
relevant_datasets = ( relevant_datasets = (
["ZeMA_DAQ", quantity, datatype.value] ["ZeMA_DAQ", quantity, datatype.value]
for quantity in ZEMA_QUANTITIES for quantity in ZEMA_QUANTITIES
for datatype in ExtractionDataType for datatype in ExtractionDataType
) )
self._treating_uncertainties: bool = False
self._treating_values: bool = False
self._normalization_divisors: dict[str, NDArray[np.double] | float] = {} self._normalization_divisors: dict[str, NDArray[np.double] | float] = {}
with h5py.File(dataset_full_path, "r") as h5f: with h5py.File(dataset_full_path, "r") as h5f:
for dataset_descriptor in relevant_datasets: for dataset_descriptor in relevant_datasets:
...@@ -123,14 +121,15 @@ class ZeMASamples: ...@@ -123,14 +121,15 @@ class ZeMASamples:
Dataset, reduce(operator.getitem, dataset_descriptor, h5f) Dataset, reduce(operator.getitem, dataset_descriptor, h5f)
) )
if ExtractionDataType.VALUES.value in self._current_dataset.name: if ExtractionDataType.VALUES.value in self._current_dataset.name:
self._treating_values = True treating_values = True
print(f" Extract values from {self._current_dataset.name}") print(f" Extract values from {self._current_dataset.name}")
elif ( elif (
ExtractionDataType.UNCERTAINTIES.value in self._current_dataset.name ExtractionDataType.UNCERTAINTIES.value in self._current_dataset.name
): ):
self._treating_values = False treating_values = False
print( print(
f" Extract uncertainties from {self._current_dataset.name}" f" Extract uncertainties from "
f"{self._current_dataset.name}"
) )
else: else:
raise RuntimeError( raise RuntimeError(
...@@ -140,52 +139,74 @@ class ZeMASamples: ...@@ -140,52 +139,74 @@ class ZeMASamples:
) )
if self._current_dataset.shape[0] == 3: if self._current_dataset.shape[0] == 3:
for idx, sensor in enumerate(self._current_dataset): for idx, sensor in enumerate(self._current_dataset):
self._normalize_if_requested_and_append( if treating_values:
sensor, self._extract_sub_dataset_name(idx) self._normalize_values_if_requested_and_append(
) sensor,
self._extract_sub_dataset_name(idx),
normalize,
)
else:
self._normalize_uncertainties_if_requested_and_append(
sensor,
self._extract_sub_dataset_name(idx),
normalize,
)
else: else:
self._normalize_if_requested_and_append( if treating_values:
self._current_dataset, self._normalize_values_if_requested_and_append(
self._strip_data_type_from_dataset_descriptor(), self._current_dataset,
) self._strip_data_type_from_dataset_descriptor(),
if self._treating_values: normalize,
)
else:
self._normalize_uncertainties_if_requested_and_append(
self._current_dataset,
self._strip_data_type_from_dataset_descriptor(),
normalize,
)
if treating_values:
print(" Values extracted") print(" Values extracted")
else: else:
print(" Uncertainties extracted") print(" Uncertainties extracted")
self._store_cache( return UncertainArray(self._values, self._uncertainties)
uncertain_values := UncertainArray(self._values, self._uncertainties)
)
self.uncertain_values = uncertain_values
def _normalize_if_requested_and_append( def _normalize_values_if_requested_and_append(
self, data: Dataset, dataset_descriptor: str self, values: Dataset, dataset_descriptor: str, normalize: bool
) -> None: ) -> None:
"""Normalize the provided data and append according to current state""" """Normalize the provided values and append according to current state"""
_potentially_normalized_data = data[ _potentially_normalized_values = values[
np.s_[1 : self.size_scaler + 1, : self.n_samples] np.s_[: self.size_scaler, : self.n_samples]
] ]
if self._treating_values: if normalize:
if self.normalize: _potentially_normalized_values -= np.mean(
_potentially_normalized_data -= np.mean( values[:, : self.n_samples], axis=0
data[:, : self.n_samples], axis=0
)
data_std = np.std(data[:, : self.n_samples], axis=0)
data_std[data_std == 0] = 1.0
self._normalization_divisors[dataset_descriptor] = data_std
_potentially_normalized_data /= self._normalization_divisors[
dataset_descriptor
]
self._values = np.append(
self._values, _potentially_normalized_data.transpose(), axis=1
)
else:
if self.normalize:
_potentially_normalized_data /= self._normalization_divisors[
dataset_descriptor
]
self._uncertainties = np.append(
self._uncertainties, _potentially_normalized_data.transpose(), axis=1
) )
data_std = np.std(values[:, : self.n_samples], axis=0)
data_std[data_std == 0] = 1.0
self._normalization_divisors[dataset_descriptor] = data_std
_potentially_normalized_values /= self._normalization_divisors[
dataset_descriptor
]
self._values = np.append(
self._values, _potentially_normalized_values.transpose(), axis=1
)
def _normalize_uncertainties_if_requested_and_append(
self, uncertainties: Dataset, dataset_descriptor: str, normalize: bool
) -> None:
"""Normalize the provided uncertainties and append according to current state"""
_potentially_normalized_uncertainties = uncertainties[
np.s_[: self.size_scaler, : self.n_samples]
]
if normalize:
_potentially_normalized_uncertainties /= self._normalization_divisors[
dataset_descriptor
]
self._uncertainties = np.append(
self._uncertainties,
_potentially_normalized_uncertainties.transpose(),
axis=1,
)
def _extract_sub_dataset_name(self, idx: int) -> str: def _extract_sub_dataset_name(self, idx: int) -> str:
return str( return str(
...@@ -216,37 +237,26 @@ class ZeMASamples: ...@@ -216,37 +237,26 @@ class ZeMASamples:
"""The uncertainties of the stored :class:`UncertainArray` object""" """The uncertainties of the stored :class:`UncertainArray` object"""
return self.uncertain_values.uncertainties return self.uncertain_values.uncertainties
@staticmethod def _check_and_load_cache(self, normalize: bool) -> UncertainArray | None:
def _check_and_load_cache(
n_samples: int, size_scaler: int
) -> UncertainArray | None:
"""Checks if corresponding file for n_samples exists and loads it with pickle""" """Checks if corresponding file for n_samples exists and loads it with pickle"""
if os.path.exists( if os.path.exists(cache_path := self._cache_path(normalize)):
cache_path := ZeMASamples._cache_path(n_samples, size_scaler)
):
with open(cache_path, "rb") as cache_file: with open(cache_path, "rb") as cache_file:
return cast(UncertainArray, pickle.load(cache_file)) return cast(UncertainArray, pickle.load(cache_file))
return None return None
@staticmethod def _cache_path(self, normalize: bool) -> Path:
def _cache_path(n_samples: int, size_scaler: int) -> Path:
"""Local file system path for a cache file containing n ZeMA samples """Local file system path for a cache file containing n ZeMA samples
The result does not guarantee, that the file at the specified location exists, The result does not guarantee, that the file at the specified location exists,
but can be used to check for existence or creation. but can be used to check for existence or creation.
""" """
return LOCAL_ZEMA_DATASET_PATH.joinpath( return LOCAL_ZEMA_DATASET_PATH.joinpath(
f"{str(n_samples)}_samples_with_{str(size_scaler)}_values_per_sensor.pickle" f"{str(self.n_samples)}_samples_with"
f"_{str(self.size_scaler)}_values_per_sensor"
f"{'_normalized' if normalize else ''}.pickle"
) )
@staticmethod def _store_cache(self, normalize: bool) -> None:
def _store_cache(uncertain_values: UncertainArray) -> None: """Dumps provided uncertain tensor to corresponding pickle file"""
"""Dumps provided uncertain tenor to corresponding pickle file""" with open(self._cache_path(normalize), "wb") as cache_file:
with open( pickle.dump(self.uncertain_values, cache_file)
ZeMASamples._cache_path(
uncertain_values.values.shape[0],
int(uncertain_values.values.shape[1] / 11),
),
"wb",
) as cache_file:
pickle.dump(uncertain_values, cache_file)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment