Skip to content
Snippets Groups Projects
Commit cba4363d authored by wactbprot's avatar wactbprot
Browse files

next: branch w/o rabbitmq

parent ff669c9b
No related branches found
No related tags found
No related merge requests found
......@@ -72,7 +72,26 @@ class Anselm(System):
'payload':{"id": arg.id}
})
self.ctrl_conn.close()
self.ltm_conn.close()
def run_task(self):
parser = argparse.ArgumentParser(
description="builds the api for the mp given by id")
print(sys.argv)
parser.add_argument('id')
parser.add_argument('task')
arg = parser.parse_args(sys.argv[2:4])
if len(arg.id) < self.max_arg_len and len(arg.task) < self.max_arg_len:
self.stm_pub(body_dict={
'do':'trigger_run_task',
'payload':{"id": arg.id, "task":arg.task}
})
self.stm_conn.close()
def read_exchange(self):
......
......@@ -12,7 +12,7 @@ class Ctrl(System):
self.init_stm_msg_prod()
self.init_ltm_msg_prod()
self.init_ctrl_msg_prod()
self.init_ctrl_msg_consume(callback=self.dispatch)
self.init_msg_consume(queue_name='ctrl', callback=self.dispatch)
def dispatch(self, ch, method, props, body):
res = json.loads(body)
......@@ -32,40 +32,37 @@ class Ctrl(System):
source = res['source']
self.log.info("source: {}".format(source))
if 'msg' in res:
msg = res['msg']
self.log.info("msg: {}".format(msg))
if source == "ltm" and contains == "mpdoc":
self.stm_pub(body_dict={
'do':"insert_mp_doc",
'payload':payload
})
if source == "stm" and contains == "task":
print(payload)
found = True
if source == "ltm" and contains == "auxobj":
self.stm_pub(body_dict={
'do':"insert_auxobj_doc",
'payload':payload
'do':"build_auxobj_db",
'payload':payload
})
found = True
if source == "stm" and msg == "insert_mp_doc_complete":
if source == "ltm" and contains == "mpdoc":
self.stm_pub(body_dict={
'do':"build_mp_db",
'payload':payload
})
found = True
if source == "stm" and msg == "insert_auxobj_doc_complete":
if source == "stm" and msg == "insert_mp_doc_complete":
self.stm_pub(body_dict={
'do':"build_auxobj_db",
'do':"build_mp_db",
'payload':payload
})
found = True
if found:
self.log.info("found branch for routing key")
else:
......
......@@ -12,7 +12,7 @@ class LongTermMemory(System):
self.log.info("long-term memory system start consuming")
self.init_ctrl_msg_prod()
self.init_ltm_msg_consume(callback=self.dispatch)
self.init_msg_consume(queue_name='ltm', callback=self.dispatch)
def dispatch(self, ch, method, props, body):
res = json.loads(body)
......
......@@ -13,12 +13,9 @@ class ShortTermMemory(System):
stm_dict = self.config['mongodb']
self.stm_dict = stm_dict
self.stm = MongoClient(stm_dict['host'], stm_dict['port'])
self.init_stm()
self.init_ctrl_msg_prod()
self.init_stm_msg_consume(callback=self.dispatch)
self.init_msg_consume(queue_name='stm', callback=self.dispatch)
def dispatch(self, ch, method, props, body):
self.log.info("start dispatch with routing key: {}".format(method.routing_key))
......@@ -29,185 +26,180 @@ class ShortTermMemory(System):
if 'payload' in res:
pl = res['payload']
if do == "insert_mp_doc":
self.insert_mp_doc(pl)
found=True
if do == "insert_auxobj_doc":
self.insert_auxobj_doc(pl)
found=True
if do == "build_mp_db":
self.build_mp_db(pl['id'])
self.build_mp_db(pl)
found=True
if do == "build_auxobj_db":
self.build_auxobj_db(pl['id'])
self.build_auxobj_db(pl)
found=True
if do == "clear_stm":
self.clear_stm()
found=True
if do == "read_exchange":
self.read_exchange(pl['id'], pl['find_set'])
found=True
if do == "mp_to_ltm":
self.mp_to_ltm(pl['id'])
found=True
if do == "trigger_run_task":
self.trigger_run_task(pl['id'], pl['task'])
found=True
if found:
self.log.info("found branch for routing key")
else:
self.log.error("no branch found for routing key: {}".format(do))
def init_stm(self):
"""Generates the api databases and the source doc collection.
"""
self.mp_container_db = self.stm['mp_exchange']
self.mp_container_description_db = self.stm['mp_container_description']
self.mp_container_definition_db = self.stm['mp_container_definition']
self.mp_container_element_db = self.stm['mp_container_element']
self.mp_container_ctrl_db = self.stm['mp_container_ctrl']
self.stm_mp_db = self.stm['mp']
self.mp_doc_coll = self.stm_mp_db['mp_doc']
self.stm_auxobj_db = self.stm['auxobj']
self.auxobj_doc_coll = self.stm_auxobj_db['auxobj_doc']
self.log.info("short-term memory system ok")
def trigger_run_task(self, id, taskname, cdid=False):
task = self.stm[id]['tasks'].find({"TaskName": taskname})
n = task.count()
print(task[n-1])
self.ctrl_pub(body_dict={
'source':'stm',
'contains': 'task',
'payload': {'id': id, 'task': task[n-1]}
})
def clear_stm(self):
"""
Clears the stm by droping all databasese starting with ``mp_``.
Clears the stm by droping all databasese.
"""
n=0
for database in self.stm.database_names():
if database.startswith("mp_"):
n=n+1
self.stm.drop_database(database)
self.log.info("drop databes {}".format(database))
for database in self.stm.database_names():
n=n+1
self.stm.drop_database(database)
self.log.info("drop databes {}".format(database))
self.log.info("amount of droped databases: {}".format(n))
def mp_to_ltm(self, id):
doc = self.mp_doc_coll.find({'_id': id})
doc = self.stm[id]['org'].find({'_id': id})
n = doc.count()
self.ltm_pub(body_dict={
'do':'store_doc',
'payload': doc[n-1]
})
def insert_mp_doc(self, doc):
ret = self.mp_doc_coll.find({'_id': doc['_id'], '_rev': doc['_rev']})
def build_auxobj_db(self, doc):
if ret.count() == 0:
res = self.mp_doc_coll.insert_one(doc)
self.log.info("insert with result: {}".format(res))
if '_id' in doc:
id = doc['_id']
if ret.count() == 1:
self.log.info("doc with same _id and _rev already exists")
db = self.stm[id]
db_coll_org = db['org']
db_coll_org = doc
self.ctrl_pub(body_dict={
'source':'stm',
'msg': 'insert_mp_doc_complete',
'payload':{'id': doc['_id']}
})
def insert_auxobj_doc(self, doc):
if 'AuxObject' in doc:
ret = self.auxobj_doc_coll.find({'_id': doc['_id'], '_rev': doc['_rev']})
if ret.count() == 0:
res = self.auxobj_doc_coll.insert_one(doc)
self.log.info("insert with result: {}".format(res))
db_coll_task = db['tasks']
if ret.count() == 1:
self.log.info("doc with same _id and _rev already exists")
if 'AuxObject' in doc:
doc = doc['AuxObject']
self.ctrl_pub(body_dict={
'source':'stm',
'msg': 'insert_auxobj_doc_complete',
'payload':{'id': doc['_id']}
})
if 'Defaults' in doc:
defaults = doc['Defaults']
else:
self.log.error("document is not an AuxObject")
def build_auxobj_db(self, id):
doc = self.auxobj_doc_coll.find_one({'_id': id})
doc = doc['AuxObject']
self.log.warning("no defaults in AuxObject with id: {}".format(id))
if 'Default' in doc:
defaults = doc['default']
if 'Task' in doc:
tasks = doc['Task']
else:
self.log.error("no task in AuxObject with id: {}".format(id))
#for ...
# self.replace_defaults(task, defaults)
def build_mp_db(self, id):
doc = self.mp_doc_coll.find_one({'_id': id})
if doc and 'Mp' in doc:
self.log.info("found document with id: {}, start building collections".format(id))
mp = doc['Mp']
if 'Standard' in mp:
standard = mp['Standard']
else:
standard ="none"
if 'Name' in mp:
mp_name = mp['Name']
else:
mp_name = "none"
self.write_exchange(id, {"StartTime":{"Type":"start", "Value":self.now()}})
if 'Exchange' in mp:
for _, entr in mp['Exchange'].items():
self.write_exchange(id, entr)
if 'tasks' in locals() and 'defaults' in locals():
for _, task in enumerate(tasks):
task = self.replace_defaults(task, defaults)
task['_id'] = "{}@{}".format(task['TaskName'], id)
for contno, entr in enumerate(mp['Container']):
title = entr['Title']
db_coll_task.insert_one(task)
self.ctrl_pub(body_dict={
'source':'stm',
'msg': 'build_auxobj_db_complete',
'payload':{'id': id}
})
def replace_defaults(self, task, defaults):
strtask = json.dumps(task)
if isinstance(defaults, dict):
for key, val in defaults.items():
if isinstance(val, int) or isinstance(val, float):
val = '{}'.format(val)
val = val.replace('\n', '\\n')
val = val.replace('\r', '\\r')
self.mp_container_description_db[id].insert_one({'Description':entr['Description'], 'ContNo':contno, 'Title': title})
self.mp_container_ctrl_db[id].insert_one({'Ctrl':entr['Ctrl'], 'ContNo':contno, 'Title': title})
if 'Element' in entr:
self.mp_container_element_db[id].insert_one({'Element':entr['Element'], 'ContNo':contno, 'Title': title})
else:
self.mp_container_element_db[id].insert_one({'Element':[], 'ContNo':contno, 'Title': title})
definition = entr['Definition']
for serno, _ in enumerate(definition):
for parno, _ in enumerate(definition[serno]):
t = definition[serno][parno]
t['ContNo'] = contno
t['SerNo'] = serno
t['ParNo'] = parno
t['MpName'] = mp_name
t['Standard'] = standard
strtask = strtask.replace(key, val)
else:
m = "can not find document with id: {}".format(id)
self.log.error(m)
sys.exit(m)
self.log.error("defaults is not a dict")
def write_exchange(self, mpid, doc):
if isinstance(doc, dict):
self.mp_container_db[mpid].insert_one(doc)
try:
task = json.loads(strtask)
except:
self.log.error("replacing defaults fails for")
def read_exchange(self, id, find_set):
res = self.mp_container_db[id].find(find_set)
n = res.count()
if n == 1:
print(res[0])
else:
print("found nothing")
return task
def build_mp_db(self, id):
pass
# doc = self.{'_id': id})
# if doc and 'Mp' in doc:
# self.log.info("found document with id: {}, start building collections".format(id))
# mp = doc['Mp']
#
# if 'Standard' in mp:
# standard = mp['Standard']
# else:
# standard ="none"
#
# if 'Name' in mp:
# mp_name = mp['Name']
# else:
# mp_name = "none"
#
# self.write_exchange(id, {"StartTime":{"Type":"start", "Value":self.now()}})
#
# if 'Exchange' in mp:
# for _, entr in mp['Exchange'].items():
# self.write_exchange(id, entr)
#
# for contno, entr in enumerate(mp['Container']):
# title = entr['Title']
#
# self.mp_container_description_db[id].insert_one({'Description':entr['Description'], 'ContNo':contno, 'Title': title})
# self.mp_container_ctrl_db[id].insert_one({'Ctrl':entr['Ctrl'], 'ContNo':contno, 'Title': title})
#
# if 'Element' in entr:
# self.mp_container_element_db[id].insert_one({'Element':entr['Element'], 'ContNo':contno, 'Title': title})
# else:
# self.mp_container_element_db[id].insert_one({'Element':[], 'ContNo':contno, 'Title': title})
#
# definition = entr['Definition']
# for serno, _ in enumerate(definition):
# for parno, _ in enumerate(definition[serno]):
#
# t = definition[serno][parno]
# t['ContNo'] = contno
# t['SerNo'] = serno
# t['ParNo'] = parno
# t['MpName'] = mp_name
# t['Standard'] = standard
#
# else:
# m = "can not find document with id: {}".format(id)
# self.log.error(m)
# sys.exit(m)
#
# def write_exchange(self, mpid, doc):
# if isinstance(doc, dict):
# self.mp_container_db[mpid].insert_one(doc)
# def read_exchange(self, id, find_set):
# res = self.mp_container_db[id].find(find_set)
# n = res.count()
# if n == 1:
# print(res[0])
# else:
# print("found nothing")
......@@ -3,6 +3,8 @@ import pika
import coloredlogs
import logging
import datetime
import time
class System:
......@@ -22,7 +24,8 @@ class System:
self.init_log()
self.msg_param = pika.ConnectionParameters(
host=self.config['rabbitmq']['host'])
host=self.config['rabbitmq']['host'],
heartbeat_interval=10)
self.log.info("system __init__ complete")
def init_log(self):
......@@ -38,7 +41,14 @@ class System:
chan.queue_declare(queue=queue_name)
return conn, chan
def init_msg_consume(self, queue_name, callback):
_, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(callback,
queue=queue_name,
no_ack=True)
chan.start_consuming()
def init_stm_msg_prod(self):
conn, chan = self.queue_factory(queue_name='stm')
self.stm_conn = conn
......@@ -54,31 +64,6 @@ class System:
self.ctrl_conn = conn
self.ctrl_chan = chan
def init_ltm_msg_consume(self, callback):
queue_name = 'ltm'
_, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(callback,
queue=queue_name,
no_ack=True)
chan.start_consuming()
def init_ctrl_msg_consume(self, callback):
queue_name = 'ctrl'
_, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(callback,
queue=queue_name,
no_ack=True)
chan.start_consuming()
def init_stm_msg_consume(self, callback):
queue_name = 'stm'
_, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(callback,
queue=queue_name,
no_ack=True)
chan.start_consuming()
def stm_pub(self, body_dict):
self.stm_chan.basic_publish(exchange='',
routing_key='stm',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment