Skip to content
Snippets Groups Projects
Commit 015d84f5 authored by wactbprot's avatar wactbprot
Browse files

Merge branch 'master' of github.com:wactbprot/anselm

parents 3e4f3107 cdc7c061
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,9 @@ from anselm.system import System
class Anselm(System):
"""
https://chase-seibert.github.io/blog/2014/03/21/python-multilevel-argparse.html
always talk to short-term-memory, if there is somthing not in stm try to remember
"""
def __init__(self):
super().__init__()
......@@ -38,10 +41,9 @@ class Anselm(System):
def start(self):
"""
"""
parser = argparse.ArgumentParser(
description="sends a all to ltm exchange")
self.ltm_pub(body_dict={
## dont work, see tagline
self.stm_pub(body_dict={
'do':'start',
'payload':{}
})
......@@ -50,33 +52,59 @@ class Anselm(System):
def clear_stm(self):
"""
"""
parser = argparse.ArgumentParser(
description="sends a clear.all to stm exchange")
self.stm_pub(body_dict={
'do':'clear_all',
'payload':{}})
self.ltm_conn.close()
self.stm_conn.close()
def build_mp_db_for(self):
"""
usage:
> python anselm build_mp_db_for mpid
def build_api_for(self):
"""
parser = argparse.ArgumentParser(
description="checks if the systems are up")
description="builds the api for the mp given by id")
parser.add_argument('mpid')
arg = parser.parse_args(sys.argv[2:3])
if len(arg.mpid) < self.max_arg_len:
if len(arg.calid) < self.max_arg_len:
self.stm_pub(body_dict={
'do':'build_api',
'do':'build_mp_db',
'payload':{"id": arg.mpid}
})
self.stm_conn.close()
def read_exchange(self):
def build_cal_mp_for(self):
"""
usage:
> python anselm provide_excahnge_for calid
"""
parser = argparse.ArgumentParser(
description="read from exchange")
description="builds the api for the mp given by id")
self.stm_publish(body_dict={
parser.add_argument('calid')
arg = parser.parse_args(sys.argv[2:3])
if len(arg.calid) < self.max_arg_len:
self.stm_pub(body_dict={
'do':'build_cal_db',
'payload':{"id": arg.calid}
})
self.stm_conn.close()
def read_exchange(self):
self.stm_pub(body_dict={
'do':'read_exchange',
'payload':{"id":"mpd-ce3-calib", "find_set":{"StartTime.Type":"start"}}
})
......
import sys
from anselm.system import System
import json
class Ctrl(System):
def __init__(self):
super().__init__()
self.log.info("start long-term memory init function")
self.init_stm_msg_prod()
self.init_ltm_msg_prod()
self.init_ctrl_msg_prod()
self.init_ctrl_msg_consume(callback=self.dispatch)
def dispatch(self, ch, method, props, body):
res = json.loads(body)
if 'payload' in res:
pl = res['payload']
if 'contains' in res:
contains = res['contains']
if 'source' in res:
source = res['source']
if 'msg' in res:
msg = res['msg']
if source == "ltm" and contains == "mps":
self.stm_pub(body_dict={
'do':"insert_mp_doc",
'payload':pl
})
if source == "stm" and msg == "insert_mp_doc_complete":
self.stm_pub(body_dict={
'do':"build_mp_db",
'payload':pl
})
......@@ -13,16 +13,20 @@ class LongTermMemory(System):
self.log.info("long-term memory system start consuming")
self.init_stm_msg_prod()
self.init_ltm_msg_prod()
self.init_ltm_msg_consume()
self.init_ltm_msg_consume(callback=self.dispatch)
def dispatch(self, ch, method, props, body):
do, pl = self.parse_body(body)
if do == "start":
self.log.info("dispatch to do: {}".format(do))
self.get_mp_defs()
if do == "store_doc":
self.store_doc(pl)
self.log.info("dispatch to do: {}".format(do))
def init_ltm(self):
ltm_dict = self.config['couchdb']
......@@ -33,8 +37,19 @@ class LongTermMemory(System):
self.ltm_dict = ltm_dict
self.ltm = couchdb.Server(url)
self.ltm_db = self.ltm[self.ltm_dict['database']]
self.ltm_db_sav = self.ltm["{}_sav".format(self.ltm_dict['database'])]
self.log.info("long-term memory system ok")
def store_doc(self, doc):
id = doc['_id']
dbdoc = self.ltm_db_sav[id]
if dbdoc:
doc['_rev'] = dbdoc['_rev']
else:
doc.pop('_rev', None)
self.ltm_db_sav.save(doc)
def get_mp_defs(self):
view = self.ltm_dict['view']['mpd']
for mp in self.ltm_db.view(view):
......
......@@ -15,19 +15,19 @@ class ShortTermMemory(System):
self.stm = MongoClient(stm_dict['host'], stm_dict['port'])
self.init_stm()
self.init_ltm_msg_prod()
self.init_stm_msg_consume()
self.init_ctrl_msg_prod()
self.init_stm_msg_consume(callback=self.dispatch)
def dispatch(self, ch, method, props, body):
found = False
do, pl = self.parse_body(body)
if do == "insert_document":
self.insert_source_doc(pl)
if do == "insert_mp_doc":
self.insert_mp_doc(pl)
found=True
if do == "build_api":
self.build_api(pl['id'])
if do == "build_mp_db":
self.build_mp_db(pl['id'])
found=True
if do == "clear_all":
......@@ -38,6 +38,10 @@ class ShortTermMemory(System):
self.read_exchange(pl['id'], pl['find_set'])
found=True
if do == "mp_to_ltm":
self.mp_to_ltm(pl['id'])
found=True
if found:
self.log.info("found branch for routing key")
else:
......@@ -52,8 +56,8 @@ class ShortTermMemory(System):
self.container_element_db = self.stm['mp_container_element']
self.container_ctrl_db = self.stm['mp_container_ctrl']
self.stm_source_db = self.stm['mp_def']
self.source_doc_coll = self.stm_source_db['source_doc']
self.stm_mp_db = self.stm['mp_def']
self.mp_doc_coll = self.stm_mp_db['mp_doc']
self.log.info("short-term memory system ok")
......@@ -71,18 +75,32 @@ class ShortTermMemory(System):
self.log.info("amount of droped databases: {}".format(n))
def insert_source_doc(self, doc):
ret = self.source_doc_coll.find({'_id': doc['_id'], '_rev': doc['_rev']})
def mp_to_ltm(self, id):
doc = self.mp_doc_coll.find({'_id': id})
n = doc.count()
self.ltm_pub(body_dict={
'do':'store_doc',
'payload': doc[n-1]
})
def insert_mp_doc(self, doc):
ret = self.mp_doc_coll.find({'_id': doc['_id'], '_rev': doc['_rev']})
if ret.count() == 0:
res = self.source_doc_coll.insert_one(doc)
res = self.mp_doc_coll.insert_one(doc)
self.log.info("insert with result: {}".format(res))
if ret.count() == 1:
self.log.info("doc with same _id and _rev already exists")
def build_api(self, id):
doc = self.source_doc_coll.find_one({'_id': id})
self.ctrl_pub(body_dict={
'source':'ltm'
'msg': 'insert_mp_doc_comlete',
'payload':doc['_id']
})
def build_mp_db(self, id):
doc = self.mp_doc_coll.find_one({'_id': id})
if doc and 'Mp' in doc:
mp = doc['Mp']
self.log.info("found document, start building collections")
......@@ -114,6 +132,7 @@ class ShortTermMemory(System):
self.ltm_pub(body_dict={'do':'provide_task', 'payload':t})
else:
m = "can not find document with id: {}".format(id)
self.log.error(m)
sys.exit(m)
......
......@@ -32,6 +32,7 @@ class System:
self.log.info("logging system online")
def queue_factory(self, queue_name):
conn = pika.BlockingConnection(self.msg_param)
chan = conn.channel()
......@@ -49,18 +50,32 @@ class System:
self.ltm_conn = conn
self.ltm_chan = chan
def init_ltm_msg_consume(self):
def init_ctrl_msg_prod(self):
conn, chan = self.queue_factory(queue_name='ctrl')
self.ltm_conn = conn
self.ltm_chan = chan
def init_ltm_msg_consume(self, callback):
queue_name = 'ltm'
conn, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(self.dispatch,
_, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(callback,
queue=queue_name,
no_ack=True)
chan.start_consuming()
def init_stm_msg_consume(self):
def init_ctrl_msg_consume(self, callback):
queue_name = 'ctrl'
_, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(callback,
queue=queue_name,
no_ack=True)
chan.start_consuming()
def init_stm_msg_consume(self, callback):
queue_name = 'stm'
conn, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(self.dispatch,
_, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(callback,
queue=queue_name,
no_ack=True)
chan.start_consuming()
......@@ -70,6 +85,11 @@ class System:
routing_key='stm',
body=json.dumps(body_dict))
def ctrl_pub(self, body_dict):
self.stm_chan.basic_publish(exchange='',
routing_key='ctrl',
body=json.dumps(body_dict))
def ltm_pub(self, body_dict):
self.ltm_chan.basic_publish(exchange='',
routing_key='ltm',
......
from anselm.ctrl import Ctrl
if __name__ == "__main__":
Ctrl()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment