ini
functionini()
Subscriptions
var ini = function(cb){
mem.subscribe(cstr.run, function(err){
if(!err){
log.info(ok
, "run.js subscribed to run channel");
mem.subscribe(cstr.exec, function(err){
if(!err){
log.info(ok
, "run.js subscribed to executed channel");
mem.subscribe("stop", function(err){
if(!err){
log.info(ok
, "run.js subscribed to stop channel");
if( _.isFunction(cb)){
cb(ok);
}
}else{
log.info({error:err}
, "error on stop subscription in run.js");
}
}); // stop
}else{
log.info({error:err}
, "error on executed subscription in run.js");
}
}); // exec
}else{
log.info({error:err}
, "error on run subscription in jun.js");
}
}); // run
}
exports.ini = ini;
mem.on('message', function(ch, path){
var strpath = path.join(" ")
, endseq = false
, mpid = path[0]
, no = path[1]
if(ch == "stop"){
if(timer[strpath]){
log.info(ok
, "receice stop event, clear intervall timer id");
clearInterval(timer[strpath])
timer[strpath] = 0;
}
}
if(ch == "executed"){
if(timer[strpath]){
log.info(ok
, "receice executed event, clear intervall timer id");
clearInterval(timer[strpath])
timer[strpath] = 0;
}
}
if(ch == "run"){
log.info(ok
, "receice run event, start intervall timer");
if(!timer[strpath]){
timer[strpath] = setInterval(function(){
mem.get([mpid, no, "state"], function(err, state){
if(!err){
for(var i in state){
var some_values_ready = _.some(_.values(state[i]),function(k){
return k == cstr.ready;
});
if(some_values_ready){
for(var j in state[i]){
if(state[i][j] == cstr.ready){
var path_s = [mpid, no, "state", i, j];
mem.set(path_s, cstr.work, function(s,p){
return function(err){
mem.publish("state", [mpid, no, "state",s,p], function(err){
if(!err){
endseq = true;
var path_r = [mpid, no, "recipe",s,p];
mem.get(path_r, function(err, task){
if(!err){
//------------------
run(path, s, p, task);
//------------------
}else{
log.err({error: err}
, "can not read recipe on position " + path_r.join(" "));
}
});
}else{
log.err({error:err}
, "can not set state at " + path_s.join(" "));
}
}); // publisch state
}}(i,j)); // set work closure
}// if ready
} // j
} // contains ready
// solange bei i bleiben (break)
// bis nicht alle ausgeführt sind
var all_values_executed = _.every(_.values(state[i]),function(k){
return k == cstr.exec;
});
if(! all_values_executed){
break;
}
} // i
}else{
log.error({error: err}
, "can not read state");
}
}); // state
}, deflt.container.heartbeat);
}else{
log.warn({warn: "running"}
, "container is already running");
}
}
});