Skip to content
Snippets Groups Projects
Commit 3110bd87 authored by Ruud Overeem's avatar Ruud Overeem
Browse files

Task #7420: MACScheduler pushes parset on the bus just before starting...

Task #7420: MACScheduler pushes parset on the bus just before starting ObservationControl. Care is taken that the parset is pushed only once.
parent 4800e4e4
No related branches found
No related tags found
No related merge requests found
......@@ -77,7 +77,8 @@ MACScheduler::MACScheduler() :
itsNextFinishedTime (0),
itsNrPlanned (0),
itsNrActive (0),
itsOTDBconnection (0)
itsOTDBconnection (0),
itsMsgQueue (0)
{
LOG_TRACE_OBJ ("MACscheduler construction");
......@@ -93,7 +94,8 @@ MACScheduler::MACScheduler() :
itsMaxPlanned = globalParameterSet()->getTime("maxPlannedList", 30);
itsMaxFinished = globalParameterSet()->getTime("maxFinishedList", 40);
ASSERTSTR(itsMaxPlanned + itsMaxFinished < MAX_CONCURRENT_OBSERVATIONS, "maxPlannedList + maxFinishedList should be less than " << MAX_CONCURRENT_OBSERVATIONS);
ASSERTSTR(itsMaxPlanned + itsMaxFinished < MAX_CONCURRENT_OBSERVATIONS,
"maxPlannedList + maxFinishedList should be less than " << MAX_CONCURRENT_OBSERVATIONS);
// Read the schedule periods for starting observations.
itsQueuePeriod = globalParameterSet()->getTime("QueuePeriod");
......@@ -101,20 +103,23 @@ MACScheduler::MACScheduler() :
// attach to child control task
itsChildControl = ChildControl::instance();
itsChildPort = new GCFITCPort (*this, *itsChildControl, "childITCport",
GCFPortInterface::SAP, CONTROLLER_PROTOCOL);
itsChildPort = new GCFITCPort (*this, *itsChildControl, "childITCport", GCFPortInterface::SAP, CONTROLLER_PROTOCOL);
ASSERTSTR(itsChildPort, "Cannot allocate ITCport for childcontrol");
itsChildPort->open(); // will result in F_CONNECTED
// create an PVSSprepare Task
itsClaimerTask = new ObsClaimer(this);
ASSERTSTR(itsClaimerTask, "Cannot construct a ObsClaimerTask");
itsClaimerPort = new GCFITCPort (*this, *itsClaimerTask, "ObsClaimerPort",
GCFPortInterface::SAP, CM_PROTOCOL);
itsClaimerPort = new GCFITCPort (*this, *itsClaimerTask, "ObsClaimerPort", GCFPortInterface::SAP, CM_PROTOCOL);
// need port for timers
itsTimerPort = new GCFTimerPort(*this, "Timerport");
// setup MsgQueue
string queueName = globalParameterSet()->getString("ParsetQueuename");
ASSERTSTR(!queueName.empty(), "Queuename for distributing parameterSets not specified");
itsMsgQueue = new ToBus(queueName);
registerProtocol(CONTROLLER_PROTOCOL, CONTROLLER_PROTOCOL_STRINGS);
registerProtocol(DP_PROTOCOL, DP_PROTOCOL_STRINGS);
}
......@@ -134,6 +139,10 @@ MACScheduler::~MACScheduler()
if (itsOTDBconnection) {
delete itsOTDBconnection;
}
if (itsMsgQueue) {
delete itsMsgQueue;
}
}
//
......@@ -696,6 +705,10 @@ void MACScheduler::_updatePlannedList()
// add controller to our 'monitor' administration
itsControllerMap[cntlrName] = obsID;
LOG_DEBUG_STR("itsControllerMap[" << cntlrName << "]=" << obsID);
if (!itsPreparedObs[obsID].parsetDistributed) {
_setParsetOnMsgBus(observationParset(obsID));
itsPreparedObs[obsID].parsetDistributed = true;
}
}
else {
LOG_DEBUG_STR("Observation " << obsID << " is already (being) started");
......@@ -809,6 +822,25 @@ void MACScheduler::_updateFinishedList()
}
}
//
// _setParsetOnMsgBus(parsetFile)
//
void MACScheduler::_setParsetOnMsgBus(const string& filename) const
{
// open file
ParameterSet obsSpecs(filename);
string obsPrefix = obsSpecs.fullModuleName("Observation");
string momID = obsSpecs.getString(obsPrefix + ".momID");
string sasID = obsSpecs.getString(obsPrefix + ".otdbID");
// from, forUser, summary, protocol, protocolVersion, momID, sasID
Message outMsg("LOFAR.MACScheduler", "", "", "task.specification.system", "1.0", momID, sasID);
stringstream ss;
obsSpecs.writeStream(ss);
outMsg.setTXTPayload(ss.str());
cout << outMsg << endl;
itsMsgQueue->send(outMsg);
}
//
// _connectedHandler(port)
......
......@@ -29,3 +29,4 @@ maxFinishedList = 40 # Never show more finished observations
#ChildControl.StartupRetryInterval = 10s
#ChildControl.MaxStartupRetry = 5
ParsetQueuename = lofar.task.specification.system
......@@ -39,6 +39,7 @@
#include <Common/lofar_vector.h>
#include <Common/LofarLogger.h>
#include <ApplCommon/Observation.h>
#include <MessageBus/MsgBus.h>
//# ACC Includes
#include <OTDB/OTDBconnection.h>
......@@ -107,6 +108,7 @@ private:
void _updatePlannedList();
void _updateActiveList();
void _updateFinishedList();
void _setParsetOnMsgBus(const string& filename) const;
// ----- DATA MEMBERS -----
// Our own propertySet in PVSS to inform the operator
......@@ -130,8 +132,9 @@ private:
public:
ptime modTime;
bool prepReady;
schedInfo(ptime t, bool p) : modTime(t), prepReady(p) {};
schedInfo() : modTime(min_date_time), prepReady(false) {};
bool parsetDistributed;
schedInfo(ptime t, bool p) : modTime(t), prepReady(p), parsetDistributed(false) {};
schedInfo() : modTime(min_date_time), prepReady(false), parsetDistributed(false) {};
};
typedef map<int /*obsID*/, schedInfo /*prepReady*/> ObsList;
typedef map<int ,schedInfo>::iterator OLiter;
......@@ -163,8 +166,10 @@ private:
uint32 itsQueuePeriod; // period between queueing and start
// OTDB related variables.
OTDB::OTDBconnection* itsOTDBconnection; // connection to the database
OTDB::OTDBconnection* itsOTDBconnection; // connection to the database
// Messagebus related variables
ToBus* itsMsgQueue; // Bus used for sending
};
};//MainCU
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment