Skip to content
Snippets Groups Projects
Commit 976be479 authored by Wact B. Prot's avatar Wact B. Prot
Browse files

Merge branch 'master' of github.com:wactbprot/anselm

parents 98d6de83 4f8e8ce1
No related branches found
No related tags found
No related merge requests found
...@@ -7,53 +7,5 @@ requirements ...@@ -7,53 +7,5 @@ requirements
============ ============
* couchdb (long-term memory, ltm) * couchdb (long-term memory, ltm)
* mongodb (short-term memory, stm) * PyQt5
* rabbidmq (message broker, msg)
opensuse specials
=================
$> zypper in rabbitmq-server
$> zypper in rabbitmq-server-plugins
$> rabbitmq-plugins enable rabbitmq_management
/etc/hosts:
hostname 127.0.0.1
$> systemctl enable rabbitmq-server.service
$> systemctl start rabbitmq-server.service
todo rabbitmq
=============
* proper shutdown
todo config
===========
* config.json entries for exchanges and queues together with
description of their functionality
build api
=========
Idea is:
* make exchange, container etc. databases
* collections with mp_def id
exchange and insert_one
=======================
How to organize a fast write_to_exchange?
Idea is:
Don't search and replace if an entry already exist.
Use ``insert_one`` regardless of already existing documents. Simply
read out the last written document by ``db.coll.find_one()[n-1]``.
But:
This may become slow if N goes up (e.g. on setting pressure
processes N will be several 1000)
Solution:
``delete_many()`` *before* ``insert_one()``
...@@ -3,93 +3,162 @@ import json ...@@ -3,93 +3,162 @@ import json
import argparse import argparse
from threading import Thread from threading import Thread
from anselm.system import System # pylint: disable=E0611 from anselm.system import System # pylint: disable=E0611
from anselm.long_term_memory import LongTermMemory # pylint: disable=E0611 from anselm.db import DB # pylint: disable=E0611
from anselm.short_term_memory import ShortTermMemory # pylint: disable=E0611
from anselm.worker import Worker # pylint: disable=E0611 from anselm.worker import Worker # pylint: disable=E0611
from PyQt5.QtWidgets import QWidget, QDesktopWidget, QApplication, QPushButton, QComboBox, QGridLayout
class Anselm(System): import sys
"""
https://chase-seibert.github.io/blog/2014/03/21/python-multilevel-argparse.html
always talk to short-term-memory, if there is somthing not in stm try to remember class Anselm(System):
""" state = {}
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.ltm = LongTermMemory() self.db = DB()
self.stm = ShortTermMemory()
self.worker = Worker() self.worker = Worker()
self.current_grid_line = 1
self.initUI()
def initUI(self):
self.win = QWidget()
self.win.resize(250, 150)
self.win.setWindowTitle('Anselm')
self.grid = QGridLayout()
add_device_bttn = QPushButton("add device", self.win)
add_device_bttn.clicked.connect(self.add_device_line)
self.add_widget_to_grid(add_device_bttn ,1, 2)
self.draw_grid()
def add_device_line(self):
self.current_grid_line +=1
line = self.current_grid_line
line_key = self.get_line_key(line)
parser = argparse.ArgumentParser( self.state[line_key] = {}
description='check systems',
usage='''anselm <command> [<args>]''') run_bttn = self.make_run_bttn(line = line)
auxobj_combo = self.make_auxobj_combo(line = line)
run_kind_combo = self.make_run_kind_combo(line = line)
parser.add_argument('command', help='Subcommand to run') self.add_widget_to_grid(run_bttn, line, 1)
args = parser.parse_args(sys.argv[1:2]) self.add_widget_to_grid(run_kind_combo, line, 2)
self.add_widget_to_grid(auxobj_combo, line, 3)
self.draw_grid()
if not hasattr(self, args.command): def draw_grid(self):
parser.print_help() self.win.setLayout(self.grid)
exit(1) self.win.show()
if len(args.command) > self.max_arg_len: def make_run_bttn(self, line):
print("command too long") run_device_bttn = QPushButton("run", self.win)
exit(1) run_device_bttn.clicked.connect(lambda: self.run_device(line))
getattr(self, args.command)() return run_device_bttn
def make_run_kind_combo(self, line):
run_kinds = ["single", "loop"]
combo = self.make_combo(run_kinds, first_item = None)
combo.currentIndexChanged.connect(lambda: self.run_kind_selected(combo, line))
return combo
def clear_stm(self): def make_auxobj_combo(self, line):
self.stm.clear_stm()
aux_obj_ids = self.db.get_auxobj_ids()
self.log.debug("found following auxobj ids {}".format(aux_obj_ids))
combo = self.make_combo(aux_obj_ids)
combo.currentIndexChanged.connect(lambda: self.auxobj_selected(combo, line))
return combo
def build_auxobj_db_for(self): def make_task_combo(self, doc_id, line):
"""
usage: task_names = self.db.get_task_names(doc_id = doc_id)
self.log.debug("found following tasknames {}".format(task_names))
combo = self.make_combo(task_names)
combo.currentIndexChanged.connect(lambda: self.task_selected(combo, line))
> python anselm provide_excahnge_for calid return combo
""" def run_kind_selected(self, combo, line):
parser = argparse.ArgumentParser(description="builds the api for the mp given by id")
parser.add_argument('id') run_kind = combo.currentText()
arg = parser.parse_args(sys.argv[2:3]) line_key = self.get_line_key(line)
self.state[line_key]['run_kind'] = run_kind
if len(arg.id) < self.max_arg_len: def task_selected(self, combo, line):
doc = self.ltm.get_auxobj(arg.id)
if doc:
self.stm.build_auxobj_db(doc)
def list_tasks_for(self): line_key = self.get_line_key(line)
parser = argparse.ArgumentParser(description="list the tasks for given by id")
parser.add_argument('id') doc_id = self.state[line_key]['doc_id']
task_name = combo.currentText()
self.state[line_key]['task_name'] = task_name
arg = parser.parse_args(sys.argv[2:3]) self.log.info("task with name {} selected at line {}".format(task_name, line))
id = arg.id self.log.debug("state dict: {}".format(self.state))
if len(id) < self.max_arg_len:
self.stm.get_tasknames(id) task = self.db.get_task(doc_id, task_name)
self.state[line_key]['task'] = task
self.log.debug("task: {}".format(task))
def run_task(self): def auxobj_selected(self, combo, line):
parser = argparse.ArgumentParser(description="builds the api for the mp given by id") doc_id = combo.currentText()
line_key = self.get_line_key(line)
parser.add_argument('id') self.state[line_key]['doc_id'] = doc_id
parser.add_argument('taskname')
arg = parser.parse_args(sys.argv[2:4]) self.log.debug("select {} at line {}".format(doc_id, line))
id = arg.id
taskname = arg.taskname
if len(id) < self.max_arg_len and len(taskname) < self.max_arg_len: auxobj_combo = self.make_task_combo(doc_id = doc_id, line = line)
task = self.stm.get_task(id, taskname) self.add_widget_to_grid(widget=auxobj_combo, line=line, col=4)
if task: self.draw_grid()
Thread(target=self.worker.run, args=(task, )).start()
self.log.info("start thread for task: {}".format(task['TaskName'])) def add_widget_to_grid(self, widget, line, col):
else:
self.log.error("task not found") #old_widget_item = self.grid.itemAtPosition (line, col)
#old_widget = old_widget_item.widget()
self.grid.addWidget(widget, line, col)
def make_combo(self, item_list, first_item='select'):
combo = QComboBox(self.win)
if first_item:
combo.addItem(first_item)
for item in item_list:
combo.addItem(item)
return combo
def get_line_key(self, line):
return 'line_{}'.format(line)
def run_device(self, line):
line_key = self.get_line_key(line)
task = None
self.log.info("start device at line {}".format(line))
if line_key in self.state:
if 'task' in self.state[line_key]:
task = self.state[line_key]['task']
else:
self.log.error("no task selected at line {}".format(line))
if task:
Thread(target=self.worker.run, args=(task, )).start()
if __name__ == '__main__': if __name__ == '__main__':
Anselm()
app = QApplication(sys.argv)
ex = Anselm()
sys.exit(app.exec_())
{ {
"mongodb": {
"host": "localhost",
"port": 27017
},
"couchdb": { "couchdb": {
"host": "localhost", "host": "localhost",
"port": 5984, "port": 5984,
"database": "vl_db", "database": "vl_db",
"view": { "view": {
"mpd": "dbmp/mpdocs" "auxobj": "share/AuxObject"
} }
}, },
"relay":{ "relay":{
"host": "i75419.berlin.ptb.de", "host": "i75419.berlin.ptb.de",
"port": 55555 "port": 55555
}, },
"loglevel": "INFO" "loglevel": "DEBUG"
} }
from anselm.system import System
import couchdb
import json
class DB(System):
def __init__(self):
super().__init__()
self.init_db()
self.log.info("database ")
def init_db(self):
db_dict = self.config['couchdb']
port = db_dict['port']
host = db_dict['host']
url = 'http://{}:{}/'.format(host, port)
self.db_dict = db_dict
self.db_srv = couchdb.Server(url)
self.db = self.db_srv[self.db_dict['database']]
self.log.info("long-term memory system ok")
def store_doc(self, doc):
id = doc['_id']
dbdoc = self.db[id]
if dbdoc:
doc['_rev'] = dbdoc['_rev']
else:
doc.pop('_rev', None)
self.db.save(doc)
def get_auxobj_ids(self):
view = self.db_dict['view']['auxobj']
return [doc['id'] for doc in self.db.view(view)]
def get_red_doc(self, doc_id):
doc = self.db[doc_id]
red_doc = None
if doc:
if 'AuxObject' in doc:
red_doc = doc['AuxObject']
if 'CalibrationObject' in doc:
red_doc = doc['CalibrationObject']
else:
self.log.error("no doc with id {}".format(doc_id))
if red_doc:
return red_doc
else:
return None
def get_task_names(self, doc_id):
doc = self.get_red_doc(doc_id)
if doc and 'Task' in doc:
return [task['TaskName'] for task in doc['Task']]
else:
return []
def get_task(self, doc_id, task_name):
doc = self.get_red_doc(doc_id)
if doc and 'Task' in doc:
tasks = doc['Task']
for task in tasks:
if task['TaskName'] == task_name:
break
if 'Defaults' in doc:
defaults = doc['Defaults']
task = self.replace_defaults(task=task, defaults=defaults)
return task
else:
self.log.error("no doc with id {}".format(doc_id))
return []
def get_auxobj(self, id):
doc = self.db[id]
if doc:
return doc
else:
self.log.error("document with id: {} does not exist".format(id))
return None
def replace_defaults(self, task, defaults):
strtask = json.dumps(task)
if isinstance(defaults, dict):
for key, val in defaults.items():
if isinstance(val, int) or isinstance(val, float):
val = '{}'.format(val)
val = val.replace('\n', '\\n')
val = val.replace('\r', '\\r')
strtask = strtask.replace(key, val)
else:
self.log.error("defaults is not a dict")
try:
task = json.loads(strtask)
except:
self.log.error("replacing defaults fails for")
return task
from anselm.system import System
import couchdb
import json
class LongTermMemory(System):
def __init__(self):
super().__init__()
self.init_log()
self.init_ltm()
self.log.info("long-term memory system start consuming")
def init_ltm(self):
ltm_dict = self.config['couchdb']
port = ltm_dict['port']
host = ltm_dict['host']
url = 'http://{}:{}/'.format(host, port)
self.ltm_dict = ltm_dict
self.ltm = couchdb.Server(url)
self.ltm_db = self.ltm[self.ltm_dict['database']]
self.log.info("long-term memory system ok")
def store_doc(self, doc):
id = doc['_id']
dbdoc = self.ltm_db[id]
if dbdoc:
doc['_rev'] = dbdoc['_rev']
else:
doc.pop('_rev', None)
self.ltm_db.save(doc)
def get_mps(self):
view = self.ltm_dict['view']['mpd']
for mp in self.ltm_db.view(view):
if mp.id and mp.key == "mpdoc":
self.ltm_db[mp.id]
def get_auxobj(self, id):
doc = self.ltm_db[id]
if doc:
return doc
else:
self.log.error("document with id: {} does not exist".format(id))
return None
import sys
from anselm.system import System
from pymongo import MongoClient
import json
class ShortTermMemory(System):
def __init__(self):
super().__init__()
self.log.info("start long-term memory init function")
stm_dict = self.config['mongodb']
self.stm_dict = stm_dict
self.stm = MongoClient(stm_dict['host'], stm_dict['port'])
def get_task(self, id, taskname):
task = self.stm[id]['tasks'].find({"TaskName": taskname})
n = task.count()
if n > 0:
return task[n-1]
def get_tasknames(self, id):
tasks = self.stm[id]['tasks'].find()
for task in tasks:
print(task['TaskName'])
def clear_stm(self):
"""
Clears the stm by droping all databasese.
"""
n=0
for database in self.stm.database_names():
n=n+1
self.stm.drop_database(database)
self.log.info("drop databes {}".format(database))
self.log.info("amount of droped databases: {}".format(n))
def build_auxobj_db(self, doc):
if '_id' in doc:
id = doc['_id']
db = self.stm[id]
db_coll_org = db['org']
db_coll_org = doc
db_coll_task = db['tasks']
if 'AuxObject' in doc:
doc = doc['AuxObject']
if 'Defaults' in doc:
defaults = doc['Defaults']
else:
self.log.warning("no defaults in AuxObject with id: {}".format(id))
if 'Task' in doc:
tasks = doc['Task']
else:
self.log.error("no task in AuxObject with id: {}".format(id))
if 'tasks' in locals() and 'defaults' in locals():
for _, task in enumerate(tasks):
task = self.replace_defaults(task, defaults)
task['_id'] = "{}@{}".format(task['TaskName'], id)
db_coll_task.insert_one(task)
def replace_defaults(self, task, defaults):
strtask = json.dumps(task)
if isinstance(defaults, dict):
for key, val in defaults.items():
if isinstance(val, int) or isinstance(val, float):
val = '{}'.format(val)
val = val.replace('\n', '\\n')
val = val.replace('\r', '\\r')
strtask = strtask.replace(key, val)
else:
self.log.error("defaults is not a dict")
try:
task = json.loads(strtask)
except:
self.log.error("replacing defaults fails for")
return task
...@@ -11,7 +11,7 @@ class System: ...@@ -11,7 +11,7 @@ class System:
""" """
max_arg_len = 40 max_arg_len = 40
log_fmt = '%(asctime)s,%(msecs)03d %(hostname)s %(filename)s:%(lineno)s %(levelname)s %(message)s' log_fmt = '%(asctime)s,%(msecs)03d %(hostname)s %(filename)s:%(lineno)s %(levelname)s %(message)s'
state = {}
def __init__(self): def __init__(self):
""" """
Gets the configuration out of the file: ``config.json``. Gets the configuration out of the file: ``config.json``.
...@@ -20,9 +20,8 @@ class System: ...@@ -20,9 +20,8 @@ class System:
# open and parse config file # open and parse config file
with open('anselm/config.json') as json_config_file: with open('anselm/config.json') as json_config_file:
self.config = json.load(json_config_file) self.config = json.load(json_config_file)
self.init_log()
self.init_log()
def init_log(self): def init_log(self):
self.log = logging.getLogger() self.log = logging.getLogger()
coloredlogs.install( coloredlogs.install(
......
import requests import requests
import json import json
from anselm.system import System from anselm.system import System
from anselm.short_term_memory import ShortTermMemory
class Worker(System): class Worker(System):
...@@ -14,7 +13,6 @@ class Worker(System): ...@@ -14,7 +13,6 @@ class Worker(System):
self.relay_url = "http://{}:{}".format(relay_dict['host'], relay_dict['port']) self.relay_url = "http://{}:{}".format(relay_dict['host'], relay_dict['port'])
self.headers = {'content-type': 'application/json'} self.headers = {'content-type': 'application/json'}
self.stm = ShortTermMemory()
def run(self, task): def run(self, task):
acc = task['Action'] acc = task['Action']
...@@ -28,7 +26,9 @@ class Worker(System): ...@@ -28,7 +26,9 @@ class Worker(System):
if 'Result' in res: if 'Result' in res:
print(res['Result']) print(res['Result'])
print(self.state)
print("dddddddddddddddddd")
if 'ToExchange' in res: if 'ToExchange' in res:
print(res['ToExchange']) print(res['ToExchange'])
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment