Skip to content
Snippets Groups Projects
Commit 01001c54 authored by wactbprot's avatar wactbprot
Browse files

merge

parents 015d84f5 ff669c9b
Branches
No related tags found
No related merge requests found
......@@ -18,9 +18,10 @@ class Anselm(System):
host = msg_dict['host']
self.msg_param = pika.ConnectionParameters(host=host)
self.init_ltm_msg_prod()
self.init_ctrl_msg_prod()
self.init_stm_msg_prod()
self.init_ltm_msg_prod()
parser = argparse.ArgumentParser(
description='check systems',
usage='''anselm <command> [<args>]''')
......@@ -38,50 +39,21 @@ class Anselm(System):
getattr(self, args.command)()
def start(self):
"""
"""
## dont work, see tagline
self.stm_pub(body_dict={
'do':'start',
def ini_mps(self):
self.ltm_pub(body_dict={
'do':'get_mps',
'payload':{}
})
self.ltm_conn.close()
def clear_stm(self):
"""
"""
self.stm_pub(body_dict={
'do':'clear_all',
'do':'clear_stm',
'payload':{}})
self.stm_conn.close()
def build_mp_db_for(self):
"""
usage:
> python anselm build_mp_db_for mpid
"""
parser = argparse.ArgumentParser(
description="builds the api for the mp given by id")
parser.add_argument('mpid')
arg = parser.parse_args(sys.argv[2:3])
if len(arg.calid) < self.max_arg_len:
self.stm_pub(body_dict={
'do':'build_mp_db',
'payload':{"id": arg.mpid}
})
self.stm_conn.close()
def build_cal_mp_for(self):
def build_auxobj_mp_for(self):
"""
usage:
......@@ -91,16 +63,16 @@ class Anselm(System):
parser = argparse.ArgumentParser(
description="builds the api for the mp given by id")
parser.add_argument('calid')
parser.add_argument('id')
arg = parser.parse_args(sys.argv[2:3])
if len(arg.calid) < self.max_arg_len:
self.stm_pub(body_dict={
'do':'build_cal_db',
'payload':{"id": arg.calid}
if len(arg.id) < self.max_arg_len:
self.ltm_pub(body_dict={
'do':'get_auxobj',
'payload':{"id": arg.id}
})
self.stm_conn.close()
self.ctrl_conn.close()
def read_exchange(self):
......
......@@ -16,27 +16,57 @@ class Ctrl(System):
def dispatch(self, ch, method, props, body):
res = json.loads(body)
found = False
contains = ""
source = ""
msg =""
if 'payload' in res:
pl = res['payload']
payload = res['payload']
if 'contains' in res:
contains = res['contains']
self.log.info("contains: {}".format(contains))
if 'source' in res:
source = res['source']
self.log.info("source: {}".format(source))
if 'msg' in res:
msg = res['msg']
self.log.info("msg: {}".format(msg))
if source == "ltm" and contains == "mps":
if source == "ltm" and contains == "mpdoc":
self.stm_pub(body_dict={
'do':"insert_mp_doc",
'payload':pl
'payload':payload
})
found = True
if source == "ltm" and contains == "auxobj":
self.stm_pub(body_dict={
'do':"insert_auxobj_doc",
'payload':payload
})
found = True
if source == "stm" and msg == "insert_mp_doc_complete":
self.stm_pub(body_dict={
'do':"build_mp_db",
'payload':pl
'payload':payload
})
found = True
if source == "stm" and msg == "insert_auxobj_doc_complete":
self.stm_pub(body_dict={
'do':"build_auxobj_db",
'payload':payload
})
found = True
if found:
self.log.info("found branch for routing key")
else:
self.log.error("no branch found")
......@@ -11,22 +11,36 @@ class LongTermMemory(System):
self.init_ltm()
self.log.info("long-term memory system start consuming")
self.init_stm_msg_prod()
self.init_ltm_msg_prod()
self.init_ctrl_msg_prod()
self.init_ltm_msg_consume(callback=self.dispatch)
def dispatch(self, ch, method, props, body):
res = json.loads(body)
do = res['do']
found = False
do, pl = self.parse_body(body)
if do == "start":
self.get_mp_defs()
if 'payload' in res:
pl = res['payload']
if do == "get_mps":
self.get_mps()
found = True
if do == "get_auxobj":
if 'id' in pl:
self.get_auxobj(pl['id'])
else:
self.log.error("payload contains no id")
found = True
if do == "store_doc":
self.store_doc(pl)
found = True
self.log.info("dispatch to do: {}".format(do))
if found:
self.log.info("dispatch to do: {}".format(do))
else:
self.log.error("found no dispatch case for {}".format(do))
def init_ltm(self):
ltm_dict = self.config['couchdb']
......@@ -37,28 +51,41 @@ class LongTermMemory(System):
self.ltm_dict = ltm_dict
self.ltm = couchdb.Server(url)
self.ltm_db = self.ltm[self.ltm_dict['database']]
self.ltm_db_sav = self.ltm["{}_sav".format(self.ltm_dict['database'])]
self.log.info("long-term memory system ok")
def store_doc(self, doc):
id = doc['_id']
dbdoc = self.ltm_db_sav[id]
dbdoc = self.ltm_db[id]
if dbdoc:
doc['_rev'] = dbdoc['_rev']
else:
doc.pop('_rev', None)
self.ltm_db_sav.save(doc)
self.ltm_db.save(doc)
def get_mp_defs(self):
def get_mps(self):
view = self.ltm_dict['view']['mpd']
for mp in self.ltm_db.view(view):
if mp.id and mp.key == "mpdoc":
doc = self.ltm_db[mp.id]
self.stm_pub(body_dict={
'do':'insert_document',
'payload':doc}
self.ctrl_pub(body_dict={
'contains':'mpdoc',
'source':'ltm',
'payload': doc}
)
else:
self.log.info(
"document with id: {} will not be published".format(mp.id))
def get_auxobj(self, id):
doc = self.ltm_db[id]
if doc:
self.ctrl_pub(body_dict={
'contains':'auxobj',
'source':'ltm',
'payload': doc}
)
else:
self.log.info(
"document with id: {} will not found".format(id))
......@@ -26,11 +26,19 @@ class ShortTermMemory(System):
self.insert_mp_doc(pl)
found=True
if do == "insert_auxobj_doc":
self.insert_auxobj_doc(pl)
found=True
if do == "build_mp_db":
self.build_mp_db(pl['id'])
found=True
if do == "clear_all":
if do == "build_auxobj_db":
self.build_auxobj_db(pl['id'])
found=True
if do == "clear_stm":
self.clear_stm()
found=True
......@@ -50,15 +58,18 @@ class ShortTermMemory(System):
def init_stm(self):
"""Generates the api databases and the source doc collection.
"""
self.exchange_db = self.stm['mp_exchange']
self.container_description_db = self.stm['mp_container_description']
self.container_definition_db = self.stm['mp_container_definition']
self.container_element_db = self.stm['mp_container_element']
self.container_ctrl_db = self.stm['mp_container_ctrl']
self.mp_container_db = self.stm['mp_exchange']
self.mp_container_description_db = self.stm['mp_container_description']
self.mp_container_definition_db = self.stm['mp_container_definition']
self.mp_container_element_db = self.stm['mp_container_element']
self.mp_container_ctrl_db = self.stm['mp_container_ctrl']
self.stm_mp_db = self.stm['mp_def']
self.stm_mp_db = self.stm['mp']
self.mp_doc_coll = self.stm_mp_db['mp_doc']
self.stm_auxobj_db = self.stm['auxobj']
self.auxobj_doc_coll = self.stm_auxobj_db['auxobj_doc']
self.log.info("short-term memory system ok")
......@@ -94,19 +105,59 @@ class ShortTermMemory(System):
self.log.info("doc with same _id and _rev already exists")
self.ctrl_pub(body_dict={
'source':'ltm'
'msg': 'insert_mp_doc_comlete',
'payload':doc['_id']
'source':'stm',
'msg': 'insert_mp_doc_complete',
'payload':{'id': doc['_id']}
})
def insert_auxobj_doc(self, doc):
if 'AuxObject' in doc:
ret = self.auxobj_doc_coll.find({'_id': doc['_id'], '_rev': doc['_rev']})
if ret.count() == 0:
res = self.auxobj_doc_coll.insert_one(doc)
self.log.info("insert with result: {}".format(res))
if ret.count() == 1:
self.log.info("doc with same _id and _rev already exists")
self.ctrl_pub(body_dict={
'source':'stm',
'msg': 'insert_auxobj_doc_complete',
'payload':{'id': doc['_id']}
})
else:
self.log.error("document is not an AuxObject")
def build_auxobj_db(self, id):
doc = self.auxobj_doc_coll.find_one({'_id': id})
doc = doc['AuxObject']
if 'Default' in doc:
defaults = doc['default']
if 'Task' in doc:
tasks = doc['Task']
#for ...
# self.replace_defaults(task, defaults)
def build_mp_db(self, id):
doc = self.mp_doc_coll.find_one({'_id': id})
if doc and 'Mp' in doc:
self.log.info("found document with id: {}, start building collections".format(id))
mp = doc['Mp']
self.log.info("found document, start building collections")
standard = mp['Standard']
mp_name = mp['Name']
# start with filling up exchange
if 'Standard' in mp:
standard = mp['Standard']
else:
standard ="none"
if 'Name' in mp:
mp_name = mp['Name']
else:
mp_name = "none"
self.write_exchange(id, {"StartTime":{"Type":"start", "Value":self.now()}})
if 'Exchange' in mp:
......@@ -115,9 +166,14 @@ class ShortTermMemory(System):
for contno, entr in enumerate(mp['Container']):
title = entr['Title']
self.container_description_db[id].insert_one({'Description':entr['Description'], 'ContNo':contno, 'Title': title})
self.container_element_db[id].insert_one({'Element':entr['Element'], 'ContNo':contno, 'Title': title})
self.container_ctrl_db[id].insert_one({'Ctrl':entr['Ctrl'], 'ContNo':contno, 'Title': title})
self.mp_container_description_db[id].insert_one({'Description':entr['Description'], 'ContNo':contno, 'Title': title})
self.mp_container_ctrl_db[id].insert_one({'Ctrl':entr['Ctrl'], 'ContNo':contno, 'Title': title})
if 'Element' in entr:
self.mp_container_element_db[id].insert_one({'Element':entr['Element'], 'ContNo':contno, 'Title': title})
else:
self.mp_container_element_db[id].insert_one({'Element':[], 'ContNo':contno, 'Title': title})
definition = entr['Definition']
for serno, _ in enumerate(definition):
......@@ -130,21 +186,19 @@ class ShortTermMemory(System):
t['MpName'] = mp_name
t['Standard'] = standard
self.ltm_pub(body_dict={'do':'provide_task', 'payload':t})
else:
m = "can not find document with id: {}".format(id)
self.log.error(m)
sys.exit(m)
def write_exchange(self, mpid, doc):
if isinstance(doc, dict):
self.exchange_db[mpid].insert_one(doc)
self.mp_container_db[mpid].insert_one(doc)
def read_exchange(self, id, find_set):
res = self.exchange_db[id].find(find_set)
res = self.mp_container_db[id].find(find_set)
n = res.count()
if n == 1:
print(res[0])
......
......@@ -32,7 +32,6 @@ class System:
self.log.info("logging system online")
def queue_factory(self, queue_name):
conn = pika.BlockingConnection(self.msg_param)
chan = conn.channel()
......@@ -52,8 +51,8 @@ class System:
def init_ctrl_msg_prod(self):
conn, chan = self.queue_factory(queue_name='ctrl')
self.ltm_conn = conn
self.ltm_chan = chan
self.ctrl_conn = conn
self.ctrl_chan = chan
def init_ltm_msg_consume(self, callback):
queue_name = 'ltm'
......@@ -86,7 +85,7 @@ class System:
body=json.dumps(body_dict))
def ctrl_pub(self, body_dict):
self.stm_chan.basic_publish(exchange='',
self.ctrl_chan.basic_publish(exchange='',
routing_key='ctrl',
body=json.dumps(body_dict))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment