Skip to content
Snippets Groups Projects
MACScheduler.cc 39.5 KiB
Newer Older
Ruud Overeem's avatar
Ruud Overeem committed
//#  MACScheduler.cc: Implementation of the MAC Scheduler task
//#
//#  Copyright (C) 2004-2012
Ruud Overeem's avatar
Ruud Overeem committed
//#  ASTRON (Netherlands Foundation for Research in Astronomy)
//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, softwaresupport@astron.nl
Ruud Overeem's avatar
Ruud Overeem committed
//#
//#  This program is free software; you can redistribute it and/or modify
//#  it under the terms of the GNU General Public License as published by
//#  the Free Software Foundation; either version 2 of the License, or
//#  (at your option) any later version.
//#
//#  This program is distributed in the hope that it will be useful,
//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//#  GNU General Public License for more details.
//#
//#  You should have received a copy of the GNU General Public License
//#  along with this program; if not, write to the Free Software
//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
//#
//#  $Id$
#include <lofar_config.h>
#include <Common/LofarLogger.h>
#include <Common/Version.h>
Ruud Overeem's avatar
Ruud Overeem committed

#include <Common/ParameterSet.h>
Ruud Overeem's avatar
Ruud Overeem committed
#include <GCF/TM/GCF_Protocols.h>
#include <MessageBus/Protocols/TaskSpecificationSystem.h>
#include <GCF/PVSS/GCF_PVTypes.h>
Ruud Overeem's avatar
Ruud Overeem committed
#include <APL/APLCommon/APL_Defines.h>
Ruud Overeem's avatar
Ruud Overeem committed
#include <APL/APLCommon/ControllerDefines.h>
Ruud Overeem's avatar
Ruud Overeem committed
#include <APL/APLCommon/Controller_Protocol.ph>
Ruud Overeem's avatar
Ruud Overeem committed
#include <GCF/RTDB/DP_Protocol.ph>
#include <APL/RTDBCommon/CM_Protocol.ph>
Ruud Overeem's avatar
Ruud Overeem committed
#include <OTDB/TreeStateConv.h>
blaakmeer's avatar
blaakmeer committed
#include <signal.h>
Ruud Overeem's avatar
Ruud Overeem committed

#include "MACScheduler.h"
Ruud Overeem's avatar
Ruud Overeem committed

#include <boost/date_time/posix_time/posix_time.hpp>

using namespace LOFAR::GCF::PVSS;
Ruud Overeem's avatar
Ruud Overeem committed
using namespace LOFAR::GCF::TM;
Ruud Overeem's avatar
Ruud Overeem committed
using namespace LOFAR::GCF::RTDB;
Ruud Overeem's avatar
Ruud Overeem committed
using namespace LOFAR::OTDB;
using namespace LOFAR::Protocols;
Ruud Overeem's avatar
Ruud Overeem committed
using namespace std;

namespace LOFAR {
	using namespace Controller_Protocol;
	using namespace DP_Protocol;
	using namespace CM_Protocol;
Ruud Overeem's avatar
Ruud Overeem committed
	using namespace APLCommon;
blaakmeer's avatar
blaakmeer committed

#define	MAX_CONCURRENT_OBSERVATIONS		100
#define MIN2(a,b) (((a) < (b)) ? (a) : (b))

blaakmeer's avatar
blaakmeer committed
// static (this) pointer used for signal handling
static MACScheduler* pMacScheduler = 0;
Ruud Overeem's avatar
Ruud Overeem committed
//
// MACScheduler()
//
MACScheduler::MACScheduler() :
	GCFTask 			((State)&MACScheduler::initial_state,string("MACScheduler")),
Ruud Overeem's avatar
Ruud Overeem committed
	itsPropertySet		(0),
Ruud Overeem's avatar
Ruud Overeem committed
	itsChildControl		(0),
Ruud Overeem's avatar
Ruud Overeem committed
	itsChildPort		(0),
	itsClaimerTask		(0),
	itsClaimerPort		(0),
	itsTimerPort		(0),
Ruud Overeem's avatar
Ruud Overeem committed
	itsSecondTimer		(0),
	itsNextPlannedTime	(0),
	itsNextActiveTime	(0),
	itsNextFinishedTime	(0),
	itsNrPlanned		(0),
	itsNrActive			(0),
Ruud Overeem's avatar
Ruud Overeem committed
{
	LOG_TRACE_OBJ ("MACscheduler construction");

	LOG_INFO_STR("MACProcessScope: " << PSN_MAC_SCHEDULER);
	LOG_INFO(Version::getInfo<MainCUVersion>("MACScheduler"));
Ruud Overeem's avatar
Ruud Overeem committed

	// Read timersettings from the ParameterSet
	itsPlannedItv	 = globalParameterSet()->getTime("pollIntervalPlanned", 60);
	itsActiveItv	 = globalParameterSet()->getTime("pollIntervalExecute", 5);
	itsFinishedItv	 = globalParameterSet()->getTime("pollIntervalFinished", 60);
	itsPlannedPeriod = globalParameterSet()->getTime("plannedPeriod",  86400) / 60;	// in minutes
	itsFinishedPeriod= globalParameterSet()->getTime("finishedPeriod", 86400) / 60; // in minutes
	itsMaxPlanned    = globalParameterSet()->getTime("maxPlannedList",  30);
	itsMaxFinished   = globalParameterSet()->getTime("maxFinishedList", 40);
    itsExclPLcluster = globalParameterSet()->getString("excludePipelinesOnThisCluster", "");
    if (itsExclPLcluster.length() > 0) {
        LOG_INFO_STR("NOT running the pipelines on cluster: " << itsExclPLcluster);
    	// We need to exclude this cluster for pipelines so make sure the name begins with the 'not'-sign.
        if (itsExclPLcluster[0] != '!') {
            itsExclPLcluster.insert(0, 1, '!');
		}
	}
//    JS: 20200329: This is a working piece of code to test TMSSBridge functionality.
//                  The actual code is used below in _updatePlannedList, but that requires GCF calls which I don't have (yet) in my dockerimage.
//  HINT for TeamRed TMSS collegues: uncomment this part, and compile and run MACScheduler in the ci_mac docker image, that makes testing/coding fast and easy.
//  But keep it commented out if you want to run it on mcu199 !!!
//    ParameterSet* pParamSet = globalParameterSet();
//    std::string tmss_username = pParamSet->getString("TMSSusername", "test");
//    std::string tmss_password = pParamSet->getString("TMSSpassword", "test");
//    std::string tmss_hostname = pParamSet->getString("TMSShostname", "127.0.0.1");
//    int tmss_port = pParamSet->getInt("TMSSport", 8008);
//
//    LOG_INFO_STR ("Trying to connect to the TMSS " << tmss_hostname << ":" << tmss_port << " user/pass:" << tmss_username << "/******" );
//    itsTMSSconnection = new TMSSBridge(tmss_hostname, tmss_port, tmss_username, tmss_password);
//    LOG_INFO ("Connected to the TMSSBridge");
//
//    Json::Value upcomingSubTasks = itsTMSSconnection->getSubTasksStartingInThreeMinutes();
//
//     if (!upcomingSubTasks.empty()) {
//             std::cout << formatString("TMSSCheck:First planned observation (%s) is at %s", upcomingSubTasks[0]["url"].asCString(), upcomingSubTasks[0]["start_time"].asCString()) << endl;
//        Json::Value subtask = upcomingSubTasks[0];
//        string updated_at(subtask["updated_at"].asString().replace(10, 1, " "));
//             ptime  modTime = time_from_string(updated_at);
//             cout << "modTime: " << modTime << endl;
//
//        // get subtask_id from url. I know, ugly, needs to be in json itself.
//        vector<string> tmp;
//        string url(subtask["url"].asString());
//        boost::split(tmp, url, [](char c){return c == '/';});
//        int subtask_id = stoi(tmp[tmp.size()-2]);
//
//        string parsetText = itsTMSSconnection->getParsetAsText(subtask_id);
//        std::cout << parsetText << std::endl << std::endl << std::endl;
//
//        string filename(observationParset(subtask_id));
//
//     ParameterSet obsSpecs(false);
//     obsSpecs.adoptBuffer(parsetText);
//     obsSpecs.writeFile(filename);
//
//        std::cout << "Wrote parset to " << filename << std::endl;
//    }
	ASSERTSTR(itsMaxPlanned + itsMaxFinished < MAX_CONCURRENT_OBSERVATIONS,
				"maxPlannedList + maxFinishedList should be less than " << MAX_CONCURRENT_OBSERVATIONS);

	// Read the schedule periods for starting observations.
Ruud Overeem's avatar
Ruud Overeem committed
	itsQueuePeriod 		= globalParameterSet()->getTime("QueuePeriod");
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
	// attach to child control task
Ruud Overeem's avatar
Ruud Overeem committed
	itsChildControl = ChildControl::instance();
	itsChildPort = new GCFITCPort (*this, *itsChildControl, "childITCport", GCFPortInterface::SAP, CONTROLLER_PROTOCOL);
Ruud Overeem's avatar
Ruud Overeem committed
	ASSERTSTR(itsChildPort, "Cannot allocate ITCport for childcontrol");
	itsChildPort->open();		// will result in F_CONNECTED
Ruud Overeem's avatar
Ruud Overeem committed

	// create an PVSSprepare Task
	itsClaimerTask = new ObsClaimer(this);
	ASSERTSTR(itsClaimerTask, "Cannot construct a ObsClaimerTask");
	itsClaimerPort = new GCFITCPort (*this, *itsClaimerTask, "ObsClaimerPort", GCFPortInterface::SAP, CM_PROTOCOL);
Ruud Overeem's avatar
Ruud Overeem committed
	// need port for timers
	itsTimerPort = new GCFTimerPort(*this, "Timerport");

	// setup MsgQueue
	string queueName = globalParameterSet()->getString("ParsetQueuename");
	ASSERTSTR(!queueName.empty(), "Queuename for distributing parameterSets not specified");
	itsMsgQueue = new ToBus(queueName);
	registerProtocol(CONTROLLER_PROTOCOL, CONTROLLER_PROTOCOL_STRINGS);
	registerProtocol(DP_PROTOCOL, 		  DP_PROTOCOL_STRINGS);
Ruud Overeem's avatar
Ruud Overeem committed
}


//
// ~MACScheduler()
//
MACScheduler::~MACScheduler()
{
	LOG_TRACE_OBJ ("~MACscheduler");

Ruud Overeem's avatar
Ruud Overeem committed
	if (itsPropertySet) {
		delete itsPropertySet;
	}
Ruud Overeem's avatar
Ruud Overeem committed

	if (itsOTDBconnection) {
		delete itsOTDBconnection;
	}
	if (itsTMSSconnection) {
		delete itsTMSSconnection;
	}

Ruud Overeem's avatar
Ruud Overeem committed
}

Ruud Overeem's avatar
Ruud Overeem committed
//
// sigintHandler(signum)
//
void MACScheduler::sigintHandler(int signum)
blaakmeer's avatar
blaakmeer committed
{
Ruud Overeem's avatar
Ruud Overeem committed
	LOG_DEBUG (formatString("SIGINT signal detected (%d)",signum));

	if (pMacScheduler) {
		pMacScheduler->finish();
	}
blaakmeer's avatar
blaakmeer committed
}

Ruud Overeem's avatar
Ruud Overeem committed

//
Ruud Overeem's avatar
Ruud Overeem committed
// _databaseEventHandler(event)
Ruud Overeem's avatar
Ruud Overeem committed
//
Ruud Overeem's avatar
Ruud Overeem committed
void MACScheduler::_databaseEventHandler(GCFEvent& event)
Ruud Overeem's avatar
Ruud Overeem committed
{
Ruud Overeem's avatar
Ruud Overeem committed

//	LOG_DEBUG_STR ("_databaseEventHandler:" << eventName(event));
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
	switch(event.signal) {
	case DP_CHANGED: {
		DPChangedEvent	dpEvent(event);
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
		// TODO: implement something usefull.
Ruud Overeem's avatar
Ruud Overeem committed
		if (strstr(dpEvent.DPname.c_str(), PVSSNAME_MS_QUEUEPERIOD) != 0) {
			uint32	newVal = ((GCFPVUnsigned*) (dpEvent.value._pValue))->getValue();
			LOG_INFO_STR ("Changing QueuePeriod from " << itsQueuePeriod << " to " << newVal);
			itsQueuePeriod = newVal;
		}
Ruud Overeem's avatar
Ruud Overeem committed
	break;
Ruud Overeem's avatar
Ruud Overeem committed

	default:
		break;
Ruud Overeem's avatar
Ruud Overeem committed
}


//
// initial_state(event, port)
//
// Setup all connections.
//
GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& /*port*/)
{
	LOG_INFO_STR ("initial_state:" << eventName(event));
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
	GCFEvent::TResult status = GCFEvent::HANDLED;
Ruud Overeem's avatar
Ruud Overeem committed
	switch (event.signal) {
Ruud Overeem's avatar
Ruud Overeem committed
    case F_INIT:
Ruud Overeem's avatar
Ruud Overeem committed
   		break;

Ruud Overeem's avatar
Ruud Overeem committed
	case F_ENTRY: {
Ruud Overeem's avatar
Ruud Overeem committed
		// Get access to my own propertyset.
		LOG_INFO_STR ("Activating my propertySet(" << PSN_MAC_SCHEDULER << ")");
Ruud Overeem's avatar
Ruud Overeem committed
		itsPropertySet = new RTDBPropertySet(PSN_MAC_SCHEDULER,
											 PST_MAC_SCHEDULER,
Ruud Overeem's avatar
Ruud Overeem committed
											 this);
#else
    //HACK
    itsTimerPort->cancelAllTimers();
    itsTimerPort->setTimer(0.0);
#endif
Ruud Overeem's avatar
Ruud Overeem committed
		}
		break;
Ruud Overeem's avatar
Ruud Overeem committed

	case DP_CREATED: {
			// NOTE: thsi function may be called DURING the construction of the PropertySet.
			// Always exit this event in a way that GCF can end the construction.
			DPCreatedEvent	dpEvent(event);
			LOG_DEBUG_STR("Result of creating " << dpEvent.DPname << " = " << dpEvent.result);
			itsTimerPort->cancelAllTimers();
			itsTimerPort->setTimer(0.0);
        }
		break;
Ruud Overeem's avatar
Ruud Overeem committed
	case F_TIMER: {		// must be timer that PropSet is enabled.
Ruud Overeem's avatar
Ruud Overeem committed
		// update PVSS.
Ruud Overeem's avatar
Ruud Overeem committed
		LOG_TRACE_FLOW ("Updateing state to PVSS");
		itsPropertySet->setValue(PN_FSM_CURRENT_ACTION,		  GCFPVString  ("initial"));
		itsPropertySet->setValue(PN_FSM_ERROR,				  GCFPVString  (""));
		itsPropertySet->setValue(PN_MS_OTDB_CONNECTED,    	  GCFPVBool    (false));
		itsPropertySet->setValue(PN_MS_OTDB_LAST_POLL,    	  GCFPVString  (""));
		itsPropertySet->setValue(PN_MS_OTDB_POLLINTERVAL, 	  GCFPVInteger (itsPlannedItv));
		GCFPValueArray	emptyArr;
		itsPropertySet->setValue(PN_MS_ACTIVE_OBSERVATIONS,   GCFPVDynArr(LPT_STRING, emptyArr));
		itsPropertySet->setValue(PN_MS_PLANNED_OBSERVATIONS,  GCFPVDynArr(LPT_STRING, emptyArr));
		itsPropertySet->setValue(PN_MS_FINISHED_OBSERVATIONS, GCFPVDynArr(LPT_STRING, emptyArr));
Ruud Overeem's avatar
Ruud Overeem committed
		// Try to connect to the SAS database.
		ParameterSet* pParamSet = globalParameterSet();
Auke Klazema's avatar
Auke Klazema committed
        std::string username;
Auke Klazema's avatar
Auke Klazema committed
            username = pParamSet->getString("OTDBusername");
        }
        catch(LOFAR::APSException& ex)
        {
            LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
                          "read the OTDB database user name from its configuration file:  "
                          << ex.what());
            throw(ex);
        }
Auke Klazema's avatar
Auke Klazema committed
        std::string DBname;
Auke Klazema's avatar
Auke Klazema committed
            DBname = pParamSet->getString("OTDBdatabasename");
        }
        catch(LOFAR::APSException& ex)
        {
            LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
                          "read the OTDB database name from its configuration file:  "
                          << ex.what());
            throw(ex);
        }
Auke Klazema's avatar
Auke Klazema committed
        std::string password;
Auke Klazema's avatar
Auke Klazema committed
            password = pParamSet->getString("OTDBpassword");
        }
        catch(LOFAR::APSException& ex)
        {
            LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
                          "read the OTDB database password from its configuration file:  "
                          << ex.what());
            throw(ex);
        }
Auke Klazema's avatar
Auke Klazema committed
        std::string hostname;
Auke Klazema's avatar
Auke Klazema committed
            hostname = pParamSet->getString("OTDBhostname");
        }
        catch(LOFAR::APSException& ex)
        {
            LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
                          "read the OTDB database server host name from its configuration file:  "
                          << ex.what());
            throw(ex);
        }
Auke Klazema's avatar
Auke Klazema committed
        std::string port;
Auke Klazema's avatar
Auke Klazema committed
            port = pParamSet->getString("OTDBport");
        }
        catch(LOFAR::APSException& ex)
        {
            LOG_ERROR_STR("MACScheduler code caught an exception while trying to "
                          "read the OTDB database server port from its configuration file:  "
                          << ex.what());
            throw(ex);
        }
Ruud Overeem's avatar
Ruud Overeem committed

		LOG_INFO_STR ("Trying to connect to the OTDB on " << hostname);
		itsOTDBconnection= new OTDBconnection(username, password, DBname, hostname, port);
Ruud Overeem's avatar
Ruud Overeem committed
		ASSERTSTR (itsOTDBconnection, "Memory allocation error (OTDB)");
		ASSERTSTR (itsOTDBconnection->connect(),
                   "Unable to connect to database " << DBname << " on " << hostname <<
                   " port: " << port <<
                   " using " << username << "," << password);
Ruud Overeem's avatar
Ruud Overeem committed
		LOG_INFO ("Connected to the OTDB");
Ruud Overeem's avatar
Ruud Overeem committed
		itsPropertySet->setValue(PN_MS_OTDB_CONNECTED, GCFPVBool(true));
		std::string tmss_username = pParamSet->getString("TMSSusername", "test");
		std::string tmss_password = pParamSet->getString("TMSSpassword", "test");
		std::string tmss_hostname = pParamSet->getString("TMSShostname", "120.0.0.1");
		int tmss_port = pParamSet->getInt("TMSSport", 8008);

		LOG_INFO_STR ("Trying to connect to the TMSS " << tmss_hostname << ":" << tmss_port << " user/pass:" << tmss_username << "/******" );
		itsTMSSconnection= new TMSSBridge(tmss_hostname, tmss_port, tmss_username, tmss_password);
Ruud Overeem's avatar
Ruud Overeem committed
		// Start ChildControl task
		LOG_DEBUG ("Enabling ChildControltask");
		itsChildControl->openService(MAC_SVCMASK_SCHEDULERCTRL, 0);
Ruud Overeem's avatar
Ruud Overeem committed
		itsChildControl->registerCompletionPort(itsChildPort);
Ruud Overeem's avatar
Ruud Overeem committed

		// setup initial schedule: first planned, next run active, second run finished
		itsNextPlannedTime  = time(0);
		itsNextActiveTime   = itsNextPlannedTime +   itsPlannedItv;
		itsNextFinishedTime = itsNextPlannedTime + 2*itsPlannedItv;

Ruud Overeem's avatar
Ruud Overeem committed
		TRAN(MACScheduler::recover_state);				// go to next state.
Ruud Overeem's avatar
Ruud Overeem committed
		}
		break;
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
	case F_CONNECTED:
		break;
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
	case F_DISCONNECTED:
Ruud Overeem's avatar
Ruud Overeem committed
		break;
Ruud Overeem's avatar
Ruud Overeem committed
	default:
Ruud Overeem's avatar
Ruud Overeem committed
		LOG_DEBUG ("MACScheduler::initial, default");
Ruud Overeem's avatar
Ruud Overeem committed
		status = GCFEvent::NOT_HANDLED;
		break;
Ruud Overeem's avatar
Ruud Overeem committed
	return (status);
}


//
// recover_state(event, port)
//
// Read last PVSS states, compare those to the SAS states and try to
// recover to the last situation.
//
GCFEvent::TResult MACScheduler::recover_state(GCFEvent& event, GCFPortInterface& port)
{
Ruud Overeem's avatar
Ruud Overeem committed
	LOG_DEBUG_STR ("recover_state:" << eventName(event) << "@" << port.getName());
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
	GCFEvent::TResult status = GCFEvent::HANDLED;

	switch (event.signal) {
	case F_INIT:
		break;

	case F_ENTRY: {
		// update PVSS
		itsPropertySet->setValue(string(PN_FSM_CURRENT_ACTION),GCFPVString("recover"));
		itsPropertySet->setValue(string(PN_FSM_ERROR),GCFPVString(""));
Ruud Overeem's avatar
Ruud Overeem committed
		//
		// TODO: do recovery

		TRAN(MACScheduler::active_state);
Ruud Overeem's avatar
Ruud Overeem committed
		break;
	}
Ruud Overeem's avatar
Ruud Overeem committed
	default:
Ruud Overeem's avatar
Ruud Overeem committed
		LOG_DEBUG("recover_state, default");
Ruud Overeem's avatar
Ruud Overeem committed
		status = GCFEvent::NOT_HANDLED;
		break;
Ruud Overeem's avatar
Ruud Overeem committed
	return (status);
}

//
// active_state(event, port)
//
// Normal operation state. Check OTDB every itsActiveItv seconds and control
Ruud Overeem's avatar
Ruud Overeem committed
// the running observations.
//
GCFEvent::TResult MACScheduler::active_state(GCFEvent& event, GCFPortInterface& port)
{
Ruud Overeem's avatar
Ruud Overeem committed
	LOG_DEBUG_STR ("active:" << eventName(event) << "@" << port.getName());
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
	GCFEvent::TResult status = GCFEvent::HANDLED;

	switch (event.signal) {
	case F_INIT:
		break;

	case F_ENTRY: {
  	    // install my own signal handler. GCFTask also installs a handler so we have
Ruud Overeem's avatar
Ruud Overeem committed
		// to install our handler later than the GCFTask handler.
blaakmeer's avatar
blaakmeer committed
	    pMacScheduler = this;
Ruud Overeem's avatar
Ruud Overeem committed
		signal (SIGINT, MACScheduler::sigintHandler);	// ctrl-c
Ruud Overeem's avatar
Ruud Overeem committed
		signal (SIGTERM, MACScheduler::sigintHandler);	// kill
blaakmeer's avatar
blaakmeer committed

Ruud Overeem's avatar
Ruud Overeem committed
		// update PVSS
		itsPropertySet->setValue(string(PN_FSM_CURRENT_ACTION),GCFPVString("active"));
		itsPropertySet->setValue(string(PN_FSM_ERROR),GCFPVString(""));
Ruud Overeem's avatar
Ruud Overeem committed
		itsSecondTimer = itsTimerPort->setTimer(1L);
Ruud Overeem's avatar
Ruud Overeem committed
		break;
	}

Ruud Overeem's avatar
Ruud Overeem committed
	case F_ACCEPT_REQ:
Ruud Overeem's avatar
Ruud Overeem committed
		break;

Ruud Overeem's avatar
Ruud Overeem committed
		// Should be from the (lost) connection with the SD
		_connectedHandler(port);
		break;

Ruud Overeem's avatar
Ruud Overeem committed
		// Can be from StartDaemon or ObsController.
		// StartDaemon: trouble! Try to reconnect asap.
		// ObsController: ok when obs is finished, BIG TROUBLE when not!
		_disconnectedHandler(port);
		break;

Ruud Overeem's avatar
Ruud Overeem committed
	case DP_CHANGED:
		_databaseEventHandler(event);
		break;

	case CM_CLAIM_RESULT: {
			// some observation was claimed by the claimMgr. Update our prepare_list.
			CMClaimResultEvent	cmEvent(event);
			ltrim(cmEvent.nameInAppl, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_");
			int		obsID = atoi(cmEvent.nameInAppl.c_str());
			if (cmEvent.result != CM_NO_ERR) {
				LOG_ERROR_STR("Error during checking observation " << obsID);
				OTDB::TreeMaintenance	tm(itsOTDBconnection);
				TreeStateConv			tsc(itsOTDBconnection);
				tm.setTreeState(obsID, tsc.get("aborted"));
				itsPreparedObs.erase(obsID);
				break;
			}
			// claim was successful, update admin
			LOG_INFO_STR("Observation " << obsID << " is mapped to " << cmEvent.DPname);
			LOG_DEBUG_STR("PVSS preparation of observation " << obsID << " ready.");
Ruud Overeem's avatar
Ruud Overeem committed
	case F_TIMER: {		// secondTimer or reconnectTimer.
		GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event);
		if (timerEvent.id == itsSecondTimer) {
			// time to poll the OTDB?
			// Note: We assume here that the PlannedItv is smaller than the ActiveItv and the FinishedItv.
			//		 When it is not smaller (very unlikely) than the ActiveItv and FinishedItv those two
			//		 intervals will default to the PlannedItv.
			if (time(0) >= itsNextPlannedTime) {		// check shortest interval
Ruud Overeem's avatar
Ruud Overeem committed
				_doOTDBcheck();
Ruud Overeem's avatar
Ruud Overeem committed
				// reinit polltime at multiple of intervaltime.
				// (=more change to hit hh.mm:00)
				itsNextPlannedTime = time(0) + itsPlannedItv;
				itsNextPlannedTime -= (itsNextPlannedTime % itsPlannedItv);
Ruud Overeem's avatar
Ruud Overeem committed
			}
Ruud Overeem's avatar
Ruud Overeem committed
			port.cancelTimer(itsSecondTimer);
Ruud Overeem's avatar
Ruud Overeem committed
			itsSecondTimer = port.setTimer(1.0);
		}
		// a connection was lost and a timer was set to try to reconnect.
//		else if (...) {
			// TODO
//			map timer to port
//			port.open();
//		}
		break;
	}

Ruud Overeem's avatar
Ruud Overeem committed
	// -------------------- EVENTS FROM CHILDCONTROL --------------------
	// That must be events from the ObservationControllers that are currently
Ruud Overeem's avatar
Ruud Overeem committed
	case CONTROL_STARTED: {
Ruud Overeem's avatar
Ruud Overeem committed
		// Child control received a message from the startDaemon that the
		// observationController was started (or not)
Ruud Overeem's avatar
Ruud Overeem committed
		CONTROLStartedEvent	msg(event);
		if (msg.successful) {
			LOG_DEBUG_STR("Start of " << msg.cntlrName << " was successful, waiting for connection.");
Ruud Overeem's avatar
Ruud Overeem committed
		}
		else {
			LOG_ERROR_STR("Observation controller " << msg.cntlrName << " could not be started");
			LOG_INFO_STR("Observation is be removed from administration, " << "restart will occur in next cycle");
Ruud Overeem's avatar
Ruud Overeem committed
		}
Ruud Overeem's avatar
Ruud Overeem committed
		break;
	}
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
		// The observationController has registered itself at childControl.
		LOG_INFO_STR(conEvent.cntlrName << " is connected, updating SAS)");
Ruud Overeem's avatar
Ruud Overeem committed

		// Ok, controller is really up, update SAS so that obs will not appear in
		// in the SAS list again.
		CMiter	theObs(itsControllerMap.find(conEvent.cntlrName));
		if (theObs == itsControllerMap.end()) {
			LOG_WARN_STR("Cannot find controller " << conEvent.cntlrName << ". Can't update the SAS database");
Ruud Overeem's avatar
Ruud Overeem committed
			break;
		}
Ruud Overeem's avatar
Ruud Overeem committed
		OTDB::TreeMaintenance	tm(itsOTDBconnection);
		TreeStateConv			tsc(itsOTDBconnection);
		tm.setTreeState(theObs->second, tsc.get("queued"));
Ruud Overeem's avatar
Ruud Overeem committed
		break;
	}

	case CONTROL_RESUMED: {
		// update SAS database.
		CONTROLResumedEvent		msg(event);
		CMiter	theObs(itsControllerMap.find(msg.cntlrName));
		if (theObs == itsControllerMap.end()) {
			LOG_WARN_STR("Cannot find controller " << msg.cntlrName << ". Can't update the SAS database");
			break;
		}
		OTDB::TreeMaintenance	tm(itsOTDBconnection);
		TreeStateConv			tsc(itsOTDBconnection);
		tm.setTreeState(theObs->second, tsc.get("active"));
		break;
	}

	case CONTROL_SUSPENDED: {
		// update SAS database.
		CONTROLSuspendedEvent		msg(event);
		CMiter	theObs(itsControllerMap.find(msg.cntlrName));
		if (theObs == itsControllerMap.end()) {
			LOG_WARN_STR("Cannot find controller " << msg.cntlrName << ". Can't update the SAS database");
			break;
		}
		OTDB::TreeMaintenance	tm(itsOTDBconnection);
		TreeStateConv			tsc(itsOTDBconnection);
		tm.setTreeState(theObs->second, tsc.get("completing"));
Ruud Overeem's avatar
Ruud Overeem committed
	case CONTROL_QUITED: {
Ruud Overeem's avatar
Ruud Overeem committed
		// The observationController is going down.
Ruud Overeem's avatar
Ruud Overeem committed
		CONTROLQuitedEvent quitedEvent(event);
		LOG_INFO_STR("Received QUITED(" << quitedEvent.cntlrName << "," << quitedEvent.result << ")");
Ruud Overeem's avatar
Ruud Overeem committed

		// update SAS database.
		CMiter	theObs(itsControllerMap.find(quitedEvent.cntlrName));
		if (theObs == itsControllerMap.end()) {
			LOG_WARN_STR("Cannot find controller " << quitedEvent.cntlrName << ". Can't update the SAS database");
Ruud Overeem's avatar
Ruud Overeem committed
			break;
		}
Ruud Overeem's avatar
Ruud Overeem committed
		OTDB::TreeMaintenance	tm(itsOTDBconnection);
		TreeStateConv			tsc(itsOTDBconnection);
Ruud Overeem's avatar
Ruud Overeem committed
		// CT_RESULT_: MANUAL_REMOVED, MANUAL_ABORT, LOST_CONNECTION, NO_ERROR
Ruud Overeem's avatar
Ruud Overeem committed
		if (quitedEvent.result == CT_RESULT_NO_ERROR) {
			tm.setTreeState(theObs->second, tsc.get("finished"));
Ruud Overeem's avatar
Ruud Overeem committed
		}
		else {
			tm.setTreeState(theObs->second, tsc.get("aborted"));
Ruud Overeem's avatar
Ruud Overeem committed
		}
		// free claimed observation in PVSS
		itsClaimerTask->freeObservation(observationName(theObs->second));

Ruud Overeem's avatar
Ruud Overeem committed
		// update our administration
		LOG_INFO_STR("Removing observation " << quitedEvent.cntlrName << " from activeList");
		itsControllerMap.erase(quitedEvent.cntlrName);
Ruud Overeem's avatar
Ruud Overeem committed
		break;
	}
Ruud Overeem's avatar
Ruud Overeem committed

Ruud Overeem's avatar
Ruud Overeem committed
	// NOTE: ignore all other CONTROL events, we are not interested in the
	// states of the Observations. (not our job).
Ruud Overeem's avatar
Ruud Overeem committed
	default:
Ruud Overeem's avatar
Ruud Overeem committed
		LOG_DEBUG("MACScheduler::active, default");
Ruud Overeem's avatar
Ruud Overeem committed
		status = GCFEvent::NOT_HANDLED;
		break;
	}

	return (status);
}

blaakmeer's avatar
blaakmeer committed
//
// finishing_state(event, port)
//
// Write controller state to PVSS, wait for 1 second (using a timer) to let GCF handle the property change
// and close the controller
//
GCFEvent::TResult MACScheduler::finishing_state(GCFEvent& event, GCFPortInterface& port)
{
Ruud Overeem's avatar
Ruud Overeem committed
	LOG_DEBUG_STR ("finishing_state:" << eventName(event) << "@" << port.getName());
blaakmeer's avatar
blaakmeer committed

	GCFEvent::TResult status = GCFEvent::HANDLED;

	switch (event.signal) {
	case F_INIT:
		break;

	case F_ENTRY: {
		// update PVSS
		itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("finished"));
		itsPropertySet->setValue(PN_FSM_ERROR, 			GCFPVString(""));
		itsPropertySet->setValue(PN_MS_OTDB_CONNECTED,  GCFPVBool  (false));
blaakmeer's avatar
blaakmeer committed
		itsTimerPort->setTimer(1L);
		break;
	}
blaakmeer's avatar
blaakmeer committed
    case F_TIMER:
Ruud Overeem's avatar
Ruud Overeem committed
      GCFScheduler::instance()->stop();
blaakmeer's avatar
blaakmeer committed
      break;
blaakmeer's avatar
blaakmeer committed
	default:
		LOG_DEBUG("finishing_state, default");
		status = GCFEvent::NOT_HANDLED;
		break;
blaakmeer's avatar
blaakmeer committed
	return (status);
}

//
// finish
//
// Make the transition to the finishing state
//
void MACScheduler::finish()
{
  TRAN(MACScheduler::finishing_state);
}

Ruud Overeem's avatar
Ruud Overeem committed
//
// _doOTDBcheck
//
// Check if a new action should be taken based on the contents of OTDB and our own
// administration.
//
void MACScheduler::_doOTDBcheck()
{
Ruud Overeem's avatar
Ruud Overeem committed
	// update PVSS database with polltime
	time_t		now = time(0);
	ptime	currentTime = from_time_t(now);
	itsPropertySet->setValue(string(PN_MS_OTDB_LAST_POLL), GCFPVString(to_simple_string(currentTime)));
	// always update planned list because we might need to start some of those
	// (and we assumed that the PlannedItv was the smallest)
	_updatePlannedList();

	// update active lists when its time to do so.
	if (now >= itsNextActiveTime) {
		_updateActiveList();
		while (itsNextActiveTime <= now) {
			itsNextActiveTime += itsActiveItv;
		}
	}

	// update finished lists when its time to do so.
	if (now >= itsNextFinishedTime) {
		_updateFinishedList();
		while (itsNextFinishedTime <= now) {
			itsNextFinishedTime += itsFinishedItv;
		}
	}
}

//
// _updatePlannedList()
//
void MACScheduler::_updatePlannedList()
{
Ruud Overeem's avatar
Ruud Overeem committed

	// get time info
	time_t	now = time(0);
	ptime	currentTime = from_time_t(now);
	ASSERTSTR (currentTime != not_a_date_time, "Can't determine systemtime, bailing out");

	// get new list (list is ordered on starttime) of planned observations
	vector<OTDBtree> plannedDBlist = itsOTDBconnection->getTreeGroup(1, itsPlannedPeriod, itsExclPLcluster);
Ruud Overeem's avatar
Ruud Overeem committed

	if (!plannedDBlist.empty()) {
		LOG_DEBUG(formatString("OTDBCheck:First planned observation (%d) is at %s (active over %d seconds)",
				plannedDBlist[0].treeID(), to_simple_string(plannedDBlist[0].starttime).c_str(),
				time_duration(plannedDBlist[0].starttime - currentTime).total_seconds()));
	}
	// NOTE: do not exit routine on emptylist: we need to write an empty list to clear the DB
Ruud Overeem's avatar
Ruud Overeem committed

    Json::Value upcomingSubTasks = itsTMSSconnection->getSubTasksStartingInThreeMinutes();

	if (!upcomingSubTasks.empty()) {
		LOG_DEBUG(formatString("TMSSCheck:First planned observation (%s) is at %s",
				upcomingSubTasks[0]["url"].asCString(), upcomingSubTasks[0]["start_time"].asCString()));
	}

	// make a copy of the current prepared observations (= observations shown in the navigator in the 'future'
	// list). By eliminating the observations that are in the current SAS list we end up (at the end of this function)
	// with a list of observations that were in the SASlist the last time but now not anymore. Normally those observations
	// will appear in the active-list and will be removed there from the prepared list but WHEN AN OPERATOR CHANGES
	// THE STATUS MANUALLY into something different (e.g. ON-HOLD) the observation stays in the preparedlist.
	// EVEN WORSE: when the observation re-enters the system with different settings (again as scheduled) the system
	// still knows the observation and will use the OLD information of the observation.
	ObsList		backupObsList = itsPreparedObs;

	// walk through the plannedDBlist, prepare PVSS for the new obs, update own admin lists.
	// after walking through the plannedDBlist, do same thing for upcomingSubTasks (TMSS)
Ruud Overeem's avatar
Ruud Overeem committed

	int32			idx = MIN2(plannedDBlist.size(), itsMaxPlanned) - 1;
	for ( ; idx >= 0; idx--)  {
		if (plannedDBlist[idx].processType=="RESERVATION" || plannedDBlist[idx].processType=="MAINTENANCE") {
			continue;
		}

		// construct name and timings info for observation
		treeIDType		obsID = plannedDBlist[idx].treeID();
		string			obsName(observationName(obsID));
		ptime			modTime = plannedDBlist[idx].modificationDate;
		// remove obs from backup of the planned-list (it is in the list again)
		OLiter	oldObsIter = backupObsList.find(obsID);
		if (oldObsIter != backupObsList.end()) {
			backupObsList.erase(oldObsIter);
		}

		// must we claim this observation at the claimMgr?
		OLiter	prepIter = itsPreparedObs.find(obsID);
		if ((prepIter == itsPreparedObs.end()) || (prepIter->second.prepReady == false) ||
												  (prepIter->second.modTime != modTime)) {
			// create a ParameterFile for this Observation
			TreeMaintenance		tm(itsOTDBconnection);
			OTDBnode			topNode = tm.getTopNode(obsID);
			string				filename(observationParset(obsID));
			if (!tm.exportTree(obsID, topNode.nodeID(), filename)) {
				LOG_ERROR_STR ("Cannot create ParameterSet '" << filename <<
								"' for new observation. Observation CANNOT BE STARTED!");
			}
			else {
				// Claim a DP in PVSS and write obssettings to it so the operator can see it.
				LOG_INFO_STR("Requesting preparation of PVSS for " << obsName);
				itsClaimerTask->prepareObservation(obsName);
				itsPreparedObs[obsID] = schedInfo(modTime, false);	// requested claim but no answer yet.
Ruud Overeem's avatar
Ruud Overeem committed
		}
		else {
			// only add observations to the PVSS list when the claim was succesfull
			// otherwise thing will go wrong in the Navigator
			plannedArr.push_back(new GCFPVString(obsName));
		}
		// should this observation (have) be(en) started?
		int		timeBeforeStart = time_duration(plannedDBlist[idx].starttime - currentTime).total_seconds();
                LOG_INFO(formatString("%s starts at %s which is in %d seconds",
                                obsName.c_str(),
                                to_simple_string(plannedDBlist[idx].starttime).c_str(),
                                timeBeforeStart));

		if (timeBeforeStart > 0 && timeBeforeStart <= (int)itsQueuePeriod) {
			if (itsPreparedObs[obsID].prepReady == false) {
				LOG_INFO_STR("Observation " << obsID << " must be started but is not claimed yet.");
Ruud Overeem's avatar
Ruud Overeem committed
			}
Ruud Overeem's avatar
Ruud Overeem committed
			else {
				// starttime of observation lays in queuePeriod. Start the controller-chain,
				// this will result in CONTROL_STARTED event in our main task
				// Note: as soon as the ObservationController has reported itself to the MACScheduler
				//		 the observation will not be returned in the 'plannedDBlist' anymore.
				string	cntlrName(controllerName(CNTLRTYPE_OBSERVATIONCTRL, 0, obsID));
				if (itsControllerMap.find(cntlrName) == itsControllerMap.end()) {
					itsChildControl->startChild(CNTLRTYPE_OBSERVATIONCTRL,
												obsID,
					// Note: controller is now in state NO_STATE/CONNECTED (C/R)
					// add controller to our 'monitor' administration
					itsControllerMap[cntlrName] =  obsID;
					LOG_DEBUG_STR("itsControllerMap[" << cntlrName << "]=" <<  obsID);
					if (!itsPreparedObs[obsID].parsetDistributed) {
						_setParsetOnMsgBus(observationParset(obsID));
						itsPreparedObs[obsID].parsetDistributed = true;
					}
					LOG_DEBUG_STR("Observation " << obsID << " is already (being) started");
Ruud Overeem's avatar
Ruud Overeem committed
			}
Ruud Overeem's avatar
Ruud Overeem committed
		}

	// now walk through the upcomingSubTasks (TMSS), prepare PVSS for the new obs, update own admin lists.
    //JS: 20200329: I decided to keep the loop simple at first, and then later add the same steps as in the loop above.
    //That means, do all the stupid bookkeeping here in MAC as well, with its internal lists etc.
	int idx2 = MIN2(upcomingSubTasks.size(), itsMaxPlanned) - 1;
	for ( ; idx2 >= 0; idx2--)  {
	    Json::Value subtask = upcomingSubTasks[idx2];

        // get subtask_id from url. I know, ugly, needs to be in json itself.
        vector<string> tmp;
        string url(subtask["url"].asString());
        boost::split(tmp, url, [](char c){return c == '/';});
        int subtask_id = stoi(tmp[tmp.size()-2]);

//        string parsetText = itsTMSSconnection->getParsetAsText(subtask_id);
//        std::cout << parsetText << std::endl << std::endl << std::endl;
//
//        string filename(observationParset(subtask_id));
//
//    	ParameterSet obsSpecs(false);
//    	obsSpecs.adoptBuffer(parsetText);
//    	obsSpecs.writeFile(filename);
//
//        std::cout << "Wrote parset to " << filename << std::endl;
//        _setParsetOnMsgBus(filename);


		// construct name and timings info for observation
		string			obsName(observationName(subtask_id));
        string          updated_at(subtask["updated_at"].asString().replace(10, 1, " "));
		ptime           modTime = time_from_string(updated_at);

//		// remove obs from backup of the planned-list (it is in the list again)
//		OLiter	oldObsIter = backupObsList.find(obsID);
//		if (oldObsIter != backupObsList.end()) {
//			backupObsList.erase(oldObsIter);
//		}
//
//		// must we claim this observation at the claimMgr?
//		OLiter	prepIter = itsPreparedObs.find(obsID);
//		if ((prepIter == itsPreparedObs.end()) || (prepIter->second.prepReady == false) ||
//												  (prepIter->second.modTime != modTime)) {
//			// create a ParameterFile for this Observation
//			TreeMaintenance		tm(itsOTDBconnection);
//			OTDBnode			topNode = tm.getTopNode(obsID);
//			string				filename(observationParset(obsID));
//			if (!tm.exportTree(obsID, topNode.nodeID(), filename)) {
//				LOG_ERROR_STR ("Cannot create ParameterSet '" << filename <<
//								"' for new observation. Observation CANNOT BE STARTED!");
//			}
//			else {
//				// Claim a DP in PVSS and write obssettings to it so the operator can see it.
//				LOG_INFO_STR("Requesting preparation of PVSS for " << obsName);
//				itsClaimerTask->prepareObservation(obsName);
//				itsPreparedObs[obsID] = schedInfo(modTime, false);	// requested claim but no answer yet.
//			}
//		}
//		else {
//			// only add observations to the PVSS list when the claim was succesfull
//			// otherwise thing will go wrong in the Navigator
//			plannedArr.push_back(new GCFPVString(obsName));
//		}
//
//		// should this observation (have) be(en) started?
//		int		timeBeforeStart = time_duration(plannedDBlist[idx].starttime - currentTime).total_seconds();
//                LOG_INFO(formatString("%s starts at %s which is in %d seconds",
//                                obsName.c_str(),
//                                to_simple_string(plannedDBlist[idx].starttime).c_str(),
//                                timeBeforeStart));
//
//		if (timeBeforeStart > 0 && timeBeforeStart <= (int)itsQueuePeriod) {
//			if (itsPreparedObs[obsID].prepReady == false) {
//				LOG_INFO_STR("Observation " << obsID << " must be started but is not claimed yet.");
//			}
//			else {
//				// starttime of observation lays in queuePeriod. Start the controller-chain,
//				// this will result in CONTROL_STARTED event in our main task
//				// Note: as soon as the ObservationController has reported itself to the MACScheduler
//				//		 the observation will not be returned in the 'plannedDBlist' anymore.
//				string	cntlrName(controllerName(CNTLRTYPE_OBSERVATIONCTRL, 0, obsID));
//				if (itsControllerMap.find(cntlrName) == itsControllerMap.end()) {
//					LOG_INFO_STR("Requesting start of " << cntlrName);
//					itsChildControl->startChild(CNTLRTYPE_OBSERVATIONCTRL,
//												obsID,
//												0,		// instanceNr
//												myHostname(false));
//					// Note: controller is now in state NO_STATE/CONNECTED (C/R)
//
//					// add controller to our 'monitor' administration
//					itsControllerMap[cntlrName] =  obsID;
//					LOG_DEBUG_STR("itsControllerMap[" << cntlrName << "]=" <<  obsID);
//					if (!itsPreparedObs[obsID].parsetDistributed) {
//						_setParsetOnMsgBus(observationParset(obsID));
//						itsPreparedObs[obsID].parsetDistributed = true;
//					}
//				}
//				else {
//					LOG_DEBUG_STR("Observation " << obsID << " is already (being) started");
//				}
//			}
//		}