Commit d8c1ccb4 authored by Benedikt Seeger's avatar Benedikt Seeger
Browse files

Added Multitasking for seq_fits using multiprocessing and shared memory

tdqm is used  for progressbar
parent 38ab6ab6
......@@ -14,7 +14,13 @@ from scipy.sparse.linalg import lsqr as sp_lsqr
import matplotlib.pyplot as mp
import warnings
import multiprocessing
from multiprocessing import shared_memory
from tqdm.contrib.concurrent import process_map
manager = multiprocessing.Manager()
mpdata = manager.dict()
def sampletimes(Fs, T): #
"""
generate a t_i vector with \n
......@@ -115,6 +121,33 @@ def threeparsinefit(y, t, f0):
abc = la.lstsq(a.transpose(), y)
return abc[0][0:3] ## fit vector a*sin+b*cos+c
# multiprocessing wraper for threeparsinefit
def multiPrcoess3paramFit(i):
"""
mpdata['t_shared_name'] = shmY.name
mpdata['y_shared_name'] = shmT.name
mpdata['t_shared_type'] = y.dtype
mpdata['y_shared_type'] = t.dtype
mpdata['freq'] = f0
mpdata['samples_per_block'] = N
mpdata['nmumber_of_samples'] = y.size
"""
N=mpdata['samples_per_block']
existing_shm_t = shared_memory.SharedMemory(name=mpdata['t_shared_name'])
t = np.ndarray((mpdata['nmumber_of_samples'],), dtype=mpdata['t_shared_type'], buffer=existing_shm_t.buf)
ti = t[i * N: (i + 1) * N]
existing_shm_y = shared_memory.SharedMemory(name=mpdata['y_shared_name'])
y = np.ndarray((mpdata['nmumber_of_samples'],), dtype=mpdata['y_shared_type'], buffer=existing_shm_y.buf)
yi = y[i * N: (i + 1) * N]
result=threeparsinefit(yi, ti, mpdata['freq'])
# Clean up
del t # Unnecessary; merely emphasizing the array is no longer used
del y # Unnecessary; merely emphasizing the array is no longer used
existing_shm_t.close()
existing_shm_y.close()
return result
# sine fit at known frequency and detrending
def threeparsinefit_lin(y, t, f0):
......@@ -182,7 +215,7 @@ def phase_delay(A1, A2, deg=False):
# periodical sinefit at known frequency
def seq_threeparsinefit(y, t, f0, periods=1):
def seq_threeparsinefit(y, t, f0, periods=1,multiTasiking=True):
"""
period-wise sine-fit at a known frequency\n
y vector of sample values \n
......@@ -206,12 +239,41 @@ def seq_threeparsinefit(y, t, f0, periods=1):
M = int(t.size // N) ## number of sections or periods
abc = np.zeros((M, 3))
if multiTasiking==False:
for i in range(int(M)):
ti = t[i * N : (i + 1) * N]
yi = y[i * N : (i + 1) * N]
abc[i, :] = threeparsinefit(yi, ti, f0)
return abc ## matrix of all fit vectors per period
else:
shmY = shared_memory.SharedMemory(create=True, size=y.nbytes)
shmT = shared_memory.SharedMemory(create=True, size=t.nbytes)
# Now create a NumPy array backed by shared memory
#to accesdta use this
tmpY = np.ndarray(y.shape, dtype=y.dtype, buffer=shmY.buf)
tmpT = np.ndarray(t.shape, dtype=t.dtype, buffer=shmT.buf)
tmpY[:] = y[:]
tmpT[:] = t[:]
mpdata['t_shared_name'] = shmT.name
mpdata['y_shared_name'] = shmY.name
mpdata['t_shared_type'] = y.dtype
mpdata['y_shared_type'] = t.dtype
mpdata['freq'] = f0
mpdata['samples_per_block']=N
mpdata['nmumber_of_samples']=y.size
numCoresToBeUsed=int(multiprocessing.cpu_count()-1)
i = np.arange(M)
mpchunksize=10
results = process_map(multiPrcoess3paramFit, i,chunksize=mpchunksize, max_workers=numCoresToBeUsed)
shmT.close()
shmT.unlink() # Free and release the shared memory block at the very end
shmY.close()
shmY.unlink() # Free and release the shared memory block at the very end
for i in np.arange(M):
abc[i,:]=results[i]
return abc
for i in range(int(M)):
ti = t[i * N : (i + 1) * N]
yi = y[i * N : (i + 1) * N]
abc[i, :] = threeparsinefit(yi, ti, f0)
return abc ## matrix of all fit vectors per period
# four parameter sine-fit (with frequency approximation)
......@@ -248,6 +310,34 @@ def fourparsinefit(y, t, f0, tol=1.0e-7, nmax=1000):
return np.hstack((abcd[0:3], w / (2 * np.pi)))
# multiprocessing wraper for threeparsinefit
def multiPrcoess4paramFit(i):
"""
mpdata['t_shared_name'] = shmY.name
mpdata['y_shared_name'] = shmT.name
mpdata['t_shared_type'] = y.dtype
mpdata['y_shared_type'] = t.dtype
mpdata['freq'] = f0
mpdata['samples_per_block'] = N
mpdata['nmumber_of_samples'] = y.size
"""
N=mpdata['samples_per_block']
existing_shm_t = shared_memory.SharedMemory(name=mpdata['t_shared_name'])
t = np.ndarray((mpdata['nmumber_of_samples'],), dtype=mpdata['t_shared_type'], buffer=existing_shm_t.buf)
ti = t[i * N: (i + 1) * N]
existing_shm_y = shared_memory.SharedMemory(name=mpdata['y_shared_name'])
y = np.ndarray((mpdata['nmumber_of_samples'],), dtype=mpdata['y_shared_type'], buffer=existing_shm_y.buf)
yi = y[i * N: (i + 1) * N]
result=fourparsinefit(yi, ti, mpdata['freq'])
# Clean up
del t # Unnecessary; merely emphasizing the array is no longer used
del y # Unnecessary; merely emphasizing the array is no longer used
existing_shm_t.close()
existing_shm_y.close()
return result
def calc_fourparsine(abcf, t):
"""
......@@ -282,7 +372,7 @@ endfunction
"""
# periodical sinefit at known frequency
def seq_fourparsinefit(y, t, f0, tol=1.0e-7, nmax=1000, periods=1, debug_plot=False):
def seq_fourparsinefit(y, t, f0, tol=1.0e-7, nmax=1000, periods=1, debug_plot=False,multiTasiking=True):
"""
sliced or period-wise sine-fit at a known frequency\n
y vector of sample values \n
......@@ -308,11 +398,39 @@ def seq_fourparsinefit(y, t, f0, tol=1.0e-7, nmax=1000, periods=1, debug_plot=Fa
M = int(t.size // N) ## number of sections or periods
abcd = np.zeros((M, 4))
for i in range(M):
ti = t[i * N : (i + 1) * N]
yi = y[i * N : (i + 1) * N]
abcd[i, :] = fourparsinefit(yi, ti, f0, tol=tol, nmax=nmax)
if multiTasiking == False:
for i in range(M):
ti = t[i * N : (i + 1) * N]
yi = y[i * N : (i + 1) * N]
abcd[i, :] = fourparsinefit(yi, ti, f0, tol=tol, nmax=nmax)
else:
shmY = shared_memory.SharedMemory(create=True, size=y.nbytes)
shmT = shared_memory.SharedMemory(create=True, size=t.nbytes)
# Now create a NumPy array backed by shared memory
#to accesdta use this
tmpY = np.ndarray(y.shape, dtype=y.dtype, buffer=shmY.buf)
tmpT = np.ndarray(t.shape, dtype=t.dtype, buffer=shmT.buf)
tmpY[:] = y[:]
tmpT[:] = t[:]
mpdata['t_shared_name'] = shmT.name
mpdata['y_shared_name'] = shmY.name
mpdata['t_shared_type'] = y.dtype
mpdata['y_shared_type'] = t.dtype
mpdata['freq'] = f0
mpdata['samples_per_block']=N
mpdata['nmumber_of_samples']=y.size
numCoresToBeUsed=int(multiprocessing.cpu_count()-1)
i = np.arange(M)
mpchunksize=10
results = process_map(multiPrcoess4paramFit, i,chunksize=mpchunksize, max_workers=numCoresToBeUsed)
shmT.close()
shmT.unlink() # Free and release the shared memory block at the very end
shmY.close()
shmY.unlink() # Free and release the shared memory block at the very end
for i in np.arange(M):
abcd[i,:]=results[i]
return abcd
if debug_plot:
mp.ioff()
......@@ -334,7 +452,6 @@ def seq_fourparsinefit(y, t, f0, tol=1.0e-7, nmax=1000, periods=1, debug_plot=Fa
return abcd ## matrix of all fit vectors per period
# fitting a pseudo-random multi-sine signal with 2*Nf+1 parameters
def multi_threeparsinefit(y, t, f0): # f0 vector of frequencies
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment