Newer
Older
//# MACScheduler.cc: Implementation of the MAC Scheduler task
//#
//# Copyright (C) 2002-2004
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#include <lofar_config.h>
#include <Common/LofarLogger.h>
#include <boost/shared_array.hpp>
#include <APS/ParameterSet.h>
#include "MACSchedulerDefines.h"
#include "MACScheduler.h"
using namespace LOFAR::GCF::Common;
using namespace LOFAR::GCF::TM;
using namespace LOFAR::GCF::PAL;
using namespace LOFAR::OTDB;
using namespace std;
namespace LOFAR {
using namespace APLCommon;
using namespace ACC::APS;
// static (this) pointer used for signal handling
static MACScheduler* pMacScheduler = 0;
//
// MACScheduler()
//
MACScheduler::MACScheduler() :
GCFTask ((State)&MACScheduler::initial_state,string(MS_TASKNAME)),
PropertySetAnswerHandlerInterface(),
itsPropertySetAnswer(*this),
itsPropertySet (),
itsSecondTimer (0),
itsQueuePeriod (0),
itsClaimPeriod (0),
itsOTDBconnection (0),
itsOTDBpollInterval (0),
itsNextOTDBpolltime (0)
{
LOG_TRACE_OBJ ("MACscheduler construction");
LOG_INFO_STR("MACProcessScope: " << globalParameterSet()->getString("prefix"));
// Readin some parameters from the ParameterSet.
itsOTDBpollInterval = globalParameterSet()->getTime("OTDBpollInterval");
itsQueuePeriod = globalParameterSet()->getTime("QueuePeriod");
itsClaimPeriod = globalParameterSet()->getTime("ClaimPeriod");
itsChildPort = new GCFITCPort (*this, *itsChildControl, "childITCport",
GCFPortInterface::SAP, CONTROLLER_PROTOCOL);
ASSERTSTR(itsChildPort, "Cannot allocate ITCport for childcontrol");
itsChildPort->open(); // will result in F_CONNECTED
// need port for timers
itsTimerPort = new GCFTimerPort(*this, "Timerport");
itsObservations.reserve(10); // already reserve memory for 10 observations.
registerProtocol(CONTROLLER_PROTOCOL, CONTROLLER_PROTOCOL_signalnames);
registerProtocol(PA_PROTOCOL, PA_PROTOCOL_signalnames);
registerProtocol(F_PML_PROTOCOL, F_PML_PROTOCOL_signalnames);
}
//
// ~MACScheduler()
//
MACScheduler::~MACScheduler()
{
LOG_TRACE_OBJ ("~MACscheduler");
// Note: disable is not neccesary because this is always done in destructor
// of propertyset.
if (itsOTDBconnection) {
delete itsOTDBconnection;
}
}
//
// sigintHandler(signum)
//
void MACScheduler::sigintHandler(int signum)
LOG_DEBUG (formatString("SIGINT signal detected (%d)",signum));
if (pMacScheduler) {
pMacScheduler->finish();
}
//
// handlePropertySetAnswer(answer)
//
void MACScheduler::handlePropertySetAnswer(GCFEvent& answer)
{
LOG_DEBUG_STR ("handlePropertySetAnswer:" << evtstr(answer));
switch(answer.signal) {
case F_MYPS_ENABLED: {
GCFPropSetAnswerEvent* pPropAnswer=static_cast<GCFPropSetAnswerEvent*>(&answer);
if(pPropAnswer->result != GCF_NO_ERROR) {
LOG_ERROR(formatString("%s : PropertySet %s NOT ENABLED",
getName().c_str(), pPropAnswer->pScope));
}
// always let timer expire so main task will continue.
itsTimerPort->setTimer(0.0);
break;
}
case F_PS_CONFIGURED:
{
GCFConfAnswerEvent* pConfAnswer=static_cast<GCFConfAnswerEvent*>(&answer);
if(pConfAnswer->result == GCF_NO_ERROR) {
LOG_DEBUG(formatString("%s : apc %s Loaded",
getName().c_str(), pConfAnswer->pApcName));
//apcLoaded();
}
else {
LOG_ERROR(formatString("%s : apc %s NOT LOADED",
getName().c_str(), pConfAnswer->pApcName));
}
break;
}
case F_VGETRESP:
case F_VCHANGEMSG: {
// check which property changed
GCFPropValueEvent* pPropAnswer=static_cast<GCFPropValueEvent*>(&answer);
// TODO: implement something usefull.
// change of queueTime
if ((strstr(pPropAnswer->pPropName, PSN_MAC_SCHEDULER) != 0) &&
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
(pPropAnswer->pValue->getType() == LPT_INTEGER)) {
uint32 newVal = (uint32) ((GCFPVInteger*)pPropAnswer->pValue)->getValue();
if (strstr(pPropAnswer->pPropName, PVSSNAME_MS_QUEUEPERIOD) != 0) {
LOG_INFO_STR ("Changing QueuePeriod from " << itsQueuePeriod <<
" to " << newVal);
itsQueuePeriod = newVal;
}
else if (strstr(pPropAnswer->pPropName, PVSSNAME_MS_CLAIMPERIOD) != 0) {
LOG_INFO_STR ("Changing ClaimPeriod from " << itsClaimPeriod <<
" to " << newVal);
itsClaimPeriod = newVal;
}
}
break;
}
default:
break;
}
}
//
// initial_state(event, port)
//
// Setup all connections.
//
GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& /*port*/)
{
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
itsPropertySet = GCFMyPropertySetPtr(new GCFMyPropertySet(PSN_MAC_SCHEDULER,
PST_MAC_SCHEDULER,
// Wait for timer that is set in PropertySetAnswer on ENABLED event.
}
break;
itsPropertySet->setValue(PVSSNAME_FSM_STATE, GCFPVString ("initial"));
itsPropertySet->setValue(PVSSNAME_FSM_ERROR, GCFPVString (""));
itsPropertySet->setValue(PN_MS_OTDB_CONNECTED, GCFPVBool (false));
itsPropertySet->setValue(PN_MS_OTDB_LAST_POLL, GCFPVString (""));
itsPropertySet->setValue(PN_MS_OTDB_POLLINTERVAL, GCFPVUnsigned(itsOTDBpollInterval));
itsPropertySet->setValue(PN_MS_ACTIVE_OBSERVATIONS, GCFPVDynArr(LPT_STRING, itsPVSSObsList));
// Try to connect to the SAS database.
ACC::APS::ParameterSet* pParamSet = ACC::APS::globalParameterSet();
string username = pParamSet->getString("OTDBusername");
string DBname = pParamSet->getString("OTDBdatabasename");
string password = pParamSet->getString("OTDBpassword");
LOG_DEBUG ("Trying to connect to the OTDB");
itsOTDBconnection= new OTDBconnection(username, password, DBname);
ASSERTSTR (itsOTDBconnection, "Memory allocation error (OTDB)");
ASSERTSTR (itsOTDBconnection->connect(),
"Unable to connect to database " << DBname << " using " <<
username << "," << password);
itsPropertySet->setValue(string(PN_MS_OTDB_CONNECTED),GCFPVBool(true));
// Start ChildControl task
LOG_DEBUG ("Enabling ChildControltask");
itsChildControl->openService(MAC_SVCMASK_SCHEDULERCTRL, 0);
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// recover_state(event, port)
//
// Read last PVSS states, compare those to the SAS states and try to
// recover to the last situation.
//
GCFEvent::TResult MACScheduler::recover_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("recover_state:" << evtstr(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// update PVSS
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("recover"));
itsPropertySet->setValue(string(PVSSNAME_FSM_ERROR),GCFPVString(""));
//
// TODO: do recovery
TRAN(MACScheduler::active_state);
break;
}
default:
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// active_state(event, port)
//
// Normal operation state. Check OTDB every OTDBpollInterval seconds and control
// the running observations.
//
GCFEvent::TResult MACScheduler::active_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("active:" << evtstr(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// install my own signal handler. GCFTask also installs a handler so we have
// to install our handler later than the GCFTask handler.
// update PVSS
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("active"));
itsPropertySet->setValue(string(PVSSNAME_FSM_ERROR),GCFPVString(""));
// Timers must be connected to ports, so abuse serverPort for second timer.
break;
case F_CONNECTED:
// Should be from the (lost) connection with the SD
_connectedHandler(port);
break;
case F_DISCONNECTED:
// Can be from StartDaemon or ObsController.
// StartDaemon: trouble! Try to reconnect asap.
// ObsController: ok when obs is finished, BIG TROUBLE when not!
_disconnectedHandler(port);
break;
case F_TIMER: { // secondTimer or reconnectTimer.
GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event);
if (timerEvent.id == itsSecondTimer) {
// time to poll the OTDB?
if (time(0) >= itsNextOTDBpolltime) {
_doOTDBcheck();
// reinit polltime at multiple of intervaltime.
// (=more change to hit hh.mm:00)
itsNextOTDBpolltime -= (itsNextOTDBpolltime % itsOTDBpollInterval);
}
itsSecondTimer = port.setTimer(1.0);
}
// a connection was lost and a timer was set to try to reconnect.
// else if (...) {
// TODO
// map timer to port
// port.open();
// }
break;
}
// -------------------- EVENTS FROM CHILDCONTROL --------------------
// Child control received a message from the startDaemon that the
// observationController was started (or not)
CONTROLStartedEvent msg(event);
if (msg.successful) {
LOG_DEBUG_STR("Start of " << msg.cntlrName << " was successful");
}
else {
LOG_ERROR_STR("Observation controller " << msg.cntlrName <<
// The observationController has registered itself at childControl.
CONTROLConnectEvent conEvent(event);
LOG_DEBUG_STR("Received CONNECT(" << conEvent.cntlrName << ")");
case CONTROL_FINISH: {
// The observationController is going down.
CONTROLFinishEvent finishEvent(event);
LOG_DEBUG_STR("Received FINISH(" << finishEvent.cntlrName << ")");
LOG_DEBUG_STR("Removing observation " << finishEvent.cntlrName <<
" from activeList");
// NOTE: ignore all other CONTROL events, we are not interested in the
// states of the Observations. (not our job).
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
//
// finishing_state(event, port)
//
// Write controller state to PVSS, wait for 1 second (using a timer) to let GCF handle the property change
// and close the controller
//
GCFEvent::TResult MACScheduler::finishing_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("finishing_state:" << evtstr(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// update PVSS
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("finished"));
itsPropertySet->setValue(string(PVSSNAME_FSM_ERROR),GCFPVString(""));
itsTimerPort->setTimer(1L);
break;
}
case F_TIMER:
GCFTask::stop();
break;
default:
LOG_DEBUG("finishing_state, default");
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// finish
//
// Make the transition to the finishing state
//
void MACScheduler::finish()
{
TRAN(MACScheduler::finishing_state);
}
//
// _doOTDBcheck
//
// Check if a new action should be taken based on the contents of OTDB and our own
// administration.
//
void MACScheduler::_doOTDBcheck()
{
ptime currentTime = from_time_t(time(0));
itsPropertySet->setValue(string(PN_MS_OTDB_LAST_POLL),
GCFPVString(to_simple_string(currentTime)));
// get new list (list is ordered on starttime)
vector<OTDBtree> newTreeList = itsOTDBconnection->getExecutableTrees();
if (newTreeList.empty()) {
return;
}
LOG_DEBUG(formatString("OTDBCheck:First observation is at %s (tree=%d)",
to_simple_string(newTreeList[0].starttime).c_str(), newTreeList[0].treeID()));
// walk through the list and bring each observation in the right state when necc.
uint32 listSize = newTreeList.size();
uint32 idx = 0;
ASSERTSTR (currentTime != not_a_date_time, "Can't determine systemtime, bailing out");
while (idx < listSize) {
// timediff = time to go before start of Observation
time_duration timediff = newTreeList[idx].starttime - currentTime;
LOG_TRACE_VAR_STR("timediff=" << timediff);
// when queuetime is not reached yet we are finished with the list.
if (timediff > seconds(itsQueuePeriod)) {
break;
}
// get current state of Observation
string cntlrName = controllerName(CNTLRTYPE_OBSERVATIONCTRL,
0, newTreeList[idx].treeID());
CTState::CTstateNr requestedState= itsChildControl->getRequestedState(cntlrName);
// When in startup or claimtime we should try to start the controller.
if ((timediff > seconds(0)) && (requestedState != CTState::CONNECTED)) {
// no, let database construct the parset for the whole observation
OTDB::TreeMaintenance tm(itsOTDBconnection);
OTDB::treeIDType treeID = newTreeList[idx].treeID();
OTDBnode topNode = tm.getTopNode(treeID);
// NOTE: this name must be the same as in the ChildControl.
string filename = formatString("%s/Observation_%d",
LOFAR_SHARE_LOCATION, treeID);
else {
// fire request for new controller, will result in CONTROL_STARTED
// Note: controller is now in state NO_STATE/CONNECTED (C/R)
// register this Observation
ParameterSet obsPS(filename);
Observation newObs(&obsPS);
newObs.name = cntlrName;
newObs.treeID = treeID;
LOG_DEBUG_STR("Observation " << cntlrName << " added to active Observations");
TreeStateConv tsc(itsOTDBconnection);
tm.setTreeState(treeID, tsc.get("queued"));
// in CLAIM period?
if ((timediff > seconds(0)) && (timediff <= seconds(itsClaimPeriod))) {
// Observation is somewhere in the claim period its should be up by now.
if (requestedState != CTState::CLAIMED) {
LOG_ERROR_STR("Observation " << cntlrName <<
" should have reached the CLAIMING state by now," <<
" check state of observation.");
}
// observation must be running (otherwise it would not be in the newTreeList)
// TODO: check if endtime is reached and observation is still running.
//
// _addActiveObservation(name)
//
void MACScheduler::_addActiveObservation(const Observation& newObs)
{
// Observation already in vector?
vector<Observation>::iterator end = itsObservations.end();
vector<Observation>::iterator iter = itsObservations.begin();
while (iter != end) {
if (iter->name == newObs.name) {
return;
}
}
// update own admin and PVSS datapoint
itsObservations.push_back(newObs);
itsPVSSObsList.push_back(new GCFPVString(formatString("Observation%d", newObs.treeID)));
itsPropertySet->setValue(PN_MS_ACTIVE_OBSERVATIONS, GCFPVDynArr(LPT_STRING, itsPVSSObsList));
LOG_DEBUG_STR("Added observation " << newObs.name << " to active observation-list");
}
//
// _removeActiveObservation(name)
//
void MACScheduler::_removeActiveObservation(const string& name)
{
// search observation.
vector<Observation>::iterator end = itsObservations.end();
vector<Observation>::iterator iter = itsObservations.begin();
bool found(false);
while (!found && (iter != end)) {
if (iter->name == name) {
found = true;
itsObservations.erase(iter);
LOG_DEBUG_STR("Removed observation " << name << " from active observationList");
}
GCFPValueArray::iterator pEnd = itsPVSSObsList.end();
GCFPValueArray::iterator pIter = itsPVSSObsList.begin();
while (pIter != pEnd) {
if ((static_cast<GCFPVString*>(*pIter))->getValue() == obsName) {
delete *pIter;
itsPVSSObsList.erase(pIter);
break;
}
}
itsPropertySet->setValue(PN_MS_ACTIVE_OBSERVATIONS, GCFPVDynArr(LPT_STRING, itsPVSSObsList));
}
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
{
}
//
// _disconnectedHandler(port)
//
void MACScheduler::_disconnectedHandler(GCFPortInterface& port)
{
string visd;
port.close();
#if 0
if(_isServerPort(itsVIparentPort,port)) {
LOG_FATAL("VI parent server closed");
itsVIparentPort.open(); // server closed? reopen it
}
else if(_isVISDclientPort(port,visd)) {
LOG_FATAL(formatString("VI Startdaemon port disconnected: %s",visd.c_str()));
port.setTimer(3L);
}
else if(_isVIclientPort(port)) {
LOG_FATAL("VI client port disconnected");
// do something with the nodeId?
}
#endif
}
};
};