Skip to content
Snippets Groups Projects
Verified Commit 30a5cf99 authored by Björn Ludwig's avatar Björn Ludwig
Browse files

feat(dataset): introduce scaler parameter to retrieve several datapoints from each cycle at once

parent 50e8eef9
No related branches found
No related tags found
No related merge requests found
...@@ -6,20 +6,21 @@ __all__ = [ ...@@ -6,20 +6,21 @@ __all__ = [
"LOCAL_ZEMA_DATASET_PATH", "LOCAL_ZEMA_DATASET_PATH",
"ZEMA_DATASET_HASH", "ZEMA_DATASET_HASH",
"ZEMA_DATASET_URL", "ZEMA_DATASET_URL",
"ZEMA_DATATYPES",
"ZEMA_QUANTITIES", "ZEMA_QUANTITIES",
] ]
import operator
import os import os
import pickle import pickle
from enum import Enum from enum import Enum
from functools import reduce
from os.path import dirname, exists from os.path import dirname, exists
from pathlib import Path from pathlib import Path
from typing import cast from typing import cast
import h5py import h5py
import numpy as np import numpy as np
from h5py import Dataset, File, Group from h5py import Dataset
from numpy._typing import NDArray from numpy._typing import NDArray
from pooch import retrieve from pooch import retrieve
...@@ -30,7 +31,6 @@ ZEMA_DATASET_HASH = ( ...@@ -30,7 +31,6 @@ ZEMA_DATASET_HASH = (
"sha256:fb0e80de4e8928ae8b859ad9668a1b6ea6310028a6690bb8d4c1abee31cb8833" "sha256:fb0e80de4e8928ae8b859ad9668a1b6ea6310028a6690bb8d4c1abee31cb8833"
) )
ZEMA_DATASET_URL = "https://zenodo.org/record/5185953/files/axis11_2kHz_ZeMA_PTB_SI.h5" ZEMA_DATASET_URL = "https://zenodo.org/record/5185953/files/axis11_2kHz_ZeMA_PTB_SI.h5"
ZEMA_DATATYPES = ("qudt:standardUncertainty", "qudt:value")
ZEMA_QUANTITIES = ( ZEMA_QUANTITIES = (
"Acceleration", "Acceleration",
"Active_Current", "Active_Current",
...@@ -57,7 +57,9 @@ class ExtractionDataType(Enum): ...@@ -57,7 +57,9 @@ class ExtractionDataType(Enum):
VALUES = "qudt:value" VALUES = "qudt:value"
def provide_zema_samples(n_samples: int = 1) -> UncertainArray: def provide_zema_samples(
n_samples: int = 1, size_scaler: int = 1, normalize: bool = False
) -> UncertainArray:
"""Extracts requested number of samples of values with associated uncertainties """Extracts requested number of samples of values with associated uncertainties
The underlying dataset is the annotated "Sensor data set of one electromechanical The underlying dataset is the annotated "Sensor data set of one electromechanical
...@@ -65,26 +67,28 @@ def provide_zema_samples(n_samples: int = 1) -> UncertainArray: ...@@ -65,26 +67,28 @@ def provide_zema_samples(n_samples: int = 1) -> UncertainArray:
Parameters Parameters
---------- ----------
n_samples : int n_samples : int, optional
number of samples each containing one reading from each of the eleven sensors number of samples each containing size_scaler readings from each of the eleven
with associated uncertainties sensors with associated uncertainties, defaults to 1
size_scaler : int, optional
number of sensor readings from each of the individual sensors per sample,
defaults to 1
normalize : bool, optional
if ``True``, then data is centered around zero and scaled to unit std,
defaults to False
Returns Returns
------- -------
UncertainArray UncertainArray
The collection of samples of values with associated uncertainties The collection of samples of values with associated uncertainties, will be of
shape (n_samples, 11 x size_scaler)
""" """
def _hdf5_part(hdf5_file: File, keys: list[str]) -> Group | Dataset: def _normalize_if_requested(data: Dataset) -> NDArray[np.double]:
part = hdf5_file _potentially_normalized_data = data[np.s_[1 : size_scaler + 1, :n_samples]]
for key in keys: if normalize:
part = part[key] _potentially_normalized_data -= np.mean(data[:, :n_samples], axis=0)
return part _potentially_normalized_data /= np.std(data[:, :n_samples], axis=0)
return _potentially_normalized_data.transpose()
def _extract_sample_from_dataset(
data_set: Dataset, ns_samples: tuple[slice, int]
) -> NDArray[np.double]:
return np.expand_dims(np.array(data_set[ns_samples]), 1)
def _append_to_extraction( def _append_to_extraction(
append_to: NDArray[np.double], appendix: NDArray[np.double] append_to: NDArray[np.double], appendix: NDArray[np.double]
...@@ -102,46 +106,40 @@ def provide_zema_samples(n_samples: int = 1) -> UncertainArray: ...@@ -102,46 +106,40 @@ def provide_zema_samples(n_samples: int = 1) -> UncertainArray:
assert exists(dataset_full_path) assert exists(dataset_full_path)
uncertainties = np.empty((n_samples, 0)) uncertainties = np.empty((n_samples, 0))
values = np.empty((n_samples, 0)) values = np.empty((n_samples, 0))
indices = np.s_[0:n_samples, 0]
relevant_datasets = ( relevant_datasets = (
["ZeMA_DAQ", quantity, datatype] ["ZeMA_DAQ", quantity, datatype.value]
for quantity in ZEMA_QUANTITIES for quantity in ZEMA_QUANTITIES
for datatype in ZEMA_DATATYPES for datatype in ExtractionDataType
) )
with h5py.File(dataset_full_path, "r") as h5f: with h5py.File(dataset_full_path, "r") as h5f:
for dataset in relevant_datasets: for dataset_descriptor in relevant_datasets:
if ExtractionDataType.UNCERTAINTIES.value in dataset: dataset = cast(Dataset, reduce(operator.getitem, dataset_descriptor, h5f))
if ExtractionDataType.UNCERTAINTIES.value in dataset.name:
extracted_data = uncertainties extracted_data = uncertainties
print(f" Extract uncertainties from {dataset}") print(f" Extract uncertainties from {dataset.name}")
elif ExtractionDataType.VALUES.value in dataset: elif ExtractionDataType.VALUES.value in dataset.name:
extracted_data = values extracted_data = values
print(f" Extract values from {dataset}") print(f" Extract values from {dataset.name}")
else: else:
extracted_data = None raise RuntimeError(
if extracted_data is not None: "Somehow there is unexpected data in the dataset to be processed. "
if len(_hdf5_part(h5f, dataset).shape) == 3: f"Did not expect to find {dataset.name}"
for sensor in _hdf5_part(h5f, dataset): )
extracted_data = _append_to_extraction( if dataset.shape[0] == 3:
extracted_data, for sensor in dataset:
_extract_sample_from_dataset(sensor, indices),
)
else:
extracted_data = _append_to_extraction( extracted_data = _append_to_extraction(
extracted_data, extracted_data, _normalize_if_requested(sensor)
_extract_sample_from_dataset(
_hdf5_part(h5f, dataset),
indices,
),
) )
if ( else:
ExtractionDataType.UNCERTAINTIES.value extracted_data = _append_to_extraction(
in _hdf5_part(h5f, dataset).name extracted_data, _normalize_if_requested(dataset)
): )
uncertainties = extracted_data if ExtractionDataType.UNCERTAINTIES.value in dataset.name:
print(" Uncertainties extracted") uncertainties = extracted_data
elif ExtractionDataType.VALUES.value in _hdf5_part(h5f, dataset).name: print(" Uncertainties extracted")
values = extracted_data elif ExtractionDataType.VALUES.value in dataset.name:
print(" Values extracted") values = extracted_data
print(" Values extracted")
uncertain_values = UncertainArray(np.array(values), np.array(uncertainties)) uncertain_values = UncertainArray(np.array(values), np.array(uncertainties))
_store_cache(uncertain_values) _store_cache(uncertain_values)
return uncertain_values return uncertain_values
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment