diff --git a/app/cocal_methods.py b/app/cocal_methods.py index 6ece6159b00d833c51daf1d3566d7baa1a8d7ee7..8f956534cbb03a3e60219f9cb3bb3803fa5e263f 100644 --- a/app/cocal_methods.py +++ b/app/cocal_methods.py @@ -1,14 +1,12 @@ -import os - # ffmpeg is required under windows -#ffmpeg_path = os.path.abspath( +# ffmpeg_path = os.path.abspath( # r"C:\Users\gruber04\programs\ffmpeg-master-latest-win64-gpl\bin" -#) -#os.environ["PATH"] += os.pathsep + ffmpeg_path - +# ) +# os.environ["PATH"] += os.pathsep + ffmpeg_path import base64 import datetime import json +import os import time from pathlib import Path from queue import Queue @@ -24,8 +22,8 @@ import scipy.signal as sig import xmlschema from pydub import AudioSegment from PyDynamic.misc.tools import complex_2_real_imag, real_imag_2_complex +from PyDynamic.uncertainty.propagate_DFT import DFT_deconv, DFT_multiply from PyDynamic.uncertainty.propagate_MonteCarlo import UMC_generic -from PyDynamic.uncertainty.propagate_DFT import DFT_multiply, DFT_deconv from scipy.interpolate import interp1d from sounddevice import InputStream, play, query_devices from soundfile import SoundFile @@ -100,12 +98,12 @@ class CocalMethods: # set stream arguments input_stream_args = { - "device" : dev["index"], - "blocksize" : int(fs), - "samplerate" : fs, - "channels" : 1, - "dtype" : "int16", - "callback" : self.create_callback(q), + "device": dev["index"], + "blocksize": int(fs), + "samplerate": fs, + "channels": 1, + "dtype": "int16", + "callback": self.create_callback(q), } input_stream_args.update(custom_args) @@ -210,7 +208,6 @@ class CocalMethods: return output_devices def get_relevant_audio(self, kind="input", verbose=False): - if kind == "input": available_devices = self.get_available_audio_inputs() wanted_devices = self.audio_config["audio_inputs"] @@ -227,15 +224,15 @@ class CocalMethods: filter_matches = len(w_dev["filter"]) for filter_key, filter_val in w_dev["filter"].items(): filter_matches = filter_matches and a_dev[filter_key] == filter_val - + if filter_matches: - dev = {"device" : a_dev, "args" : w_dev["args"]} + dev = {"device": a_dev, "args": w_dev["args"]} relevant_devices.append(dev) break - + # chose default if still empty list if not relevant_devices: - default_device = {"device" : None, "args" : {}} + default_device = {"device": None, "args": {}} if kind == "input": default_device["device"] = self.get_default_audio_input() elif kind == "output": @@ -251,10 +248,9 @@ class CocalMethods: def base64_of_file(self, filepath): return base64.b64encode(open(filepath, "rb").read()).decode() - - def convert_array_to_xml(self, a): - # define some xml templates + def convert_array_to_xml(self, a): + # define some xml templates matrix_template = """ <si:covarianceMatrix xmlns:si="https://ptb.de/si">{COLUMNS} </si:covarianceMatrix> @@ -271,37 +267,35 @@ class CocalMethods: </si:covariance>""" columns = [] - + for col in a.T: entries = [] for val in col: entries.append(entry_template.format(VALUE=val, UNIT="\\one")) - + entries_string = " ".join(entries) columns.append(column_template.format(ENTRIES=entries_string)) - + columns_string = "\n".join(columns) matrix = matrix_template.format(COLUMNS=columns_string) - - return ET.fromstring(matrix) + return ET.fromstring(matrix) def generate_filter_mathml(self): - # prepare content (Note: discards covariance between b and a) Uba = self.transfer_behavior["IIR"]["Uba"] b = self.transfer_behavior["IIR"]["b"] - Ub = Uba[:len(b), :len(b)] + Ub = Uba[: len(b), : len(b)] a = self.transfer_behavior["IIR"]["a"] - Ua = np.pad(Uba[len(b):, len(b):], ((1, 0), (1, 0)), mode="constant") - + Ua = np.pad(Uba[len(b) :, len(b) :], ((1, 0), (1, 0)), mode="constant") + # write content into template placeholder_dict = { "PLACEHOLDER_NUM_LIST_VALUES": " ".join([str(item) for item in b]), "PLACEHOLDER_NUM_LIST_UNITS": " ".join(["\\one" for item in b]), "PLACEHOLDER_NUM_COVARIANCE_MATRIX": self.convert_array_to_xml(Ub), "PLACEHOLDER_DEN_LIST_VALUES": " ".join([str(item) for item in a]), - "PLACEHOLDER_DEN_LIST_UNITS": " ".join(["\\one" for item in a]) , + "PLACEHOLDER_DEN_LIST_UNITS": " ".join(["\\one" for item in a]), "PLACEHOLDER_DEN_COVARIANCE_MATRIX": self.convert_array_to_xml(Ub), } @@ -311,11 +305,12 @@ class CocalMethods: filter_tree = ET.parse(filter_template) # replace placeholders with substitute from dict - filter_tree = self.replace_placeholders_with_substitutes(filter_tree, placeholder_dict) + filter_tree = self.replace_placeholders_with_substitutes( + filter_tree, placeholder_dict + ) return filter_tree - def generate_dcc(self, perform_dcc_validation=False): if self.transfer_behavior: placeholder_dict = { @@ -345,7 +340,11 @@ class CocalMethods: # write DCC to file ET.indent(tree) - tree.write(self.dcc_path, encoding="UTF-8", pretty_print=True, ) + tree.write( + self.dcc_path, + encoding="UTF-8", + pretty_print=True, + ) # validate against schema (requires internet connection) if perform_dcc_validation: @@ -374,7 +373,9 @@ class CocalMethods: elem.text = "" elem.insert(0, substitute.getroot()) else: - print(f"Can not handle replacement type for {placeholder} of type {type(substitute)}.") + print( + f"Can not handle replacement type for {placeholder} of type {type(substitute)}." + ) return tree @@ -489,17 +490,16 @@ class CocalMethods: # plt.savefig(self.result_image_path) plt.show() - def perform_dummy_computations(self): self.start_date = datetime.date.today().isoformat() self.transfer_behavior = { "IIR": { - "a": np.ones((2,)), - "b": np.ones((1,)), + "a": np.ones((2,)), + "b": np.ones((1,)), "Uba": np.eye(2), - } } + } self.end_date = datetime.date.today().isoformat() def perform_computations(self): @@ -568,7 +568,7 @@ class CocalMethods: ax[i + 1].plot(ref_time, ref_signal, label=f"REF {i}") ax[i + 1].legend() plt.savefig(self.session_dir.joinpath("raw_signals.png")) - #plt.show() + # plt.show() def align_and_fuse_multiple_references(self): # time_b, signal_b, signal_b_unc = self.align_and_trim_signals( @@ -586,13 +586,15 @@ class CocalMethods: self.ref_time = self.ref_times[0] def align_and_trim_dut_to_ref(self): - self.dut_time, self.dut_signal, self.dut_signal_unc = self.align_and_trim_signals( - self.ref_time, - self.ref_signal, - self.ref_signal_unc, - self.dut_time, - self.dut_signal, - np.zeros_like(self.dut_signal), + self.dut_time, self.dut_signal, self.dut_signal_unc = ( + self.align_and_trim_signals( + self.ref_time, + self.ref_signal, + self.ref_signal_unc, + self.dut_time, + self.dut_signal, + np.zeros_like(self.dut_signal), + ) ) def align_and_trim_signals( @@ -652,7 +654,7 @@ class CocalMethods: else: block_offsets.append(0.0) - #if self.generate_plots + # if self.generate_plots # plt.plot(block_starts, block_offsets) # plt.show() @@ -687,7 +689,7 @@ class CocalMethods: ax[1].plot(time_a, signal_a_unc, label="A unc") ax[1].plot(time_b, signal_b_unc, label="B unc") plt.savefig(self.session_dir.joinpath("aligned_signals.png")) - #plt.show() + # plt.show() return time_b, signal_b, signal_b_unc @@ -777,7 +779,7 @@ class CocalMethods: ax[0].legend(loc="upper right") plt.savefig(self.session_dir.joinpath("frequency_diagrams.png")) - #plt.show() + # plt.show() def estimate_filter(self): # fit a transfer behavior against the signals @@ -800,7 +802,7 @@ class CocalMethods: theta0 = np.r_[b0, a0[1:]] # only fit against reduced frequency range - mask = np.logical_and(self.ref_frequency < 5000.0, self.ref_frequency >= 0.0) + mask = np.logical_and(self.ref_frequency < 15000.0, self.ref_frequency >= 0.0) mask_ri = np.r_[mask, mask] # function to draw samples for Monte Carlo @@ -849,7 +851,6 @@ class CocalMethods: self.transfer_behavior = {"IIR": {"a": a, "b": b, "Uba": ba_cov}} - def hull_correlation_offset(self, signal_a, signal_b): # estimate delay between two signals by comparing their hulls # direction correlation of signals is not meaningful because of the many oscillations