Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
C
Cocalibration Demonstrator Backend
Manage
Activity
Members
Labels
Plan
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
M4D
Cocalibration Demonstrator Backend
Commits
af7e5184
Commit
af7e5184
authored
1 year ago
by
Maximilian Gruber
Browse files
Options
Downloads
Patches
Plain Diff
code formatting
parent
8bdd876e
Branches
Branches containing commit
No related tags found
1 merge request
!1
Visualize report data
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
app/cocal_methods.py
+53
-52
53 additions, 52 deletions
app/cocal_methods.py
with
53 additions
and
52 deletions
app/cocal_methods.py
+
53
−
52
View file @
af7e5184
import
os
# ffmpeg is required under windows
#ffmpeg_path = os.path.abspath(
#
ffmpeg_path = os.path.abspath(
# r"C:\Users\gruber04\programs\ffmpeg-master-latest-win64-gpl\bin"
#)
#os.environ["PATH"] += os.pathsep + ffmpeg_path
# )
# os.environ["PATH"] += os.pathsep + ffmpeg_path
import
base64
import
datetime
import
json
import
os
import
time
from
pathlib
import
Path
from
queue
import
Queue
...
...
@@ -24,8 +22,8 @@ import scipy.signal as sig
import
xmlschema
from
pydub
import
AudioSegment
from
PyDynamic.misc.tools
import
complex_2_real_imag
,
real_imag_2_complex
from
PyDynamic.uncertainty.propagate_DFT
import
DFT_deconv
,
DFT_multiply
from
PyDynamic.uncertainty.propagate_MonteCarlo
import
UMC_generic
from
PyDynamic.uncertainty.propagate_DFT
import
DFT_multiply
,
DFT_deconv
from
scipy.interpolate
import
interp1d
from
sounddevice
import
InputStream
,
play
,
query_devices
from
soundfile
import
SoundFile
...
...
@@ -100,12 +98,12 @@ class CocalMethods:
# set stream arguments
input_stream_args
=
{
"
device
"
:
dev
[
"
index
"
],
"
blocksize
"
:
int
(
fs
),
"
samplerate
"
:
fs
,
"
channels
"
:
1
,
"
dtype
"
:
"
int16
"
,
"
callback
"
:
self
.
create_callback
(
q
),
"
device
"
:
dev
[
"
index
"
],
"
blocksize
"
:
int
(
fs
),
"
samplerate
"
:
fs
,
"
channels
"
:
1
,
"
dtype
"
:
"
int16
"
,
"
callback
"
:
self
.
create_callback
(
q
),
}
input_stream_args
.
update
(
custom_args
)
...
...
@@ -210,7 +208,6 @@ class CocalMethods:
return
output_devices
def
get_relevant_audio
(
self
,
kind
=
"
input
"
,
verbose
=
False
):
if
kind
==
"
input
"
:
available_devices
=
self
.
get_available_audio_inputs
()
wanted_devices
=
self
.
audio_config
[
"
audio_inputs
"
]
...
...
@@ -227,15 +224,15 @@ class CocalMethods:
filter_matches
=
len
(
w_dev
[
"
filter
"
])
for
filter_key
,
filter_val
in
w_dev
[
"
filter
"
].
items
():
filter_matches
=
filter_matches
and
a_dev
[
filter_key
]
==
filter_val
if
filter_matches
:
dev
=
{
"
device
"
:
a_dev
,
"
args
"
:
w_dev
[
"
args
"
]}
dev
=
{
"
device
"
:
a_dev
,
"
args
"
:
w_dev
[
"
args
"
]}
relevant_devices
.
append
(
dev
)
break
# chose default if still empty list
if
not
relevant_devices
:
default_device
=
{
"
device
"
:
None
,
"
args
"
:
{}}
default_device
=
{
"
device
"
:
None
,
"
args
"
:
{}}
if
kind
==
"
input
"
:
default_device
[
"
device
"
]
=
self
.
get_default_audio_input
()
elif
kind
==
"
output
"
:
...
...
@@ -251,10 +248,9 @@ class CocalMethods:
def
base64_of_file
(
self
,
filepath
):
return
base64
.
b64encode
(
open
(
filepath
,
"
rb
"
).
read
()).
decode
()
def
convert_array_to_xml
(
self
,
a
):
# define some xml templates
def
convert_array_to_xml
(
self
,
a
):
# define some xml templates
matrix_template
=
"""
<si:covarianceMatrix xmlns:si=
"
https://ptb.de/si
"
>{COLUMNS}
</si:covarianceMatrix>
...
...
@@ -271,37 +267,35 @@ class CocalMethods:
</si:covariance>
"""
columns
=
[]
for
col
in
a
.
T
:
entries
=
[]
for
val
in
col
:
entries
.
append
(
entry_template
.
format
(
VALUE
=
val
,
UNIT
=
"
\\
one
"
))
entries_string
=
"
"
.
join
(
entries
)
columns
.
append
(
column_template
.
format
(
ENTRIES
=
entries_string
))
columns_string
=
"
\n
"
.
join
(
columns
)
matrix
=
matrix_template
.
format
(
COLUMNS
=
columns_string
)
return
ET
.
fromstring
(
matrix
)
return
ET
.
fromstring
(
matrix
)
def
generate_filter_mathml
(
self
):
# prepare content (Note: discards covariance between b and a)
Uba
=
self
.
transfer_behavior
[
"
IIR
"
][
"
Uba
"
]
b
=
self
.
transfer_behavior
[
"
IIR
"
][
"
b
"
]
Ub
=
Uba
[:
len
(
b
),
:
len
(
b
)]
Ub
=
Uba
[:
len
(
b
),
:
len
(
b
)]
a
=
self
.
transfer_behavior
[
"
IIR
"
][
"
a
"
]
Ua
=
np
.
pad
(
Uba
[
len
(
b
):,
len
(
b
):],
((
1
,
0
),
(
1
,
0
)),
mode
=
"
constant
"
)
Ua
=
np
.
pad
(
Uba
[
len
(
b
)
:,
len
(
b
)
:],
((
1
,
0
),
(
1
,
0
)),
mode
=
"
constant
"
)
# write content into template
placeholder_dict
=
{
"
PLACEHOLDER_NUM_LIST_VALUES
"
:
"
"
.
join
([
str
(
item
)
for
item
in
b
]),
"
PLACEHOLDER_NUM_LIST_UNITS
"
:
"
"
.
join
([
"
\\
one
"
for
item
in
b
]),
"
PLACEHOLDER_NUM_COVARIANCE_MATRIX
"
:
self
.
convert_array_to_xml
(
Ub
),
"
PLACEHOLDER_DEN_LIST_VALUES
"
:
"
"
.
join
([
str
(
item
)
for
item
in
a
]),
"
PLACEHOLDER_DEN_LIST_UNITS
"
:
"
"
.
join
([
"
\\
one
"
for
item
in
a
])
,
"
PLACEHOLDER_DEN_LIST_UNITS
"
:
"
"
.
join
([
"
\\
one
"
for
item
in
a
]),
"
PLACEHOLDER_DEN_COVARIANCE_MATRIX
"
:
self
.
convert_array_to_xml
(
Ub
),
}
...
...
@@ -311,11 +305,12 @@ class CocalMethods:
filter_tree
=
ET
.
parse
(
filter_template
)
# replace placeholders with substitute from dict
filter_tree
=
self
.
replace_placeholders_with_substitutes
(
filter_tree
,
placeholder_dict
)
filter_tree
=
self
.
replace_placeholders_with_substitutes
(
filter_tree
,
placeholder_dict
)
return
filter_tree
def
generate_dcc
(
self
,
perform_dcc_validation
=
False
):
if
self
.
transfer_behavior
:
placeholder_dict
=
{
...
...
@@ -345,7 +340,11 @@ class CocalMethods:
# write DCC to file
ET
.
indent
(
tree
)
tree
.
write
(
self
.
dcc_path
,
encoding
=
"
UTF-8
"
,
pretty_print
=
True
,
)
tree
.
write
(
self
.
dcc_path
,
encoding
=
"
UTF-8
"
,
pretty_print
=
True
,
)
# validate against schema (requires internet connection)
if
perform_dcc_validation
:
...
...
@@ -374,7 +373,9 @@ class CocalMethods:
elem
.
text
=
""
elem
.
insert
(
0
,
substitute
.
getroot
())
else
:
print
(
f
"
Can not handle replacement type for
{
placeholder
}
of type
{
type
(
substitute
)
}
.
"
)
print
(
f
"
Can not handle replacement type for
{
placeholder
}
of type
{
type
(
substitute
)
}
.
"
)
return
tree
...
...
@@ -489,17 +490,16 @@ class CocalMethods:
# plt.savefig(self.result_image_path)
plt
.
show
()
def
perform_dummy_computations
(
self
):
self
.
start_date
=
datetime
.
date
.
today
().
isoformat
()
self
.
transfer_behavior
=
{
"
IIR
"
:
{
"
a
"
:
np
.
ones
((
2
,)),
"
b
"
:
np
.
ones
((
1
,)),
"
a
"
:
np
.
ones
((
2
,)),
"
b
"
:
np
.
ones
((
1
,)),
"
Uba
"
:
np
.
eye
(
2
),
}
}
}
self
.
end_date
=
datetime
.
date
.
today
().
isoformat
()
def
perform_computations
(
self
):
...
...
@@ -568,7 +568,7 @@ class CocalMethods:
ax
[
i
+
1
].
plot
(
ref_time
,
ref_signal
,
label
=
f
"
REF
{
i
}
"
)
ax
[
i
+
1
].
legend
()
plt
.
savefig
(
self
.
session_dir
.
joinpath
(
"
raw_signals.png
"
))
#plt.show()
#
plt.show()
def
align_and_fuse_multiple_references
(
self
):
# time_b, signal_b, signal_b_unc = self.align_and_trim_signals(
...
...
@@ -586,13 +586,15 @@ class CocalMethods:
self
.
ref_time
=
self
.
ref_times
[
0
]
def
align_and_trim_dut_to_ref
(
self
):
self
.
dut_time
,
self
.
dut_signal
,
self
.
dut_signal_unc
=
self
.
align_and_trim_signals
(
self
.
ref_time
,
self
.
ref_signal
,
self
.
ref_signal_unc
,
self
.
dut_time
,
self
.
dut_signal
,
np
.
zeros_like
(
self
.
dut_signal
),
self
.
dut_time
,
self
.
dut_signal
,
self
.
dut_signal_unc
=
(
self
.
align_and_trim_signals
(
self
.
ref_time
,
self
.
ref_signal
,
self
.
ref_signal_unc
,
self
.
dut_time
,
self
.
dut_signal
,
np
.
zeros_like
(
self
.
dut_signal
),
)
)
def
align_and_trim_signals
(
...
...
@@ -652,7 +654,7 @@ class CocalMethods:
else
:
block_offsets
.
append
(
0.0
)
#if self.generate_plots
#
if self.generate_plots
# plt.plot(block_starts, block_offsets)
# plt.show()
...
...
@@ -687,7 +689,7 @@ class CocalMethods:
ax
[
1
].
plot
(
time_a
,
signal_a_unc
,
label
=
"
A unc
"
)
ax
[
1
].
plot
(
time_b
,
signal_b_unc
,
label
=
"
B unc
"
)
plt
.
savefig
(
self
.
session_dir
.
joinpath
(
"
aligned_signals.png
"
))
#plt.show()
#
plt.show()
return
time_b
,
signal_b
,
signal_b_unc
...
...
@@ -777,7 +779,7 @@ class CocalMethods:
ax
[
0
].
legend
(
loc
=
"
upper right
"
)
plt
.
savefig
(
self
.
session_dir
.
joinpath
(
"
frequency_diagrams.png
"
))
#plt.show()
#
plt.show()
def
estimate_filter
(
self
):
# fit a transfer behavior against the signals
...
...
@@ -800,7 +802,7 @@ class CocalMethods:
theta0
=
np
.
r_
[
b0
,
a0
[
1
:]]
# only fit against reduced frequency range
mask
=
np
.
logical_and
(
self
.
ref_frequency
<
5000.0
,
self
.
ref_frequency
>=
0.0
)
mask
=
np
.
logical_and
(
self
.
ref_frequency
<
1
5000.0
,
self
.
ref_frequency
>=
0.0
)
mask_ri
=
np
.
r_
[
mask
,
mask
]
# function to draw samples for Monte Carlo
...
...
@@ -849,7 +851,6 @@ class CocalMethods:
self
.
transfer_behavior
=
{
"
IIR
"
:
{
"
a
"
:
a
,
"
b
"
:
b
,
"
Uba
"
:
ba_cov
}}
def
hull_correlation_offset
(
self
,
signal_a
,
signal_b
):
# estimate delay between two signals by comparing their hulls
# direction correlation of signals is not meaningful because of the many oscillations
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment