Skip to content
Snippets Groups Projects
Commit e00d0bc5 authored by Ruud Overeem's avatar Ruud Overeem
Browse files

Task #7360: Modified to work on the MessageBus.

parent cb4539ee
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,7 @@
#include <Common/Exceptions.h>
#include <Common/SystemUtil.h>
#include <Common/hexdump.h>
#include <MessageBus/Message.h>
#include <ApplCommon/StationInfo.h>
#include <ApplCommon/Observation.h>
#include <ApplCommon/LofarDirs.h>
......@@ -65,6 +66,8 @@ namespace LOFAR {
// static pointer to this object for signal handler
static OnlineControl* thisOnlineControl = 0;
const double QUEUE_POLL_TIMEOUT = 1.0;
//
// OnlineControl()
//
......@@ -80,9 +83,9 @@ OnlineControl::OnlineControl(const string& cntlrName) :
itsForcedQuitTimer (0),
itsLogControlPort (0),
itsState (CTState::NOSTATE),
itsFeedbackListener (0), // QUICK FIX #4022
itsFeedbackPort (0), // QUICK FIX #4022
itsFeedbackResult (CT_RESULT_NO_ERROR), // QUICK FIX #4022
itsMsgQueue (0),
itsQueueTimer (0),
itsFeedbackResult (CT_RESULT_NO_ERROR),
itsTreePrefix (""),
itsInstanceNr (0),
itsStartTime (),
......@@ -110,14 +113,13 @@ OnlineControl::OnlineControl(const string& cntlrName) :
// need port for timers.
itsTimerPort = new GCFTimerPort(*this, "TimerPort");
ASSERTSTR(itsTimerPort, "Can't allocate the timer!");
itsQueueTimer = new GCFTimerPort(*this, "MsgQTimer");
ASSERTSTR(itsQueueTimer, "Cannot allocate queue timer");
// Controlport to logprocessor
itsLogControlPort = new GCFTCPPort(*this, MAC_SVCMASK_CEPLOGCONTROL, GCFPortInterface::SAP, CONTROLLER_PROTOCOL);
ASSERTSTR(itsLogControlPort, "Can't allocate the logControlPort");
// QUICK FIX #4022
itsFeedbackListener = new GCFTCPPort (*this, "Feedbacklistener", GCFPortInterface::MSPP, CONTROLLER_PROTOCOL);
ASSERTSTR(itsFeedbackListener, "Cannot allocate TCP port for feedback");
itsForcedQuitTimer = new GCFTimerPort(*this, "EmergencyTimer");
ASSERTSTR(itsForcedQuitTimer, "Can't allocate the emergency timer!");
itsForceTimeout = globalParameterSet()->getTime("emergencyTimeout", 3600);
......@@ -141,14 +143,9 @@ OnlineControl::~OnlineControl()
delete itsLogControlPort;
}
if (itsTimerPort) {
delete itsTimerPort;
}
if (itsFeedbackListener) {
itsFeedbackListener->close();
delete itsFeedbackListener;
}
delete itsTimerPort;
delete itsQueueTimer;
delete itsMsgQueue;
}
//
......@@ -268,11 +265,12 @@ GCFEvent::TResult OnlineControl::initial_state(GCFEvent& event, GCFPortInterface
itsParentPort = itsParentControl->registerTask(this);
// results in CONTROL_CONNECT
// QUICK FIX #4022
uint32 obsID = globalParameterSet()->getUint32("Observation.ObsID");
LOG_INFO_STR("Openening feedback port for OLAP: " << MAC_ONLINE_FEEDBACK_QF + obsID%1000);
itsFeedbackListener->setPortNumber(MAC_ONLINE_FEEDBACK_QF + obsID%1000);
itsFeedbackListener->open(); // will result in F_CONN
// open connection with messagebus
if (!itsMsgQueue) {
string queueName = globalParameterSet()->getString("TaskStateQueue");
itsMsgQueue = new FromBus(queueName);
LOG_INFO_STR("Starting to listen on " << queueName);
}
LOG_DEBUG ("Going to operational state");
TRAN(OnlineControl::active_state); // go to next state.
......@@ -310,11 +308,7 @@ GCFEvent::TResult OnlineControl::active_state(GCFEvent& event, GCFPortInterface&
// update PVSS
itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("active"));
itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString(""));
} break;
// QUICKFIX #4022
case F_ACCEPT_REQ: {
_handleAcceptRequest(port);
itsQueueTimer->setTimer(QUEUE_POLL_TIMEOUT);
} break;
case F_CONNECTED: {
......@@ -326,18 +320,30 @@ GCFEvent::TResult OnlineControl::active_state(GCFEvent& event, GCFPortInterface&
_handleDisconnect(port);
} break;
// QUICKFIX #4022
case F_DATAIN: {
_handleDataIn(port);
} break;
case DP_CHANGED:
_databaseEventHandler(event);
break;
case F_TIMER: {
GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event);
if (timerEvent.id == itsStopTimerID) {
if (&port == itsQueueTimer) {
Message msg;
if (itsMsgQueue->getMessage(msg, 0.1)) {
string result = msg.getXMLvalue("message.payload.task.state");
if (result == "ABORT") {
itsFeedbackResult = CT_RESULT_PIPELINE_FAILED;
}
else if (result != "FINISHED") {
LOG_FATAL_STR("Unknown result received from correlator: " << result << " assuming failure!");
itsFeedbackResult = CT_RESULT_PIPELINE_FAILED;
}
LOG_INFO_STR("Received finish result on messagebus: " << itsFeedbackResult);
TRAN(OnlineControl::finishing_state);
break;
}
itsQueueTimer->setTimer(QUEUE_POLL_TIMEOUT);
}
else if (timerEvent.id == itsStopTimerID) {
LOG_DEBUG("StopTimer expired, starting QUIT sequence");
itsStopTimerID = 0;
_setState(CTState::QUIT);
......@@ -442,45 +448,6 @@ GCFEvent::TResult OnlineControl::active_state(GCFEvent& event, GCFPortInterface&
return (status);
}
//
// completing_state(event, port)
//
//
GCFEvent::TResult OnlineControl::completing_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_INFO_STR ("completing:" << eventName(event) << "@" << port.getName());
switch (event.signal) {
case F_ENTRY: {
// update PVSS
itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("completing"));
itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString(""));
itsInFinishState = true;
_passMetadatToOTDB();
TRAN(OnlineControl::finishing_state);
} break;
case F_TIMER:
break;
case F_DISCONNECTED:
_handleDisconnect(port);
break;
case F_DATAIN:
_handleDataIn(port);
break;
default:
LOG_DEBUG("completing state default");
return (GCFEvent::NOT_HANDLED);
}
return (GCFEvent::HANDLED);
}
//
// finishing_state(event, port)
//
......@@ -511,10 +478,6 @@ GCFEvent::TResult OnlineControl::finishing_state(GCFEvent& event, GCFPortInterfa
_handleDisconnect(port);
break;
case F_DATAIN:
_handleDataIn(port);
break;
default:
LOG_DEBUG("finishing_state default");
return (GCFEvent::NOT_HANDLED);
......@@ -798,91 +761,6 @@ void OnlineControl::_clearCobaltDatapoints()
delete myDPservice;
}
//
// _passMetadatToOTDB();
// THIS ROUTINE IS A MODIFIED COPY FROM PYTHONCONTROL.CC
//
void OnlineControl::_passMetadatToOTDB()
{
// No name specified?
bool metadataFileAvailable (true);
uint32 obsID(globalParameterSet()->getUint32("Observation.ObsID", 0));
string feedbackFile = observationParset(obsID)+"_feedback";
LOG_INFO_STR ("Expecting metadata to be in file " << feedbackFile);
if (feedbackFile.empty()) {
metadataFileAvailable = false;
}
// read parameterset
// Try to setup the connection with the database
string confFile = globalParameterSet()->getString("OTDBconfFile", "SASGateway.conf");
ConfigLocator CL;
string filename = CL.locate(confFile);
LOG_INFO_STR("Trying to read database information from file " << filename);
ParameterSet otdbconf;
otdbconf.adoptFile(filename);
string database = otdbconf.getString("SASGateway.OTDBdatabase");
string dbhost = otdbconf.getString("SASGateway.OTDBhostname");
OTDBconnection conn("paulus", "boskabouter", database, dbhost);
if (!conn.connect()) {
LOG_FATAL_STR("Cannot connect to database " << database << " on machine " << dbhost);
// WE DO HAVE A PROBLEM HERE BECAUSE THIS PIPELINE CANNOT BE SET TO FINISHED IN SAS.
return;
}
LOG_INFO_STR("Connected to database " << database << " on machine " << dbhost);
if (metadataFileAvailable) {
try {
TreeValue tv(&conn, getObservationNr(getName()));
ParameterSet metadata;
metadata.adoptFile(feedbackFile);
// Loop over the parameterset and send the information to the KVTlogger.
// During the transition phase from parameter-based to record-based storage in OTDB the
// nodenames ending in '_' are implemented both as parameter and as record.
ParameterSet::iterator iter = metadata.begin();
ParameterSet::iterator end = metadata.end();
while (iter != end) {
string key(iter->first); // make destoyable copy
rtrim(key, "[]0123456789");
// bool doubleStorage(key[key.size()-1] == '_');
bool isRecord(iter->second.isRecord());
// isRecord doubleStorage
// --------------------------------------------------------------
// Y Y store as record and as parameters
// Y N store as parameters
// N * store parameter
if (!isRecord) {
LOG_DEBUG_STR("BASIC: " << iter->first << " = " << iter->second);
tv.addKVT(iter->first, iter->second, ptime(microsec_clock::local_time()));
}
else {
// if (doubleStorage) {
// LOG_DEBUG_STR("RECORD: " << iter->first << " = " << iter->second);
// tv.addKVT(iter->first, iter->second, ptime(microsec_clock::local_time()));
// }
// to store is a node/param values the last _ should be stipped of
key = iter->first; // destroyable copy
// string::size_type pos = key.find_last_of('_');
// key.erase(pos,1);
ParameterRecord pr(iter->second.getRecord());
ParameterRecord::const_iterator prIter = pr.begin();
ParameterRecord::const_iterator prEnd = pr.end();
while (prIter != prEnd) {
LOG_DEBUG_STR("ELEMENT: " << key+"."+prIter->first << " = " << prIter->second);
tv.addKVT(key+"."+prIter->first, prIter->second, ptime(microsec_clock::local_time()));
prIter++;
}
}
iter++;
}
LOG_INFO_STR(metadata.size() << " metadata values send to SAS");
}
catch (APSException &e) {
// Parameterfile not found
LOG_FATAL(e.text());
}
}
}
// -------------------- Application-order administration --------------------
//
......@@ -891,55 +769,6 @@ void OnlineControl::_passMetadatToOTDB()
void OnlineControl::_handleDisconnect(GCFPortInterface& port)
{
port.close();
// QUICKFIX #4022
if (&port == itsFeedbackPort) {
LOG_INFO_STR("Lost connection with Feedback of OLAP.");
delete itsFeedbackPort;
itsFeedbackPort = 0;
}
}
//
// _handleAcceptRequest(port)
//
void OnlineControl::_handleAcceptRequest(GCFPortInterface& port)
{
ASSERTSTR(&port == itsFeedbackListener, "Incoming connection on main listener iso feedbackListener");
itsFeedbackPort = new GCFTCPPort();
itsFeedbackPort->init(*this, "feedback", GCFPortInterface::SPP, 0, true); // raw port
if (!itsFeedbackListener->accept(*itsFeedbackPort)) {
delete itsFeedbackPort;
itsFeedbackPort = 0;
LOG_ERROR("Connection with Python feedback FAILED");
}
else {
LOG_INFO("Connection made on feedback port, accepting commands");
}
}
//
// _handleDataIn(port)
//
void OnlineControl::_handleDataIn(GCFPortInterface& port)
{
ASSERTSTR(&port == itsFeedbackPort, "Didn't expect raw data on port " << port.getName());
char buf[1024];
ssize_t btsRead = port.recv((void*)&buf[0], 1023);
buf[btsRead] = '\0';
string s;
hexdump(s, buf, btsRead);
LOG_INFO_STR("Received command on feedback port: " << s);
if (!strcmp(buf, "ABORT")) {
itsFeedbackResult = CT_RESULT_PIPELINE_FAILED;
TRAN(OnlineControl::completing_state); // pass metadata
}
else if (!strcmp(buf, "FINISHED")) {
TRAN(OnlineControl::completing_state);
}
else {
LOG_FATAL_STR("Received command on feedback port unrecognized");
}
}
}; // CEPCU
......
......@@ -27,10 +27,11 @@
#include <Common/lofar_string.h>
#include <Common/lofar_vector.h>
#include <Common/LofarLogger.h>
//# ACC Includes
#include <Common/ParameterSet.h>
//# MsgBus Includes
#include <MessageBus/MsgBus.h>
//# GCF Includes
#include <GCF/TM/GCF_Control.h>
#include <GCF/PVSS/PVSSservice.h>
......@@ -90,11 +91,8 @@ private:
void _stopApplications();
void _finishController (uint16_t result);
void _handleDisconnect (GCFPortInterface& port);
void _handleAcceptRequest (GCFPortInterface& port);
void _handleDataIn (GCFPortInterface& port);
void _setState (CTState::CTstateNr newState);
void _databaseEventHandler(GCFEvent& event);
void _passMetadatToOTDB ();
void _clearCobaltDatapoints();
// ----- datamembers -----
......@@ -115,9 +113,8 @@ private:
CTState::CTstateNr itsState;
// QUICK FIX #4022
GCFTCPPort* itsFeedbackListener;
GCFTCPPort* itsFeedbackPort;
FromBus* itsMsgQueue;
GCFTimerPort* itsQueueTimer;
int itsFeedbackResult;
// ParameterSet variables
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment