Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
PythonControl.cc 15.04 KiB
//#  PythonControl.cc: Implementation of the MAC Scheduler task
//#
//#  Copyright (C) 2010
//#  ASTRON (Netherlands Foundation for Research in Astronomy)
//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//#  This program is free software; you can redistribute it and/or modify
//#  it under the terms of the GNU General Public License as published by
//#  the Free Software Foundation; either version 2 of the License, or
//#  (at your option) any later version.
//#
//#  This program is distributed in the hope that it will be useful,
//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//#  GNU General Public License for more details.
//#
//#  You should have received a copy of the GNU General Public License
//#  along with this program; if not, write to the Free Software
//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
//#
//#  $Id$
#include <lofar_config.h>
#include <Common/LofarLogger.h>
#include <Common/LofarLocators.h>

#include <signal.h>
#include <Common/StreamUtil.h>
//#include <Common/lofar_vector.h>
//#include <Common/lofar_string.h>
#include <Common/ParameterSet.h>
#include <Common/Exceptions.h>
#include <ApplCommon/StationInfo.h>
#include <GCF/PVSS/GCF_PVTypes.h>
#include <Common/SystemUtil.h>
#include <MACIO/MACServiceInfo.h>
#include <GCF/TM/GCF_Protocols.h>
#include <APL/APLCommon/APL_Defines.h>
#include <APL/APLCommon/APLUtilities.h>
#include <APL/APLCommon/Controller_Protocol.ph>
#include <APL/APLCommon/APLUtilities.h>
#include <APL/APLCommon/CTState.h>
#include <GCF/RTDB/DP_Protocol.ph>

#include "PythonControl.h"
#include "PVSSDatapointDefs.h"

using namespace LOFAR::GCF::PVSS;
using namespace LOFAR::GCF::TM;
using namespace LOFAR::GCF::RTDB;
using namespace std;

namespace LOFAR {
	using namespace APLCommon;
	namespace CEPCU {
	
// static pointer to this object for signal handler
static PythonControl*	thisPythonControl = 0;

//
// PythonControl()
//
PythonControl::PythonControl(const string&	cntlrName) :
	GCFTask 			((State)&PythonControl::initial_state,cntlrName),
	itsPropertySet		(0),
	itsPropertySetInitialized (false),
	itsParentControl	(0),
	itsParentPort		(0),
	itsTimerPort		(0),
	itsListener			(0),
	itsPythonPort		(0),
	itsState			(CTState::NOSTATE),
	itsTreePrefix       ("")
//	itsStopTimerID      (0)
{
	LOG_TRACE_OBJ_STR (cntlrName << " construction");

	// First readin our observation related config file.
	LOG_DEBUG_STR("Reading parset file:" << LOFAR_SHARE_LOCATION << "/" << cntlrName);
	globalParameterSet()->adoptFile(string(LOFAR_SHARE_LOCATION)+"/"+cntlrName);

	// Readin some parameters from the ParameterSet.
	itsTreePrefix = globalParameterSet()->getString("prefix");

	// attach to parent control task
	itsParentControl = ParentControl::instance();

	itsListener = new GCFTCPPort (*this, "listener", GCFPortInterface::MSPP, CONTROLLER_PROTOCOL);
	ASSERTSTR(itsListener, "Cannot allocate TCP for server port");

	// need port for timers.
	itsTimerPort = new GCFTimerPort(*this, "TimerPort");

	// for debugging purposes
	registerProtocol (CONTROLLER_PROTOCOL, CONTROLLER_PROTOCOL_STRINGS);
	registerProtocol (DP_PROTOCOL, 		   DP_PROTOCOL_STRINGS);
}


//
// ~PythonControl()
//
PythonControl::~PythonControl()
{
	LOG_TRACE_OBJ_STR (getName() << " destruction");
}

//
// signalHandler(signum)
//
void PythonControl::signalHandler(int	signum)
{
	LOG_DEBUG (formatString("SIGNAL %d detected", signum));

	if (thisPythonControl) {
		thisPythonControl->finish();
	}
}

//
// finish()
//
void PythonControl::finish()
{
	TRAN(PythonControl::finishing_state);
}


//
// _databaseEventHandler(event)
//
void PythonControl::_databaseEventHandler(GCFEvent& event)
{
	LOG_DEBUG_STR ("_databaseEventHandler:" << eventName(event));

	switch(event.signal) {

	case DP_CHANGED: {
		// TODO: implement something usefull.
		break;
	}  

	default:
		break;
	}  
}

//
// _startPython(execName, ObsNr, parentHost, parentservice)
//
bool PythonControl::_startPython(const string&	pythonProg,
								 int			obsID,
							 	 const string&	pythonHost,
							 	 const string&	parentService)
{
	bool	onRemoteMachine(pythonHost != myHostname(false)  && pythonHost != myHostname(true));

	// Copy observation parset to another machine if neccesary
	string	parSetName(observationParset(obsID));
	if (onRemoteMachine) {
		if (APLUtilities::remoteCopy(parSetName, pythonHost, parSetName) != 0) {
			return (false);
		}
	}

	// extra check if we run local.
	ProgramLocator		PL;
	string	executable = PL.locate(pythonProg);
	if (onRemoteMachine) {
		executable = pythonProg;
	}
	else {
		if (executable.empty()) {
			LOG_ERROR_STR("Executable '" << pythonProg << "' not found.");
			return (false);
		}
	}

	// construct system command
	string	startCmd;
	string	startScript("startPython.sh");
	itsPythonName = formatString("PythonServer{%d}@%s", obsID, pythonHost.c_str());

	if (onRemoteMachine) {
		startCmd = formatString("ssh %s %s %s %s %s %s %s", 
								pythonHost.c_str(), startScript.c_str(),
								executable.c_str(), parSetName.c_str(),
								myHostname(true).c_str(), parentService.c_str(), itsPythonName.c_str());
	}
	else {
		startCmd = formatString("%s %s %s %s %s %s", 
								startScript.c_str(),
								executable.c_str(), parSetName.c_str(),
								myHostname(true).c_str(), parentService.c_str(), itsPythonName.c_str());
	}
	LOG_INFO_STR("About to start: " << startCmd);

	int32	result = system (startCmd.c_str());
	LOG_INFO_STR ("Result of start = " << result);

	if (result == -1) {
		return (false);
	}
	
	return (true);
}



//
// initial_state(event, port)
//
// Setup all connections.
//
GCFEvent::TResult PythonControl::initial_state(GCFEvent& event, 
													GCFPortInterface& port)
{
	LOG_DEBUG_STR ("initial:" << eventName(event) << "@" << port.getName());

	GCFEvent::TResult status = GCFEvent::HANDLED;
  
	switch (event.signal) {
	case F_ENTRY:
		itsListener->open();	// will result in F_CONN
   		break;

    case F_INIT: {
#ifdef USE_PVSS_DATABASE
		// Get access to my own propertyset.
//		uint32	obsID = globalParameterSet()->getUint32("Observation.ObsID");
		string	propSetName(createPropertySetName(PSN_PYTHON_CONTROL, getName()));
		LOG_DEBUG_STR ("Activating PropertySet: "<< propSetName);
		itsPropertySet = new RTDBPropertySet(propSetName,
											 PST_PYTHON_CONTROL,
											 PSAT_RW,
											 this);
		}
		break;

	case DP_CREATED: {
		// NOTE: thsi function may be called DURING the construction of the PropertySet.
		// Always exit this event in a way that GCF can end the construction.
		DPCreatedEvent  dpEvent(event);
		LOG_DEBUG_STR("Result of creating " << dpEvent.DPname << " = " << dpEvent.result);
		itsTimerPort->cancelAllTimers();
		itsTimerPort->setTimer(0.0);
		}
		break;

	case F_TIMER: {	// must be timer that PropSet is online.
		// update PVSS.
		LOG_TRACE_FLOW ("Updateing state to PVSS");
		itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("initial"));
		itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString(""));
#endif	  
		// Start ParentControl task
		LOG_DEBUG ("Enabling ParentControl task");
		itsParentPort = itsParentControl->registerTask(this);	 // results in CONTROL_CONNECT
		}
		break;

	case CONTROL_CONNECT: {
		CONTROLConnectEvent		msg(event);
		itsMyName  = msg.cntlrName;

		// request from parent task to start up the child side.
		ParameterSet*   thePS  = globalParameterSet();      // shortcut to global PS.
		string  myPrefix(thePS->locateModule("PythonControl")+"PythonControl.");
		string	pythonProg(thePS->getString(myPrefix+"pythonProgram", "@pythonProgram@"));
		string	pythonHost(thePS->getString(myPrefix+"pythonHost", "@pythonHost@"));
		// START PYTHON
		bool	startOK = _startPython(pythonProg, getObservationNr(getName()), realHostname(pythonHost), itsListener->makeServiceName());
		if (!startOK) {
			LOG_ERROR("Failed to start the Python environment.");
			CONTROLConnectedEvent	answer;
			answer.cntlrName = msg.cntlrName;
			answer.result    = CONTROL_LOST_CONN_ERR;
			port.send(answer);
			TRAN(PythonControl::finishing_state);
		}
		else {
			LOG_DEBUG ("Started Python environment, going to waitForConnection state");
			TRAN(PythonControl::waitForConnection_state);
		}
	}
	break;

	case F_CONNECTED:
		break;

	case F_DISCONNECTED:
		port.close();
		break;
	
	default:
		LOG_DEBUG_STR ("initial, default");
		status = GCFEvent::NOT_HANDLED;
		break;
	}    
	return (status);
}


//
// waitForConnection_state(event, port)
//
// The only target in this state is to wat for the CONNECT msg of the start Python env.
//
GCFEvent::TResult PythonControl::waitForConnection_state(GCFEvent& event, GCFPortInterface& port)
{
	LOG_DEBUG_STR("PythonControl::waitForConnection_state: " << eventName(event) << "@" << port.getName());
  
	GCFEvent::TResult status = GCFEvent::HANDLED;
	switch (event.signal) {
	case F_ACCEPT_REQ: {
		itsPythonPort = new GCFTCPPort();
		itsPythonPort->init(*this, "client", GCFPortInterface::SPP, CONTROLLER_PROTOCOL);
		if (!itsListener->accept(*itsPythonPort)) {
			delete itsPythonPort;
			itsPythonPort = 0;
			LOG_ERROR("Connection with Python environment FAILED");
		}
		else {
			LOG_INFO("Connection with unknown client made, waiting for identification");
		}
	}
	break;

	case CONTROL_CONNECT: {
		// acknowledgement from python it is up and running.
		CONTROLConnectEvent		ccEvent(event);
		itsPythonName = ccEvent.cntlrName;
		LOG_INFO_STR("Python identified itself as " << itsPythonName << ". Going to operational state.");
		// inform python
		LOG_INFO_STR("Sending CONNECTED(" << itsPythonName << ") to python");
		CONTROLConnectedEvent	answer;
		answer.cntlrName = itsPythonName;
		answer.result = CT_RESULT_NO_ERROR;
		itsPythonPort->send(answer);
		// inform Parent
		LOG_INFO_STR("Sending CONNECTED(" << itsMyName << ") to Parent");
		answer.cntlrName = itsMyName;
		itsParentPort->send(answer);
		TRAN(PythonControl::operational_state);
	}
	break;
  
	case F_DISCONNECTED:		// Must be from itsListener port
		if (&port == itsPythonPort) {
			LOG_INFO("Unknown client broke connection, waiting for new client");
			itsPythonPort->close();
			delete itsPythonPort;
			itsPythonPort = 0;
		}
		else {
			LOG_ERROR("Disconnect of unexpected port");
		}
		break;

	default:
		LOG_DEBUG("PythonControl::waitForConnection_state, default");
		break;
	}    

	return (status);
}
//
// operational_state(event, port)
//
// Normal operation state. 
//
GCFEvent::TResult PythonControl::operational_state(GCFEvent& event, GCFPortInterface& port)
{
	LOG_DEBUG_STR ("operational:" << eventName(event) << "@" << port.getName());

	GCFEvent::TResult status = GCFEvent::HANDLED;

	switch (event.signal) {
	case F_ENTRY: {
#ifdef USE_PVSS_DATABASE
		// update PVSS
		itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("active"));
		itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString(""));
#endif
	}
	break;

	case F_DISCONNECTED: {
		port.close();
		if (&port == itsPythonPort) {
			LOG_FATAL_STR("Lost connection with Python, going to wait for a new connection");
			TRAN(PythonControl::waitForConnection_state);
		}
	}
	break;

	case DP_CHANGED:
		_databaseEventHandler(event);
		break;

	case F_TIMER:  {
//		GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event);
//		if (timerEvent.id == itsStopTimerID) {
//			LOG_DEBUG("StopTimer expired, starting QUIT sequence");
//			startNewState(CTState::QUIT, ""/*options*/);
//		}
//		else {
//			LOG_WARN_STR("Received unknown timer event");
//		}
	}
	break;

	// -------------------- EVENTS RECEIVED FROM PARENT CONTROL --------------------
	case CONTROL_CONNECT: {
		CONTROLConnectEvent		msg(event);
		LOG_DEBUG_STR("Received CONNECT(" << msg.cntlrName << "), IGNORING!");
		break;
	}

	case CONTROL_SCHEDULED: {
		CONTROLScheduledEvent		msg(event);
		LOG_DEBUG_STR("Received SCHEDULED(" << msg.cntlrName << "), IGNORING!");
		// TODO: do something usefull with this information!
		break;
	}

	case CONTROL_CLAIM: {
		CONTROLClaimEvent		msg(event);
		LOG_DEBUG_STR("Received CLAIM(" << msg.cntlrName << ")");
		itsPythonPort->send(msg);
		break;
	}

	case CONTROL_PREPARE: {
		CONTROLPrepareEvent		msg(event);
		LOG_DEBUG_STR("Received PREPARE(" << msg.cntlrName << ")");
		itsPythonPort->send(msg);
		break;
	}

	case CONTROL_RESUME: {
		CONTROLResumeEvent		msg(event);
		LOG_DEBUG_STR("Received RESUME(" << msg.cntlrName << ")");
		itsPythonPort->send(msg);
		break;
	}

	case CONTROL_SUSPEND: {
		CONTROLSuspendEvent		msg(event);
		LOG_DEBUG_STR("Received SUSPEND(" << msg.cntlrName << ")");
		itsPythonPort->send(msg);
		break;
	}

	case CONTROL_RELEASE: {
		CONTROLReleaseEvent		msg(event);
		LOG_DEBUG_STR("Received RELEASE(" << msg.cntlrName << ")");
		itsPythonPort->send(msg);
		break;
	}

	case CONTROL_QUIT: {
		CONTROLQuitEvent		msg(event);
		LOG_DEBUG_STR("Received QUIT(" << msg.cntlrName << ")");
		itsPythonPort->send(msg);
		break;
	}

	// -------------------- RECEIVED FROM PYTHON SIDE --------------------
	case CONTROL_CLAIMED: {
		CONTROLClaimedEvent		msg(event);
		LOG_DEBUG_STR("Received CLAIMED(" << msg.cntlrName << ")");
		msg.cntlrName = itsMyName;
		msg.result = CT_RESULT_NO_ERROR;
		itsParentPort->send(msg);
		break;
	}

	case CONTROL_PREPARED: {
		CONTROLPreparedEvent		msg(event);
		LOG_DEBUG_STR("Received PREPARED(" << msg.cntlrName << ")");
		msg.cntlrName = itsMyName;
		msg.result = CT_RESULT_NO_ERROR;
		itsParentPort->send(msg);
		break;
	}

	case CONTROL_RESUMED: {
		CONTROLResumedEvent		msg(event);
		LOG_DEBUG_STR("Received RESUMED(" << msg.cntlrName << ")");
		msg.cntlrName = itsMyName;
		msg.result = CT_RESULT_NO_ERROR;
		itsParentPort->send(msg);
		break;
	}

	case CONTROL_SUSPENDED: {
		CONTROLSuspendedEvent		msg(event);
		LOG_DEBUG_STR("Received SUSPENDED(" << msg.cntlrName << ")");
		msg.cntlrName = itsMyName;
		msg.result = CT_RESULT_NO_ERROR;
		itsParentPort->send(msg);
		break;
	}

	case CONTROL_RELEASED: {
		CONTROLReleasedEvent		msg(event);
		LOG_DEBUG_STR("Received RELEASED(" << msg.cntlrName << ")");
		msg.cntlrName = itsMyName;
		msg.result = CT_RESULT_NO_ERROR;
		itsParentPort->send(msg);
		break;
	}

	case CONTROL_QUITED: {
		CONTROLQuitedEvent		msg(event);
		LOG_DEBUG_STR("Received QUITED(" << msg.cntlrName << ")");
		msg.cntlrName = itsMyName;
		msg.result = CT_RESULT_NO_ERROR;
		itsParentPort->send(msg);
		LOG_INFO("Python environment has quited, quiting too.");
		TRAN(PythonControl::finishing_state);
		break;
	}

	default:
		LOG_DEBUG("operational_state, default");
		status = GCFEvent::NOT_HANDLED;
		break;
	}

	return (status);
}

//
// finishing_state(event, port)
//
//
GCFEvent::TResult PythonControl::finishing_state(GCFEvent& event, GCFPortInterface& port)
{
	LOG_DEBUG_STR ("finishing:" << eventName(event) << "@" << port.getName());

	GCFEvent::TResult status = GCFEvent::HANDLED;

	switch (event.signal) {
	case F_ENTRY: {
		// update PVSS

#ifdef USE_PVSS_DATABASE
		itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("finished"));
		itsPropertySet->setValue(PN_FSM_ERROR, GCFPVString(""));
#endif
		itsTimerPort->setTimer(3.0);
		break;
	}

	case F_TIMER:
		GCFScheduler::instance()->stop();
		break;

	case F_DISCONNECTED:
		port.close();
		break;
	
	default:
		LOG_DEBUG("finishing_state, default");
		status = GCFEvent::NOT_HANDLED;
		break;
	}

	return (status);
}

}; // CEPCU
}; // LOFAR