Skip to content
Snippets Groups Projects
Commit f88e6ff5 authored by blaakmeer's avatar blaakmeer
Browse files

BugID: 802

Transformed VirtualBackend to OnlineControl
parent 3c4674a3
No related branches found
No related tags found
No related merge requests found
//# CEPApplicationManager.cc: Implementation of the Virtual CEPApplicationManager task
//#
//# Copyright (C) 2002-2004
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#include <lofar_config.h>
#include "CEPApplicationManager.h"
namespace LOFAR
{
using namespace ACC::ALC;
namespace CEPCU
{
INIT_TRACER_CONTEXT(CEPApplicationManager, LOFARLOGGER_PACKAGE);
void CEPApplicationManager::workProc()
{
if (_continuePoll)
{
_acClient.processACmsgFromServer();
}
}
void CEPApplicationManager::handleAckMsg(ACCmd cmd,
uint16 result,
const string& info)
{
LOG_INFO(formatString("command: %d, result: %d, info: %s", cmd, result, info.c_str()));
switch (cmd)
{
case ACCmdBoot:
if (result == AcCmdMaskOk)
{
_lastOkCmd = cmd;
}
_interface.appBooted(result);
break;
case ACCmdQuit:
if (result == AcCmdMaskOk && result == 0)
{
_continuePoll = false;
}
_interface.appQuitDone(result);
break;
case ACCmdDefine:
if (result == AcCmdMaskOk)
{
_lastOkCmd = cmd;
}
_interface.appDefined(result);
break;
case ACCmdInit:
if (result == AcCmdMaskOk)
{
_lastOkCmd = cmd;
}
_interface.appInitialized(result);
break;
case ACCmdPause:
_interface.appPaused(result);
break;
case ACCmdRun:
if (result == AcCmdMaskOk)
{
_lastOkCmd = cmd;
}
_interface.appRunDone(result);
break;
case ACCmdSnapshot:
_interface.appSnapshotDone(result);
break;
case ACCmdRecover:
_interface.appRecovered(result);
break;
case ACCmdReinit:
_interface.appReinitialized(result);
break;
case ACCmdReplace:
_interface.appReplaced(result);
break;
default:
LOG_WARN_STR("Received command = " << cmd << ", result = " << result
<< ", info = " << info << " not handled!");
break;
}
}
void CEPApplicationManager::handleAnswerMsg (const string& answer)
{
_interface.appSupplyInfoAnswer(answer);
}
string CEPApplicationManager::supplyInfoFunc (const string& keyList)
{
return _interface.appSupplyInfo(keyList);
}
} // namespace CEPCU
} // namespace LOFAR
//# CEPApplicationManager.h: factory class for Virtual Backends.
//#
//# Copyright (C) 2002-2005
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#ifndef CEPAPPLICATIONMANAGER_H
#define CEPAPPLICATIONMANAGER_H
//# Includes
#include <ALC/ACAsyncClient.h>
#include <GCF/TM/GCF_Handler.h>
//# local includes
//# Common Includes
// forward declaration
namespace LOFAR
{
namespace CEPCU
{
class CEPApplicationManagerInterface
{
protected:
CEPApplicationManagerInterface() {}
public:
virtual ~CEPApplicationManagerInterface() {}
public:
virtual void appBooted(uint16 result) = 0;
virtual void appDefined(uint16 result) = 0;
virtual void appInitialized(uint16 result) = 0;
virtual void appRunDone(uint16 result) = 0;
virtual void appPaused(uint16 result) = 0;
virtual void appQuitDone(uint16 result) = 0;
virtual void appSnapshotDone(uint16 result) = 0;
virtual void appRecovered(uint16 result) = 0;
virtual void appReinitialized(uint16 result) = 0;
virtual void appReplaced(uint16 result) = 0;
virtual string appSupplyInfo(const string& keyList) = 0;
virtual void appSupplyInfoAnswer(const string& answer) = 0;
private:
// protected copy constructor
CEPApplicationManagerInterface(const CEPApplicationManagerInterface&);
// protected assignment operator
CEPApplicationManagerInterface& operator=(const CEPApplicationManagerInterface&);
};
class CEPApplicationManager : public ACC::ALC::ACClientFunctions,
GCF::TM::GCFHandler
{
public:
CEPApplicationManager(CEPApplicationManagerInterface& interface, const string& appName);
virtual ~CEPApplicationManager();
public: // methods may be called from specialized CEPApplicationManagerInterface
bool boot (const time_t scheduleTime,
const string& configID);
bool define (const time_t scheduleTime) const;
bool init (const time_t scheduleTime) const;
bool run (const time_t scheduleTime) const;
bool pause (const time_t scheduleTime,
const time_t maxWaitTime,
const string& condition) const;
bool quit (const time_t scheduleTime) const;
bool shutdown (const time_t scheduleTime) const;
bool snapshot (const time_t scheduleTime,
const string& destination) const;
bool recover (const time_t scheduleTime,
const string& source) const;
bool reinit (const time_t scheduleTime,
const string& configID) const;
bool replace (const time_t scheduleTime,
const string& processList,
const string& nodeList,
const string& configID) const;
string askInfo (const string& keylist) const;
bool cancelCmdQueue () const;
ACC::ALC::ACCmd getLastOkCmd() const;
private: // implemenation of abstract GCFHandler methods
friend class GCF::TM::GCFHandler;
void workProc();
void stop();
private: // implemenation of abstract ACClientFunctions methods
friend class ACC::ALC::ACClientFunctions;
void handleAckMsg (ACC::ALC::ACCmd cmd,
uint16 result,
const string& info);
void handleAnswerMsg (const string& answer);
string supplyInfoFunc (const string& keyList);
protected:
// protected copy constructor
CEPApplicationManager(const CEPApplicationManager&);
// protected assignment operator
CEPApplicationManager& operator=(const CEPApplicationManager&);
private:
CEPApplicationManagerInterface& _interface;
ACC::ALC::ACAsyncClient _acClient;
bool _continuePoll;
ACC::ALC::ACCmd _lastOkCmd;
ALLOC_TRACER_CONTEXT
};
inline CEPApplicationManager::CEPApplicationManager(
CEPApplicationManagerInterface& interface,
const string& appName) :
_interface(interface),
_acClient(this, appName, 10, 100, 1, 0),
_continuePoll(false),
_lastOkCmd(ACC::ALC::ACCmdNone)
{
use(); // to avoid that this object will be deleted in GCFTask::stop;
}
inline CEPApplicationManager::~CEPApplicationManager()
{
GCFTask::deregisterHandler(*this);
}
inline bool CEPApplicationManager::boot (const time_t scheduleTime,
const string& configID)
{
_continuePoll = true;
return _acClient.boot(scheduleTime, configID);
}
inline bool CEPApplicationManager::define (const time_t scheduleTime) const
{
return _acClient.define(scheduleTime);
}
inline bool CEPApplicationManager::init (const time_t scheduleTime) const
{
return _acClient.init(scheduleTime);
}
inline bool CEPApplicationManager::run (const time_t scheduleTime) const
{
return _acClient.run(scheduleTime);
}
inline bool CEPApplicationManager::pause (const time_t scheduleTime,
const time_t maxWaitTime,
const string& condition) const
{
return _acClient.pause(scheduleTime, maxWaitTime, condition);
}
inline bool CEPApplicationManager::quit (const time_t scheduleTime) const
{
return _acClient.quit(scheduleTime);
}
inline bool CEPApplicationManager::shutdown (const time_t scheduleTime) const
{
return _acClient.shutdown(scheduleTime);
}
inline bool CEPApplicationManager::snapshot (const time_t scheduleTime,
const string& destination) const
{
return _acClient.snapshot(scheduleTime, destination);
}
inline bool CEPApplicationManager::recover (const time_t scheduleTime,
const string& source) const
{
return _acClient.recover(scheduleTime, source);
}
inline bool CEPApplicationManager::reinit (const time_t scheduleTime,
const string& configID) const
{
return _acClient.reinit(scheduleTime, configID);
}
inline bool CEPApplicationManager::replace (const time_t scheduleTime,
const string& processList,
const string& nodeList,
const string& configID) const
{
return _acClient.replace(scheduleTime, processList, nodeList, configID);
}
inline bool CEPApplicationManager::cancelCmdQueue () const
{
return _acClient.cancelCmdQueue();
}
inline ACC::ALC::ACCmd CEPApplicationManager::getLastOkCmd() const
{
return _lastOkCmd;
}
inline void CEPApplicationManager::stop()
{
}
} // namespace CEPCU
} // namespace LOFAR
#endif
bin_PROGRAMS = OnlineControl
OnlineControl_CPPFLAGS = -Wno-deprecated \
-fmessage-length=0 \
-fdiagnostics-show-location=once
OnlineControl_SOURCES = CEPApplicationManager.cc \
OnlineControl.cc \
OnlineControlMain.cc
OnlineControl_LDADD = $(LOFAR_DEPEND)
OnlineControl_DEPENDENCIES = $(LOFAR_DEPEND)
NOINSTHDRS = CEPApplicationManager.h \
OnlineControl.h \
OnlineControlDefines.h
INSTHDRS =
pkginclude_HEADERS = $(NOINSTHDRS) $(INSTHDRS)
DOCHDRS = $(pkginclude_HEADERS) $(BUILT_SOURCES)
EXTRA_DIST = $(configfiles_DATA) $(sysconf_DATA)
#in case of make install these files will be copied to the bindir beside the test apps
configfilesdir=$(bindir)
configfiles_DATA =
sysconf_DATA = OnlineControl.log_prop
%.log_prop: %.log_prop.in
cp $< $@
clean-local:
rm -f *.ph
include $(top_srcdir)/Makefile.common
This diff is collapsed.
//# OnlineControl.cc: Implementation of the MAC Scheduler task
//#
//# Copyright (C) 2006
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#include <lofar_config.h>
#include <Common/LofarLogger.h>
#include <boost/shared_array.hpp>
#include <APS/ParameterSet.h>
#include <APS/Exceptions.h>
#include <GCF/GCF_PVTypes.h>
#include <GCF/PAL/GCF_PVSSInfo.h>
#include <GCF/Utils.h>
#include <GCF/GCF_ServiceInfo.h>
#include <GCF/Protocols/PA_Protocol.ph>
#include <APL/APLCommon/APL_Defines.h>
#include <APL/APLCommon/APLUtilities.h>
#include <APL/APLCommon/APLCommonExceptions.h>
#include <APL/APLCommon/Controller_Protocol.ph>
#include <APL/APLCommon/StationInfo.h>
#include <APL/APLCommon/APLUtilities.h>
#include "OnlineControl.h"
#include "OnlineControlDefines.h"
using namespace LOFAR::GCF::Common;
using namespace LOFAR::GCF::TM;
using namespace LOFAR::GCF::PAL;
using namespace std;
namespace LOFAR {
using namespace APLCommon;
using namespace ACC::APS;
using namespace ACC::ALC;
namespace CEPCU {
//
// OnlineControl()
//
OnlineControl::OnlineControl(const string& cntlrName) :
GCFTask ((State)&OnlineControl::initial_state,cntlrName),
PropertySetAnswerHandlerInterface(),
itsPropertySetAnswer(*this),
itsPropertySet (),
itsPropertySetInitialized (false),
itsParentControl (0),
itsParentPort (0),
itsTimerPort (0),
itsCepApplication (*this, cntlrName),
itsCepAppParams (),
itsResultParams (),
itsState (CTState::NOSTATE),
itsTreePrefix (""),
itsInstanceNr (0),
itsStartTime (),
itsStopTime (),
itsClaimPeriod (),
itsPreparePeriod ()
{
LOG_TRACE_OBJ_STR (cntlrName << " construction");
// First readin our observation related config file.
LOG_DEBUG_STR("Reading parset file:" << LOFAR_SHARE_LOCATION << "/" << cntlrName);
globalParameterSet()->adoptFile(string(LOFAR_SHARE_LOCATION)+"/"+cntlrName);
// Readin some parameters from the ParameterSet.
itsTreePrefix = globalParameterSet()->getString("prefix");
itsInstanceNr = globalParameterSet()->getUint32("_instanceNr");
// get Observation based information
itsStartTime = time_from_string(globalParameterSet()->
getString("Observation.startTime"));
itsStopTime = time_from_string(globalParameterSet()->
getString("Observation.stopTime"));
itsClaimPeriod = globalParameterSet()->getTime ("Observation.claimPeriod");
itsPreparePeriod = globalParameterSet()->getTime ("Observation.preparePeriod");
// attach to parent control task
itsParentControl = ParentControl::instance();
itsParentPort = new GCFITCPort (*this, *itsParentControl, "ParentITCport",
GCFPortInterface::SAP, CONTROLLER_PROTOCOL);
ASSERTSTR(itsParentPort, "Cannot allocate ITCport for Parentcontrol");
itsParentPort->open(); // will result in F_CONNECTED
// need port for timers.
itsTimerPort = new GCFTimerPort(*this, "TimerPort");
// for debugging purposes
registerProtocol (CONTROLLER_PROTOCOL, CONTROLLER_PROTOCOL_signalnames);
registerProtocol (PA_PROTOCOL, PA_PROTOCOL_signalnames);
setState(CTState::CREATED);
}
//
// ~OnlineControl()
//
OnlineControl::~OnlineControl()
{
LOG_TRACE_OBJ_STR (getName() << " destruction");
if (itsPropertySet) {
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("down"));
itsPropertySet->disable();
}
// ...
}
//
// setState(CTstateNr)
//
void OnlineControl::setState(CTState::CTstateNr newState)
{
itsState = newState;
if (itsPropertySet) {
CTState cts;
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),
GCFPVString(cts.name(newState)));
}
}
//
// handlePropertySetAnswer(answer)
//
void OnlineControl::handlePropertySetAnswer(GCFEvent& answer)
{
LOG_DEBUG_STR ("handlePropertySetAnswer:" << evtstr(answer));
switch(answer.signal) {
case F_MYPS_ENABLED: {
GCFPropSetAnswerEvent* pPropAnswer=static_cast<GCFPropSetAnswerEvent*>(&answer);
if(pPropAnswer->result != GCF_NO_ERROR) {
LOG_ERROR(formatString("%s : PropertySet %s NOT ENABLED",
getName().c_str(), pPropAnswer->pScope));
}
// always let timer expire so main task will continue.
itsTimerPort->setTimer(1.0);
break;
}
case F_PS_CONFIGURED: {
GCFConfAnswerEvent* pConfAnswer=static_cast<GCFConfAnswerEvent*>(&answer);
if(pConfAnswer->result == GCF_NO_ERROR) {
LOG_DEBUG(formatString("%s : apc %s Loaded",
getName().c_str(), pConfAnswer->pApcName));
//apcLoaded();
}
else {
LOG_ERROR(formatString("%s : apc %s NOT LOADED",
getName().c_str(), pConfAnswer->pApcName));
}
break;
}
case F_VGETRESP:
case F_VCHANGEMSG: {
// check which property changed
// GCFPropValueEvent* pPropAnswer=static_cast<GCFPropValueEvent*>(&answer);
// TODO: implement something usefull.
break;
}
// case F_SUBSCRIBED:
// case F_UNSUBSCRIBED:
// case F_PS_CONFIGURED:
// case F_EXTPS_LOADED:
// case F_EXTPS_UNLOADED:
// case F_MYPS_ENABLED:
// case F_MYPS_DISABLED:
// case F_VGETRESP:
// case F_VCHANGEMSG:
// case F_SERVER_GONE:
default:
break;
}
}
//
// initial_state(event, port)
//
// Setup all connections.
//
GCFEvent::TResult OnlineControl::initial_state(GCFEvent& event,
GCFPortInterface& port)
{
LOG_DEBUG_STR ("initial:" << evtstr(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_ENTRY:
break;
case F_INIT: {
// Get access to my own propertyset.
LOG_DEBUG ("Activating PropertySet");
string propSetName = formatString(ONC_PROPSET_NAME, itsInstanceNr);
itsPropertySet = GCFMyPropertySetPtr(new GCFMyPropertySet(propSetName.c_str(),
ONC_PROPSET_TYPE,
PS_CAT_TEMPORARY,
&itsPropertySetAnswer));
itsPropertySet->enable();
// Wait for timer that is set in PropertySetAnswer on ENABLED event
}
break;
case F_TIMER:
if (!itsPropertySetInitialized) {
itsPropertySetInitialized = true;
// update PVSS.
LOG_TRACE_FLOW ("Updateing state to PVSS");
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("initial"));
itsPropertySet->setValue(string(PVSSNAME_FSM_ERROR),GCFPVString(""));
// Start ParentControl task
LOG_DEBUG ("Enabling ParentControl task");
itsParentPort = itsParentControl->registerTask(this);
LOG_DEBUG ("Going to operational state");
TRAN(OnlineControl::active_state); // go to next state.
}
break;
case F_CONNECTED:
ASSERTSTR (&port == itsParentPort,
"F_CONNECTED event from port " << port.getName());
break;
case F_DISCONNECTED:
break;
default:
LOG_DEBUG_STR ("initial, default");
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// active_state(event, port)
//
// Normal operation state.
//
GCFEvent::TResult OnlineControl::active_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("active:" << evtstr(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_ENTRY: {
// update PVSS
itsPropertySet->setValue(string(PVSSNAME_FSM_STATE),GCFPVString("active"));
itsPropertySet->setValue(string(PVSSNAME_FSM_ERROR),GCFPVString(""));
break;
}
case F_INIT:
break;
case F_ACCEPT_REQ:
break;
case F_CONNECTED: {
ASSERTSTR (&port == itsParentPort, "F_CONNECTED event from port " << port.getName());
break;
}
case F_DISCONNECTED: {
port.close();
break;
}
case F_TIMER:
// GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event);
break;
// -------------------- EVENTS RECEIVED FROM PARENT CONTROL --------------------
case CONTROL_CONNECT: {
CONTROLConnectEvent msg(event);
LOG_DEBUG_STR("Received CONNECT(" << msg.cntlrName << ")");
setState(CTState::CONNECTED);
CONTROLConnectedEvent answer;
answer.cntlrName = msg.cntlrName;
port.send(answer);
break;
}
case CONTROL_SCHEDULED: {
CONTROLScheduledEvent msg(event);
LOG_DEBUG_STR("Received SCHEDULED(" << msg.cntlrName << ")");
// TODO: do something usefull with this information!
break;
}
case CONTROL_CLAIM: {
CONTROLClaimEvent msg(event);
LOG_DEBUG_STR("Received CLAIM(" << msg.cntlrName << ")");
setState(CTState::CLAIM);
setState(CTState::CLAIMED);
CONTROLClaimedEvent answer;
answer.cntlrName = getName();
answer.result = doClaim(msg.cntlrName);
if(answer.result == CT_RESULT_NO_ERROR)
{
setState(CTState::CLAIMED);
}
port.send(answer);
break;
}
case CONTROL_PREPARE: {
CONTROLPrepareEvent msg(event);
LOG_DEBUG_STR("Received PREPARE(" << msg.cntlrName << ")");
setState(CTState::PREPARE);
CONTROLPreparedEvent answer;
answer.cntlrName = getName();
answer.result = doPrepare(msg.cntlrName);
if(answer.result == CT_RESULT_NO_ERROR)
{
setState(CTState::PREPARED);
}
port.send(answer);
break;
}
case CONTROL_RESUME: {
CONTROLResumeEvent msg(event);
LOG_DEBUG_STR("Received RESUME(" << msg.cntlrName << ")");
setState(CTState::ACTIVE);
// TODO: implement something useful
CONTROLResumedEvent answer;
answer.cntlrName = msg.cntlrName;
port.send(answer);
break;
}
case CONTROL_SUSPEND: {
CONTROLSuspendEvent msg(event);
LOG_DEBUG_STR("Received SUSPEND(" << msg.cntlrName << ")");
setState(CTState::SUSPENDED);
// TODO: implement something useful
CONTROLSuspendedEvent answer;
answer.cntlrName = msg.cntlrName;
port.send(answer);
break;
}
case CONTROL_RELEASE: {
CONTROLReleaseEvent msg(event);
LOG_DEBUG_STR("Received RELEASED(" << msg.cntlrName << ")");
setState(CTState::RELEASE);
doRelease(event);
setState(CTState::RELEASED);
CONTROLReleasedEvent answer;
answer.cntlrName = msg.cntlrName;
port.send(answer);
break;
}
default:
LOG_DEBUG("active_state, default");
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
//
// doPrepare(cntlrName)
//
uint16_t OnlineControl::doClaim(const string& cntlrName)
{
uint16_t result = CT_RESULT_NO_ERROR;
try
{
itsCepAppParams.clear();
itsCepAppParams.replace("AC.application", cntlrName);
itsCepAppParams.replace("AC.resultfile", formatString("./ACC-%s_result.param", cntlrName.c_str()));
string processScope("AC.process");
string onlineCtrlPrefix(globalParameterSet()->locateModule("OnlineCtrl") + "OnlineCtrl.");
string procName, newProcName, nodeName;
string ldName(getName().c_str());
procName = globalParameterSet()->getString(onlineCtrlPrefix+"ApplCtrl.application");
itsCepAppParams.adoptCollection(globalParameterSet()->makeSubset(onlineCtrlPrefix+"ApplCtrl", "AC"));
itsCepAppParams.adoptCollection(globalParameterSet()->makeSubset(onlineCtrlPrefix+procName, procName));
}
catch(APSException &)
{
// key not found. skip
result = CT_RESULT_UNSPECIFIED;
}
return result;
}
//
// doPrepare(cntlrName)
//
uint16_t OnlineControl::doPrepare(const string& cntlrName)
{
uint16_t result = CT_RESULT_NO_ERROR;
try
{
// TODO use parameterset of 'cntlrname' when being shared controller
string paramFileName(formatString("ACC-%s.param", cntlrName.c_str()));
itsCepAppParams.writeFile(paramFileName);
// schedule all ACC commands
time_t startTime = to_time_t(itsStartTime);
time_t initTime = startTime - itsCepAppParams.getTime("AC.timeout.init");
time_t defineTime = initTime - itsCepAppParams.getTime("AC.timeout.define") -
itsCepAppParams.getTime("AC.timeout.startup");
time_t bootTime = defineTime - itsCepAppParams.getTime("AC.timeout.createsubsets");
time_t now = time(0);
time_t stopTime = to_time_t(itsStopTime);
LOG_DEBUG(formatString("%d boot %s", bootTime, ctime(&bootTime)));
LOG_DEBUG(formatString("%d define %s", defineTime, ctime(&defineTime)));
LOG_DEBUG(formatString("%d init %s", initTime, ctime(&initTime)));
LOG_DEBUG(formatString("%d start %s", startTime, ctime(&startTime)));
LOG_DEBUG(formatString("%d now %s time %d", now, ctime(&now), time(0)));
LOG_DEBUG(formatString("%d stop %s", stopTime, ctime(&stopTime)));
string hostName, remoteParamPath;
hostName = itsCepAppParams.getString("AC.hostname");
remoteParamPath = itsCepAppParams.getString("AC.remoteParamPath");
if (now > bootTime)
{
APLCommon::APLUtilities::remoteCopy(paramFileName,hostName,remoteParamPath);
LOG_WARN("Cannot guarantee all CEP processes are started in time.");
}
else
{
switch (itsCepApplication.getLastOkCmd())
{
case ACCmdNone:
itsCepApplication.boot(bootTime, paramFileName);
break;
case ACCmdBoot:
itsCepApplication.define(defineTime);
break;
case ACCmdDefine:
case ACCmdInit:
case ACCmdRun:
itsCepApplication.recover(0, "snapshot-DB");
break;
default:
assert(0);
break;
}
APLCommon::APLUtilities::remoteCopy(paramFileName,hostName,remoteParamPath);
}
}
catch(APSException &)
{
// key not found. skip
result = CT_RESULT_UNSPECIFIED;
}
return result;
}
//
// doRelease(event)
//
void OnlineControl::doRelease(GCFEvent& /*event*/)
{
string hostName, remoteFile, resultFile;
hostName = itsCepAppParams.getString("AC.hostname");
resultFile = formatString("ACC-%s_result.param", getName().c_str());
remoteFile = itsCepAppParams.getString("AC.remoteParamPath") + string("/") + resultFile;
APLCommon::APLUtilities::copyFromRemote(hostName,remoteFile,resultFile);
itsResultParams.adoptFile(resultFile);
// itsResultParams.replace(KVpair(formatString("%s.quality", getName().c_str()), (int) _qualityGuard.getQuality()));
if (!itsResultParams.isDefined(formatString("%s.faultyNodes", getName().c_str())))
{
itsResultParams.add(formatString("%s.faultyNodes", getName().c_str()), "");
}
itsResultParams.writeFile(formatString("%s_result.param", getName().c_str()));
itsCepApplication.quit(0);
}
// _connectedHandler(port)
//
void OnlineControl::_connectedHandler(GCFPortInterface& /*port*/)
{
}
//
// _disconnectedHandler(port)
//
void OnlineControl::_disconnectedHandler(GCFPortInterface& port)
{
port.close();
}
void OnlineControl::appBooted(uint16 result)
{
if (result == (AcCmdMaskOk | AcCmdMaskScheduled))
{
time_t startTime = to_time_t(itsStartTime);
time_t initTime = startTime - itsCepAppParams.getTime("AC.timeout.init");
time_t defineTime = initTime - itsCepAppParams.getTime("AC.timeout.define") -
itsCepAppParams.getTime("AC.timeout.startup");
itsCepApplication.define(defineTime);
}
else if (result == 0) // Error
{
LOG_ERROR("Error in ACC. Stops CEP application and releases Online Control.");
itsCepApplication.quit(0);
// _doStateTransition(LOGICALDEVICE_STATE_RELEASING, LD_RESULT_LOW_QUALITY);
}
}
void OnlineControl::appDefined(uint16 result)
{
if (result == (AcCmdMaskOk | AcCmdMaskScheduled))
{
time_t startTime = to_time_t(itsStartTime);
time_t initTime = startTime - itsCepAppParams.getTime("AC.timeout.init");
itsCepApplication.init(initTime);
}
else if (result == 0) // Error
{
LOG_ERROR("Error in ACC. Stops CEP application and releases VB.");
itsCepApplication.quit(0);
// _doStateTransition(LOGICALDEVICE_STATE_RELEASING, LD_RESULT_LOW_QUALITY);
}
}
void OnlineControl::appInitialized(uint16 result)
{
if (result == AcCmdMaskOk)
{
// _doStateTransition(LOGICALDEVICE_STATE_SUSPENDED);
}
else if (result == (AcCmdMaskOk | AcCmdMaskScheduled))
{
itsCepApplication.run(to_time_t(itsStartTime));
}
else if (result == 0) // Error
{
LOG_ERROR("Error in ACC. Stops CEP application and releases VB.");
itsCepApplication.quit(0);
// _doStateTransition(LOGICALDEVICE_STATE_RELEASING, LD_RESULT_LOW_QUALITY);
}
}
void OnlineControl::appRunDone(uint16 result)
{
if (result == (AcCmdMaskOk | AcCmdMaskScheduled))
{
itsCepApplication.quit(to_time_t(itsStopTime));
}
else if (result == 0) // Error
{
LOG_ERROR("Error in ACC. Stops CEP application and releases VB.");
itsCepApplication.quit(0);
// _doStateTransition(LOGICALDEVICE_STATE_RELEASING, LD_RESULT_LOW_QUALITY);
}
}
void OnlineControl::appPaused(uint16 /*result*/)
{
}
void OnlineControl::appQuitDone(uint16 result)
{
if (result == AcCmdMaskOk)
{
//_qualityGuard.stopMonitoring(); // not in this increment
}
}
void OnlineControl::appSnapshotDone(uint16 /*result*/)
{
time_t rsto(0);
try
{
rsto = globalParameterSet()->getTime("rescheduleTimeOut");
}
catch (...) {}
itsCepApplication.pause(0, rsto, "condition");
}
void OnlineControl::appRecovered(uint16 /*result*/)
{
time_t startTime = to_time_t(itsStartTime);
time_t reinitTime = startTime - itsCepAppParams.getTime("AC.timeout.reinit");
string paramFileName(formatString("ACC-%s.param", getName().c_str()));
itsCepApplication.reinit(reinitTime, paramFileName);
}
void OnlineControl::appReinitialized(uint16 result)
{
if (result == AcCmdMaskOk)
{
// _doStateTransition(LOGICALDEVICE_STATE_SUSPENDED);
}
else if (result == (AcCmdMaskOk | AcCmdMaskScheduled))
{
itsCepApplication.run(to_time_t(itsStartTime));
}
}
void OnlineControl::appReplaced(uint16 /*result*/)
{
}
string OnlineControl::appSupplyInfo(const string& keyList)
{
string ret(keyList);
return ret;
}
void OnlineControl::appSupplyInfoAnswer(const string& answer)
{
LOG_INFO_STR("Answer: " << answer);
}
}; // CEPCU
}; // LOFAR
//# OnlineControl.h: Controller for the OnlineControl
//#
//# Copyright (C) 2006
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#ifndef ONLINECONTROL_H
#define ONLINECONTROL_H
//# Includes
#include <boost/shared_ptr.hpp>
//# GCF Includes
#include <GCF/PAL/GCF_MyPropertySet.h>
#include <GCF/TM/GCF_Port.h>
#include <GCF/TM/GCF_ITCPort.h>
#include <GCF/TM/GCF_TimerPort.h>
#include <GCF/TM/GCF_Task.h>
#include <GCF/TM/GCF_Event.h>
//# local includes
#include <APL/APLCommon/PropertySetAnswerHandlerInterface.h>
#include <APL/APLCommon/PropertySetAnswer.h>
#include <APL/APLCommon/APLCommonExceptions.h>
#include <APL/APLCommon/Controller_Protocol.ph>
#include <APL/APLCommon/ParentControl.h>
#include <APL/APLCommon/CTState.h>
#include <CEPApplicationManager.h>
//# Common Includes
#include <Common/lofar_string.h>
#include <Common/lofar_vector.h>
#include <Common/lofar_datetime.h>
#include <Common/LofarLogger.h>
//# ACC Includes
#include <APS/ParameterSet.h>
// forward declaration
namespace LOFAR {
namespace CEPCU {
using GCF::TM::GCFTimerPort;
using GCF::TM::GCFITCPort;
using GCF::TM::GCFPort;
using GCF::TM::GCFEvent;
using GCF::TM::GCFPortInterface;
using GCF::TM::GCFTask;
using APLCommon::ParentControl;
class OnlineControl : public GCFTask,
public APLCommon::PropertySetAnswerHandlerInterface,
public CEPApplicationManagerInterface
{
public:
explicit OnlineControl(const string& cntlrName);
~OnlineControl();
// PropertySetAnswerHandlerInterface method
void handlePropertySetAnswer(GCFEvent& answer);
// During the initial state all connections with the other programs are made.
GCFEvent::TResult initial_state (GCFEvent& e,
GCFPortInterface& p);
// Normal control mode.
GCFEvent::TResult active_state (GCFEvent& e,
GCFPortInterface& p);
protected: // implemenation of abstract CEPApplicationManagerInterface methods
void appBooted(uint16 result);
void appDefined(uint16 result);
void appInitialized(uint16 result);
void appRunDone(uint16 result);
void appPaused(uint16 result);
void appQuitDone(uint16 result);
void appSnapshotDone(uint16 result);
void appRecovered(uint16 result);
void appReinitialized(uint16 result);
void appReplaced(uint16 result);
string appSupplyInfo(const string& keyList);
void appSupplyInfoAnswer(const string& answer);
private:
// avoid defaultconstruction and copying
OnlineControl();
OnlineControl(const OnlineControl&);
OnlineControl& operator=(const OnlineControl&);
uint16_t doClaim(const string& cntlrName);
uint16_t doPrepare(const string& cntlrName);
void doRelease(GCFEvent& event);
void _connectedHandler(GCFPortInterface& port);
void _disconnectedHandler(GCFPortInterface& port);
void setState(CTState::CTstateNr newState);
typedef boost::shared_ptr<GCF::PAL::GCFMyPropertySet> GCFMyPropertySetPtr;
APLCommon::PropertySetAnswer itsPropertySetAnswer;
GCFMyPropertySetPtr itsPropertySet;
bool itsPropertySetInitialized;
// pointer to parent control task
ParentControl* itsParentControl;
GCFITCPort* itsParentPort;
GCFTimerPort* itsTimerPort;
CEPApplicationManager itsCepApplication;
ACC::APS::ParameterSet itsCepAppParams;
ACC::APS::ParameterSet itsResultParams;
CTState::CTstateNr itsState;
// ParameterSet variables
string itsTreePrefix;
uint32 itsInstanceNr;
ptime itsStartTime;
ptime itsStopTime;
uint32 itsClaimPeriod;
uint32 itsPreparePeriod;
};
};//CEPCU
};//LOFAR
#endif
# add your custom loggers and appenders here
#
log4cplus.rootLogger=DEBUG, STDOUT, FILE
log4cplus.logger.TRC=TRACE2
log4cplus.additivity.TRC=FALSE
log4cplus.appender.STDOUT=log4cplus::ConsoleAppender
log4cplus.appender.STDOUT.layout=log4cplus::PatternLayout
log4cplus.appender.STDOUT.layout.ConversionPattern=%D{%d-%m-%y %H:%M:%S.%q} %-5p %c{9} - %m [%.25l]%n
log4cplus.appender.STDOUT.logToStdErr=true
log4cplus.appender.FILE=log4cplus::RollingFileAppender
log4cplus.appender.FILE.File=../log/OnlineControl.log
log4cplus.appender.FILE.MaxFileSize=5MB
log4cplus.appender.FILE.MaxBackupIndex=5
log4cplus.appender.FILE.layout=log4cplus::PatternLayout
log4cplus.appender.FILE.layout.ConversionPattern=%D{%d-%m-%y %H:%M:%S.%q} %-5p %c{3} - %m [%.25l]%n
//# OnlineControlDefines.h: preprocessor definitions of various constants
//#
//# Copyright (C) 2002-2003
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#ifndef ONLINECONTROLDEFINES_H
#define ONLINECONTROLDEFINES_H
namespace LOFAR {
namespace CEPCU {
#define ONC_TASKNAME "OnlineCtrl"
#define ONC_PROPSET_NAME "LOFAR_ObsSW_ObsCtrl%d_OnlineCtrl"
#define ONC_PROPSET_TYPE "OnlineCtrl"
#define ONC_OBSERVATIONSTATE "observationState"
// next lines should be defined somewhere in Common.
#define PVSSNAME_FSM_STATE "state"
#define PVSSNAME_FSM_ERROR "error"
}; // MCU
}; // LOFAR
#endif
//# OnlineControlMain.cc: Main entry for the OnlineControl controller.
//#
//# Copyright (C) 2006
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
//#
#include <lofar_config.h>
#include <Common/LofarLogger.h>
#include "OnlineControl.h"
using namespace LOFAR::GCF::TM;
using namespace LOFAR::CEPCU;
int main(int argc, char* argv[])
{
// args: cntlrname
if(argc < 2)
{
printf("Unexpected number of arguments: %d\n",argc);
printf("%s usage: %s <controller name>\n",argv[0],argv[0]);
exit(-1);
}
GCFTask::init(argc, argv);
ParentControl* pc = ParentControl::instance();
pc->start(); // make initial transition
OnlineControl ofc(argv[1]);
ofc.start(); // make initial transition
GCFTask::run();
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment