Skip to content
Snippets Groups Projects
Commit 40665cb5 authored by Benedikt's avatar Benedikt
Browse files

added first version of the dcctools and boke app for cal data upload

parent 488f5750
No related branches found
No related tags found
No related merge requests found
[submodule "sampledata"]
path = sampledata
url = https://gitlab1.ptb.de/DNordmann/sampledata
import numpy as np
import pandas as pd
import bokeh as bokeh
from bokeh.models.widgets import DataTable, TableColumn
from bokeh.models import ColumnDataSource
from bokeh.layouts import column
from bokeh.plotting import curdoc, figure
from bokeh.io import output_file, show
from bokeh.models import LinearAxis, Range1d
from bokeh.models.widgets import FileInput
import json
from hashlib import md5
from cloudant.client import CouchDB
from requests.adapters import HTTPAdapter
from urllib.parse import urlparse
import os
import re #regular expressions
from datetime import datetime, timezone
from config import cfg
from pccDccTools import dsiVector,dsiMultiVector,DsiASCICOnversion
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
class CDBAcces:
def __init__(self):
self.session_id = ""
self.db = False # change in self.connectDB
self.cb_disabled = False # enable callbacks by default (only fitting-CB)
# the main data structure (dict) that hold the data base entry
#self.data = amp_data() # the currently used data (from file or selected in table)
self.QueryData = [] # Array of data sets received from Query
def connect_DB(self):
if 'REMOVE_PROXY' in cfg:
for k in os.environ:
if "proxy" in k.lower():
os.environ.pop(k)
if (
cfg["DB_UID"] != ""
): # if we have a database username then we have to include it into the DB_URL
CDB_SERVER_URL = cfg["DB_URL"]
else: # no username, we can take the clean URL
CDB_SERVER_URL = cfg["DB_URL"]
# connect to the data base
try:
print("Try connect %s" %(CDB_SERVER_URL) )
couch = CouchDB(
cfg['DB_UID'],
cfg['DB_UPW'],
url=CDB_SERVER_URL,
connect=True,
#adapter=myadapter(),
adapter=HTTPAdapter(),
encoder=NumpyEncoder # for JSON-serializing np.array
)
except:
raise Exception("No CouchDB-Server running at %s" % CDB_SERVER_URL)
print("success in setting coonection to CouchDB with %s" % CDB_SERVER_URL )
"""
# check if DB is existing, if not, create it
if not cfg['DB_NAME'] in couch.all_dbs():
self.db = couch.create_database(self.config['DB_NAME'])
if not self.db.exists():
raise Exception(
"Failure in creating db %s on server %s"
% (cfg['DB_NAME'], CDB_SERVER_URL)
)
else:
self.db = couch[cfg['DB_NAME']]
if not self.db.exists():
raise Exception(
"Failure in opening existing db %s on server %s"
% (self.config.cdb_dbname, self.config.cdb_server_url)
)
"""
try:
self.db = couch[cfg['DB_NAME']]
except KeyError as ke:
print(ke)
print("Database "+str(cfg['DB_NAME'])+ " does not exisit please create it ")
# submit the data{} structure to CouchDB data base
# identification is provided by the MD5-hash of the summary data file (base64 encoded string)
def submit(self, forced=False):
assert (
len(self.data["SummaryFile"]) > 50
), "No valid Summary File" # make sure we don't have fake
# prepare the hashing
hash = md5()
hash.update(
self.data["SummaryFile"].encode("utf-8")
) # set the data for hashing
ID = hash.hexdigest() # get the hexlyfied hash string
record = {"_id": ID} # initialise the record-dict with the MD5-hash-id
if ID in self.db:
doc = self.db[ID] # check whether it already exists; non-existance returns False
print("document with ID=%s exists" % ID)
self.data["_rev"] = doc["_rev"]
self.data["_id"] = ID
# print("data already in data base") # do not submit but complain
self.DivWarnExists.visible = True
self.ForceSubmitBtn.visible = True
if forced: # forced submit button was pressed
# record.update(self.data) # append the data to the (still) blank record
for k, v in self.data.items():
doc[k] = v
doc.save()
# self.db.put(record) # overwrite existing DB-record
self.UUIDInp.value = ID
self.DivWarnExists.visible = False
self.ForceSubmitBtn.visible = False
else: # record does not yet exist
print("document with ID=%s does not yet exist" % ID)
record.update(self.data) # append the data to the (still) blank record dict
record['_id'] = ID # _id was overwritten with "" by the previous record.update(...)
doc = self.db.create_document(record)
doc_exists = ID in self.db
if doc_exists:
print('wrote %s to CouchDB' % ID)
self.UUIDInp.value = ID
else:
print("Failed to write to database")
print(self.db.server)
print(self.db)
pass
# check whether a field-string is a valid date (for csv-data in ParseSummary() )
def is_valid_date(self, field):
try:
dmy = datetime.strptime(field, "%d.%m.%Y")
self.data["Date"] = datetime.strftime(dmy, "%Y-%m-%d")
return True
except:
return False
# combine the URL-parts for the data base access
def combined_URL(self, url, name, pw):
parts = urlparse(url)
combined = (
parts.scheme
+ "://"
+ name
+ ":"
+ pw
+ "@"
+ parts.netloc
+ "/"
+ parts.path
)
return combined
# Query the data base with the info from the form
def queryDB(self):
if not self.cb_disabled:
# ---- set-up the query-json-string
criteria = []
for k, v in self.data.items():
if k in (
"FrequencyResponse",
"SummaryFile",
"DetailsFile",
"_rev",
"_id",
"SetupSetting",
"DutSetting",
):
pass
elif isinstance(v, str) and (v != ""): # for string-type fields
cond = {"$regex": v}
criteria.append({k: cond})
elif isinstance(v, (float, int)) and v != 0: # for numerical fields
cond = {"$gt": 0.9 * v, "$lt": 1.1 * v}
criteria.append({k: cond})
selector = {"$and": criteria}
docs = self.db.get_query_result(selector) # apply query
if docs: # if data received, build the result table's ColumnDataSource
self.QueryData = []
names = []
dates = []
S_nom = []
Fmin = []
Fmax = []
for d in docs: # this is for the query-result-table in the GUI
names.append(d["SummaryFileName"])
dates.append(d["Date"])
if d['DataType'] == 'Charge-Amp':
S_nom.append(1.0e-9 * d["SNom"])
else:
S_nom.append(d["SNom"])
Fmin.append(d["Fmin"])
Fmax.append(d["Fmax"])
self.QueryData.append(d)
self.TableSource.data = dict(
name=names, date=dates, S_nom=S_nom, f_low=Fmin, f_high=Fmax
)
def createUser(self,name,type="user",roles=[],password='defaultPassowrd'):
userDocument= {
'_id': 'org.couchdb.user:'+name,
'name': name,
'type': type,
'roles': roles,
'password': password
}
self.db.create_document(userDocument)
class SensorCalibrationData:
def __init__(self,xlsFileName):
self.xlsFileName=xlsFileName
self.df = pd.read_excel(xlsFileName, 'ERGEBNISSE', engine='openpyxl', skiprows=[1])
self.dfUnits = pd.read_excel(xlsFileName, 'ERGEBNISSE', engine='openpyxl', nrows=1)
self.dfUnits = self.dfUnits.fillna("None")
self.dfKeydict = dictionary = dict(
zip(['Freq', 'Amp', 'Sensetivity', 'Sens_uncer', 'Mag_device', 'Phase', 'Phase_uncer', 'Phase_Device'],
self.df.keys()))
self.errorDf = pd.DataFrame()
self.errorDf[self.dfKeydict['Freq']] = self.df[self.dfKeydict['Freq']]
self.errorDf['mag_error']=self.df[self.dfKeydict['Sens_uncer']] / 100 * self.df[self.dfKeydict['Sensetivity']]
self.errorDf['mag_error_low'] = self.df[self.dfKeydict['Sensetivity']] - self.errorDf['mag_error']
self.errorDf['mag_error_high'] = self.df[self.dfKeydict['Sensetivity']] + self.errorDf['mag_error']
self.errorDf['phase_error_low'] = self.df[self.dfKeydict['Phase']] - self.df[self.dfKeydict['Phase_uncer']]
self.errorDf['phase_error_high'] = self.df[self.dfKeydict['Phase']] + self.df[self.dfKeydict['Phase_uncer']]
def createDict(self,deviceType='Not Set',serialNo="Not Set",owner="Not Set",date=None,operator="NotSet"):
if date==None:
date=datetime.fromtimestamp(os.stat(self.xlsFileName).st_mtime).isoformat(timespec='microseconds')
freqResponseDict1D={"Frequency":self.df[self.dfKeydict['Freq']].tolist(),
"ExcitationAmp":self.df[self.dfKeydict['Amp']].tolist(),
"Magnitude": self.df[self.dfKeydict['Sensetivity']].tolist(),
"StdevMag": self.errorDf['mag_error'].tolist(),
"RelStdevMagPerCent": self.df[self.dfKeydict['Sens_uncer']].tolist(),
"PhaseInDeg": self.df[self.dfKeydict['Phase']].tolist(),
"StdevPhaseInDeg": self.df[self.dfKeydict['Phase_uncer']].tolist(),
"Mag_Device":self.df[self.dfKeydict['Mag_device']].tolist(),
"Phase_Device":self.df[self.dfKeydict['Phase_Device']].tolist(),
"units":self.dfUnits.to_dict(orient='index')[0]
}
self.dict={"_id":re.sub(r'\W+', '',str(date))+re.sub(r'\W+', '', deviceType),
"DataType":"Acceleration sensor Transferfunction 1D",
"DeviceType":deviceType,
"SerialNo":serialNo,
"Owner":owner,
"Date":str(date),
"Operator":operator,
"FrequencyResponse":freqResponseDict1D
}
print(self.dict)
return self.dict
def uploadDataAndAttachemntsToCouchDB(self,cdb):
try:
self.dict
except NameError:
raise RuntimeError("You need to run self.createDict before DatabaseUpload")
self.dbEntry=cdb.create_document(calDataDict)
self.dbEntry.put_attachment('Measurment XSLX','application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',open(self.xlsFileName, "rb").read())
def uploadDUMYDCCXML(self,dumyXMLFileName,dataBaseEntry=None):
if dataBaseEntry==None:
try:
self.dbEntry
except NameError:
raise RuntimeError("You need to upload data from this class to databes with command uploadDataAndAttachemntsToCouchDB before uploading an dummyXML. OR specify databaseentry to upload xml to with dataBaseEntry keyword ")
else:
self.dbEntry=dataBaseEntry
self.dbEntry.put_attachment('DCC_Dummy',
'application/xml',
open(dumyXMLFileName, "rb").read())
if __name__=='main':
xlsFileName = '../sampledata/ERGEBNISS_sheets/20220708_8305_SN1864992_Auswertung.xlsx'
callData=SensorCalibrationData(xlsFileName)
def upload_fit_data(attr, old, new):
print("fit data upload succeeded")
print(FI.value)
FI = FileInput(accept=".csv,.json,.txt")
FI.on_change('value', upload_fit_data)
# set output to static HTML file
output_file(filename="custom_filename.html", title="Static HTML file")
# This is important! Save curdoc() to make sure all threads
# see the same document.
doc = curdoc()
# Make the plot
tfPlot = bokeh.plotting.figure(
height=500,
width=1000,
x_axis_label=str(callData.dfKeydict['Freq']+' in '+callData.dfUnits[callData.dfKeydict['Freq']][0]),
y_axis_label=str(callData.dfKeydict['Sensetivity']+' in '+callData.dfUnits[callData.dfKeydict['Sensetivity']][0]),
y_range=(callData.df[callData.dfKeydict['Sensetivity']].min(), callData.df[callData.dfKeydict['Sensetivity']].max()),
)
tfPlot.circle(
source=callData.df,
x=callData.dfKeydict['Freq'],
y=callData.dfKeydict['Sensetivity'],
)
# Add error bars
tfPlot.segment(
source=callData.errorDf,
x0=callData.dfKeydict['Freq'],
y0='mag_error_low',
x1=callData.dfKeydict['Freq'],
y1='mag_error_high',
line_width=2
)
tfPlot.extra_y_ranges = {"y2": Range1d(start=callData.df[callData.dfKeydict['Phase']].min() - callData.df[callData.dfKeydict['Phase_uncer']].max(),
end=callData.df[callData.dfKeydict['Phase']].max()+callData.df[callData.dfKeydict['Phase_uncer']].max())}
tfPlot.add_layout(LinearAxis(y_range_name="y2",
axis_label=str(callData.dfKeydict['Phase']+' in '+callData.dfUnits[callData.dfKeydict['Phase']][0])), 'right', )
tfPlot.circle(source=callData.df, x=callData.dfKeydict['Freq'], y=callData.dfKeydict['Phase'], color="red", y_range_name="y2")
tfPlot.segment(
source=callData.errorDf,
x0=callData.dfKeydict['Freq'],
y0='phase_error_low',
x1=callData.dfKeydict['Freq'],
y1='phase_error_high',
line_width=2,
color='red',
y_range_name="y2"
)
data_table = DataTable(
columns=[TableColumn(field=Ci, title=Ci) for Ci in callData.df.columns],
source=ColumnDataSource(callData.df),
height=500,
width=1000
)
show(column(FI, tfPlot, data_table))
calDataDict=callData.createDict(deviceType='Brueel & Kjaer 8305-001',serialNo="1502245",owner="Customer Company",date=None,operator="D. Nordmann")
CDBA=CDBAcces()
CDBA.connect_DB()
#caldataDBEntry=callData.uploadDataAndAttachemntsToCouchDB(CDBA.db)
#callData.uploadDUMYDCCXML('../sampledata/dcc-vacuumlab-CDG.xml')
# Check that the document exists in the database
#if userDocument.exists():
print('SUCCESS!!')
\ No newline at end of file
import json
import numpy as np
#for interpolation methodes
from scipy.interpolate import interp1d
import functools
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
DsiASCICOnversion={ 'Hz' : r'\hertz',
'm/s^2' : r'\metre\second\tothe{-2}',
'mV/(m/s^2)' : r'\milli\volt\metre\tothe{-2}\second\tothe{2}',
'%' : r'\percent',
'degree' : r'\degree',
'None' :''
}
revd=dict([reversed(i) for i in DsiASCICOnversion.items()])
DsiASCICOnversion.update(revd)
del revd
def getpartialInterpolFunction(type):
if type in ['data', 'linear', 'cubic']:
return functools.partial(interp1d ,kind=type)
else:
raise NotImplementedError("Interpolation Type " + str(type) + r" is not supoorted use ['data', 'linear', 'cubic']")
#interpolationMethode='linear'
self.interPolationType='linear'
self.interpolationFunction=getpartialInterpolFunction(interpolationMethode)
class dsiVector:
def __init__(self,values,uncer,quantity,unit,uncerType="absolute"):
#TODO private vars
self.__dict__={
'values':np.array(values),
'unit':unit,
'quantity':quantity,
'originalUncerType':uncerType}
if uncerType=="absolute":
self.__dict__['uncer']=uncer
elif uncerType=='rel':
self.__dict__['uncer']= uncer * values
elif uncerType=='relPercent':
self.__dict__['uncer']= uncer / 100 * values
elif uncerType=='relPPM':
self.__dict__['uncer']= uncer / 1e6 * values
else:
raise NotImplementedError("uncetType "+str(uncerType)+r" is not supoorted/implemented use ['absolute','rel','relPercent','relPPM']")
@classmethod
def fromdict(cls, dict):
#change constructor so that np.arrays are created
instance=cls(np.array(dict['values']),
np.array(dict['uncer']),
dict['unit'],
dict['quantity'],
uncerType=dict['originalUncerType'])
additionalkeys=dict.keys()-set(['values','uncer','unit','quantity','originalUncerType'])
for key in additionalkeys:
instance.__dict__[key]=dict[key]
#TODO add additional key vaule pairs
return instance
@classmethod
def fromjson(cls, jsonstr):
dict=json.loads(jsonstr)
return cls.fromdict(dict)
def jsonDumps(self):
return json.dumps(self.__dict__, cls=NumpyEncoder)
def __getitem__(self, key):
print(key)
if type(key)==int:
return (self.__dict__['values'][key], self.__dict__['uncer'][key])
elif type(key)==str:
try:
return self.__dict__[key]
except KeyError:
if key=='uncer_relPercent':
return self.__dict__['uncer'] / self.__dict__['values'] * 100
elif key=='uncer_relPPM':
return self.__dict__['uncer'] / self.__dict__['values'] * 1e6
elif key=='uncer_rel':
return self.__dict__['uncer'] / self.__dict__['values']
else:
raise KeyError(key +' not supoorted try:' + str(list(self.__dict__.keys())) + r" or ['uncer_rel','uncer_relPercent','uncer_relPPM]")
elif type(key)==tuple:
if type(key[0])==str and type(key[1])==int:
try:
return self.__dict__[key[0]][key[1]]
except KeyError:
if key[0] == 'uncer_relPercent':
return self.__dict__['uncer'][key[1]] / self.__dict__['values'][key[1]] * 100
elif key[0] == 'uncer_relPPM':
return self.__dict__['uncer'][key[1]] / self.__dict__['values'][key[1]] * 1e6
elif key[0] == 'uncer_rel':
return self.__dict__['uncer'][key[1]] / self.__dict__['values'][key[1]]
else:
raise KeyError(key + ' not supoorted try:' + str(
list(self.__dict__.keys())) + r" or ['uncer_relPercent','uncer_relPPM]")
else:
raise KeyError (key +" Not Supported try dsiVector[int], dsiVector[" + str(
list(self.__dict__.keys())) + r" or 'uncer_relPercent','uncer_relPPM" + "dsiVector[values,int] or dsiVector['uncer',int]")
def __str__(self):
length=self['values'].size
string='DSI_dict :'+str(self['quantity'])+' in '+str(self['unit'])+' len='+str(length)+' '
if length<8:
for i in range(length):
string=string+str(self[i])+' '
else:
firstblock=''
secondblock=''
for i in range(4):
firstblock=firstblock+str(self[i])+' '
secondblock = secondblock + str(self[-(i+1)])
string=string+firstblock+' ... '+secondblock
return string
def __repr__(self):
return 'dsiVector @'+hex(id(self)) +' '+self.__str__()
class dsiMultiVector:
def __init__(self,indexVector,valueVectors,interpolationMethode='None'):
self.__dict__['index']=indexVector
for dataVector in valueVectors:
self.__dict__[dataVector.key]=dataVector
self.__dict__[interpolationMethode]=interpolationMethode
import numpy as np
from pccDccTools import dsiVector,dsiMultiVector,DsiASCICOnversion
if __name__ == "__main__":
testDSiVector = dsiVector((np.arange(20) + 1) * 0.5, np.ones(20) * 0.1, 'Magnitude', r'\volt', uncerType="relPercent")
print(testDSiVector[10])
print(testDSiVector['unit'])
print(testDSiVector['quantity'])
print(testDSiVector['uncer'])
print(testDSiVector['uncer_relPercent'])
# print(testDSiVector['test'])
print(testDSiVector['values', 10])
print(testDSiVector['uncer_relPercent', 10])
# print(testDSiVector['uncer_relPercent', 1000])
jsonStr = testDSiVector.jsonDumps()
loadedDSIVector = dsiVector.fromjson(jsonStr)
print(loadedDSIVector['values', 10])
print(testDSiVector.__str__())
\ No newline at end of file
Subproject commit 5308b1bd2d0e6d915aa353966d84084d056c03a7
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment