diff --git a/MAC/APL/CEPCU/src/OnlineControl/OnlineControl.cc b/MAC/APL/CEPCU/src/OnlineControl/OnlineControl.cc index 8127b683a13f9a6a76f5c7bae35abf3de6fc11cb..913f61a12f1027d7d8d2ca90983030e325923d46 100644 --- a/MAC/APL/CEPCU/src/OnlineControl/OnlineControl.cc +++ b/MAC/APL/CEPCU/src/OnlineControl/OnlineControl.cc @@ -31,6 +31,7 @@ #include <Common/Exceptions.h> #include <Common/SystemUtil.h> #include <Common/hexdump.h> +#include <MessageBus/Message.h> #include <ApplCommon/StationInfo.h> #include <ApplCommon/Observation.h> #include <ApplCommon/LofarDirs.h> @@ -65,6 +66,8 @@ namespace LOFAR { // static pointer to this object for signal handler static OnlineControl* thisOnlineControl = 0; +const double QUEUE_POLL_TIMEOUT = 1.0; + // // OnlineControl() // @@ -80,9 +83,9 @@ OnlineControl::OnlineControl(const string& cntlrName) : itsForcedQuitTimer (0), itsLogControlPort (0), itsState (CTState::NOSTATE), - itsFeedbackListener (0), // QUICK FIX #4022 - itsFeedbackPort (0), // QUICK FIX #4022 - itsFeedbackResult (CT_RESULT_NO_ERROR), // QUICK FIX #4022 + itsMsgQueue (0), + itsQueueTimer (0), + itsFeedbackResult (CT_RESULT_NO_ERROR), itsTreePrefix (""), itsInstanceNr (0), itsStartTime (), @@ -110,14 +113,13 @@ OnlineControl::OnlineControl(const string& cntlrName) : // need port for timers. itsTimerPort = new GCFTimerPort(*this, "TimerPort"); ASSERTSTR(itsTimerPort, "Can't allocate the timer!"); + itsQueueTimer = new GCFTimerPort(*this, "MsgQTimer"); + ASSERTSTR(itsQueueTimer, "Cannot allocate queue timer"); // Controlport to logprocessor itsLogControlPort = new GCFTCPPort(*this, MAC_SVCMASK_CEPLOGCONTROL, GCFPortInterface::SAP, CONTROLLER_PROTOCOL); ASSERTSTR(itsLogControlPort, "Can't allocate the logControlPort"); - // QUICK FIX #4022 - itsFeedbackListener = new GCFTCPPort (*this, "Feedbacklistener", GCFPortInterface::MSPP, CONTROLLER_PROTOCOL); - ASSERTSTR(itsFeedbackListener, "Cannot allocate TCP port for feedback"); itsForcedQuitTimer = new GCFTimerPort(*this, "EmergencyTimer"); ASSERTSTR(itsForcedQuitTimer, "Can't allocate the emergency timer!"); itsForceTimeout = globalParameterSet()->getTime("emergencyTimeout", 3600); @@ -141,14 +143,9 @@ OnlineControl::~OnlineControl() delete itsLogControlPort; } - if (itsTimerPort) { - delete itsTimerPort; - } - - if (itsFeedbackListener) { - itsFeedbackListener->close(); - delete itsFeedbackListener; - } + delete itsTimerPort; + delete itsQueueTimer; + delete itsMsgQueue; } // @@ -268,11 +265,12 @@ GCFEvent::TResult OnlineControl::initial_state(GCFEvent& event, GCFPortInterface itsParentPort = itsParentControl->registerTask(this); // results in CONTROL_CONNECT - // QUICK FIX #4022 - uint32 obsID = globalParameterSet()->getUint32("Observation.ObsID"); - LOG_INFO_STR("Openening feedback port for OLAP: " << MAC_ONLINE_FEEDBACK_QF + obsID%1000); - itsFeedbackListener->setPortNumber(MAC_ONLINE_FEEDBACK_QF + obsID%1000); - itsFeedbackListener->open(); // will result in F_CONN + // open connection with messagebus + if (!itsMsgQueue) { + string queueName = globalParameterSet()->getString("TaskStateQueue"); + itsMsgQueue = new FromBus(queueName); + LOG_INFO_STR("Starting to listen on " << queueName); + } LOG_DEBUG ("Going to operational state"); TRAN(OnlineControl::active_state); // go to next state. @@ -310,11 +308,7 @@ GCFEvent::TResult OnlineControl::active_state(GCFEvent& event, GCFPortInterface& // update PVSS itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("active")); itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString("")); - } break; - - // QUICKFIX #4022 - case F_ACCEPT_REQ: { - _handleAcceptRequest(port); + itsQueueTimer->setTimer(QUEUE_POLL_TIMEOUT); } break; case F_CONNECTED: { @@ -326,18 +320,30 @@ GCFEvent::TResult OnlineControl::active_state(GCFEvent& event, GCFPortInterface& _handleDisconnect(port); } break; - // QUICKFIX #4022 - case F_DATAIN: { - _handleDataIn(port); - } break; - case DP_CHANGED: _databaseEventHandler(event); break; case F_TIMER: { GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event); - if (timerEvent.id == itsStopTimerID) { + if (&port == itsQueueTimer) { + Message msg; + if (itsMsgQueue->getMessage(msg, 0.1)) { + string result = msg.getXMLvalue("message.payload.task.state"); + if (result == "ABORT") { + itsFeedbackResult = CT_RESULT_PIPELINE_FAILED; + } + else if (result != "FINISHED") { + LOG_FATAL_STR("Unknown result received from correlator: " << result << " assuming failure!"); + itsFeedbackResult = CT_RESULT_PIPELINE_FAILED; + } + LOG_INFO_STR("Received finish result on messagebus: " << itsFeedbackResult); + TRAN(OnlineControl::finishing_state); + break; + } + itsQueueTimer->setTimer(QUEUE_POLL_TIMEOUT); + } + else if (timerEvent.id == itsStopTimerID) { LOG_DEBUG("StopTimer expired, starting QUIT sequence"); itsStopTimerID = 0; _setState(CTState::QUIT); @@ -442,45 +448,6 @@ GCFEvent::TResult OnlineControl::active_state(GCFEvent& event, GCFPortInterface& return (status); } -// -// completing_state(event, port) -// -// -GCFEvent::TResult OnlineControl::completing_state(GCFEvent& event, GCFPortInterface& port) -{ - LOG_INFO_STR ("completing:" << eventName(event) << "@" << port.getName()); - - switch (event.signal) { - case F_ENTRY: { - // update PVSS - itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("completing")); - itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString("")); - itsInFinishState = true; - - _passMetadatToOTDB(); - - TRAN(OnlineControl::finishing_state); - } break; - - case F_TIMER: - break; - - case F_DISCONNECTED: - _handleDisconnect(port); - break; - - case F_DATAIN: - _handleDataIn(port); - break; - - default: - LOG_DEBUG("completing state default"); - return (GCFEvent::NOT_HANDLED); - } - - return (GCFEvent::HANDLED); -} - // // finishing_state(event, port) // @@ -511,10 +478,6 @@ GCFEvent::TResult OnlineControl::finishing_state(GCFEvent& event, GCFPortInterfa _handleDisconnect(port); break; - case F_DATAIN: - _handleDataIn(port); - break; - default: LOG_DEBUG("finishing_state default"); return (GCFEvent::NOT_HANDLED); @@ -798,91 +761,6 @@ void OnlineControl::_clearCobaltDatapoints() delete myDPservice; } -// -// _passMetadatToOTDB(); -// THIS ROUTINE IS A MODIFIED COPY FROM PYTHONCONTROL.CC -// -void OnlineControl::_passMetadatToOTDB() -{ - // No name specified? - bool metadataFileAvailable (true); - uint32 obsID(globalParameterSet()->getUint32("Observation.ObsID", 0)); - string feedbackFile = observationParset(obsID)+"_feedback"; - LOG_INFO_STR ("Expecting metadata to be in file " << feedbackFile); - if (feedbackFile.empty()) { - metadataFileAvailable = false; - } - - // read parameterset - // Try to setup the connection with the database - string confFile = globalParameterSet()->getString("OTDBconfFile", "SASGateway.conf"); - ConfigLocator CL; - string filename = CL.locate(confFile); - LOG_INFO_STR("Trying to read database information from file " << filename); - ParameterSet otdbconf; - otdbconf.adoptFile(filename); - string database = otdbconf.getString("SASGateway.OTDBdatabase"); - string dbhost = otdbconf.getString("SASGateway.OTDBhostname"); - OTDBconnection conn("paulus", "boskabouter", database, dbhost); - if (!conn.connect()) { - LOG_FATAL_STR("Cannot connect to database " << database << " on machine " << dbhost); - // WE DO HAVE A PROBLEM HERE BECAUSE THIS PIPELINE CANNOT BE SET TO FINISHED IN SAS. - return; - } - LOG_INFO_STR("Connected to database " << database << " on machine " << dbhost); - - if (metadataFileAvailable) { - try { - TreeValue tv(&conn, getObservationNr(getName())); - ParameterSet metadata; - metadata.adoptFile(feedbackFile); - // Loop over the parameterset and send the information to the KVTlogger. - // During the transition phase from parameter-based to record-based storage in OTDB the - // nodenames ending in '_' are implemented both as parameter and as record. - ParameterSet::iterator iter = metadata.begin(); - ParameterSet::iterator end = metadata.end(); - while (iter != end) { - string key(iter->first); // make destoyable copy - rtrim(key, "[]0123456789"); - // bool doubleStorage(key[key.size()-1] == '_'); - bool isRecord(iter->second.isRecord()); - // isRecord doubleStorage - // -------------------------------------------------------------- - // Y Y store as record and as parameters - // Y N store as parameters - // N * store parameter - if (!isRecord) { - LOG_DEBUG_STR("BASIC: " << iter->first << " = " << iter->second); - tv.addKVT(iter->first, iter->second, ptime(microsec_clock::local_time())); - } - else { - // if (doubleStorage) { - // LOG_DEBUG_STR("RECORD: " << iter->first << " = " << iter->second); - // tv.addKVT(iter->first, iter->second, ptime(microsec_clock::local_time())); - // } - // to store is a node/param values the last _ should be stipped of - key = iter->first; // destroyable copy - // string::size_type pos = key.find_last_of('_'); - // key.erase(pos,1); - ParameterRecord pr(iter->second.getRecord()); - ParameterRecord::const_iterator prIter = pr.begin(); - ParameterRecord::const_iterator prEnd = pr.end(); - while (prIter != prEnd) { - LOG_DEBUG_STR("ELEMENT: " << key+"."+prIter->first << " = " << prIter->second); - tv.addKVT(key+"."+prIter->first, prIter->second, ptime(microsec_clock::local_time())); - prIter++; - } - } - iter++; - } - LOG_INFO_STR(metadata.size() << " metadata values send to SAS"); - } - catch (APSException &e) { - // Parameterfile not found - LOG_FATAL(e.text()); - } - } -} // -------------------- Application-order administration -------------------- // @@ -891,55 +769,6 @@ void OnlineControl::_passMetadatToOTDB() void OnlineControl::_handleDisconnect(GCFPortInterface& port) { port.close(); - // QUICKFIX #4022 - if (&port == itsFeedbackPort) { - LOG_INFO_STR("Lost connection with Feedback of OLAP."); - delete itsFeedbackPort; - itsFeedbackPort = 0; - } -} - -// -// _handleAcceptRequest(port) -// -void OnlineControl::_handleAcceptRequest(GCFPortInterface& port) -{ - ASSERTSTR(&port == itsFeedbackListener, "Incoming connection on main listener iso feedbackListener"); - itsFeedbackPort = new GCFTCPPort(); - itsFeedbackPort->init(*this, "feedback", GCFPortInterface::SPP, 0, true); // raw port - if (!itsFeedbackListener->accept(*itsFeedbackPort)) { - delete itsFeedbackPort; - itsFeedbackPort = 0; - LOG_ERROR("Connection with Python feedback FAILED"); - } - else { - LOG_INFO("Connection made on feedback port, accepting commands"); - } -} - -// -// _handleDataIn(port) -// -void OnlineControl::_handleDataIn(GCFPortInterface& port) -{ - ASSERTSTR(&port == itsFeedbackPort, "Didn't expect raw data on port " << port.getName()); - char buf[1024]; - ssize_t btsRead = port.recv((void*)&buf[0], 1023); - buf[btsRead] = '\0'; - string s; - hexdump(s, buf, btsRead); - LOG_INFO_STR("Received command on feedback port: " << s); - - if (!strcmp(buf, "ABORT")) { - itsFeedbackResult = CT_RESULT_PIPELINE_FAILED; - TRAN(OnlineControl::completing_state); // pass metadata - } - else if (!strcmp(buf, "FINISHED")) { - TRAN(OnlineControl::completing_state); - } - else { - LOG_FATAL_STR("Received command on feedback port unrecognized"); - } } }; // CEPCU diff --git a/MAC/APL/CEPCU/src/OnlineControl/OnlineControl.h b/MAC/APL/CEPCU/src/OnlineControl/OnlineControl.h index 3fda39dc026f130746e587e67e6611f2a79e5ec2..fdfff17c6bc3089696b4e5e887e61b22ada37af8 100644 --- a/MAC/APL/CEPCU/src/OnlineControl/OnlineControl.h +++ b/MAC/APL/CEPCU/src/OnlineControl/OnlineControl.h @@ -27,10 +27,11 @@ #include <Common/lofar_string.h> #include <Common/lofar_vector.h> #include <Common/LofarLogger.h> - -//# ACC Includes #include <Common/ParameterSet.h> +//# MsgBus Includes +#include <MessageBus/MsgBus.h> + //# GCF Includes #include <GCF/TM/GCF_Control.h> #include <GCF/PVSS/PVSSservice.h> @@ -90,11 +91,8 @@ private: void _stopApplications(); void _finishController (uint16_t result); void _handleDisconnect (GCFPortInterface& port); - void _handleAcceptRequest (GCFPortInterface& port); - void _handleDataIn (GCFPortInterface& port); void _setState (CTState::CTstateNr newState); void _databaseEventHandler(GCFEvent& event); - void _passMetadatToOTDB (); void _clearCobaltDatapoints(); // ----- datamembers ----- @@ -115,9 +113,8 @@ private: CTState::CTstateNr itsState; - // QUICK FIX #4022 - GCFTCPPort* itsFeedbackListener; - GCFTCPPort* itsFeedbackPort; + FromBus* itsMsgQueue; + GCFTimerPort* itsQueueTimer; int itsFeedbackResult; // ParameterSet variables