Skip to content
Snippets Groups Projects
Verified Commit e92c9bb7 authored by Björn Ludwig's avatar Björn Ludwig
Browse files

feat(dataset): turn dataset provider into class and fix normalization

parent 30a5cf99
No related branches found
No related tags found
No related merge requests found
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
__all__ = [ __all__ = [
"ExtractionDataType", "ExtractionDataType",
"provide_zema_samples",
"LOCAL_ZEMA_DATASET_PATH", "LOCAL_ZEMA_DATASET_PATH",
"ZeMASamples",
"ZEMA_DATASET_HASH", "ZEMA_DATASET_HASH",
"ZEMA_DATASET_URL", "ZEMA_DATASET_URL",
"ZEMA_QUANTITIES", "ZEMA_QUANTITIES",
...@@ -12,6 +12,7 @@ __all__ = [ ...@@ -12,6 +12,7 @@ __all__ = [
import operator import operator
import os import os
import pickle import pickle
from dataclasses import dataclass
from enum import Enum from enum import Enum
from functools import reduce from functools import reduce
from os.path import dirname, exists from os.path import dirname, exists
...@@ -22,9 +23,8 @@ import h5py ...@@ -22,9 +23,8 @@ import h5py
import numpy as np import numpy as np
from h5py import Dataset from h5py import Dataset
from numpy._typing import NDArray from numpy._typing import NDArray
from pooch import retrieve
from zema_emc_annotated.data_types import UncertainArray from zema_emc_annotated.data_types import RealMatrix, RealVector, UncertainArray
LOCAL_ZEMA_DATASET_PATH = Path(dirname(__file__), "datasets") LOCAL_ZEMA_DATASET_PATH = Path(dirname(__file__), "datasets")
ZEMA_DATASET_HASH = ( ZEMA_DATASET_HASH = (
...@@ -47,19 +47,18 @@ class ExtractionDataType(Enum): ...@@ -47,19 +47,18 @@ class ExtractionDataType(Enum):
Attributes Attributes
---------- ----------
UNCERTAINTIES : str
with value ``qudt:standardUncertainty``
VALUES : str VALUES : str
with value ``qudt:value`` with value ``qudt:value``
UNCERTAINTIES : str
with value ``qudt:standardUncertainty``
""" """
UNCERTAINTIES = "qudt:standardUncertainty"
VALUES = "qudt:value" VALUES = "qudt:value"
UNCERTAINTIES = "qudt:standardUncertainty"
def provide_zema_samples( @dataclass
n_samples: int = 1, size_scaler: int = 1, normalize: bool = False class ZeMASamples:
) -> UncertainArray:
"""Extracts requested number of samples of values with associated uncertainties """Extracts requested number of samples of values with associated uncertainties
The underlying dataset is the annotated "Sensor data set of one electromechanical The underlying dataset is the annotated "Sensor data set of one electromechanical
...@@ -68,101 +67,184 @@ def provide_zema_samples( ...@@ -68,101 +67,184 @@ def provide_zema_samples(
Parameters Parameters
---------- ----------
n_samples : int, optional n_samples : int, optional
number of samples each containing size_scaler readings from each of the eleven number of samples each containing size_scaler readings from each of the
sensors with associated uncertainties, defaults to 1 eleven sensors with associated uncertainties, defaults to 1
size_scaler : int, optional size_scaler : int, optional
number of sensor readings from each of the individual sensors per sample, number of sensor readings from each of the individual sensors per sample,
defaults to 1 defaults to 1
normalize : bool, optional normalize : bool, optional
if ``True``, then data is centered around zero and scaled to unit std, if ``True``, then data is centered around zero and scaled to unit std,
defaults to False defaults to False
Returns
------- Attributes
UncertainArray ----------
The collection of samples of values with associated uncertainties, will be of uncertain_values : UncertainArray
shape (n_samples, 11 x size_scaler) The collection of samples of values with associated uncertainties,
will be of shape (n_samples, 11 x size_scaler)
""" """
def _normalize_if_requested(data: Dataset) -> NDArray[np.double]: uncertain_values: UncertainArray
_potentially_normalized_data = data[np.s_[1 : size_scaler + 1, :n_samples]]
if normalize: def __init__(
_potentially_normalized_data -= np.mean(data[:, :n_samples], axis=0) self, n_samples: int = 1, size_scaler: int = 1, normalize: bool = False
_potentially_normalized_data /= np.std(data[:, :n_samples], axis=0) ):
return _potentially_normalized_data.transpose()
self.normalize = normalize
def _append_to_extraction( self.n_samples = n_samples
append_to: NDArray[np.double], appendix: NDArray[np.double] self.size_scaler = size_scaler
) -> NDArray[np.double]: # if cached_data := _check_and_load_cache(n_samples, size_scaler):
return np.append(append_to, appendix, axis=1) # return cached_data
dataset_full_path = (
if cached_data := _check_and_load_cache(n_samples): "/home/bjorn/code/zema_emc_annotated/src/zema_emc_annotated/"
return cached_data "datasets/394da54b1fc044fc498d60367c4e292d-axis11_2kHz_ZeMA_PTB_SI.h5"
dataset_full_path = retrieve( )
url=ZEMA_DATASET_URL, # retrieve(
known_hash=ZEMA_DATASET_HASH, # url=ZEMA_DATASET_URL,
path=LOCAL_ZEMA_DATASET_PATH, # known_hash=ZEMA_DATASET_HASH,
progressbar=True, # path=LOCAL_ZEMA_DATASET_PATH,
) # progressbar=True,
assert exists(dataset_full_path) # )
uncertainties = np.empty((n_samples, 0)) assert exists(dataset_full_path)
values = np.empty((n_samples, 0)) self._uncertainties = np.empty((n_samples, 0))
relevant_datasets = ( self._values = np.empty((n_samples, 0))
["ZeMA_DAQ", quantity, datatype.value] relevant_datasets = (
for quantity in ZEMA_QUANTITIES ["ZeMA_DAQ", quantity, datatype.value]
for datatype in ExtractionDataType for quantity in ZEMA_QUANTITIES
) for datatype in ExtractionDataType
with h5py.File(dataset_full_path, "r") as h5f: )
for dataset_descriptor in relevant_datasets: self._treating_uncertainties: bool = False
dataset = cast(Dataset, reduce(operator.getitem, dataset_descriptor, h5f)) self._treating_values: bool = False
if ExtractionDataType.UNCERTAINTIES.value in dataset.name: self._normalization_divisors: dict[str, NDArray[np.double] | float] = {}
extracted_data = uncertainties with h5py.File(dataset_full_path, "r") as h5f:
print(f" Extract uncertainties from {dataset.name}") for dataset_descriptor in relevant_datasets:
elif ExtractionDataType.VALUES.value in dataset.name: self._current_dataset: Dataset = cast(
extracted_data = values Dataset, reduce(operator.getitem, dataset_descriptor, h5f)
print(f" Extract values from {dataset.name}")
else:
raise RuntimeError(
"Somehow there is unexpected data in the dataset to be processed. "
f"Did not expect to find {dataset.name}"
) )
if dataset.shape[0] == 3: if ExtractionDataType.VALUES.value in self._current_dataset.name:
for sensor in dataset: self._treating_values = True
extracted_data = _append_to_extraction( print(f" Extract values from {self._current_dataset.name}")
extracted_data, _normalize_if_requested(sensor) elif (
ExtractionDataType.UNCERTAINTIES.value in self._current_dataset.name
):
self._treating_values = False
print(
f" Extract uncertainties from {self._current_dataset.name}"
)
else:
raise RuntimeError(
"Somehow there is unexpected data in the dataset to be"
f"processed. Did not expect to find "
f"{self._current_dataset.name}"
)
if self._current_dataset.shape[0] == 3:
for idx, sensor in enumerate(self._current_dataset):
self._normalize_if_requested_and_append(
sensor, self._extract_sub_dataset_name(idx)
)
else:
self._normalize_if_requested_and_append(
self._current_dataset,
self._strip_data_type_from_dataset_descriptor(),
) )
else: if self._treating_values:
extracted_data = _append_to_extraction( print(" Values extracted")
extracted_data, _normalize_if_requested(dataset) else:
print(" Uncertainties extracted")
self._store_cache(
uncertain_values := UncertainArray(self._values, self._uncertainties)
)
self.uncertain_values = uncertain_values
def _normalize_if_requested_and_append(
self, data: Dataset, dataset_descriptor: str
) -> None:
"""Normalize the provided data and append according to current state"""
_potentially_normalized_data = data[
np.s_[1 : self.size_scaler + 1, : self.n_samples]
]
if self._treating_values:
if self.normalize:
_potentially_normalized_data -= np.mean(
data[:, : self.n_samples], axis=0
) )
if ExtractionDataType.UNCERTAINTIES.value in dataset.name: data_std = np.std(data[:, : self.n_samples], axis=0)
uncertainties = extracted_data data_std[data_std == 0] = 1.0
print(" Uncertainties extracted") self._normalization_divisors[dataset_descriptor] = data_std
elif ExtractionDataType.VALUES.value in dataset.name: _potentially_normalized_data /= self._normalization_divisors[
values = extracted_data dataset_descriptor
print(" Values extracted") ]
uncertain_values = UncertainArray(np.array(values), np.array(uncertainties)) self._values = np.append(
_store_cache(uncertain_values) self._values, _potentially_normalized_data.transpose(), axis=1
return uncertain_values )
else:
if self.normalize:
def _check_and_load_cache(n_samples: int) -> UncertainArray | None: _potentially_normalized_data /= self._normalization_divisors[
"""Checks if corresponding file for n_samples exists and loads it with pickle""" dataset_descriptor
if os.path.exists(cache_path := _cache_path(n_samples)): ]
with open(cache_path, "rb") as cache_file: self._uncertainties = np.append(
return cast(UncertainArray, pickle.load(cache_file)) self._uncertainties, _potentially_normalized_data.transpose(), axis=1
return None )
def _extract_sub_dataset_name(self, idx: int) -> str:
def _cache_path(n_samples: int) -> Path: return str(
"""Local file system path for a cache file containing n ZeMA samples self._strip_data_type_from_dataset_descriptor()
+ self._current_dataset.attrs["si:label"]
The result does not guarantee, that the file at the specified location exists, .split(",")[idx]
but can be used to check for existence or creation. .strip("[")
""" .strip("]")
return LOCAL_ZEMA_DATASET_PATH.joinpath(f"{str(n_samples)}_samples.pickle") .replace(" ", "")
.replace('"', "")
.replace("uncertainty", "")
def _store_cache(uncertain_values: UncertainArray) -> None: ).replace("\n", "")
"""Dumps provided uncertain tenor to corresponding pickle file"""
with open(_cache_path(len(uncertain_values.values)), "wb") as cache_file: def _strip_data_type_from_dataset_descriptor(self) -> str:
pickle.dump(uncertain_values, cache_file) return str(
self._current_dataset.name.replace(
ExtractionDataType.UNCERTAINTIES.value, ""
).replace(ExtractionDataType.VALUES.value, "")
)
@property
def values(self) -> RealVector:
"""The values of the stored :class:`UncertainArray` object"""
return self.uncertain_values.values
@property
def uncertainties(self) -> RealMatrix | RealVector:
"""The uncertainties of the stored :class:`UncertainArray` object"""
return self.uncertain_values.uncertainties
@staticmethod
def _check_and_load_cache(
n_samples: int, size_scaler: int
) -> UncertainArray | None:
"""Checks if corresponding file for n_samples exists and loads it with pickle"""
if os.path.exists(
cache_path := ZeMASamples._cache_path(n_samples, size_scaler)
):
with open(cache_path, "rb") as cache_file:
return cast(UncertainArray, pickle.load(cache_file))
return None
@staticmethod
def _cache_path(n_samples: int, size_scaler: int) -> Path:
"""Local file system path for a cache file containing n ZeMA samples
The result does not guarantee, that the file at the specified location exists,
but can be used to check for existence or creation.
"""
return LOCAL_ZEMA_DATASET_PATH.joinpath(
f"{str(n_samples)}_samples_with_{str(size_scaler)}_values_per_sensor.pickle"
)
@staticmethod
def _store_cache(uncertain_values: UncertainArray) -> None:
"""Dumps provided uncertain tenor to corresponding pickle file"""
with open(
ZeMASamples._cache_path(
uncertain_values.values.shape[0],
int(uncertain_values.values.shape[1] / 11),
),
"wb",
) as cache_file:
pickle.dump(uncertain_values, cache_file)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment