diff --git a/anselm.py b/anselm.py index aeaff3606030d15b091f5f88d4773e84e87780c8..7a8a858e92b65d6d4ee89e6d19716a4d89f08d50 100644 --- a/anselm.py +++ b/anselm.py @@ -18,9 +18,10 @@ class Anselm(System): host = msg_dict['host'] self.msg_param = pika.ConnectionParameters(host=host) - self.init_ltm_msg_prod() + self.init_ctrl_msg_prod() self.init_stm_msg_prod() - + self.init_ltm_msg_prod() + parser = argparse.ArgumentParser( description='check systems', usage='''anselm <command> [<args>]''') @@ -38,50 +39,21 @@ class Anselm(System): getattr(self, args.command)() - def start(self): - """ - """ - - ## dont work, see tagline - self.stm_pub(body_dict={ - 'do':'start', + def ini_mps(self): + self.ltm_pub(body_dict={ + 'do':'get_mps', 'payload':{} }) self.ltm_conn.close() def clear_stm(self): - """ - """ - self.stm_pub(body_dict={ - 'do':'clear_all', + 'do':'clear_stm', 'payload':{}}) self.stm_conn.close() - - def build_mp_db_for(self): - """ - usage: - - > python anselm build_mp_db_for mpid - - """ - parser = argparse.ArgumentParser( - description="builds the api for the mp given by id") - - parser.add_argument('mpid') - arg = parser.parse_args(sys.argv[2:3]) - - if len(arg.calid) < self.max_arg_len: - self.stm_pub(body_dict={ - 'do':'build_mp_db', - 'payload':{"id": arg.mpid} - }) - - self.stm_conn.close() - - def build_cal_mp_for(self): + def build_auxobj_mp_for(self): """ usage: @@ -91,16 +63,16 @@ class Anselm(System): parser = argparse.ArgumentParser( description="builds the api for the mp given by id") - parser.add_argument('calid') + parser.add_argument('id') arg = parser.parse_args(sys.argv[2:3]) - if len(arg.calid) < self.max_arg_len: - self.stm_pub(body_dict={ - 'do':'build_cal_db', - 'payload':{"id": arg.calid} + if len(arg.id) < self.max_arg_len: + self.ltm_pub(body_dict={ + 'do':'get_auxobj', + 'payload':{"id": arg.id} }) - self.stm_conn.close() + self.ctrl_conn.close() def read_exchange(self): diff --git a/anselm/ctrl.py b/anselm/ctrl.py index bd742ae94bd75f42a10e977880c02c0ff6255f7a..2ced0cbee88babbb380ff7ac7df46fae62b2f3c0 100644 --- a/anselm/ctrl.py +++ b/anselm/ctrl.py @@ -16,27 +16,57 @@ class Ctrl(System): def dispatch(self, ch, method, props, body): res = json.loads(body) - + found = False + contains = "" + source = "" + msg ="" + if 'payload' in res: - pl = res['payload'] + payload = res['payload'] if 'contains' in res: contains = res['contains'] + self.log.info("contains: {}".format(contains)) if 'source' in res: source = res['source'] + self.log.info("source: {}".format(source)) + if 'msg' in res: msg = res['msg'] + self.log.info("msg: {}".format(msg)) - if source == "ltm" and contains == "mps": + if source == "ltm" and contains == "mpdoc": self.stm_pub(body_dict={ 'do':"insert_mp_doc", - 'payload':pl + 'payload':payload + }) + found = True + + if source == "ltm" and contains == "auxobj": + self.stm_pub(body_dict={ + 'do':"insert_auxobj_doc", + 'payload':payload }) + found = True if source == "stm" and msg == "insert_mp_doc_complete": self.stm_pub(body_dict={ 'do':"build_mp_db", - 'payload':pl + 'payload':payload }) + found = True + + if source == "stm" and msg == "insert_auxobj_doc_complete": + self.stm_pub(body_dict={ + 'do':"build_auxobj_db", + 'payload':payload + }) + found = True + + + if found: + self.log.info("found branch for routing key") + else: + self.log.error("no branch found") diff --git a/anselm/long_term_memory.py b/anselm/long_term_memory.py index 2d87209d0a022cae46d2791a7148831838424a15..162c21905264e0e214f46c379cfb19975bdaf0a4 100644 --- a/anselm/long_term_memory.py +++ b/anselm/long_term_memory.py @@ -11,22 +11,36 @@ class LongTermMemory(System): self.init_ltm() self.log.info("long-term memory system start consuming") - self.init_stm_msg_prod() - self.init_ltm_msg_prod() + self.init_ctrl_msg_prod() self.init_ltm_msg_consume(callback=self.dispatch) def dispatch(self, ch, method, props, body): + res = json.loads(body) + do = res['do'] + found = False - do, pl = self.parse_body(body) - - if do == "start": - self.get_mp_defs() - + if 'payload' in res: + pl = res['payload'] + + if do == "get_mps": + self.get_mps() + found = True + + if do == "get_auxobj": + if 'id' in pl: + self.get_auxobj(pl['id']) + else: + self.log.error("payload contains no id") + found = True + if do == "store_doc": self.store_doc(pl) + found = True - self.log.info("dispatch to do: {}".format(do)) - + if found: + self.log.info("dispatch to do: {}".format(do)) + else: + self.log.error("found no dispatch case for {}".format(do)) def init_ltm(self): ltm_dict = self.config['couchdb'] @@ -37,28 +51,41 @@ class LongTermMemory(System): self.ltm_dict = ltm_dict self.ltm = couchdb.Server(url) self.ltm_db = self.ltm[self.ltm_dict['database']] - self.ltm_db_sav = self.ltm["{}_sav".format(self.ltm_dict['database'])] self.log.info("long-term memory system ok") def store_doc(self, doc): id = doc['_id'] - dbdoc = self.ltm_db_sav[id] + dbdoc = self.ltm_db[id] if dbdoc: doc['_rev'] = dbdoc['_rev'] else: doc.pop('_rev', None) - self.ltm_db_sav.save(doc) + self.ltm_db.save(doc) - def get_mp_defs(self): + def get_mps(self): view = self.ltm_dict['view']['mpd'] for mp in self.ltm_db.view(view): if mp.id and mp.key == "mpdoc": doc = self.ltm_db[mp.id] - self.stm_pub(body_dict={ - 'do':'insert_document', - 'payload':doc} + self.ctrl_pub(body_dict={ + 'contains':'mpdoc', + 'source':'ltm', + 'payload': doc} ) else: self.log.info( "document with id: {} will not be published".format(mp.id)) + + def get_auxobj(self, id): + doc = self.ltm_db[id] + if doc: + self.ctrl_pub(body_dict={ + 'contains':'auxobj', + 'source':'ltm', + 'payload': doc} + ) + + else: + self.log.info( + "document with id: {} will not found".format(id)) diff --git a/anselm/short_term_memory.py b/anselm/short_term_memory.py index 9dc3909875496cf99c67579cd58f63fcba3102d9..e459c33d12a2192c08fb05a4345554f93280cda7 100644 --- a/anselm/short_term_memory.py +++ b/anselm/short_term_memory.py @@ -26,11 +26,19 @@ class ShortTermMemory(System): self.insert_mp_doc(pl) found=True + if do == "insert_auxobj_doc": + self.insert_auxobj_doc(pl) + found=True + if do == "build_mp_db": self.build_mp_db(pl['id']) found=True - - if do == "clear_all": + + if do == "build_auxobj_db": + self.build_auxobj_db(pl['id']) + found=True + + if do == "clear_stm": self.clear_stm() found=True @@ -50,15 +58,18 @@ class ShortTermMemory(System): def init_stm(self): """Generates the api databases and the source doc collection. """ - self.exchange_db = self.stm['mp_exchange'] - self.container_description_db = self.stm['mp_container_description'] - self.container_definition_db = self.stm['mp_container_definition'] - self.container_element_db = self.stm['mp_container_element'] - self.container_ctrl_db = self.stm['mp_container_ctrl'] + self.mp_container_db = self.stm['mp_exchange'] + self.mp_container_description_db = self.stm['mp_container_description'] + self.mp_container_definition_db = self.stm['mp_container_definition'] + self.mp_container_element_db = self.stm['mp_container_element'] + self.mp_container_ctrl_db = self.stm['mp_container_ctrl'] - self.stm_mp_db = self.stm['mp_def'] + self.stm_mp_db = self.stm['mp'] self.mp_doc_coll = self.stm_mp_db['mp_doc'] - + + self.stm_auxobj_db = self.stm['auxobj'] + self.auxobj_doc_coll = self.stm_auxobj_db['auxobj_doc'] + self.log.info("short-term memory system ok") @@ -94,19 +105,59 @@ class ShortTermMemory(System): self.log.info("doc with same _id and _rev already exists") self.ctrl_pub(body_dict={ - 'source':'ltm' - 'msg': 'insert_mp_doc_comlete', - 'payload':doc['_id'] + 'source':'stm', + 'msg': 'insert_mp_doc_complete', + 'payload':{'id': doc['_id']} }) + + def insert_auxobj_doc(self, doc): + + if 'AuxObject' in doc: + ret = self.auxobj_doc_coll.find({'_id': doc['_id'], '_rev': doc['_rev']}) + + if ret.count() == 0: + res = self.auxobj_doc_coll.insert_one(doc) + self.log.info("insert with result: {}".format(res)) + + if ret.count() == 1: + self.log.info("doc with same _id and _rev already exists") + + self.ctrl_pub(body_dict={ + 'source':'stm', + 'msg': 'insert_auxobj_doc_complete', + 'payload':{'id': doc['_id']} + }) + else: + self.log.error("document is not an AuxObject") + + def build_auxobj_db(self, id): + doc = self.auxobj_doc_coll.find_one({'_id': id}) + doc = doc['AuxObject'] + + if 'Default' in doc: + defaults = doc['default'] + if 'Task' in doc: + tasks = doc['Task'] + + #for ... + # self.replace_defaults(task, defaults) def build_mp_db(self, id): doc = self.mp_doc_coll.find_one({'_id': id}) if doc and 'Mp' in doc: + self.log.info("found document with id: {}, start building collections".format(id)) mp = doc['Mp'] - self.log.info("found document, start building collections") - standard = mp['Standard'] - mp_name = mp['Name'] - # start with filling up exchange + + if 'Standard' in mp: + standard = mp['Standard'] + else: + standard ="none" + + if 'Name' in mp: + mp_name = mp['Name'] + else: + mp_name = "none" + self.write_exchange(id, {"StartTime":{"Type":"start", "Value":self.now()}}) if 'Exchange' in mp: @@ -115,9 +166,14 @@ class ShortTermMemory(System): for contno, entr in enumerate(mp['Container']): title = entr['Title'] - self.container_description_db[id].insert_one({'Description':entr['Description'], 'ContNo':contno, 'Title': title}) - self.container_element_db[id].insert_one({'Element':entr['Element'], 'ContNo':contno, 'Title': title}) - self.container_ctrl_db[id].insert_one({'Ctrl':entr['Ctrl'], 'ContNo':contno, 'Title': title}) + + self.mp_container_description_db[id].insert_one({'Description':entr['Description'], 'ContNo':contno, 'Title': title}) + self.mp_container_ctrl_db[id].insert_one({'Ctrl':entr['Ctrl'], 'ContNo':contno, 'Title': title}) + + if 'Element' in entr: + self.mp_container_element_db[id].insert_one({'Element':entr['Element'], 'ContNo':contno, 'Title': title}) + else: + self.mp_container_element_db[id].insert_one({'Element':[], 'ContNo':contno, 'Title': title}) definition = entr['Definition'] for serno, _ in enumerate(definition): @@ -130,21 +186,19 @@ class ShortTermMemory(System): t['MpName'] = mp_name t['Standard'] = standard - self.ltm_pub(body_dict={'do':'provide_task', 'payload':t}) else: - m = "can not find document with id: {}".format(id) self.log.error(m) sys.exit(m) - + def write_exchange(self, mpid, doc): if isinstance(doc, dict): - self.exchange_db[mpid].insert_one(doc) + self.mp_container_db[mpid].insert_one(doc) def read_exchange(self, id, find_set): - res = self.exchange_db[id].find(find_set) + res = self.mp_container_db[id].find(find_set) n = res.count() if n == 1: print(res[0]) diff --git a/anselm/system.py b/anselm/system.py index c11484a262c1d85ecfb807530428ed23ed6aec9e..1a20f87523e3991905d662778f684ae2f9984292 100644 --- a/anselm/system.py +++ b/anselm/system.py @@ -32,7 +32,6 @@ class System: self.log.info("logging system online") - def queue_factory(self, queue_name): conn = pika.BlockingConnection(self.msg_param) chan = conn.channel() @@ -52,8 +51,8 @@ class System: def init_ctrl_msg_prod(self): conn, chan = self.queue_factory(queue_name='ctrl') - self.ltm_conn = conn - self.ltm_chan = chan + self.ctrl_conn = conn + self.ctrl_chan = chan def init_ltm_msg_consume(self, callback): queue_name = 'ltm' @@ -86,7 +85,7 @@ class System: body=json.dumps(body_dict)) def ctrl_pub(self, body_dict): - self.stm_chan.basic_publish(exchange='', + self.ctrl_chan.basic_publish(exchange='', routing_key='ctrl', body=json.dumps(body_dict))