diff --git a/README.rst b/README.rst index ab7f2eddf9f1f25ff5ef65e31599f0b9399c6bce..4a5ac2b8629363a555cf76fccd88eef6af577570 100644 --- a/README.rst +++ b/README.rst @@ -7,53 +7,5 @@ requirements ============ * couchdb (long-term memory, ltm) -* mongodb (short-term memory, stm) -* rabbidmq (message broker, msg) +* PyQt5 -opensuse specials -================= - -$> zypper in rabbitmq-server -$> zypper in rabbitmq-server-plugins -$> rabbitmq-plugins enable rabbitmq_management - -/etc/hosts: -hostname 127.0.0.1 - -$> systemctl enable rabbitmq-server.service -$> systemctl start rabbitmq-server.service - - -todo rabbitmq -============= -* proper shutdown - -todo config -=========== - -* config.json entries for exchanges and queues together with - description of their functionality - - -build api -========= - -Idea is: -* make exchange, container etc. databases -* collections with mp_def id - -exchange and insert_one -======================= -How to organize a fast write_to_exchange? - -Idea is: -Don't search and replace if an entry already exist. -Use ``insert_one`` regardless of already existing documents. Simply -read out the last written document by ``db.coll.find_one()[n-1]``. - -But: -This may become slow if N goes up (e.g. on setting pressure -processes N will be several 1000) - -Solution: -``delete_many()`` *before* ``insert_one()`` diff --git a/anselm.py b/anselm.py index 14722755cbba7020087bb964cba472262c4bcbe0..a94813b549b961fb5da01bec97b027c34bcc0573 100644 --- a/anselm.py +++ b/anselm.py @@ -3,93 +3,162 @@ import json import argparse from threading import Thread from anselm.system import System # pylint: disable=E0611 -from anselm.long_term_memory import LongTermMemory # pylint: disable=E0611 -from anselm.short_term_memory import ShortTermMemory # pylint: disable=E0611 +from anselm.db import DB # pylint: disable=E0611 from anselm.worker import Worker # pylint: disable=E0611 +from PyQt5.QtWidgets import QWidget, QDesktopWidget, QApplication, QPushButton, QComboBox, QGridLayout -class Anselm(System): - """ - https://chase-seibert.github.io/blog/2014/03/21/python-multilevel-argparse.html +import sys - always talk to short-term-memory, if there is somthing not in stm try to remember - """ +class Anselm(System): + state = {} + def __init__(self): super().__init__() - - self.ltm = LongTermMemory() - self.stm = ShortTermMemory() + + self.db = DB() self.worker = Worker() + self.current_grid_line = 1 + self.initUI() + + def initUI(self): + self.win = QWidget() + self.win.resize(250, 150) + self.win.setWindowTitle('Anselm') + self.grid = QGridLayout() + + add_device_bttn = QPushButton("add device", self.win) + add_device_bttn.clicked.connect(self.add_device_line) + + self.add_widget_to_grid(add_device_bttn ,1, 2) + + self.draw_grid() + + def add_device_line(self): + self.current_grid_line +=1 + line = self.current_grid_line + line_key = self.get_line_key(line) - parser = argparse.ArgumentParser( - description='check systems', - usage='''anselm <command> [<args>]''') + self.state[line_key] = {} + + run_bttn = self.make_run_bttn(line = line) + auxobj_combo = self.make_auxobj_combo(line = line) + run_kind_combo = self.make_run_kind_combo(line = line) - parser.add_argument('command', help='Subcommand to run') - args = parser.parse_args(sys.argv[1:2]) + self.add_widget_to_grid(run_bttn, line, 1) + self.add_widget_to_grid(run_kind_combo, line, 2) + self.add_widget_to_grid(auxobj_combo, line, 3) + self.draw_grid() - if not hasattr(self, args.command): - parser.print_help() - exit(1) + def draw_grid(self): + self.win.setLayout(self.grid) + self.win.show() - if len(args.command) > self.max_arg_len: - print("command too long") - exit(1) + def make_run_bttn(self, line): + run_device_bttn = QPushButton("run", self.win) + run_device_bttn.clicked.connect(lambda: self.run_device(line)) - getattr(self, args.command)() + return run_device_bttn + + def make_run_kind_combo(self, line): + + run_kinds = ["single", "loop"] + combo = self.make_combo(run_kinds, first_item = None) + combo.currentIndexChanged.connect(lambda: self.run_kind_selected(combo, line)) + return combo - def clear_stm(self): - self.stm.clear_stm() + def make_auxobj_combo(self, line): + + aux_obj_ids = self.db.get_auxobj_ids() + + self.log.debug("found following auxobj ids {}".format(aux_obj_ids)) + + combo = self.make_combo(aux_obj_ids) + combo.currentIndexChanged.connect(lambda: self.auxobj_selected(combo, line)) + + return combo - def build_auxobj_db_for(self): - """ - usage: + def make_task_combo(self, doc_id, line): + + task_names = self.db.get_task_names(doc_id = doc_id) + + self.log.debug("found following tasknames {}".format(task_names)) + + combo = self.make_combo(task_names) + combo.currentIndexChanged.connect(lambda: self.task_selected(combo, line)) - > python anselm provide_excahnge_for calid - - """ - parser = argparse.ArgumentParser(description="builds the api for the mp given by id") + return combo + + def run_kind_selected(self, combo, line): - parser.add_argument('id') - arg = parser.parse_args(sys.argv[2:3]) + run_kind = combo.currentText() + line_key = self.get_line_key(line) + self.state[line_key]['run_kind'] = run_kind - if len(arg.id) < self.max_arg_len: - doc = self.ltm.get_auxobj(arg.id) - if doc: - self.stm.build_auxobj_db(doc) + def task_selected(self, combo, line): - def list_tasks_for(self): - parser = argparse.ArgumentParser(description="list the tasks for given by id") + line_key = self.get_line_key(line) - parser.add_argument('id') + doc_id = self.state[line_key]['doc_id'] + task_name = combo.currentText() + self.state[line_key]['task_name'] = task_name - arg = parser.parse_args(sys.argv[2:3]) - id = arg.id - if len(id) < self.max_arg_len: - self.stm.get_tasknames(id) + self.log.info("task with name {} selected at line {}".format(task_name, line)) + self.log.debug("state dict: {}".format(self.state)) + + task = self.db.get_task(doc_id, task_name) + self.state[line_key]['task'] = task + self.log.debug("task: {}".format(task)) - def run_task(self): - parser = argparse.ArgumentParser(description="builds the api for the mp given by id") + def auxobj_selected(self, combo, line): + doc_id = combo.currentText() + line_key = self.get_line_key(line) - parser.add_argument('id') - parser.add_argument('taskname') + self.state[line_key]['doc_id'] = doc_id - arg = parser.parse_args(sys.argv[2:4]) - id = arg.id - taskname = arg.taskname + self.log.debug("select {} at line {}".format(doc_id, line)) - if len(id) < self.max_arg_len and len(taskname) < self.max_arg_len: - task = self.stm.get_task(id, taskname) - if task: - Thread(target=self.worker.run, args=(task, )).start() - self.log.info("start thread for task: {}".format(task['TaskName'])) - else: - self.log.error("task not found") + auxobj_combo = self.make_task_combo(doc_id = doc_id, line = line) + self.add_widget_to_grid(widget=auxobj_combo, line=line, col=4) + self.draw_grid() + + def add_widget_to_grid(self, widget, line, col): + + #old_widget_item = self.grid.itemAtPosition (line, col) + #old_widget = old_widget_item.widget() + self.grid.addWidget(widget, line, col) + + def make_combo(self, item_list, first_item='select'): + combo = QComboBox(self.win) + if first_item: + combo.addItem(first_item) + for item in item_list: + combo.addItem(item) + return combo + def get_line_key(self, line): + return 'line_{}'.format(line) + + def run_device(self, line): + line_key = self.get_line_key(line) + task = None + + self.log.info("start device at line {}".format(line)) + if line_key in self.state: + if 'task' in self.state[line_key]: + task = self.state[line_key]['task'] + else: + self.log.error("no task selected at line {}".format(line)) + if task: + Thread(target=self.worker.run, args=(task, )).start() + - if __name__ == '__main__': - Anselm() + + app = QApplication(sys.argv) + ex = Anselm() + sys.exit(app.exec_()) + diff --git a/anselm/config.json b/anselm/config.json index 875948285771ab1f3ae0f7f52db2fa3f7f0d9d71..4de9ba0ffd4f2bb2f6499173e99f6d3c0d7fdb98 100644 --- a/anselm/config.json +++ b/anselm/config.json @@ -1,19 +1,15 @@ { - "mongodb": { - "host": "localhost", - "port": 27017 - }, "couchdb": { "host": "localhost", "port": 5984, "database": "vl_db", "view": { - "mpd": "dbmp/mpdocs" + "auxobj": "share/AuxObject" } }, "relay":{ "host": "i75419.berlin.ptb.de", "port": 55555 }, - "loglevel": "INFO" + "loglevel": "DEBUG" } diff --git a/anselm/db.py b/anselm/db.py new file mode 100644 index 0000000000000000000000000000000000000000..42590dd1cb0cfad3bfaa5c15d8f1fd7ea2037c86 --- /dev/null +++ b/anselm/db.py @@ -0,0 +1,110 @@ +from anselm.system import System +import couchdb +import json + + +class DB(System): + + def __init__(self): + super().__init__() + self.init_db() + + self.log.info("database ") + + def init_db(self): + db_dict = self.config['couchdb'] + port = db_dict['port'] + host = db_dict['host'] + url = 'http://{}:{}/'.format(host, port) + + self.db_dict = db_dict + self.db_srv = couchdb.Server(url) + self.db = self.db_srv[self.db_dict['database']] + self.log.info("long-term memory system ok") + + def store_doc(self, doc): + id = doc['_id'] + dbdoc = self.db[id] + if dbdoc: + doc['_rev'] = dbdoc['_rev'] + else: + doc.pop('_rev', None) + + self.db.save(doc) + + + def get_auxobj_ids(self): + view = self.db_dict['view']['auxobj'] + + return [doc['id'] for doc in self.db.view(view)] + + def get_red_doc(self, doc_id): + doc = self.db[doc_id] + red_doc = None + if doc: + if 'AuxObject' in doc: + red_doc = doc['AuxObject'] + + if 'CalibrationObject' in doc: + red_doc = doc['CalibrationObject'] + else: + self.log.error("no doc with id {}".format(doc_id)) + + if red_doc: + return red_doc + else: + return None + + def get_task_names(self, doc_id): + doc = self.get_red_doc(doc_id) + if doc and 'Task' in doc: + return [task['TaskName'] for task in doc['Task']] + else: + return [] + + def get_task(self, doc_id, task_name): + doc = self.get_red_doc(doc_id) + if doc and 'Task' in doc: + tasks = doc['Task'] + for task in tasks: + if task['TaskName'] == task_name: + break + + if 'Defaults' in doc: + defaults = doc['Defaults'] + task = self.replace_defaults(task=task, defaults=defaults) + + return task + else: + self.log.error("no doc with id {}".format(doc_id)) + return [] + + + + def get_auxobj(self, id): + doc = self.db[id] + if doc: + return doc + else: + self.log.error("document with id: {} does not exist".format(id)) + return None + + def replace_defaults(self, task, defaults): + strtask = json.dumps(task) + if isinstance(defaults, dict): + for key, val in defaults.items(): + if isinstance(val, int) or isinstance(val, float): + val = '{}'.format(val) + val = val.replace('\n', '\\n') + val = val.replace('\r', '\\r') + + strtask = strtask.replace(key, val) + else: + self.log.error("defaults is not a dict") + + try: + task = json.loads(strtask) + except: + self.log.error("replacing defaults fails for") + + return task diff --git a/anselm/long_term_memory.py b/anselm/long_term_memory.py deleted file mode 100644 index 14319ab25e8a2af74f11b35716b5bf22aff5f496..0000000000000000000000000000000000000000 --- a/anselm/long_term_memory.py +++ /dev/null @@ -1,50 +0,0 @@ -from anselm.system import System -import couchdb -import json - - -class LongTermMemory(System): - - def __init__(self): - super().__init__() - self.init_log() - self.init_ltm() - - self.log.info("long-term memory system start consuming") - - def init_ltm(self): - ltm_dict = self.config['couchdb'] - port = ltm_dict['port'] - host = ltm_dict['host'] - url = 'http://{}:{}/'.format(host, port) - - self.ltm_dict = ltm_dict - self.ltm = couchdb.Server(url) - self.ltm_db = self.ltm[self.ltm_dict['database']] - self.log.info("long-term memory system ok") - - def store_doc(self, doc): - id = doc['_id'] - dbdoc = self.ltm_db[id] - if dbdoc: - doc['_rev'] = dbdoc['_rev'] - else: - doc.pop('_rev', None) - - self.ltm_db.save(doc) - - def get_mps(self): - view = self.ltm_dict['view']['mpd'] - for mp in self.ltm_db.view(view): - if mp.id and mp.key == "mpdoc": - self.ltm_db[mp.id] - - - def get_auxobj(self, id): - doc = self.ltm_db[id] - if doc: - return doc - else: - self.log.error("document with id: {} does not exist".format(id)) - return None - diff --git a/anselm/short_term_memory.py b/anselm/short_term_memory.py deleted file mode 100644 index 93c60cd82c9763201d23ce91eac1cf8b3588c0b1..0000000000000000000000000000000000000000 --- a/anselm/short_term_memory.py +++ /dev/null @@ -1,95 +0,0 @@ -import sys -from anselm.system import System -from pymongo import MongoClient -import json - -class ShortTermMemory(System): - - def __init__(self): - super().__init__() - - self.log.info("start long-term memory init function") - - stm_dict = self.config['mongodb'] - self.stm_dict = stm_dict - self.stm = MongoClient(stm_dict['host'], stm_dict['port']) - - - def get_task(self, id, taskname): - task = self.stm[id]['tasks'].find({"TaskName": taskname}) - n = task.count() - if n > 0: - return task[n-1] - - def get_tasknames(self, id): - tasks = self.stm[id]['tasks'].find() - for task in tasks: - print(task['TaskName']) - - - - def clear_stm(self): - """ - Clears the stm by droping all databasese. - """ - n=0 - for database in self.stm.database_names(): - n=n+1 - self.stm.drop_database(database) - self.log.info("drop databes {}".format(database)) - - self.log.info("amount of droped databases: {}".format(n)) - - - - def build_auxobj_db(self, doc): - - if '_id' in doc: - id = doc['_id'] - - db = self.stm[id] - db_coll_org = db['org'] - db_coll_org = doc - - db_coll_task = db['tasks'] - - if 'AuxObject' in doc: - doc = doc['AuxObject'] - - if 'Defaults' in doc: - defaults = doc['Defaults'] - else: - self.log.warning("no defaults in AuxObject with id: {}".format(id)) - - if 'Task' in doc: - tasks = doc['Task'] - else: - self.log.error("no task in AuxObject with id: {}".format(id)) - - if 'tasks' in locals() and 'defaults' in locals(): - for _, task in enumerate(tasks): - task = self.replace_defaults(task, defaults) - task['_id'] = "{}@{}".format(task['TaskName'], id) - - db_coll_task.insert_one(task) - - - def replace_defaults(self, task, defaults): - strtask = json.dumps(task) - if isinstance(defaults, dict): - for key, val in defaults.items(): - if isinstance(val, int) or isinstance(val, float): - val = '{}'.format(val) - val = val.replace('\n', '\\n') - val = val.replace('\r', '\\r') - - strtask = strtask.replace(key, val) - else: - self.log.error("defaults is not a dict") - - try: - task = json.loads(strtask) - except: - self.log.error("replacing defaults fails for") - - return task diff --git a/anselm/system.py b/anselm/system.py index da3c6977ae94736c5cefe7f761f01cecfc468759..4664d28265e0d0fd32ba31b0e529719bf3ca7fab 100644 --- a/anselm/system.py +++ b/anselm/system.py @@ -11,7 +11,7 @@ class System: """ max_arg_len = 40 log_fmt = '%(asctime)s,%(msecs)03d %(hostname)s %(filename)s:%(lineno)s %(levelname)s %(message)s' - + state = {} def __init__(self): """ Gets the configuration out of the file: ``config.json``. @@ -20,9 +20,8 @@ class System: # open and parse config file with open('anselm/config.json') as json_config_file: self.config = json.load(json_config_file) - - self.init_log() - + self.init_log() + def init_log(self): self.log = logging.getLogger() coloredlogs.install( diff --git a/anselm/worker.py b/anselm/worker.py index 05a50895233dc0296d3972b78ef1522ca3a3bb9b..10fa4250f4ebcd02d14599f840ef3770308f2ea7 100644 --- a/anselm/worker.py +++ b/anselm/worker.py @@ -1,7 +1,6 @@ import requests import json from anselm.system import System -from anselm.short_term_memory import ShortTermMemory class Worker(System): @@ -14,7 +13,6 @@ class Worker(System): self.relay_url = "http://{}:{}".format(relay_dict['host'], relay_dict['port']) self.headers = {'content-type': 'application/json'} - self.stm = ShortTermMemory() def run(self, task): acc = task['Action'] @@ -28,7 +26,9 @@ class Worker(System): if 'Result' in res: print(res['Result']) - + print(self.state) + print("dddddddddddddddddd") + if 'ToExchange' in res: print(res['ToExchange']) \ No newline at end of file