"README.md" did not exist on "78bc4b7b28c5e7d65c05c191b1c59bb3dfa23189"
Newer
Older
//# MACScheduler.cc: Implementation of the MAC Scheduler task
//#
//# Copyright (C) 2002-2004
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#include <lofar_config.h>
#include <Common/LofarLogger.h>
#include <boost/shared_array.hpp>
#include <APS/ParameterSet.h>
#include "MACSchedulerDefines.h"
#include "MACScheduler.h"
using namespace LOFAR::GCF::Common;
using namespace LOFAR::GCF::TM;
using namespace LOFAR::GCF::PAL;
using namespace LOFAR::OTDB;
using namespace std;
namespace LOFAR {
using namespace APLCommon;
using namespace ACC::APS;
//
// MACScheduler()
//
MACScheduler::MACScheduler() :
GCFTask ((State)&MACScheduler::initial_state,string(MS_TASKNAME)),
PropertySetAnswerHandlerInterface(),
itsPropertySetAnswer(*this),
itsPropertySet (),
itsSecondTimer (0),
itsQueuePeriod (0),
itsClaimPeriod (0),
itsOTDBconnection (0),
itsOTDBpollInterval (0),
itsNextOTDBpolltime (0)
{
LOG_TRACE_OBJ ("MACscheduler construction");
LOG_INFO_STR("MACProcessScope: " << globalParameterSet()->getString("prefix"));
// Readin some parameters from the ParameterSet.
itsOTDBpollInterval = globalParameterSet()->getTime("OTDBpollInterval");
itsQueuePeriod = globalParameterSet()->getTime("QueuePeriod");
itsClaimPeriod = globalParameterSet()->getTime("ClaimPeriod");
itsChildPort = new GCFITCPort (*this, *itsChildControl, "childITCport",
GCFPortInterface::SAP, CONTROLLER_PROTOCOL);
ASSERTSTR(itsChildPort, "Cannot allocate ITCport for childcontrol");
itsChildPort->open(); // will result in F_CONNECTED
// need port for timers
itsTimerPort = new GCFTimerPort(*this, "Timerport");
itsObservations.reserve(10); // already reserve memory for 10 observations.
registerProtocol (CONTROLLER_PROTOCOL, CONTROLLER_PROTOCOL_signalnames);
registerProtocol(PA_PROTOCOL, PA_PROTOCOL_signalnames);
}
//
// ~MACScheduler()
//
MACScheduler::~MACScheduler()
{
LOG_TRACE_OBJ ("~MACscheduler");
if (itsPropertySet) {
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("down"));
// Note: disable is not neccesary because this is always done in destructor
// of propertyset.
}
if (itsOTDBconnection) {
delete itsOTDBconnection;
}
}
//
// handlePropertySetAnswer(answer)
//
void MACScheduler::handlePropertySetAnswer(GCFEvent& answer)
{
LOG_DEBUG_STR ("handlePropertySetAnswer:" << evtstr(answer));
switch(answer.signal) {
case F_MYPS_ENABLED: {
GCFPropSetAnswerEvent* pPropAnswer=static_cast<GCFPropSetAnswerEvent*>(&answer);
if(pPropAnswer->result != GCF_NO_ERROR) {
LOG_ERROR(formatString("%s : PropertySet %s NOT ENABLED",
getName().c_str(), pPropAnswer->pScope));
}
// always let timer expire so main task will continue.
itsTimerPort->setTimer(0.0);
break;
}
case F_PS_CONFIGURED:
{
GCFConfAnswerEvent* pConfAnswer=static_cast<GCFConfAnswerEvent*>(&answer);
if(pConfAnswer->result == GCF_NO_ERROR) {
LOG_DEBUG(formatString("%s : apc %s Loaded",
getName().c_str(), pConfAnswer->pApcName));
//apcLoaded();
}
else {
LOG_ERROR(formatString("%s : apc %s NOT LOADED",
getName().c_str(), pConfAnswer->pApcName));
}
break;
}
case F_VGETRESP:
case F_VCHANGEMSG: {
// check which property changed
GCFPropValueEvent* pPropAnswer=static_cast<GCFPropValueEvent*>(&answer);
// TODO: implement something usefull.
// change of queueTime
if ((strstr(pPropAnswer->pPropName, PSN_MAC_SCHEDULER) != 0) &&
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
(pPropAnswer->pValue->getType() == LPT_INTEGER)) {
uint32 newVal = (uint32) ((GCFPVInteger*)pPropAnswer->pValue)->getValue();
if (strstr(pPropAnswer->pPropName, PVSSNAME_MS_QUEUEPERIOD) != 0) {
LOG_INFO_STR ("Changing QueuePeriod from " << itsQueuePeriod <<
" to " << newVal);
itsQueuePeriod = newVal;
}
else if (strstr(pPropAnswer->pPropName, PVSSNAME_MS_CLAIMPERIOD) != 0) {
LOG_INFO_STR ("Changing ClaimPeriod from " << itsClaimPeriod <<
" to " << newVal);
itsClaimPeriod = newVal;
}
}
break;
}
default:
break;
}
}
//
// initial_state(event, port)
//
// Setup all connections.
//
GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& /*port*/)
{
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
itsPropertySet = GCFMyPropertySetPtr(new GCFMyPropertySet(PSN_MAC_SCHEDULER,
PST_MAC_SCHEDULER,
// Wait for timer that is set in PropertySetAnswer on ENABLED event.
}
break;
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE), GCFPVString ("initial"));
itsPropertySet->setValue(string(PVSSNAME_FSM_ERROR), GCFPVString (""));
itsPropertySet->setValue(string(PN_MS_OTDB_CONNECTED), GCFPVBool (false));
itsPropertySet->setValue(string(PN_MS_OTDB_LAST_POLL), GCFPVString (""));
itsPropertySet->setValue(string(PN_MS_OTDB_POLLINTERVAL), GCFPVUnsigned(itsOTDBpollInterval));
// Try to connect to the SAS database.
ACC::APS::ParameterSet* pParamSet = ACC::APS::globalParameterSet();
string username = pParamSet->getString("OTDBusername");
string DBname = pParamSet->getString("OTDBdatabasename");
string password = pParamSet->getString("OTDBpassword");
LOG_DEBUG ("Trying to connect to the OTDB");
itsOTDBconnection= new OTDBconnection(username, password, DBname);
ASSERTSTR (itsOTDBconnection, "Memory allocation error (OTDB)");
ASSERTSTR (itsOTDBconnection->connect(),
"Unable to connect to database " << DBname << " using " <<
username << "," << password);
itsPropertySet->setValue(string(PN_MS_OTDB_CONNECTED),GCFPVBool(true));
// Start ChildControl task
LOG_DEBUG ("Enabling ChildControltask");
itsChildControl->openService(MAC_SVCMASK_SCHEDULERCTRL, 0);
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// recover_state(event, port)
//
// Read last PVSS states, compare those to the SAS states and try to
// recover to the last situation.
//
GCFEvent::TResult MACScheduler::recover_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("recover_state:" << evtstr(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// update PVSS
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("recover"));
itsPropertySet->setValue(string(PVSSNAME_FSM_ERROR),GCFPVString(""));
//
// TODO: do recovery
TRAN(MACScheduler::active_state);
break;
}
default:
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// active_state(event, port)
//
// Normal operation state. Check OTDB every OTDBpollInterval seconds and control
// the running observations.
//
GCFEvent::TResult MACScheduler::active_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("active:" << evtstr(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// update PVSS
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("active"));
itsPropertySet->setValue(string(PVSSNAME_FSM_ERROR),GCFPVString(""));
// Timers must be connected to ports, so abuse serverPort for second timer.
break;
case F_CONNECTED:
// Should be from the (lost) connection with the SD
_connectedHandler(port);
break;
case F_DISCONNECTED:
// Can be from StartDaemon or ObsController.
// StartDaemon: trouble! Try to reconnect asap.
// ObsController: ok when obs is finished, BIG TROUBLE when not!
_disconnectedHandler(port);
break;
case F_TIMER: { // secondTimer or reconnectTimer.
GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event);
if (timerEvent.id == itsSecondTimer) {
// time to poll the OTDB?
if (time(0) >= itsNextOTDBpolltime) {
_doOTDBcheck();
// reinit polltime at multiple of intervaltime.
// (=more change to hit hh.mm:00)
itsNextOTDBpolltime -= (itsNextOTDBpolltime % itsOTDBpollInterval);
}
itsSecondTimer = port.setTimer(1.0);
}
// a connection was lost and a timer was set to try to reconnect.
// else if (...) {
// TODO
// map timer to port
// port.open();
// }
break;
}
// -------------------- EVENTS FROM CHILDCONTROL --------------------
// Child control received a message from the startDaemon that the
// observationController was started (or not)
CONTROLStartedEvent msg(event);
if (msg.successful) {
LOG_DEBUG_STR("Start of " << msg.cntlrName << " was successful");
}
else {
LOG_ERROR_STR("Observation controller " << msg.cntlrName <<
// The observationController has registered itself at childControl.
CONTROLConnectEvent conEvent(event);
LOG_DEBUG_STR("Received CONNECT(" << conEvent.cntlrName << ")");
case CONTROL_FINISH: {
// The observationController is going down.
CONTROLFinishEvent finishEvent(event);
LOG_DEBUG_STR("Received FINISH(" << finishEvent.cntlrName << ")");
LOG_DEBUG_STR("Removing observation " << finishEvent.cntlrName <<
" from activeList");
// NOTE: ignore all other CONTROL events, we are not interested in the
// states of the Observations. (not our job).
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// _doOTDBcheck
//
// Check if a new action should be taken based on the contents of OTDB and our own
// administration.
//
void MACScheduler::_doOTDBcheck()
{
// get new list (list is ordered on starttime)
vector<OTDBtree> newTreeList = itsOTDBconnection->getExecutableTrees();
if (newTreeList.empty()) {
return;
}
LOG_DEBUG(formatString("OTDBCheck:First observation is at %s (tree=%d)",
to_simple_string(newTreeList[0].starttime).c_str(), newTreeList[0].treeID()));
// walk through the list and bring each observation in the right state when necc.
uint32 listSize = newTreeList.size();
uint32 idx = 0;
ptime currentTime = from_time_t(time(0));
ASSERTSTR (currentTime != not_a_date_time, "Can't determine systemtime, bailing out");
// REO: test pvss appl
while (idx < listSize) {
// timediff = time to go before start of Observation
time_duration timediff = newTreeList[idx].starttime - currentTime;
LOG_TRACE_VAR_STR("timediff=" << timediff);
// when queuetime is not reached yet we are finished with the list.
if (timediff > seconds(itsQueuePeriod)) {
break;
}
// get current state of Observation
string cntlrName = controllerName(CNTLRTYPE_OBSERVATIONCTRL,
0, newTreeList[idx].treeID());
CTState::CTstateNr requestedState= itsChildControl->getRequestedState(cntlrName);
// When in startup or claimtime we should try to start the controller.
if ((timediff > seconds(0)) && (requestedState != CTState::CONNECTED)) {
// no, let database construct the parset for the whole observation
OTDB::TreeMaintenance tm(itsOTDBconnection);
OTDB::treeIDType treeID = newTreeList[idx].treeID();
OTDBnode topNode = tm.getTopNode(treeID);
// NOTE: this name must be the same as in the ChildControl.
string filename = formatString("%s/Observation_%d",
LOFAR_SHARE_LOCATION, treeID);
else {
// fire request for new controller, will result in CONTROL_STARTED
itsChildControl->startChild(cntlrName,
treeID,
CNTLRTYPE_OBSERVATIONCTRL,
0, // instanceNr
// Note: controller is now in state NO_STATE/CONNECTED (C/R)
// register this Observation
ParameterSet obsPS(filename);
Observation newObs(&obsPS);
newObs.name = cntlrName;
newObs.treeID = treeID;
LOG_DEBUG_STR("Observation " << cntlrName << " added to active Observations");
}
idx++;
continue;
// in CLAIM period?
if ((timediff > seconds(0)) && (timediff <= seconds(itsClaimPeriod))) {
// Observation is somewhere in the claim period its should be up by now.
if (requestedState != CTState::CLAIMED) {
LOG_ERROR_STR("Observation " << cntlrName <<
" should have reached the CLAIMING state by now," <<
" check state of observation.");
}
// observation must be running (otherwise it would not be in the newTreeList)
// TODO: check if endtime is reached and observation is still running.
//
// _addActiveObservation(name)
//
void MACScheduler::_addActiveObservation(const Observation& newObs)
{
// Observation already in vector?
vector<Observation>::iterator end = itsObservations.end();
vector<Observation>::iterator iter = itsObservations.begin();
while (iter != end) {
if (iter->name == newObs.name) {
return;
}
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
}
// update own admin and PVSS datapoint
itsObservations.push_back(newObs);
itsPVSSObsList.push_back(new GCFPVString(newObs.name));
itsPropertySet->setValue(PN_MS_ACTIVE_OBSERVATIONS, GCFPVDynArr(LPT_STRING, itsPVSSObsList));
LOG_DEBUG_STR("Added observation " << newObs.name << " to active observation-list");
}
//
// _removeActiveObservation(name)
//
void MACScheduler::_removeActiveObservation(const string& name)
{
// search observation.
vector<Observation>::iterator end = itsObservations.end();
vector<Observation>::iterator iter = itsObservations.begin();
bool found(false);
while (!found && (iter != end)) {
if (iter->name == name) {
found = true;
itsObservations.erase(iter);
LOG_DEBUG_STR("Removed observation " << name << " from active observationList");
}
}
if (!found) {
return;
}
GCFPValueArray::iterator pEnd = itsPVSSObsList.end();
GCFPValueArray::iterator pIter = itsPVSSObsList.begin();
while (pIter != pEnd) {
if ((static_cast<GCFPVString*>(*pIter))->getValue() == name) {
delete *pIter;
itsPVSSObsList.erase(pIter);
break;
}
}
itsPropertySet->setValue(PN_MS_ACTIVE_OBSERVATIONS, GCFPVDynArr(LPT_STRING, itsPVSSObsList));
}
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
//
// _connectedHandler(port)
//
void MACScheduler::_connectedHandler(GCFPortInterface& port)
{
}
//
// _disconnectedHandler(port)
//
void MACScheduler::_disconnectedHandler(GCFPortInterface& port)
{
string visd;
port.close();
#if 0
if(_isServerPort(itsVIparentPort,port)) {
LOG_FATAL("VI parent server closed");
itsVIparentPort.open(); // server closed? reopen it
}
else if(_isVISDclientPort(port,visd)) {
LOG_FATAL(formatString("VI Startdaemon port disconnected: %s",visd.c_str()));
port.setTimer(3L);
}
else if(_isVIclientPort(port)) {
LOG_FATAL("VI client port disconnected");
// do something with the nodeId?
}
#endif
}
};
};