Newer
Older
//# MACScheduler.cc: Implementation of the MAC Scheduler task
//#
Alexander van Amesfoort
committed
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, softwaresupport@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#include <lofar_config.h>
#include <Common/LofarLogger.h>
Ruud Overeem
committed
#include <Common/SystemUtil.h>
Ruud Overeem
committed
#include <Common/StringUtil.h>
#include <Common/ParameterSet.h>
Ruud Overeem
committed
#include <MACIO/MACServiceInfo.h>
#include <MessageBus/ToBus.h>
#include <MessageBus/Protocols/TaskSpecificationSystem.h>
#include <GCF/PVSS/GCF_PVTypes.h>
Ruud Overeem
committed
#include <APL/RTDBCommon/CM_Protocol.ph>

Jorrit Schaap
committed
#include <boost/algorithm/string.hpp>
Ruud Overeem
committed
#include "PVSSDatapointDefs.h"

Marcel Loose
committed
#include <MainCU/Package__Version.h>

Marcel Loose
committed
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace LOFAR::GCF::PVSS;
using namespace LOFAR::Protocols;

Marcel Loose
committed
using namespace boost::posix_time;
using namespace Controller_Protocol;
using namespace DP_Protocol;
using namespace CM_Protocol;
namespace MainCU {
#define MAX_CONCURRENT_OBSERVATIONS 100
#define MIN2(a,b) (((a) < (b)) ? (a) : (b))
// static (this) pointer used for signal handling
static MACScheduler* pMacScheduler = 0;

Jorrit Schaap
committed
//
// MACScheduler()
//
MACScheduler::MACScheduler() :
Ruud Overeem
committed
GCFTask ((State)&MACScheduler::initial_state,string("MACScheduler")),
Ruud Overeem
committed
itsClaimerTask (0),
itsClaimerPort (0),
itsTimerPort (0),
Ruud Overeem
committed
itsNextPlannedTime (0),
itsNextActiveTime (0),
itsNextFinishedTime (0),
itsNrPlanned (0),
itsNrActive (0),
itsOTDBconnection (0),
itsTMSSconnection (0),
itsMsgQueue (0)
Ruud Overeem
committed
LOG_INFO_STR("MACProcessScope: " << PSN_MAC_SCHEDULER);
LOG_INFO(Version::getInfo<MainCUVersion>("MACScheduler"));
Ruud Overeem
committed
// Read timersettings from the ParameterSet
itsPlannedItv = globalParameterSet()->getTime("pollIntervalPlanned", 60);
itsActiveItv = globalParameterSet()->getTime("pollIntervalExecute", 5);
itsFinishedItv = globalParameterSet()->getTime("pollIntervalFinished", 60);
itsPlannedPeriod = globalParameterSet()->getTime("plannedPeriod", 86400) / 60; // in minutes
itsFinishedPeriod= globalParameterSet()->getTime("finishedPeriod", 86400) / 60; // in minutes
itsMaxPlanned = globalParameterSet()->getTime("maxPlannedList", 30);
itsMaxFinished = globalParameterSet()->getTime("maxFinishedList", 40);
Ruud Overeem
committed
itsExclPLcluster = globalParameterSet()->getString("excludePipelinesOnThisCluster", "");
if (itsExclPLcluster.length() > 0) {
LOG_INFO_STR("NOT running the pipelines on cluster: " << itsExclPLcluster);
// We need to exclude this cluster for pipelines so make sure the name begins with the 'not'-sign.
if (itsExclPLcluster[0] != '!') {
itsExclPLcluster.insert(0, 1, '!');
}
}

Jorrit Schaap
committed
// JS: 20200329: This is a working piece of code to test TMSSBridge functionality.
// The actual code is used below in _updatePlannedList, but that requires GCF calls which I don't have (yet) in my dockerimage.

Jorrit Schaap
committed
// HINT for TeamRed TMSS collegues: uncomment this part, and compile and run MACScheduler in the ci_mac docker image, that makes testing/coding fast and easy.
// But keep it commented out if you want to run it on mcu199 !!!

Jorrit Schaap
committed
// ParameterSet* pParamSet = globalParameterSet();
// std::string tmss_username = pParamSet->getString("TMSSusername", "test");
// std::string tmss_password = pParamSet->getString("TMSSpassword", "test");
// std::string tmss_hostname = pParamSet->getString("TMSShostname", "127.0.0.1");
// int tmss_port = pParamSet->getInt("TMSSport", 8008);
//
// LOG_INFO_STR ("Trying to connect to the TMSS " << tmss_hostname << ":" << tmss_port << " user/pass:" << tmss_username << "/******" );
// itsTMSSconnection = new TMSSBridge(tmss_hostname, tmss_port, tmss_username, tmss_password);
// LOG_INFO ("Connected to the TMSSBridge");
//
// Json::Value upcomingSubTasks = itsTMSSconnection->getSubTasksStartingInThreeMinutes();
//

Jorrit Schaap
committed
// if (!upcomingSubTasks.empty()) {
// std::cout << formatString("TMSSCheck:First planned observation (%s) is at %s", upcomingSubTasks[0]["url"].asCString(), upcomingSubTasks[0]["start_time"].asCString()) << endl;

Jorrit Schaap
committed
//

Jorrit Schaap
committed
// Json::Value subtask = upcomingSubTasks[0];
// string updated_at(subtask["updated_at"].asString().replace(10, 1, " "));

Jorrit Schaap
committed
// ptime modTime = time_from_string(updated_at);
// cout << "modTime: " << modTime << endl;

Jorrit Schaap
committed
//
// // get subtask_id from url. I know, ugly, needs to be in json itself.
// vector<string> tmp;
// string url(subtask["url"].asString());
// boost::split(tmp, url, [](char c){return c == '/';});
// int subtask_id = stoi(tmp[tmp.size()-2]);
//
// string parsetText = itsTMSSconnection->getParsetAsText(subtask_id);
// std::cout << parsetText << std::endl << std::endl << std::endl;
//
// string filename(observationParset(subtask_id));
//

Jorrit Schaap
committed
// ParameterSet obsSpecs(false);
// obsSpecs.adoptBuffer(parsetText);
// obsSpecs.writeFile(filename);

Jorrit Schaap
committed
//
// std::cout << "Wrote parset to " << filename << std::endl;
// }

Jorrit Schaap
committed
// exit(0);

Jorrit Schaap
committed
ASSERTSTR(itsMaxPlanned + itsMaxFinished < MAX_CONCURRENT_OBSERVATIONS,
"maxPlannedList + maxFinishedList should be less than " << MAX_CONCURRENT_OBSERVATIONS);
Ruud Overeem
committed
// Read the schedule periods for starting observations.
itsQueuePeriod = globalParameterSet()->getTime("QueuePeriod");
Arno Schoenmakers
committed
LOG_INFO_STR("Queueperiod = " << itsQueuePeriod);
itsChildPort = new GCFITCPort (*this, *itsChildControl, "childITCport", GCFPortInterface::SAP, CONTROLLER_PROTOCOL);
ASSERTSTR(itsChildPort, "Cannot allocate ITCport for childcontrol");
itsChildPort->open(); // will result in F_CONNECTED
Ruud Overeem
committed
// create an PVSSprepare Task
itsClaimerTask = new ObsClaimer(this);
ASSERTSTR(itsClaimerTask, "Cannot construct a ObsClaimerTask");
itsClaimerPort = new GCFITCPort (*this, *itsClaimerTask, "ObsClaimerPort", GCFPortInterface::SAP, CM_PROTOCOL);
Ruud Overeem
committed
// need port for timers
itsTimerPort = new GCFTimerPort(*this, "Timerport");

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
// setup MsgQueue
string queueName = globalParameterSet()->getString("ParsetQueuename");
ASSERTSTR(!queueName.empty(), "Queuename for distributing parameterSets not specified");
itsMsgQueue = new ToBus(queueName);

Jorrit Schaap
committed
#endif
Ruud Overeem
committed
registerProtocol(CONTROLLER_PROTOCOL, CONTROLLER_PROTOCOL_STRINGS);
Ruud Overeem
committed
registerProtocol(DP_PROTOCOL, DP_PROTOCOL_STRINGS);
}
//
// ~MACScheduler()
//
MACScheduler::~MACScheduler()
{
LOG_TRACE_OBJ ("~MACscheduler");

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
if (itsOTDBconnection) {
delete itsOTDBconnection;
}

Jorrit Schaap
committed
#endif
if (itsTMSSconnection) {
delete itsTMSSconnection;
}

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
if (itsMsgQueue) {
delete itsMsgQueue;
}

Jorrit Schaap
committed
#endif
//
// sigintHandler(signum)
//
void MACScheduler::sigintHandler(int signum)
LOG_DEBUG (formatString("SIGINT signal detected (%d)",signum));
if (pMacScheduler) {
pMacScheduler->finish();
}
Ruud Overeem
committed
// LOG_DEBUG_STR ("_databaseEventHandler:" << eventName(event));
switch(event.signal) {
case DP_CHANGED: {
DPChangedEvent dpEvent(event);
Ruud Overeem
committed
#if 0
if (strstr(dpEvent.DPname.c_str(), PVSSNAME_MS_QUEUEPERIOD) != 0) {
uint32 newVal = ((GCFPVUnsigned*) (dpEvent.value._pValue))->getValue();
LOG_INFO_STR ("Changing QueuePeriod from " << itsQueuePeriod << " to " << newVal);
itsQueuePeriod = newVal;
}
Ruud Overeem
committed
#endif

Jorrit Schaap
committed
}

Jorrit Schaap
committed
}
}
//
// initial_state(event, port)
//
// Setup all connections.
//
GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& /*port*/)
{

Jorrit Schaap
committed
LOG_INFO_STR ("initial_state:" << eventName(event));

Jorrit Schaap
committed

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
LOG_INFO_STR ("Activating my propertySet(" << PSN_MAC_SCHEDULER << ")");
itsPropertySet = new RTDBPropertySet(PSN_MAC_SCHEDULER,
PST_MAC_SCHEDULER,
Arno Schoenmakers
committed
PSAT_CW,

Jorrit Schaap
committed
#else
//HACK
itsTimerPort->cancelAllTimers();
itsTimerPort->setTimer(0.0);
#endif
case DP_CREATED: {
// NOTE: thsi function may be called DURING the construction of the PropertySet.
// Always exit this event in a way that GCF can end the construction.
DPCreatedEvent dpEvent(event);
LOG_DEBUG_STR("Result of creating " << dpEvent.DPname << " = " << dpEvent.result);
itsTimerPort->cancelAllTimers();
itsTimerPort->setTimer(0.0);
}
break;

Jorrit Schaap
committed

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString ("initial"));
itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString (""));
itsPropertySet->setValue(PN_MS_OTDB_CONNECTED, GCFPVBool (false));
itsPropertySet->setValue(PN_MS_OTDB_LAST_POLL, GCFPVString (""));
itsPropertySet->setValue(PN_MS_OTDB_POLLINTERVAL, GCFPVInteger (itsPlannedItv));
Ruud Overeem
committed
GCFPValueArray emptyArr;
itsPropertySet->setValue(PN_MS_ACTIVE_OBSERVATIONS, GCFPVDynArr(LPT_STRING, emptyArr));
itsPropertySet->setValue(PN_MS_PLANNED_OBSERVATIONS, GCFPVDynArr(LPT_STRING, emptyArr));
itsPropertySet->setValue(PN_MS_FINISHED_OBSERVATIONS, GCFPVDynArr(LPT_STRING, emptyArr));

Jorrit Schaap
committed
#endif

Jorrit Schaap
committed
ParameterSet* pParamSet = globalParameterSet();

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE

Auke Klazema
committed
try
{
username = pParamSet->getString("OTDBusername");

Auke Klazema
committed
}
catch(LOFAR::APSException& ex)
{
LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
"read the OTDB database user name from its configuration file: "
<< ex.what());
throw(ex);
}

Auke Klazema
committed
try
{
DBname = pParamSet->getString("OTDBdatabasename");

Auke Klazema
committed
}
catch(LOFAR::APSException& ex)
{
LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
"read the OTDB database name from its configuration file: "
<< ex.what());
throw(ex);
}

Auke Klazema
committed
try
{
password = pParamSet->getString("OTDBpassword");

Auke Klazema
committed
}
catch(LOFAR::APSException& ex)
{
LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
"read the OTDB database password from its configuration file: "
<< ex.what());
throw(ex);
}

Auke Klazema
committed
try
{
hostname = pParamSet->getString("OTDBhostname");

Auke Klazema
committed
}
catch(LOFAR::APSException& ex)
{
LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
"read the OTDB database server host name from its configuration file: "
<< ex.what());
throw(ex);
}

Auke Klazema
committed
try
{

Auke Klazema
committed
}
catch(LOFAR::APSException& ex)
{
LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
"read the OTDB database server port from its configuration file: "
<< ex.what());
throw(ex);
}
Ruud Overeem
committed
LOG_INFO_STR ("Trying to connect to the OTDB on " << hostname);

Auke Klazema
committed
itsOTDBconnection= new OTDBconnection(username, password, DBname, hostname, port);
ASSERTSTR (itsOTDBconnection, "Memory allocation error (OTDB)");
ASSERTSTR (itsOTDBconnection->connect(),

Auke Klazema
committed
"Unable to connect to database " << DBname << " on " << hostname <<
" port: " << port <<
" using " << username << "," << password);
itsPropertySet->setValue(PN_MS_OTDB_CONNECTED, GCFPVBool(true));

Jorrit Schaap
committed
#endif

Jorrit Schaap
committed
std::string tmss_username = pParamSet->getString("TMSSusername", "test");
std::string tmss_password = pParamSet->getString("TMSSpassword", "test");
std::string tmss_hostname = pParamSet->getString("TMSShostname", "120.0.0.1");
int tmss_port = pParamSet->getInt("TMSSport", 8008);
LOG_INFO_STR ("Trying to connect to the TMSS " << tmss_hostname << ":" << tmss_port << " user/pass:" << tmss_username << "/******" );
itsTMSSconnection= new TMSSBridge(tmss_hostname, tmss_port, tmss_username, tmss_password);
LOG_INFO ("Connected to the TMSSBridge");

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
// Start ChildControl task
LOG_DEBUG ("Enabling ChildControltask");
itsChildControl->openService(MAC_SVCMASK_SCHEDULERCTRL, 0);

Jorrit Schaap
committed
#endif
Ruud Overeem
committed
// setup initial schedule: first planned, next run active, second run finished
itsNextPlannedTime = time(0);
itsNextActiveTime = itsNextPlannedTime + itsPlannedItv;
itsNextFinishedTime = itsNextPlannedTime + 2*itsPlannedItv;

Jorrit Schaap
committed

Jorrit Schaap
committed
}
return (status);
}
//
// recover_state(event, port)
//
// Read last PVSS states, compare those to the SAS states and try to
// recover to the last situation.
//
GCFEvent::TResult MACScheduler::recover_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("recover_state:" << eventName(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// update PVSS

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
itsPropertySet->setValue(string(PN_FSM_CURRENT_ACTION),GCFPVString("recover"));
itsPropertySet->setValue(string(PN_FSM_ERROR),GCFPVString(""));

Jorrit Schaap
committed
#endif
//
// TODO: do recovery
TRAN(MACScheduler::active_state);

Jorrit Schaap
committed

Jorrit Schaap
committed

Jorrit Schaap
committed
}
return (status);
}
//
// active_state(event, port)
//
Ruud Overeem
committed
// Normal operation state. Check OTDB every itsActiveItv seconds and control
// the running observations.
//
GCFEvent::TResult MACScheduler::active_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("active:" << eventName(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {

Jorrit Schaap
committed
// install my own signal handler. GCFTask also installs a handler so we have

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
itsPropertySet->setValue(string(PN_FSM_CURRENT_ACTION),GCFPVString("active"));
itsPropertySet->setValue(string(PN_FSM_ERROR),GCFPVString(""));

Jorrit Schaap
committed
#endif
Ruud Overeem
committed
// Start heartbeat timer.

Jorrit Schaap
committed
case F_CONNECTED:
// Should be from the (lost) connection with the SD
_connectedHandler(port);
break;

Jorrit Schaap
committed
case F_DISCONNECTED:
// Can be from StartDaemon or ObsController.
// StartDaemon: trouble! Try to reconnect asap.
// ObsController: ok when obs is finished, BIG TROUBLE when not!
_disconnectedHandler(port);
break;
case DP_CHANGED:
_databaseEventHandler(event);
break;
Ruud Overeem
committed
case CM_CLAIM_RESULT: {
// some observation was claimed by the claimMgr. Update our prepare_list.
CMClaimResultEvent cmEvent(event);
ltrim(cmEvent.nameInAppl, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_");
int obsID = atoi(cmEvent.nameInAppl.c_str());
Arno Schoenmakers
committed
if (cmEvent.result != CM_NO_ERR) {
LOG_ERROR_STR("Error during checking observation " << obsID);

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Arno Schoenmakers
committed
OTDB::TreeMaintenance tm(itsOTDBconnection);
TreeStateConv tsc(itsOTDBconnection);
tm.setTreeState(obsID, tsc.get("aborted"));

Jorrit Schaap
committed
#endif
Arno Schoenmakers
committed
itsPreparedObs.erase(obsID);
break;
}
// claim was successful, update admin
LOG_INFO_STR("Observation " << obsID << " is mapped to " << cmEvent.DPname);
Ruud Overeem
committed
LOG_DEBUG_STR("PVSS preparation of observation " << obsID << " ready.");
Ruud Overeem
committed
itsPreparedObs[obsID].prepReady = true;
Ruud Overeem
committed
}
break;
case F_TIMER: { // secondTimer or reconnectTimer.
GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event);
if (timerEvent.id == itsSecondTimer) {
// time to poll the OTDB?
Ruud Overeem
committed
// Note: We assume here that the PlannedItv is smaller than the ActiveItv and the FinishedItv.
// When it is not smaller (very unlikely) than the ActiveItv and FinishedItv those two
// intervals will default to the PlannedItv.
if (time(0) >= itsNextPlannedTime) { // check shortest interval
Ruud Overeem
committed
// reinit polltime at multiple of intervaltime.
// (=more change to hit hh.mm:00)
Ruud Overeem
committed
itsNextPlannedTime = time(0) + itsPlannedItv;
itsNextPlannedTime -= (itsNextPlannedTime % itsPlannedItv);
itsSecondTimer = port.setTimer(1.0);
}
// a connection was lost and a timer was set to try to reconnect.
// else if (...) {
// TODO
// map timer to port
// port.open();
// }
break;
}
// -------------------- EVENTS FROM CHILDCONTROL --------------------
Ruud Overeem
committed
//

Jorrit Schaap
committed
// That must be events from the ObservationControllers that are currently
Ruud Overeem
committed
// started or running.
//
// Child control received a message from the startDaemon that the
// observationController was started (or not)
Ruud Overeem
committed
LOG_DEBUG_STR("Start of " << msg.cntlrName << " was successful, waiting for connection.");
Ruud Overeem
committed
LOG_ERROR_STR("Observation controller " << msg.cntlrName << " could not be started");
LOG_INFO_STR("Observation is be removed from administration, " << "restart will occur in next cycle");
Ruud Overeem
committed
itsControllerMap.erase(msg.cntlrName);
Ruud Overeem
committed
case CONTROL_CONNECTED: {
// The observationController has registered itself at childControl.
Ruud Overeem
committed
CONTROLConnectedEvent conEvent(event);
Ruud Overeem
committed
LOG_INFO_STR(conEvent.cntlrName << " is connected, updating SAS)");
// Ok, controller is really up, update SAS so that obs will not appear in
// in the SAS list again.
Ruud Overeem
committed
CMiter theObs(itsControllerMap.find(conEvent.cntlrName));
if (theObs == itsControllerMap.end()) {
Ruud Overeem
committed
LOG_WARN_STR("Cannot find controller " << conEvent.cntlrName << ". Can't update the SAS database");

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
OTDB::TreeMaintenance tm(itsOTDBconnection);
TreeStateConv tsc(itsOTDBconnection);
Ruud Overeem
committed
tm.setTreeState(theObs->second, tsc.get("queued"));

Jorrit Schaap
committed
#endif
Ruud Overeem
committed
case CONTROL_RESUMED: {
// update SAS database.
CONTROLResumedEvent msg(event);
CMiter theObs(itsControllerMap.find(msg.cntlrName));
if (theObs == itsControllerMap.end()) {
LOG_WARN_STR("Cannot find controller " << msg.cntlrName << ". Can't update the SAS database");
break;
}

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
OTDB::TreeMaintenance tm(itsOTDBconnection);
TreeStateConv tsc(itsOTDBconnection);
tm.setTreeState(theObs->second, tsc.get("active"));

Jorrit Schaap
committed
#endif
Ruud Overeem
committed
break;
}
case CONTROL_SUSPENDED: {
// update SAS database.
CONTROLSuspendedEvent msg(event);
CMiter theObs(itsControllerMap.find(msg.cntlrName));
if (theObs == itsControllerMap.end()) {
LOG_WARN_STR("Cannot find controller " << msg.cntlrName << ". Can't update the SAS database");
break;
}

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
OTDB::TreeMaintenance tm(itsOTDBconnection);
TreeStateConv tsc(itsOTDBconnection);
tm.setTreeState(theObs->second, tsc.get("completing"));

Jorrit Schaap
committed
#endif
Ruud Overeem
committed
break;
}
Ruud Overeem
committed
LOG_INFO_STR("Received QUITED(" << quitedEvent.cntlrName << "," << quitedEvent.result << ")");
Ruud Overeem
committed
CMiter theObs(itsControllerMap.find(quitedEvent.cntlrName));
if (theObs == itsControllerMap.end()) {
Ruud Overeem
committed
LOG_WARN_STR("Cannot find controller " << quitedEvent.cntlrName << ". Can't update the SAS database");

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
OTDB::TreeMaintenance tm(itsOTDBconnection);
TreeStateConv tsc(itsOTDBconnection);
// CT_RESULT_: MANUAL_REMOVED, MANUAL_ABORT, LOST_CONNECTION, NO_ERROR
Ruud Overeem
committed
tm.setTreeState(theObs->second, tsc.get("finished"));
Ruud Overeem
committed
tm.setTreeState(theObs->second, tsc.get("aborted"));

Jorrit Schaap
committed
#endif
Ruud Overeem
committed
// free claimed observation in PVSS
itsClaimerTask->freeObservation(observationName(theObs->second));
Ruud Overeem
committed
LOG_INFO_STR("Removing observation " << quitedEvent.cntlrName << " from activeList");
itsControllerMap.erase(quitedEvent.cntlrName);
// NOTE: ignore all other CONTROL events, we are not interested in the
// states of the Observations. (not our job).
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// finishing_state(event, port)
//
// Write controller state to PVSS, wait for 1 second (using a timer) to let GCF handle the property change
// and close the controller
//
GCFEvent::TResult MACScheduler::finishing_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("finishing_state:" << eventName(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// update PVSS

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("finished"));
itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString(""));
itsPropertySet->setValue(PN_MS_OTDB_CONNECTED, GCFPVBool (false));

Jorrit Schaap
committed
#endif

Jorrit Schaap
committed

Jorrit Schaap
committed
default:
LOG_DEBUG("finishing_state, default");
status = GCFEvent::NOT_HANDLED;
break;

Jorrit Schaap
committed
}
return (status);
}
//
// finish
//
// Make the transition to the finishing state
//
void MACScheduler::finish()
{
TRAN(MACScheduler::finishing_state);
}
//
// _doOTDBcheck
//
// Check if a new action should be taken based on the contents of OTDB and our own
// administration.
//
void MACScheduler::_doOTDBcheck()
{
Ruud Overeem
committed
time_t now = time(0);
ptime currentTime = from_time_t(now);

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
itsPropertySet->setValue(string(PN_MS_OTDB_LAST_POLL), GCFPVString(to_simple_string(currentTime)));

Jorrit Schaap
committed
#endif
Ruud Overeem
committed
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
// always update planned list because we might need to start some of those
// (and we assumed that the PlannedItv was the smallest)
_updatePlannedList();
// update active lists when its time to do so.
if (now >= itsNextActiveTime) {
_updateActiveList();
while (itsNextActiveTime <= now) {
itsNextActiveTime += itsActiveItv;
}
}
// update finished lists when its time to do so.
if (now >= itsNextFinishedTime) {
_updateFinishedList();
while (itsNextFinishedTime <= now) {
itsNextFinishedTime += itsFinishedItv;
}
}
}
//
// _updatePlannedList()
//
void MACScheduler::_updatePlannedList()
{

Jorrit Schaap
committed
LOG_DEBUG("_updatePlannedList()");
Ruud Overeem
committed
// get time info
time_t now = time(0);
ptime currentTime = from_time_t(now);
ASSERTSTR (currentTime != not_a_date_time, "Can't determine systemtime, bailing out");

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
// get new list (list is ordered on starttime) of planned observations

Jorrit Schaap
committed
vector<OTDBtree> plannedDBlist = itsOTDBconnection->getTreeGroup(1, itsPlannedPeriod, itsExclPLcluster);

Jorrit Schaap
committed
LOG_DEBUG(formatString("OTDBCheck:First planned observation (%d) is at %s (active over %d seconds)",
plannedDBlist[0].treeID(), to_simple_string(plannedDBlist[0].starttime).c_str(),
Arno Schoenmakers
committed
time_duration(plannedDBlist[0].starttime - currentTime).total_seconds()));
}
// NOTE: do not exit routine on emptylist: we need to write an empty list to clear the DB

Jorrit Schaap
committed
#endif
Json::Value upcomingSubTasks = itsTMSSconnection->getSubTasksStartingInThreeMinutes();
if (!upcomingSubTasks.empty()) {

Jorrit Schaap
committed
LOG_DEBUG(formatString("TMSSCheck:First planned observation (%s) is at %s",
upcomingSubTasks[0]["url"].asCString(), upcomingSubTasks[0]["start_time"].asCString()));
}
Arno Schoenmakers
committed
// make a copy of the current prepared observations (= observations shown in the navigator in the 'future'

Jorrit Schaap
committed
// list). By eliminating the observations that are in the current SAS list we end up (at the end of this function)
Arno Schoenmakers
committed
// with a list of observations that were in the SASlist the last time but now not anymore. Normally those observations
// will appear in the active-list and will be removed there from the prepared list but WHEN AN OPERATOR CHANGES
// THE STATUS MANUALLY into something different (e.g. ON-HOLD) the observation stays in the preparedlist.
// EVEN WORSE: when the observation re-enters the system with different settings (again as scheduled) the system
// still knows the observation and will use the OLD information of the observation.
ObsList backupObsList = itsPreparedObs;

Jorrit Schaap
committed
// walk through the plannedDBlist, prepare PVSS for the new obs, update own admin lists.
// after walking through the plannedDBlist, do same thing for upcomingSubTasks (TMSS)
Ruud Overeem
committed
GCFPValueArray plannedArr;

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE

Jorrit Schaap
committed
int32 idx = MIN2(plannedDBlist.size(), itsMaxPlanned) - 1;
Ruud Overeem
committed
for ( ; idx >= 0; idx--) {
if (plannedDBlist[idx].processType=="RESERVATION" || plannedDBlist[idx].processType=="MAINTENANCE") {
continue;
}
Ruud Overeem
committed
// construct name and timings info for observation
treeIDType obsID = plannedDBlist[idx].treeID();
string obsName(observationName(obsID));
Ruud Overeem
committed
ptime modTime = plannedDBlist[idx].modificationDate;
Ruud Overeem
committed
Arno Schoenmakers
committed
// remove obs from backup of the planned-list (it is in the list again)
OLiter oldObsIter = backupObsList.find(obsID);
if (oldObsIter != backupObsList.end()) {
backupObsList.erase(oldObsIter);
}
Ruud Overeem
committed
// must we claim this observation at the claimMgr?
OLiter prepIter = itsPreparedObs.find(obsID);

Jorrit Schaap
committed
if ((prepIter == itsPreparedObs.end()) || (prepIter->second.prepReady == false) ||
Ruud Overeem
committed
(prepIter->second.modTime != modTime)) {
Ruud Overeem
committed
// create a ParameterFile for this Observation
TreeMaintenance tm(itsOTDBconnection);
OTDBnode topNode = tm.getTopNode(obsID);
string filename(observationParset(obsID));
Ruud Overeem
committed
if (!tm.exportTree(obsID, topNode.nodeID(), filename)) {

Jorrit Schaap
committed
LOG_ERROR_STR ("Cannot create ParameterSet '" << filename <<
"' for new observation. Observation CANNOT BE STARTED!");
Ruud Overeem
committed
}
else {
// Claim a DP in PVSS and write obssettings to it so the operator can see it.
Ruud Overeem
committed
LOG_INFO_STR("Requesting preparation of PVSS for " << obsName);
Ruud Overeem
committed
itsClaimerTask->prepareObservation(obsName);
Ruud Overeem
committed
itsPreparedObs[obsID] = schedInfo(modTime, false); // requested claim but no answer yet.
Ruud Overeem
committed
}
else {
// only add observations to the PVSS list when the claim was succesfull
// otherwise thing will go wrong in the Navigator
plannedArr.push_back(new GCFPVString(obsName));
}

Jorrit Schaap
committed
Ruud Overeem
committed
// should this observation (have) be(en) started?
Arno Schoenmakers
committed
int timeBeforeStart = time_duration(plannedDBlist[idx].starttime - currentTime).total_seconds();

Jorrit Schaap
committed
LOG_INFO(formatString("%s starts at %s which is in %d seconds",
obsName.c_str(),
to_simple_string(plannedDBlist[idx].starttime).c_str(),
timeBeforeStart));
Arno Schoenmakers
committed
if (timeBeforeStart > 0 && timeBeforeStart <= (int)itsQueuePeriod) {
Ruud Overeem
committed
if (itsPreparedObs[obsID].prepReady == false) {
Ruud Overeem
committed
LOG_INFO_STR("Observation " << obsID << " must be started but is not claimed yet.");
Ruud Overeem
committed
// starttime of observation lays in queuePeriod. Start the controller-chain,
// this will result in CONTROL_STARTED event in our main task
// Note: as soon as the ObservationController has reported itself to the MACScheduler
// the observation will not be returned in the 'plannedDBlist' anymore.
string cntlrName(controllerName(CNTLRTYPE_OBSERVATIONCTRL, 0, obsID));
Arno Schoenmakers
committed
if (itsControllerMap.find(cntlrName) == itsControllerMap.end()) {

Jorrit Schaap
committed
#ifdef DO_COMPILE_OTDB_AND_PVSS_CODE
Ruud Overeem
committed
LOG_INFO_STR("Requesting start of " << cntlrName);

Jorrit Schaap
committed
itsChildControl->startChild(CNTLRTYPE_OBSERVATIONCTRL,
obsID,
Arno Schoenmakers
committed
0, // instanceNr
myHostname(false));
Arno Schoenmakers
committed
// Note: controller is now in state NO_STATE/CONNECTED (C/R)

Jorrit Schaap
committed
#endif
Arno Schoenmakers
committed
// add controller to our 'monitor' administration
itsControllerMap[cntlrName] = obsID;

Jorrit Schaap
committed
LOG_DEBUG_STR("itsControllerMap[" << cntlrName << "]=" << obsID);
if (!itsPreparedObs[obsID].parsetDistributed) {
_setParsetOnMsgBus(observationParset(obsID));
itsPreparedObs[obsID].parsetDistributed = true;
}
Arno Schoenmakers
committed
}
else {

Jorrit Schaap
committed
LOG_DEBUG_STR("Observation " << obsID << " is already (being) started");
Arno Schoenmakers
committed
}

Jorrit Schaap
committed
} // process all planned obs from OTDB

Jorrit Schaap
committed
#endif

Jorrit Schaap
committed
// now walk through the upcomingSubTasks (TMSS), prepare PVSS for the new obs, update own admin lists.
//JS: 20200329: I decided to keep the loop simple at first, and then later add the same steps as in the loop above.
//That means, do all the stupid bookkeeping here in MAC as well, with its internal lists etc.

Jorrit Schaap
committed
int idx2 = MIN2(upcomingSubTasks.size(), itsMaxPlanned) - 1;
for ( ; idx2 >= 0; idx2--) {
Json::Value subtask = upcomingSubTasks[idx2];

Jorrit Schaap
committed
// get subtask_id from url. I know, ugly, needs to be in json itself.
vector<string> tmp;
string url(subtask["url"].asString());
boost::split(tmp, url, [](char c){return c == '/';});
int subtask_id = stoi(tmp[tmp.size()-2]);
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
// string parsetText = itsTMSSconnection->getParsetAsText(subtask_id);
// std::cout << parsetText << std::endl << std::endl << std::endl;
//
// string filename(observationParset(subtask_id));
//
// ParameterSet obsSpecs(false);
// obsSpecs.adoptBuffer(parsetText);
// obsSpecs.writeFile(filename);
//
// std::cout << "Wrote parset to " << filename << std::endl;
// _setParsetOnMsgBus(filename);
// construct name and timings info for observation
string obsName(observationName(subtask_id));
string updated_at(subtask["updated_at"].asString().replace(10, 1, " "));
ptime modTime = time_from_string(updated_at);
// // remove obs from backup of the planned-list (it is in the list again)
// OLiter oldObsIter = backupObsList.find(obsID);
// if (oldObsIter != backupObsList.end()) {
// backupObsList.erase(oldObsIter);
// }
//
// // must we claim this observation at the claimMgr?
// OLiter prepIter = itsPreparedObs.find(obsID);
// if ((prepIter == itsPreparedObs.end()) || (prepIter->second.prepReady == false) ||
// (prepIter->second.modTime != modTime)) {
// // create a ParameterFile for this Observation
// TreeMaintenance tm(itsOTDBconnection);
// OTDBnode topNode = tm.getTopNode(obsID);
// string filename(observationParset(obsID));
// if (!tm.exportTree(obsID, topNode.nodeID(), filename)) {
// LOG_ERROR_STR ("Cannot create ParameterSet '" << filename <<
// "' for new observation. Observation CANNOT BE STARTED!");
// }
// else {
// // Claim a DP in PVSS and write obssettings to it so the operator can see it.
// LOG_INFO_STR("Requesting preparation of PVSS for " << obsName);
// itsClaimerTask->prepareObservation(obsName);
// itsPreparedObs[obsID] = schedInfo(modTime, false); // requested claim but no answer yet.
// }
// }
// else {
// // only add observations to the PVSS list when the claim was succesfull
// // otherwise thing will go wrong in the Navigator
// plannedArr.push_back(new GCFPVString(obsName));
// }
//
// // should this observation (have) be(en) started?
// int timeBeforeStart = time_duration(plannedDBlist[idx].starttime - currentTime).total_seconds();
// LOG_INFO(formatString("%s starts at %s which is in %d seconds",
// obsName.c_str(),
// to_simple_string(plannedDBlist[idx].starttime).c_str(),
// timeBeforeStart));
//
// if (timeBeforeStart > 0 && timeBeforeStart <= (int)itsQueuePeriod) {
// if (itsPreparedObs[obsID].prepReady == false) {
// LOG_INFO_STR("Observation " << obsID << " must be started but is not claimed yet.");
// }
// else {
// // starttime of observation lays in queuePeriod. Start the controller-chain,
// // this will result in CONTROL_STARTED event in our main task
// // Note: as soon as the ObservationController has reported itself to the MACScheduler
// // the observation will not be returned in the 'plannedDBlist' anymore.
// string cntlrName(controllerName(CNTLRTYPE_OBSERVATIONCTRL, 0, obsID));
// if (itsControllerMap.find(cntlrName) == itsControllerMap.end()) {
// LOG_INFO_STR("Requesting start of " << cntlrName);
// itsChildControl->startChild(CNTLRTYPE_OBSERVATIONCTRL,
// obsID,
// 0, // instanceNr
// myHostname(false));
// // Note: controller is now in state NO_STATE/CONNECTED (C/R)
//
// // add controller to our 'monitor' administration
// itsControllerMap[cntlrName] = obsID;
// LOG_DEBUG_STR("itsControllerMap[" << cntlrName << "]=" << obsID);
// if (!itsPreparedObs[obsID].parsetDistributed) {
// _setParsetOnMsgBus(observationParset(obsID));
// itsPreparedObs[obsID].parsetDistributed = true;
// }
// }
// else {
// LOG_DEBUG_STR("Observation " << obsID << " is already (being) started");
// }
// }
// }