diff --git a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc index 120af530645b91ee0ae5e141d3df4dbf9ebf7dbc..6dea75d6cfa5aebbd30269256bed1883a0c7bb2f 100644 --- a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc +++ b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc @@ -77,7 +77,8 @@ MACScheduler::MACScheduler() : itsNextFinishedTime (0), itsNrPlanned (0), itsNrActive (0), - itsOTDBconnection (0) + itsOTDBconnection (0), + itsMsgQueue (0) { LOG_TRACE_OBJ ("MACscheduler construction"); @@ -93,7 +94,8 @@ MACScheduler::MACScheduler() : itsMaxPlanned = globalParameterSet()->getTime("maxPlannedList", 30); itsMaxFinished = globalParameterSet()->getTime("maxFinishedList", 40); - ASSERTSTR(itsMaxPlanned + itsMaxFinished < MAX_CONCURRENT_OBSERVATIONS, "maxPlannedList + maxFinishedList should be less than " << MAX_CONCURRENT_OBSERVATIONS); + ASSERTSTR(itsMaxPlanned + itsMaxFinished < MAX_CONCURRENT_OBSERVATIONS, + "maxPlannedList + maxFinishedList should be less than " << MAX_CONCURRENT_OBSERVATIONS); // Read the schedule periods for starting observations. itsQueuePeriod = globalParameterSet()->getTime("QueuePeriod"); @@ -101,20 +103,23 @@ MACScheduler::MACScheduler() : // attach to child control task itsChildControl = ChildControl::instance(); - itsChildPort = new GCFITCPort (*this, *itsChildControl, "childITCport", - GCFPortInterface::SAP, CONTROLLER_PROTOCOL); + itsChildPort = new GCFITCPort (*this, *itsChildControl, "childITCport", GCFPortInterface::SAP, CONTROLLER_PROTOCOL); ASSERTSTR(itsChildPort, "Cannot allocate ITCport for childcontrol"); itsChildPort->open(); // will result in F_CONNECTED // create an PVSSprepare Task itsClaimerTask = new ObsClaimer(this); ASSERTSTR(itsClaimerTask, "Cannot construct a ObsClaimerTask"); - itsClaimerPort = new GCFITCPort (*this, *itsClaimerTask, "ObsClaimerPort", - GCFPortInterface::SAP, CM_PROTOCOL); + itsClaimerPort = new GCFITCPort (*this, *itsClaimerTask, "ObsClaimerPort", GCFPortInterface::SAP, CM_PROTOCOL); // need port for timers itsTimerPort = new GCFTimerPort(*this, "Timerport"); + // setup MsgQueue + string queueName = globalParameterSet()->getString("ParsetQueuename"); + ASSERTSTR(!queueName.empty(), "Queuename for distributing parameterSets not specified"); + itsMsgQueue = new ToBus(queueName); + registerProtocol(CONTROLLER_PROTOCOL, CONTROLLER_PROTOCOL_STRINGS); registerProtocol(DP_PROTOCOL, DP_PROTOCOL_STRINGS); } @@ -134,6 +139,10 @@ MACScheduler::~MACScheduler() if (itsOTDBconnection) { delete itsOTDBconnection; } + + if (itsMsgQueue) { + delete itsMsgQueue; + } } // @@ -696,6 +705,10 @@ void MACScheduler::_updatePlannedList() // add controller to our 'monitor' administration itsControllerMap[cntlrName] = obsID; LOG_DEBUG_STR("itsControllerMap[" << cntlrName << "]=" << obsID); + if (!itsPreparedObs[obsID].parsetDistributed) { + _setParsetOnMsgBus(observationParset(obsID)); + itsPreparedObs[obsID].parsetDistributed = true; + } } else { LOG_DEBUG_STR("Observation " << obsID << " is already (being) started"); @@ -809,6 +822,25 @@ void MACScheduler::_updateFinishedList() } } +// +// _setParsetOnMsgBus(parsetFile) +// +void MACScheduler::_setParsetOnMsgBus(const string& filename) const +{ + // open file + ParameterSet obsSpecs(filename); + string obsPrefix = obsSpecs.fullModuleName("Observation"); + string momID = obsSpecs.getString(obsPrefix + ".momID"); + string sasID = obsSpecs.getString(obsPrefix + ".otdbID"); + + // from, forUser, summary, protocol, protocolVersion, momID, sasID + Message outMsg("LOFAR.MACScheduler", "", "", "task.specification.system", "1.0", momID, sasID); + stringstream ss; + obsSpecs.writeStream(ss); + outMsg.setTXTPayload(ss.str()); + cout << outMsg << endl; + itsMsgQueue->send(outMsg); +} // // _connectedHandler(port) diff --git a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.conf.in b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.conf.in index e4ac723afce39d2d10bc94678d610d2ca78644cd..911f75463162324556fcb8d03141d9aece3381b6 100644 --- a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.conf.in +++ b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.conf.in @@ -29,3 +29,4 @@ maxFinishedList = 40 # Never show more finished observations #ChildControl.StartupRetryInterval = 10s #ChildControl.MaxStartupRetry = 5 +ParsetQueuename = lofar.task.specification.system diff --git a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.h b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.h index d45b6280b397e95fefae5e66a0ad91454acc0ad1..0c54e1d274e82eb5a4bf8be72bbfc391387e1aa8 100644 --- a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.h +++ b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.h @@ -39,6 +39,7 @@ #include <Common/lofar_vector.h> #include <Common/LofarLogger.h> #include <ApplCommon/Observation.h> +#include <MessageBus/MsgBus.h> //# ACC Includes #include <OTDB/OTDBconnection.h> @@ -107,6 +108,7 @@ private: void _updatePlannedList(); void _updateActiveList(); void _updateFinishedList(); + void _setParsetOnMsgBus(const string& filename) const; // ----- DATA MEMBERS ----- // Our own propertySet in PVSS to inform the operator @@ -130,8 +132,9 @@ private: public: ptime modTime; bool prepReady; - schedInfo(ptime t, bool p) : modTime(t), prepReady(p) {}; - schedInfo() : modTime(min_date_time), prepReady(false) {}; + bool parsetDistributed; + schedInfo(ptime t, bool p) : modTime(t), prepReady(p), parsetDistributed(false) {}; + schedInfo() : modTime(min_date_time), prepReady(false), parsetDistributed(false) {}; }; typedef map<int /*obsID*/, schedInfo /*prepReady*/> ObsList; typedef map<int ,schedInfo>::iterator OLiter; @@ -163,8 +166,10 @@ private: uint32 itsQueuePeriod; // period between queueing and start // OTDB related variables. - OTDB::OTDBconnection* itsOTDBconnection; // connection to the database + OTDB::OTDBconnection* itsOTDBconnection; // connection to the database + // Messagebus related variables + ToBus* itsMsgQueue; // Bus used for sending }; };//MainCU