Skip to content
Snippets Groups Projects
Commit f573e753 authored by wactbprot's avatar wactbprot
Browse files

+ message worker

parent 3e97646a
No related branches found
No related tags found
No related merge requests found
...@@ -165,7 +165,7 @@ var build_container = function(path, container, cb){ ...@@ -165,7 +165,7 @@ var build_container = function(path, container, cb){
log.trace(ok log.trace(ok
, "add title to container: " + strpath); , "add title to container: " + strpath);
mem.set(path.concat(["message"]), container["Message"] || "", function(err){ mem.set(path.concat(["message"]),"no", function(err){
if(!err){ if(!err){
log.trace(ok log.trace(ok
, "add message channel to container: " + strpath); , "add message channel to container: " + strpath);
......
...@@ -14,6 +14,7 @@ exports.TCP = nodeRelay; ...@@ -14,6 +14,7 @@ exports.TCP = nodeRelay;
exports.UDP = nodeRelay; exports.UDP = nodeRelay;
exports.wait = require("./worker.wait"); exports.wait = require("./worker.wait");
exports.message = require("./worker.message");
exports.getTime = require("./worker.getTime"); exports.getTime = require("./worker.getTime");
exports.getDate = require("./worker.getDate"); exports.getDate = require("./worker.getDate");
exports.writeExchange = require("./worker.writeExchange"); exports.writeExchange = require("./worker.writeExchange");
......
/**
* @module work.message
*/
var _ = require("underscore")
, bunyan = require("bunyan")
, logStrm = require("bunyan-couchdb-stream")
, conf = require("./conf")
, utils = require("./utils")
, log = bunyan.createLogger({name: conf.app.name + ".worker.message",
streams: conf.log.streams
})
, broker = require("sc-broker")
, mem = broker.createClient({port: conf.mem.port})
, ro = {ok: true}
, err;
/**
* ```message()``` writes messages to the path
* [mpid, no, message]. a timer is started wich
* ends the funtion when message is "" again
*
* @method message
* @param {Object} task Task-Objekt
* @param {Function} cb Callback Funktion
*/
module.exports = function(task, cb){
var path = task.Path
, mpid = path[0]
, no = path[1];
log.trace(ro,
"call function message");
mem.set([mpid, no, "message"], task.Message, function(err){
if(!err){
var iid = setInterval(function (){
mem.get([mpid, no, "message"], function(err, msg){
if(!err){
log.trace(ro
, "message lookup:" + msg);
if(msg == "ok"){
log.trace(ro
, "message reset; clear interval");
clearInterval(iid);
if(_.isFunction (cb)){
cb(null, ro);
}
}
}else{
log.error(err
, "error on attempt to read from message channel");
}
});
}, 500);
}
});
};
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
"author": "wactbprot", "author": "wactbprot",
"name": "ssmp", "name": "ssmp",
"description": "server side measurment program", "description": "server side measurment program",
"version": "0.8.0", "version": "0.9.0",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://github.com/wactbprot/ssmp" "url": "https://github.com/wactbprot/ssmp"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment