diff --git a/src/zema_emc_annotated/dataset.py b/src/zema_emc_annotated/dataset.py index a43e1583b585ca69b299d2ae417e3007b8f817fd..af8f6995e4ac76f03950740140569aa644905224 100644 --- a/src/zema_emc_annotated/dataset.py +++ b/src/zema_emc_annotated/dataset.py @@ -12,7 +12,6 @@ __all__ = [ import operator import os import pickle -from dataclasses import dataclass from enum import Enum from functools import reduce from os.path import dirname, exists @@ -23,6 +22,7 @@ import h5py import numpy as np from h5py import Dataset from numpy._typing import NDArray +from pooch import retrieve from zema_emc_annotated.data_types import RealMatrix, RealVector, UncertainArray @@ -57,7 +57,6 @@ class ExtractionDataType(Enum): UNCERTAINTIES = "qudt:standardUncertainty" -@dataclass class ZeMASamples: """Extracts requested number of samples of values with associated uncertainties @@ -90,32 +89,31 @@ class ZeMASamples: def __init__( self, n_samples: int = 1, size_scaler: int = 1, normalize: bool = False ): - - self.normalize = normalize self.n_samples = n_samples self.size_scaler = size_scaler - # if cached_data := _check_and_load_cache(n_samples, size_scaler): - # return cached_data - dataset_full_path = ( - "/home/bjorn/code/zema_emc_annotated/src/zema_emc_annotated/" - "datasets/394da54b1fc044fc498d60367c4e292d-axis11_2kHz_ZeMA_PTB_SI.h5" + if cached_data := self._check_and_load_cache(normalize): + self.uncertain_values = cached_data + else: + self._uncertainties = np.empty((n_samples, 0)) + self._values = np.empty((n_samples, 0)) + self.uncertain_values = self._extract_data(normalize) + self._store_cache(normalize) + del self._uncertainties + del self._values + + def _extract_data(self, normalize: bool) -> UncertainArray: + dataset_full_path = retrieve( + url=ZEMA_DATASET_URL, + known_hash=ZEMA_DATASET_HASH, + path=LOCAL_ZEMA_DATASET_PATH, + progressbar=True, ) - # retrieve( - # url=ZEMA_DATASET_URL, - # known_hash=ZEMA_DATASET_HASH, - # path=LOCAL_ZEMA_DATASET_PATH, - # progressbar=True, - # ) assert exists(dataset_full_path) - self._uncertainties = np.empty((n_samples, 0)) - self._values = np.empty((n_samples, 0)) relevant_datasets = ( ["ZeMA_DAQ", quantity, datatype.value] for quantity in ZEMA_QUANTITIES for datatype in ExtractionDataType ) - self._treating_uncertainties: bool = False - self._treating_values: bool = False self._normalization_divisors: dict[str, NDArray[np.double] | float] = {} with h5py.File(dataset_full_path, "r") as h5f: for dataset_descriptor in relevant_datasets: @@ -123,14 +121,15 @@ class ZeMASamples: Dataset, reduce(operator.getitem, dataset_descriptor, h5f) ) if ExtractionDataType.VALUES.value in self._current_dataset.name: - self._treating_values = True + treating_values = True print(f" Extract values from {self._current_dataset.name}") elif ( ExtractionDataType.UNCERTAINTIES.value in self._current_dataset.name ): - self._treating_values = False + treating_values = False print( - f" Extract uncertainties from {self._current_dataset.name}" + f" Extract uncertainties from " + f"{self._current_dataset.name}" ) else: raise RuntimeError( @@ -140,52 +139,74 @@ class ZeMASamples: ) if self._current_dataset.shape[0] == 3: for idx, sensor in enumerate(self._current_dataset): - self._normalize_if_requested_and_append( - sensor, self._extract_sub_dataset_name(idx) - ) + if treating_values: + self._normalize_values_if_requested_and_append( + sensor, + self._extract_sub_dataset_name(idx), + normalize, + ) + else: + self._normalize_uncertainties_if_requested_and_append( + sensor, + self._extract_sub_dataset_name(idx), + normalize, + ) else: - self._normalize_if_requested_and_append( - self._current_dataset, - self._strip_data_type_from_dataset_descriptor(), - ) - if self._treating_values: + if treating_values: + self._normalize_values_if_requested_and_append( + self._current_dataset, + self._strip_data_type_from_dataset_descriptor(), + normalize, + ) + else: + self._normalize_uncertainties_if_requested_and_append( + self._current_dataset, + self._strip_data_type_from_dataset_descriptor(), + normalize, + ) + if treating_values: print(" Values extracted") else: print(" Uncertainties extracted") - self._store_cache( - uncertain_values := UncertainArray(self._values, self._uncertainties) - ) - self.uncertain_values = uncertain_values + return UncertainArray(self._values, self._uncertainties) - def _normalize_if_requested_and_append( - self, data: Dataset, dataset_descriptor: str + def _normalize_values_if_requested_and_append( + self, values: Dataset, dataset_descriptor: str, normalize: bool ) -> None: - """Normalize the provided data and append according to current state""" - _potentially_normalized_data = data[ - np.s_[1 : self.size_scaler + 1, : self.n_samples] + """Normalize the provided values and append according to current state""" + _potentially_normalized_values = values[ + np.s_[: self.size_scaler, : self.n_samples] ] - if self._treating_values: - if self.normalize: - _potentially_normalized_data -= np.mean( - data[:, : self.n_samples], axis=0 - ) - data_std = np.std(data[:, : self.n_samples], axis=0) - data_std[data_std == 0] = 1.0 - self._normalization_divisors[dataset_descriptor] = data_std - _potentially_normalized_data /= self._normalization_divisors[ - dataset_descriptor - ] - self._values = np.append( - self._values, _potentially_normalized_data.transpose(), axis=1 - ) - else: - if self.normalize: - _potentially_normalized_data /= self._normalization_divisors[ - dataset_descriptor - ] - self._uncertainties = np.append( - self._uncertainties, _potentially_normalized_data.transpose(), axis=1 + if normalize: + _potentially_normalized_values -= np.mean( + values[:, : self.n_samples], axis=0 ) + data_std = np.std(values[:, : self.n_samples], axis=0) + data_std[data_std == 0] = 1.0 + self._normalization_divisors[dataset_descriptor] = data_std + _potentially_normalized_values /= self._normalization_divisors[ + dataset_descriptor + ] + self._values = np.append( + self._values, _potentially_normalized_values.transpose(), axis=1 + ) + + def _normalize_uncertainties_if_requested_and_append( + self, uncertainties: Dataset, dataset_descriptor: str, normalize: bool + ) -> None: + """Normalize the provided uncertainties and append according to current state""" + _potentially_normalized_uncertainties = uncertainties[ + np.s_[: self.size_scaler, : self.n_samples] + ] + if normalize: + _potentially_normalized_uncertainties /= self._normalization_divisors[ + dataset_descriptor + ] + self._uncertainties = np.append( + self._uncertainties, + _potentially_normalized_uncertainties.transpose(), + axis=1, + ) def _extract_sub_dataset_name(self, idx: int) -> str: return str( @@ -216,37 +237,26 @@ class ZeMASamples: """The uncertainties of the stored :class:`UncertainArray` object""" return self.uncertain_values.uncertainties - @staticmethod - def _check_and_load_cache( - n_samples: int, size_scaler: int - ) -> UncertainArray | None: + def _check_and_load_cache(self, normalize: bool) -> UncertainArray | None: """Checks if corresponding file for n_samples exists and loads it with pickle""" - if os.path.exists( - cache_path := ZeMASamples._cache_path(n_samples, size_scaler) - ): + if os.path.exists(cache_path := self._cache_path(normalize)): with open(cache_path, "rb") as cache_file: return cast(UncertainArray, pickle.load(cache_file)) return None - @staticmethod - def _cache_path(n_samples: int, size_scaler: int) -> Path: + def _cache_path(self, normalize: bool) -> Path: """Local file system path for a cache file containing n ZeMA samples The result does not guarantee, that the file at the specified location exists, but can be used to check for existence or creation. """ return LOCAL_ZEMA_DATASET_PATH.joinpath( - f"{str(n_samples)}_samples_with_{str(size_scaler)}_values_per_sensor.pickle" + f"{str(self.n_samples)}_samples_with" + f"_{str(self.size_scaler)}_values_per_sensor" + f"{'_normalized' if normalize else ''}.pickle" ) - @staticmethod - def _store_cache(uncertain_values: UncertainArray) -> None: - """Dumps provided uncertain tenor to corresponding pickle file""" - with open( - ZeMASamples._cache_path( - uncertain_values.values.shape[0], - int(uncertain_values.values.shape[1] / 11), - ), - "wb", - ) as cache_file: - pickle.dump(uncertain_values, cache_file) + def _store_cache(self, normalize: bool) -> None: + """Dumps provided uncertain tensor to corresponding pickle file""" + with open(self._cache_path(normalize), "wb") as cache_file: + pickle.dump(self.uncertain_values, cache_file)