Skip to content
Snippets Groups Projects
Commit cdc7c061 authored by wactbprot's avatar wactbprot
Browse files

ctzrl

parent 1df1f703
No related branches found
No related tags found
No related merge requests found
...@@ -7,6 +7,9 @@ from anselm.system import System ...@@ -7,6 +7,9 @@ from anselm.system import System
class Anselm(System): class Anselm(System):
""" """
https://chase-seibert.github.io/blog/2014/03/21/python-multilevel-argparse.html https://chase-seibert.github.io/blog/2014/03/21/python-multilevel-argparse.html
always talk to short-term-memory, if there is somthing not in stm try to remember
""" """
def __init__(self): def __init__(self):
super().__init__() super().__init__()
...@@ -38,10 +41,9 @@ class Anselm(System): ...@@ -38,10 +41,9 @@ class Anselm(System):
def start(self): def start(self):
""" """
""" """
parser = argparse.ArgumentParser(
description="sends a all to ltm exchange")
self.ltm_pub(body_dict={ ## dont work, see tagline
self.stm_pub(body_dict={
'do':'start', 'do':'start',
'payload':{} 'payload':{}
}) })
...@@ -50,45 +52,59 @@ class Anselm(System): ...@@ -50,45 +52,59 @@ class Anselm(System):
def clear_stm(self): def clear_stm(self):
""" """
""" """
parser = argparse.ArgumentParser(
description="sends a clear.all to stm exchange")
self.stm_pub(body_dict={ self.stm_pub(body_dict={
'do':'clear_all', 'do':'clear_all',
'payload':{}}) 'payload':{}})
self.ltm_conn.close() self.stm_conn.close()
def mp_to_ltm(self): def build_mp_db_for(self):
""" """
usage:
> python anselm build_mp_db_for mpid
""" """
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="sends a mp from stm back to ltm") description="builds the api for the mp given by id")
parser.add_argument('mpid') parser.add_argument('mpid')
arg = parser.parse_args(sys.argv[2:3]) arg = parser.parse_args(sys.argv[2:3])
if len(arg.calid) < self.max_arg_len:
self.stm_pub(body_dict={ self.stm_pub(body_dict={
'do':'mp_to_ltm', 'do':'build_mp_db',
'payload':{'id': arg.mpid}}) 'payload':{"id": arg.mpid}
self.ltm_conn.close() })
self.stm_conn.close()
def build_cal_mp_for(self):
"""
usage:
> python anselm provide_excahnge_for calid
def build_api_for(self): """
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="checks if the systems are up") description="builds the api for the mp given by id")
parser.add_argument('mpid') parser.add_argument('calid')
arg = parser.parse_args(sys.argv[2:3]) arg = parser.parse_args(sys.argv[2:3])
if len(arg.mpid) < self.max_arg_len: if len(arg.calid) < self.max_arg_len:
self.stm_pub(body_dict={ self.stm_pub(body_dict={
'do':'build_api', 'do':'build_cal_db',
'payload':{"id": arg.mpid} 'payload':{"id": arg.calid}
}) })
self.stm_conn.close() self.stm_conn.close()
def read_exchange(self): def read_exchange(self):
parser = argparse.ArgumentParser(description="read from exchange")
self.stm_publish(body_dict={ self.stm_pub(body_dict={
'do':'read_exchange', 'do':'read_exchange',
'payload':{"id":"mpd-ce3-calib", "find_set":{"StartTime.Type":"start"}} 'payload':{"id":"mpd-ce3-calib", "find_set":{"StartTime.Type":"start"}}
}) })
......
import sys
from anselm.system import System
import json
class Ctrl(System):
def __init__(self):
super().__init__()
self.log.info("start long-term memory init function")
self.init_stm_msg_prod()
self.init_ltm_msg_prod()
self.init_ctrl_msg_prod()
self.init_ctrl_msg_consume(callback=self.dispatch)
def dispatch(self, ch, method, props, body):
res = json.loads(body)
if 'payload' in res:
pl = res['payload']
if 'contains' in res:
contains = res['contains']
if 'source' in res:
source = res['source']
if 'msg' in res:
msg = res['msg']
if source == "ltm" and contains == "mps":
self.stm_pub(body_dict={
'do':"insert_mp_doc",
'payload':pl
})
if source == "stm" and msg == "insert_mp_doc_complete":
self.stm_pub(body_dict={
'do':"build_mp_db",
'payload':pl
})
...@@ -13,7 +13,7 @@ class LongTermMemory(System): ...@@ -13,7 +13,7 @@ class LongTermMemory(System):
self.log.info("long-term memory system start consuming") self.log.info("long-term memory system start consuming")
self.init_stm_msg_prod() self.init_stm_msg_prod()
self.init_ltm_msg_prod() self.init_ltm_msg_prod()
self.init_ltm_msg_consume() self.init_ltm_msg_consume(callback=self.dispatch)
def dispatch(self, ch, method, props, body): def dispatch(self, ch, method, props, body):
res = json.loads(body) res = json.loads(body)
......
...@@ -15,8 +15,8 @@ class ShortTermMemory(System): ...@@ -15,8 +15,8 @@ class ShortTermMemory(System):
self.stm = MongoClient(stm_dict['host'], stm_dict['port']) self.stm = MongoClient(stm_dict['host'], stm_dict['port'])
self.init_stm() self.init_stm()
self.init_ltm_msg_prod() self.init_ctrl_msg_prod()
self.init_stm_msg_consume() self.init_stm_msg_consume(callback=self.dispatch)
...@@ -29,12 +29,12 @@ class ShortTermMemory(System): ...@@ -29,12 +29,12 @@ class ShortTermMemory(System):
if 'payload' in res: if 'payload' in res:
pl = res['payload'] pl = res['payload']
if do == "insert_document": if do == "insert_mp_doc":
self.insert_source_doc(pl) self.insert_mp_doc(pl)
found=True found=True
if do == "build_api": if do == "build_mp_db":
self.build_api(pl['id']) self.build_mp_db(pl['id'])
found=True found=True
if do == "clear_all": if do == "clear_all":
...@@ -63,8 +63,8 @@ class ShortTermMemory(System): ...@@ -63,8 +63,8 @@ class ShortTermMemory(System):
self.container_element_db = self.stm['mp_container_element'] self.container_element_db = self.stm['mp_container_element']
self.container_ctrl_db = self.stm['mp_container_ctrl'] self.container_ctrl_db = self.stm['mp_container_ctrl']
self.stm_source_db = self.stm['mp_def'] self.stm_mp_db = self.stm['mp_def']
self.source_doc_coll = self.stm_source_db['source_doc'] self.mp_doc_coll = self.stm_mp_db['mp_doc']
self.log.info("short-term memory system ok") self.log.info("short-term memory system ok")
...@@ -83,25 +83,31 @@ class ShortTermMemory(System): ...@@ -83,25 +83,31 @@ class ShortTermMemory(System):
self.log.info("amount of droped databases: {}".format(n)) self.log.info("amount of droped databases: {}".format(n))
def mp_to_ltm(self, id): def mp_to_ltm(self, id):
doc = self.source_doc_coll.find({'_id': id}) doc = self.mp_doc_coll.find({'_id': id})
n = doc.count() n = doc.count()
self.ltm_pub(body_dict={ self.ltm_pub(body_dict={
'do':'store_doc', 'do':'store_doc',
'payload': doc[n-1] 'payload': doc[n-1]
}) })
def insert_source_doc(self, doc): def insert_mp_doc(self, doc):
ret = self.source_doc_coll.find({'_id': doc['_id'], '_rev': doc['_rev']}) ret = self.mp_doc_coll.find({'_id': doc['_id'], '_rev': doc['_rev']})
if ret.count() == 0: if ret.count() == 0:
res = self.source_doc_coll.insert_one(doc) res = self.mp_doc_coll.insert_one(doc)
self.log.info("insert with result: {}".format(res)) self.log.info("insert with result: {}".format(res))
if ret.count() == 1: if ret.count() == 1:
self.log.info("doc with same _id and _rev already exists") self.log.info("doc with same _id and _rev already exists")
def build_api(self, id): self.ctrl_pub(body_dict={
doc = self.source_doc_coll.find_one({'_id': id}) 'source':'ltm'
'msg': 'insert_mp_doc_comlete',
'payload':doc['_id']
})
def build_mp_db(self, id):
doc = self.mp_doc_coll.find_one({'_id': id})
if doc and 'Mp' in doc: if doc and 'Mp' in doc:
mp = doc['Mp'] mp = doc['Mp']
self.log.info("found document, start building collections") self.log.info("found document, start building collections")
...@@ -133,6 +139,7 @@ class ShortTermMemory(System): ...@@ -133,6 +139,7 @@ class ShortTermMemory(System):
self.ltm_pub(body_dict={'do':'provide_task', 'payload':t}) self.ltm_pub(body_dict={'do':'provide_task', 'payload':t})
else: else:
m = "can not find document with id: {}".format(id) m = "can not find document with id: {}".format(id)
self.log.error(m) self.log.error(m)
sys.exit(m) sys.exit(m)
......
...@@ -32,11 +32,6 @@ class System: ...@@ -32,11 +32,6 @@ class System:
self.log.info("logging system online") self.log.info("logging system online")
def connection_off(self):
print("------------------")
print("------------------")
print("------------------")
print("------------------")
def queue_factory(self, queue_name): def queue_factory(self, queue_name):
conn = pika.BlockingConnection(self.msg_param) conn = pika.BlockingConnection(self.msg_param)
...@@ -55,18 +50,32 @@ class System: ...@@ -55,18 +50,32 @@ class System:
self.ltm_conn = conn self.ltm_conn = conn
self.ltm_chan = chan self.ltm_chan = chan
def init_ltm_msg_consume(self): def init_ctrl_msg_prod(self):
conn, chan = self.queue_factory(queue_name='ctrl')
self.ltm_conn = conn
self.ltm_chan = chan
def init_ltm_msg_consume(self, callback):
queue_name = 'ltm' queue_name = 'ltm'
conn, chan = self.queue_factory(queue_name=queue_name) _, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(self.dispatch, chan.basic_consume(callback,
queue=queue_name, queue=queue_name,
no_ack=True) no_ack=True)
chan.start_consuming() chan.start_consuming()
def init_stm_msg_consume(self): def init_ctrl_msg_consume(self, callback):
queue_name = 'ctrl'
_, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(callback,
queue=queue_name,
no_ack=True)
chan.start_consuming()
def init_stm_msg_consume(self, callback):
queue_name = 'stm' queue_name = 'stm'
conn, chan = self.queue_factory(queue_name=queue_name) _, chan = self.queue_factory(queue_name=queue_name)
chan.basic_consume(self.dispatch, chan.basic_consume(callback,
queue=queue_name, queue=queue_name,
no_ack=True) no_ack=True)
chan.start_consuming() chan.start_consuming()
...@@ -76,6 +85,11 @@ class System: ...@@ -76,6 +85,11 @@ class System:
routing_key='stm', routing_key='stm',
body=json.dumps(body_dict)) body=json.dumps(body_dict))
def ctrl_pub(self, body_dict):
self.stm_chan.basic_publish(exchange='',
routing_key='ctrl',
body=json.dumps(body_dict))
def ltm_pub(self, body_dict): def ltm_pub(self, body_dict):
self.ltm_chan.basic_publish(exchange='', self.ltm_chan.basic_publish(exchange='',
routing_key='ltm', routing_key='ltm',
......
from anselm.ctrl import Ctrl
if __name__ == "__main__":
Ctrl()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment