diff --git a/app/cocal_methods.py b/app/cocal_methods.py index 2c5820ef387e238dbdecc31e1989719fd0544f99..d49c55c0a3c6d327c52e481a3810e4d820b5002b 100644 --- a/app/cocal_methods.py +++ b/app/cocal_methods.py @@ -1,14 +1,12 @@ -import os - # ffmpeg is required under windows -#ffmpeg_path = os.path.abspath( +# ffmpeg_path = os.path.abspath( # r"C:\Users\gruber04\programs\ffmpeg-master-latest-win64-gpl\bin" -#) -#os.environ["PATH"] += os.pathsep + ffmpeg_path - +# ) +# os.environ["PATH"] += os.pathsep + ffmpeg_path import base64 import datetime import json +import os import time from pathlib import Path from queue import Queue @@ -24,6 +22,7 @@ import scipy.signal as sig import xmlschema from pydub import AudioSegment from PyDynamic.misc.tools import complex_2_real_imag, real_imag_2_complex +from PyDynamic.uncertainty.propagate_DFT import DFT_deconv, DFT_multiply from PyDynamic.uncertainty.propagate_MonteCarlo import UMC_generic from scipy.interpolate import interp1d from sounddevice import InputStream, play, query_devices @@ -46,7 +45,7 @@ class CocalMethods: self.audio_config = json.load(open(self.audio_config_path, "r")) self.dut_path = self.session_dir.joinpath("upload.wav") - self.dcc_path = self.session_dir.joinpath("dcc.xml") + self.report_path = self.session_dir.joinpath("report.xml") self.result_image_path = self.session_dir.joinpath("transfer_behaviour.png") ##### TESTING @@ -99,12 +98,12 @@ class CocalMethods: # set stream arguments input_stream_args = { - "device" : dev["index"], - "blocksize" : int(fs), - "samplerate" : fs, - "channels" : 1, - "dtype" : "int16", - "callback" : self.create_callback(q), + "device": dev["index"], + "blocksize": int(fs), + "samplerate": fs, + "channels": 1, + "dtype": "int16", + "callback": self.create_callback(q), } input_stream_args.update(custom_args) @@ -169,12 +168,11 @@ class CocalMethods: break return self.upload_is_ready() - def generate_dummy_dcc(self): - dcc_content = f"<xml>dummy dcc for {self.session.hash}</xml>" + def generate_dummy_report(self): + report_content = f"<xml>dummy report for {self.session.hash}</xml>" - filepath = self.session_dir.joinpath("dcc.xml") - f = open(filepath, "w") - f.write(dcc_content) + f = open(self.report_path, "w") + f.write(report_content) f.close() def get_current_git_hash(self): @@ -209,7 +207,6 @@ class CocalMethods: return output_devices def get_relevant_audio(self, kind="input", verbose=False): - if kind == "input": available_devices = self.get_available_audio_inputs() wanted_devices = self.audio_config["audio_inputs"] @@ -226,15 +223,15 @@ class CocalMethods: filter_matches = len(w_dev["filter"]) for filter_key, filter_val in w_dev["filter"].items(): filter_matches = filter_matches and a_dev[filter_key] == filter_val - + if filter_matches: - dev = {"device" : a_dev, "args" : w_dev["args"]} + dev = {"device": a_dev, "args": w_dev["args"]} relevant_devices.append(dev) break - + # chose default if still empty list if not relevant_devices: - default_device = {"device" : None, "args" : {}} + default_device = {"device": None, "args": {}} if kind == "input": default_device["device"] = self.get_default_audio_input() elif kind == "output": @@ -250,10 +247,9 @@ class CocalMethods: def base64_of_file(self, filepath): return base64.b64encode(open(filepath, "rb").read()).decode() - - def convert_array_to_xml(self, a): - # define some xml templates + def convert_array_to_xml(self, a): + # define some xml templates matrix_template = """ <si:covarianceMatrix xmlns:si="https://ptb.de/si">{COLUMNS} </si:covarianceMatrix> @@ -270,52 +266,51 @@ class CocalMethods: </si:covariance>""" columns = [] - + for col in a.T: entries = [] for val in col: entries.append(entry_template.format(VALUE=val, UNIT="\\one")) - + entries_string = " ".join(entries) columns.append(column_template.format(ENTRIES=entries_string)) - + columns_string = "\n".join(columns) matrix = matrix_template.format(COLUMNS=columns_string) - - return ET.fromstring(matrix) + return ET.fromstring(matrix) def generate_filter_mathml(self): - # prepare content (Note: discards covariance between b and a) Uba = self.transfer_behavior["IIR"]["Uba"] b = self.transfer_behavior["IIR"]["b"] - Ub = Uba[:len(b), :len(b)] + Ub = Uba[: len(b), : len(b)] a = self.transfer_behavior["IIR"]["a"] - Ua = np.pad(Uba[len(b):, len(b):], ((1, 0), (1, 0)), mode="constant") - + Ua = np.pad(Uba[len(b) :, len(b) :], ((1, 0), (1, 0)), mode="constant") + # write content into template placeholder_dict = { "PLACEHOLDER_NUM_LIST_VALUES": " ".join([str(item) for item in b]), "PLACEHOLDER_NUM_LIST_UNITS": " ".join(["\\one" for item in b]), "PLACEHOLDER_NUM_COVARIANCE_MATRIX": self.convert_array_to_xml(Ub), "PLACEHOLDER_DEN_LIST_VALUES": " ".join([str(item) for item in a]), - "PLACEHOLDER_DEN_LIST_UNITS": " ".join(["\\one" for item in a]) , + "PLACEHOLDER_DEN_LIST_UNITS": " ".join(["\\one" for item in a]), "PLACEHOLDER_DEN_COVARIANCE_MATRIX": self.convert_array_to_xml(Ub), } # load template - filter_template_path = os.path.abspath(r"app/dcc/filter_template.xml") + filter_template_path = os.path.abspath(r"app/report/filter_template.xml") filter_template = open(filter_template_path, "r") filter_tree = ET.parse(filter_template) # replace placeholders with substitute from dict - filter_tree = self.replace_placeholders_with_substitutes(filter_tree, placeholder_dict) + filter_tree = self.replace_placeholders_with_substitutes( + filter_tree, placeholder_dict + ) return filter_tree - - def generate_dcc(self, perform_dcc_validation=False): + def generate_report(self, perform_dcc_validation=False): if self.transfer_behavior: placeholder_dict = { "PLACEHOLDER_GIT_HASH": self.get_current_git_hash(), @@ -335,29 +330,33 @@ class CocalMethods: } # load template - dcc_template_path = os.path.abspath(r"app/dcc/dcc_template.xml") - dcc_template = open(dcc_template_path, "r") - tree = ET.parse(dcc_template) + report_template_path = os.path.abspath(r"app/report/report_template.xml") + report_template = open(report_template_path, "r") + tree = ET.parse(report_template) # replace placeholders with substitute from dict tree = self.replace_placeholders_with_substitutes(tree, placeholder_dict) - # write DCC to file + # write report to file ET.indent(tree) - tree.write(self.dcc_path, encoding="UTF-8", pretty_print=True, ) + tree.write( + self.report_path, + encoding="UTF-8", + pretty_print=True, + ) # validate against schema (requires internet connection) if perform_dcc_validation: - dcc_schema_path = os.path.abspath(r"app/dcc/dcc.xsd") + report_schema_path = os.path.abspath(r"app/report/dcc.xsd") try: - schema = xmlschema.XMLSchema(dcc_schema_path) - result = schema.validate(self.dcc_path) + schema = xmlschema.XMLSchema(report_schema_path) + result = schema.validate(self.report_path) print(result) except Exception as e: print(f"Schema did not validate successfully. (Error: {e})") else: - print("Can't generate DCC. No transfer behavior available.") + print("Can't generate report. No transfer behavior available.") def replace_placeholders_with_substitutes(self, tree, placeholder_dict): for placeholder, substitute in placeholder_dict.items(): @@ -373,28 +372,48 @@ class CocalMethods: elem.text = "" elem.insert(0, substitute.getroot()) else: - print(f"Can not handle replacement type for {placeholder} of type {type(substitute)}.") + print( + f"Can not handle replacement type for {placeholder} of type {type(substitute)}." + ) return tree def generate_spectral_domain_visualization(self): + # provide shortcuts for plotting + b = self.transfer_behavior["IIR"]["b"] + a = self.transfer_behavior["IIR"]["a"] + ba_cov = self.transfer_behavior["IIR"]["Uba"] + + # further relevant parameters + time_delta = np.mean(np.diff(self.ref_times[0])) + frame_rate = 1.0 / time_delta + + Nb = len(b) + ba = np.concatenate((b, a[1:])) + mask = np.logical_and(self.ref_frequency < 5000.0, self.ref_frequency >= 0.0) + mask = np.logical_and(self.ref_frequency < 20000.0, self.ref_frequency >= 0.0) # visualize coefficient uncertainties by plotting the spectrum uncertainty via MC - draw_samples = lambda size: np.random.multivariate_normal( - ba, ba_cov, size=size - ) + def draw_samples(size): + return np.random.multivariate_normal(ba, ba_cov, size=size) - w = np.linspace(0, np.pi, 50) - f = w / (2 * np.pi) * frame_rate - w_exp = np.exp(-1j * w) # ??? - evaluate = lambda sample: complex_2_real_imag( - self.freqz_core(sample, Nb, w_exp) - ) + ## reduced frequency resolution + # w = np.linspace(0, np.pi, 50) + # f = w / (2 * np.pi) * frame_rate + # w_exp = np.exp(-1j * w) # ??? + + # full frequency resolution: + f = self.ref_frequency + w = self.ref_frequency / frame_rate * (2 * np.pi) + w_exp = np.exp(-1j * w) + + def evaluate(sample): + return complex_2_real_imag(self.freqz_core(sample, Nb, w_exp)) umc_kwargs = { "draw_samples": draw_samples, "evaluate": evaluate, - "runs": 20, + "runs": 200, "blocksize": 8, "n_cpu": 1, "compute_full_covariance": True, @@ -405,37 +424,81 @@ class CocalMethods: h = real_imag_2_complex(h_ri) h_unc = real_imag_2_complex(np.sqrt(np.diag(h_cov))) + h_empirical = self.dut_spectrum / self.ref_spectrum + + # compensate + h_comp_ri, h_comp_cov = DFT_deconv( + complex_2_real_imag(h_empirical), + h_ri, + np.zeros((2 * len(h_empirical), 2 * len(h_empirical))), + h_cov, + ) + h_comp = real_imag_2_complex(h_comp_ri) + h_comp_unc = real_imag_2_complex(np.sqrt(np.diag(h_comp_cov))) + + # regularize + # regularized_filter = sig.cheby2() + # h_regularized = self.freqz_core() + # h_compensated_and_regularized = DFT_multiply( + # h_compensated, h_regularized, h_regularized_co + # ) + # visualize result - fig, ax = plt.subplots(2, 1) - ax[0].plot(f, np.abs(h)) - ax[0].fill_between( - f, - np.abs(h) - np.abs(h_unc), - np.abs(h) + np.abs(h_unc), + fig, ax = plt.subplots(1, 1, sharex=True, squeeze=False) + ax[0, 0].plot( + f[mask], + np.abs(h[mask]), + label="fitted TF", + color="blue", + ) + ax[0, 0].fill_between( + f[mask], + np.abs(h[mask]) - np.abs(h_unc[mask]), + np.abs(h[mask]) + np.abs(h_unc[mask]), alpha=0.3, - label="unc", + label="unc of fitted TF", + color="blue", ) - ax[0].scatter( + ax[0, 0].scatter( self.ref_frequency[mask], - np.abs(self.dut_spectrum[mask] / self.ref_spectrum[mask]), - label="ref", + np.abs(h_empirical[mask]), + label="empirical TF", + s=2, + color="black", ) - ax[0].legend() - ax[1].plot(f, h_unc) + ax[0, 0].scatter( + self.ref_frequency[mask], + np.abs(h_comp[mask]), + label="compensated TF", + s=1, + color="red", + ) + ax[0, 0].fill_between( + f[mask], + np.abs(h_comp[mask]) - np.abs(h_comp_unc[mask]), + np.abs(h_comp[mask]) + np.abs(h_comp_unc[mask]), + alpha=0.3, + label="unc of compensated TF", + color="red", + ) + + ax[0, 0].plot(f, np.ones_like(f), "--r", label="ideal") + ax[0, 0].legend() + ax[0, 0].set_xscale("log") + ax[0, 0].set_yscale("log") plt.savefig(self.result_image_path) - #plt.show() - + # plt.show() def perform_dummy_computations(self): self.start_date = datetime.date.today().isoformat() self.transfer_behavior = { "IIR": { - "a": np.ones((1,)), - "b": np.ones((1,)), + "a": np.ones((2,)), + "b": np.ones((1,)), "Uba": np.eye(2), - } } + } self.end_date = datetime.date.today().isoformat() def perform_computations(self): @@ -504,7 +567,7 @@ class CocalMethods: ax[i + 1].plot(ref_time, ref_signal, label=f"REF {i}") ax[i + 1].legend() plt.savefig(self.session_dir.joinpath("raw_signals.png")) - #plt.show() + # plt.show() def align_and_fuse_multiple_references(self): # time_b, signal_b, signal_b_unc = self.align_and_trim_signals( @@ -522,13 +585,15 @@ class CocalMethods: self.ref_time = self.ref_times[0] def align_and_trim_dut_to_ref(self): - self.dut_time, self.dut_signal, self.dut_signal_unc = self.align_and_trim_signals( - self.ref_time, - self.ref_signal, - self.ref_signal_unc, - self.dut_time, - self.dut_signal, - np.zeros_like(self.dut_signal), + self.dut_time, self.dut_signal, self.dut_signal_unc = ( + self.align_and_trim_signals( + self.ref_time, + self.ref_signal, + self.ref_signal_unc, + self.dut_time, + self.dut_signal, + np.zeros_like(self.dut_signal), + ) ) def align_and_trim_signals( @@ -588,7 +653,7 @@ class CocalMethods: else: block_offsets.append(0.0) - #if self.generate_plots + # if self.generate_plots # plt.plot(block_starts, block_offsets) # plt.show() @@ -623,7 +688,7 @@ class CocalMethods: ax[1].plot(time_a, signal_a_unc, label="A unc") ax[1].plot(time_b, signal_b_unc, label="B unc") plt.savefig(self.session_dir.joinpath("aligned_signals.png")) - #plt.show() + # plt.show() return time_b, signal_b, signal_b_unc @@ -637,10 +702,14 @@ class CocalMethods: # uncertainty propagation using GUM_DFT not possible, because of ArrayMemoryError: # Unable to allocate 18.3 TiB for an array with shape (1584667, 1584667) and data type float64 # hence reverting to diag-only MC approach - draw_samples = lambda size: np.random.normal( - self.ref_signal, self.ref_signal_unc, size=(size, self.ref_signal.size) - ) - evaluate = lambda x: complex_2_real_imag(np.fft.fft(x)) + def draw_samples(size): + return np.random.normal( + self.ref_signal, self.ref_signal_unc, size=(size, self.ref_signal.size) + ) + + def evaluate(x): + return complex_2_real_imag(np.fft.fft(x)) + umc_kwargs = { "draw_samples": draw_samples, "evaluate": evaluate, @@ -713,7 +782,7 @@ class CocalMethods: ax[0].legend(loc="upper right") plt.savefig(self.session_dir.joinpath("frequency_diagrams.png")) - #plt.show() + # plt.show() def estimate_filter(self): # fit a transfer behavior against the signals @@ -736,32 +805,37 @@ class CocalMethods: theta0 = np.r_[b0, a0[1:]] # only fit against reduced frequency range - mask = np.logical_and(self.ref_frequency < 5000.0, self.ref_frequency >= 0.0) + mask = np.logical_and(self.ref_frequency < 15000.0, self.ref_frequency >= 0.0) mask_ri = np.r_[mask, mask] + # custom weights to reduce influence of higher frequencies + weights = np.abs(np.diff(np.log10(np.abs(self.ref_frequency) + 1), prepend=1)) + # function to draw samples for Monte Carlo - draw_samples = lambda size: real_imag_2_complex( - np.random.normal( - self.ref_spectrum_ri[mask_ri], - self.ref_spectrum_unc_ri[mask_ri], - size=(size, self.ref_spectrum_ri[mask_ri].size), + def draw_samples(size): + return real_imag_2_complex( + np.random.normal( + self.ref_spectrum_ri[mask_ri], + self.ref_spectrum_unc_ri[mask_ri], + size=(size, self.ref_spectrum_ri[mask_ri].size), + ) ) - ) # function to evaluate samples in Monte Carlo w_empirical_exp = ( - np.exp( - -1j * 2 * np.pi * self.ref_frequency[mask] / frame_rate - ), - ) - evaluate = lambda sample: self.fit_filter( - sample, - H_dut=self.dut_spectrum[mask], - theta0=theta0, - Nb=Nb, - w_empirical_exp=w_empirical_exp, + np.exp(-1j * 2 * np.pi * self.ref_frequency[mask] / frame_rate), ) + def evaluate(sample): + return self.fit_filter( + sample, + H_dut=self.dut_spectrum[mask], + theta0=theta0, + Nb=Nb, + w_empirical_exp=w_empirical_exp, + weights=weights[mask], + ) + umc_kwargs = { "draw_samples": draw_samples, "evaluate": evaluate, @@ -784,7 +858,6 @@ class CocalMethods: self.transfer_behavior = {"IIR": {"a": a, "b": b, "Uba": ba_cov}} - def hull_correlation_offset(self, signal_a, signal_b): # estimate delay between two signals by comparing their hulls # direction correlation of signals is not meaningful because of the many oscillations @@ -810,7 +883,7 @@ class CocalMethods: # main calculation extracted from scipy.signal.freqz return npp.polyval(w_exp, b, tensor=False) / npp.polyval(w_exp, a, tensor=False) - def fit_filter(self, sample, H_dut, theta0, Nb, w_empirical_exp): + def fit_filter(self, sample, H_dut, theta0, Nb, w_empirical_exp, weights=None): # function to estimate filter H_ref = sample @@ -820,7 +893,7 @@ class CocalMethods: res = opt.minimize( self.evaluate_filter, x0=theta0, - args=(Nb, H_empirical, w_empirical_exp), + args=(Nb, H_empirical, w_empirical_exp, weights), method="Powell", ) @@ -832,14 +905,20 @@ class CocalMethods: return res.x - def evaluate_filter(self, theta, Nb, H_empirical, w_empirical_exp): + def evaluate_filter(self, theta, Nb, H_empirical, w_empirical_exp, weights=None): # evaluation function used in the numerical optimization inside fit_filter # H_ba = sig.freqz(b, a, w_empirical)[1] # quite slow H_ba = self.freqz_core(theta, Nb, w_empirical_exp) # faster - # return np.linalg.norm(H_ba - H_empirical) # does not use log-scaling - # return np.linalg.norm(np.log(H_ba) - np.log(H_empirical)) - return np.linalg.norm( - np.log(np.abs(H_ba)) - np.log(np.abs(H_empirical)) + # compute difference + # diff = np.linalg.norm(H_ba - H_empirical) # does not use log-scaling + # diff = np.linalg.norm(np.log(H_ba) - np.log(H_empirical)) + diff = np.log(np.abs(H_ba)) - np.log( + np.abs(H_empirical) ) # only amplitude matters, phase discarded + + if weights is None: + return np.linalg.norm(diff) + else: + return np.linalg.norm(diff * weights) diff --git a/app/main.py b/app/main.py index c268b10492d0cf395022c10e282b288c6bc7ca85..cfaad7b02822dcbbe2984184117aee8efec35920 100644 --- a/app/main.py +++ b/app/main.py @@ -1,11 +1,12 @@ import datetime -from pathlib import Path +import logging import uuid +from pathlib import Path from fastapi import BackgroundTasks, Depends, FastAPI, UploadFile from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import FileResponse from fastapi.openapi.docs import get_swagger_ui_html +from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from sqlalchemy.orm import Session @@ -14,7 +15,6 @@ from .database import SessionLocal, engine models.Base.metadata.create_all(bind=engine) -import logging logger = logging.getLogger("uvicorn") app = FastAPI(docs_url=None) @@ -73,7 +73,7 @@ def cocal_task(db: Session = Depends(get_db), hash = hash): cocal.perform_computations() # perform_dummy_computations # generate (dummy) DCC - cocal.generate_dcc() # generate_dummy_dcc + cocal.generate_report() # generate_dummy_report # generate visualization of transfer behaviour cocal.generate_spectral_domain_visualization() @@ -167,7 +167,7 @@ def download_certificate(hash: str, db: Session = Depends(get_db)): session = crud.get_cocal_session_by_hash(db, hash) if session.result_state == "ready": - filepath = Path(".").joinpath("sessions", session.hash, "dcc.xml") + filepath = Path(".").joinpath("sessions", session.hash, "report.xml") return FileResponse(filepath, filename=filepath.name) else: return {"message": "Result not ready."} diff --git a/app/dcc/dcc.xsd b/app/report/dcc.xsd similarity index 100% rename from app/dcc/dcc.xsd rename to app/report/dcc.xsd diff --git a/app/dcc/filter_template.xml b/app/report/filter_template.xml similarity index 100% rename from app/dcc/filter_template.xml rename to app/report/filter_template.xml diff --git a/app/dcc/dcc_template.xml b/app/report/report_template.xml similarity index 100% rename from app/dcc/dcc_template.xml rename to app/report/report_template.xml diff --git a/app/testing.py b/app/testing.py index fdf737f98e0b7d8bcb6fb58e6d2243f1fc5feacd..9282f2669830eaa02341ba7cfe1ddf73b2daa331 100644 --- a/app/testing.py +++ b/app/testing.py @@ -1,30 +1,36 @@ # call as `python -m app/testing` import sys + sys.path.append(".") from pathlib import Path from app import cocal_methods + class session: - hash = "abc" + hash = "ad554ee7119047559a2250e7e9678ee4" + cocal_session = session() cocal = cocal_methods.CocalMethods(cocal_session, generate_plots=True) cocal.testsignal_path = Path("./app/audio/Rosa_Rauschen.mp3") -#cocal.record_and_save_reference() +# cocal.record_and_save_reference() + +# cocal.record_and_save_reference(35) +cocal.ref_paths = [ + "sessions/ad554ee7119047559a2250e7e9678ee4/reference_1.wav", + "sessions/ad554ee7119047559a2250e7e9678ee4/reference_2.wav", +] +cocal.dut_path = "sessions/ad554ee7119047559a2250e7e9678ee4/upload_2.wav" +cocal.perform_computations() -#cocal.record_and_save_reference(35) -#cocal.ref_paths = ["sessions\\abc\\reference_demo_22.wav", "sessions\\abc\\reference_demo_34.wav"] -cocal.ref_paths = ["sessions/ad18bf881c964efba09bca831fa7bd45/reference_1.wav", "sessions/ad18bf881c964efba09bca831fa7bd45/reference_3.wav"] -cocal.dut_path = "sessions/ad18bf881c964efba09bca831fa7bd45/upload.wav" +cocal.generate_report() -#cocal.perform_computations() -cocal.perform_dummy_computations() +cocal.generate_spectral_domain_visualization() -cocal.generate_dcc() print("Fin.") diff --git a/testing_method/validating_dcc_template.py b/testing_method/validating_dcc_template.py index 82c651b24d90f7a64207c28934d1afd0053f4a03..f8c5d2d9f3ec11671edd2483460ccd034fa4d220 100644 --- a/testing_method/validating_dcc_template.py +++ b/testing_method/validating_dcc_template.py @@ -2,10 +2,10 @@ import xmlschema import os -dcc_file = os.path.abspath(r"testing_method\templates\dcc_template.xml") +report_file = os.path.abspath(r"testing_method\templates\report_template.xml") schema_file = os.path.abspath(r"testing_method\templates\dcc.xsd") schema = xmlschema.XMLSchema(schema_file) -result = schema.validate(dcc_file) +result = schema.validate(report_file) print(result) \ No newline at end of file