diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/ACCmain_InputSection.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/ACCmain_InputSection.h new file mode 100644 index 0000000000000000000000000000000000000000..90753d98ccd8dbf5f162d92981fe19aebf94b117 --- /dev/null +++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/ACCmain_InputSection.h @@ -0,0 +1,42 @@ +//# ACCmain_InputSection.h: main loop that can be used by any ACC enabled program +//# +//# Copyright (C) 2006 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id$ + +#ifndef LOFAR_CS1_INPUTSECTION_ACCMAIN_INPUTSECTION_H +#define LOFAR_CS1_INPUTSECTION_ACCMAIN_INPUTSECTION_H + +// \file +// main loop that can be used by any ACC enabled program + +//# Never #include <config.h> or #include <lofar_config.h> in a header file! + +//# Includes +#include <PLC/ProcessControl.h> + +namespace LOFAR +{ + namespace CS1 + { + int ACCmain_InputSection (int argc, char* argv[], ACC::PLC::ProcessControl* theProcess); + } // namespace CS1 +} // namespace LOFAR + +#endif diff --git a/Appl/CEP/CS1/CS1_InputSection/src/ACCmainInputSection.cc b/Appl/CEP/CS1/CS1_InputSection/src/ACCmainInputSection.cc deleted file mode 100644 index be50423164b452102f6f4f771081f3f22f6672d1..0000000000000000000000000000000000000000 --- a/Appl/CEP/CS1/CS1_InputSection/src/ACCmainInputSection.cc +++ /dev/null @@ -1,240 +0,0 @@ -//# ACCmain.cc: main loop that can be used by any ACC enabled program -//# -//# Copyright (C) 2006 -//# ASTRON (Netherlands Foundation for Research in Astronomy) -//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl -//# -//# This program is free software; you can redistribute it and/or modify -//# it under the terms of the GNU General Public License as published by -//# the Free Software Foundation; either version 2 of the License, or -//# (at your option) any later version. -//# -//# This program is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. -//# -//# You should have received a copy of the GNU General Public License -//# along with this program; if not, write to the Free Software -//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -//# -//# $Id$ - -//# Always #include <lofar_config.h> first! -#include <lofar_config.h> - -//# Includes -#include <libgen.h> -#include <PLC/ACCmain.h> -#include <Common/LofarLogger.h> -#include <APS/ParameterSet.h> -#include <APS/Exceptions.h> -#include <PLC/ProcControlServer.h> -#ifdef HAVE_MPI -#include <Transport/TH_MPI.h> -#endif - -namespace LOFAR { - namespace CS1 { - - using ACC::APS::ParameterSet; - using ACC::APS::APSException; - - int ACCmainInputSection (int argc, char* orig_argv[], ACC::PLC::ProcessControl* theProcess) { - - char** argv = orig_argv; - -#ifdef HAVE_MPI - TH_MPI::initMPI(argc, orig_argv); - - int myRank = TH_MPI::getCurrentRank(); - - // The MPI standard does not demand that the commandline - // arguments are distributed, so we do it ourselves. - - // Broadcast number of arguments - MPI_Bcast(&argc, 1, MPI_INT, 0, MPI_COMM_WORLD); - // Some MPI implementations block on the Bcast. Synchronize - // the nodes to avoid deadlock. - MPI_Barrier(MPI_COMM_WORLD); - - if (myRank != 0) { - argv = new char*[argc]; - } - - for (int arg = 0; arg < argc; arg++) { - int arglen = 0; - if (myRank == 0) arglen = strlen(argv[arg]) + 1; - - // Broadcast the length of this argument - MPI_Bcast(&arglen, 1, MPI_INT, 0, MPI_COMM_WORLD); - - if (myRank != 0) { - argv[arg] = new char[arglen]; - } - // Broadcast the argument; - MPI_Bcast(argv[arg], arglen, MPI_BYTE, 0, MPI_COMM_WORLD); - } -#endif - - - string programName = basename(argv[0]); - - try { - - // Check invocation syntax - if ((argc!=3) || (strncmp("ACC", argv[1], 3) != 0)) { - - // we were not called by ACC - LOG_TRACE_FLOW(programName + " not started by ACC"); - - // See if there is a parameterset available - vector<string> possibleNames; - - // try to find a parameterset - // start with program name + .parset - // also try parts of the program name (splitted with "_") - string pName = programName; - string::size_type pos; - if (argc > 1) { - possibleNames.push_back(argv[1]); - } - - do { - possibleNames.push_back(pName + ".parset"); - //possibleNames.push_back(pName + ".cfg"); - //possibleNames.push_back(pName + ".ps"); - - pos = pName.rfind('_'); - pName = pName.substr(0, pos); - } while (pos != string::npos); - - ParameterSet* ps = 0; - vector<string>::iterator i; - for (i = possibleNames.begin(); i != possibleNames.end(); i++) { - try { - LOG_TRACE_FLOW_STR("Trying to use "<<*i<<" as parameterSet"); - ps = new ParameterSet(*i); - LOG_INFO_STR("Using "<<*i<<" as parameter set."); - break; - } catch (...) { - LOG_TRACE_FLOW_STR(*i << " not found"); - ps = 0; - } - } - - if (ps == 0) { - LOG_INFO_STR("Could not find a parameter set."); - } else { - try { - string prefix = ps->getString("parsetPrefix"); - ParameterSet ParamSet = ps->makeSubset(prefix); - ACC::APS::globalParameterSet()->adoptCollection(ParamSet); - } catch (APSException&) { - ACC::APS::globalParameterSet()->adoptCollection(*ps); - } - delete ps; - } - - LOG_TRACE_FLOW(programName + " starting define"); - if (!theProcess->define()) return 1; - LOG_TRACE_FLOW(programName + " initting"); - if (!theProcess->init()) return 1; - - LOG_TRACE_FLOW(programName + " running"); - int noRuns = atoi(argv[argc - 1]); - if (noRuns == 0) noRuns = 1; - for (int run = 0; run < noRuns; run++) { - if (!theProcess->run()) return 1; - } - - LOG_TRACE_FLOW(programName + " quitting"); - if (!theProcess->quit()) return 1; - LOG_TRACE_FLOW(programName + " deleting process"); - - } else { - - LOG_TRACE_FLOW("Main program started by ACC"); - - // Now all ACC processes expect "ACC" as first argument - // So the parameter file is the second argument - - // Read in parameterfile and get my name - - ParameterSet ParamSet(argv[2]); - string procID = ParamSet.getString("_processName"); - string prefix = ParamSet.getString("_parsetPrefix"); - - ACC::APS::globalParameterSet()->adoptCollection(ParamSet); - - ACC::PLC::ProcControlServer pcServer(ParamSet.getString(prefix + "_ACnode"), - ParamSet.getUint16(prefix + "_ACport"), - theProcess); - - LOG_TRACE_FLOW("Registering at ApplController"); - sleep(2); - pcServer.registerAtAC(procID); - LOG_TRACE_FLOW("Registered at ApplController"); - - // Main processing loop - bool isRunning = false; - while (1) { - LOG_TRACE_STAT("Polling ApplController for message"); - if (pcServer.pollForMessage()) { - LOG_TRACE_COND("Message received from ApplController"); - - // get pointer to received data - ACC::PLC::DH_ProcControl* newMsg = pcServer.getDataHolder(); - - if (newMsg->getCommand() == ACC::PLC::PCCmdRun) { - isRunning = true; - pcServer.sendResult(newMsg->getCommand(), ACC::PLC::PcCmdMaskOk); - } - else - { - isRunning = false; - } - - if (!isRunning) { - if (!pcServer.handleMessage(newMsg)) { - LOG_ERROR("ProcControlServer::handleMessage() failed"); - } - } - - if (newMsg->getCommand() == ACC::PLC::PCCmdQuit) { - break; - } - - } else if (isRunning == true) { - // Call run again. It is possible that a process doesn't - // support this. It should check for that itself. - // If run() fails, toggle the isRunning flag to false. - if (!theProcess->run()) { - isRunning = false; - } - } - } - LOG_INFO_STR("Shutting down: ApplicationController"); - pcServer.unregisterAtAC(""); // send to AC before quiting - } - } catch (Exception& ex) { - LOG_FATAL_STR("Caught exception: " << ex << endl); - LOG_FATAL_STR(argv[0] << " terminated by exception!"); - return 1; - } catch (std::exception& ex) { - LOG_FATAL_STR("Caught std::exception: " << ex.what()); - return 1; - } catch (...) { - LOG_FATAL_STR("Caught unknown exception, exitting"); - return 1; - } -#ifdef HAVE_MPI - TH_MPI::finalize(); -#endif - - LOG_INFO_STR(programName << " terminated normally"); - - return 0; - } - } // namespace CS1 -} // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_InputSection/src/ACCmain_InputSection.cc b/Appl/CEP/CS1/CS1_InputSection/src/ACCmain_InputSection.cc new file mode 100644 index 0000000000000000000000000000000000000000..33d9006d3879825fa60a0ee9746810cc54ff7ebd --- /dev/null +++ b/Appl/CEP/CS1/CS1_InputSection/src/ACCmain_InputSection.cc @@ -0,0 +1,235 @@ +//# ACCmain.cc: main loop that can be used by any ACC enabled program +//# +//# Copyright (C) 2006 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id$ + +//# Always #include <lofar_config.h> first! +#include <lofar_config.h> + +//# Includes +#include <libgen.h> +#include <Common/LofarLogger.h> +#include <Common/LofarLocators.h> +#include <APS/ParameterSet.h> +#include <APS/Exceptions.h> +#include <PLC/ProcControlServer.h> +#include <CS1_InputSection/ACCmain_InputSection.h> +#ifdef HAVE_MPI +#include <Transport/TH_MPI.h> +#endif + +namespace LOFAR { +namespace CS1 { + +using ACC::APS::ParameterSet; + +// +// ACCmain(argc, argv, procCtrl*) +// +int ACCmain_InputSection (int argc, char* orig_argv[], ACC::PLC::ProcessControl* theProcess) { + char** argv = orig_argv; + +#ifdef HAVE_MPI + TH_MPI::initMPI(argc, orig_argv); + + int myRank = TH_MPI::getCurrentRank(); + + // The MPI standard does not demand that the commandline + // arguments are distributed, so we do it ourselves. + + // Broadcast number of arguments + MPI_Bcast(&argc, 1, MPI_INT, 0, MPI_COMM_WORLD); + // Some MPI implementations block on the Bcast. Synchronize + // the nodes to avoid deadlock. + MPI_Barrier(MPI_COMM_WORLD); + + if (myRank != 0) { + argv = new char*[argc + 1]; + argv[argc] = 0; + } else { + char** argv = orig_argv; + } + + for (int arg = 0; arg < argc; arg++) { + int arglen = 0; + if (myRank == 0) { + arglen = strlen(argv[arg]) + 1; + } + + // Broadcast the length of this argument + MPI_Bcast(&arglen, 1, MPI_INT, 0, MPI_COMM_WORLD); + + if (myRank != 0) { + argv[arg] = new char[arglen]; + } + // Broadcast the argument; + MPI_Bcast(argv[arg], arglen, MPI_BYTE, 0, MPI_COMM_WORLD); + } +#endif + + // + string programName(basename(argv[0])); + bool ACCmode(true); + + try { + // Check invocation syntax: [ACC] parsetfile UniqProcesName + // When we are called by ACC the first argument is ACC. + // otherwise we do all states right after each other. + if ((argc < 2) || (strcmp("ACC", argv[1]) != 0)) { + // we were not called by ACC + LOG_DEBUG(programName + " not started by ACC"); + ACCmode = false; + } + else { + LOG_DEBUG(programName + " started by ACC"); + } + + // Read in the parameterset. + ConfigLocator CL; + string ParsetFile = CL.locate(argv[1 + (ACCmode ? 1 : 0)]); + ASSERTSTR(!ParsetFile.empty(), "Could not find parameterset " << argv[1]); + LOG_INFO_STR("Using parameterset " << ParsetFile); + ACC::APS::globalParameterSet()->adoptFile(ParsetFile); + + // When not under control of ACC execute all modes immediately + if (!ACCmode) { + LOG_DEBUG(programName + " starting define"); + if (!theProcess->define()) { + return (1); + } + + LOG_DEBUG(programName + " initializing"); + if (!theProcess->init()) { + return (1); + } + + LOG_DEBUG(programName + " running"); + int noRuns = atoi(argv[argc - 1]); + if (noRuns == 0) { + noRuns = 1; + } + for (int run = 0; run < noRuns; run++) { + if (!theProcess->run()) { + return (1); + } + } + + LOG_DEBUG(programName + " releasing"); + if (!theProcess->release()) { + return (1); + } + + LOG_DEBUG(programName + " quitting"); + if (!theProcess->quit()) { + return (1); + } + + LOG_DEBUG(programName + " deleting process"); + + } + else { + // we are under control of ACC + // Note args are: ACC parsetfile UniqProcesName + + string procID(argv[3]); + string prefix = ACC::APS::globalParameterSet()->getString("_parsetPrefix"); + + // connect to Application Controller + ACC::PLC::ProcControlServer pcServer(ACC::APS::globalParameterSet()->getString(prefix+"_ACnode"), + ACC::APS::globalParameterSet()->getUint16(prefix+"_ACport"), + theProcess); + + + // Tell AC who we are. + LOG_DEBUG_STR("Registering at ApplController as " << procID); + sleep(1); + pcServer.registerAtAC(procID); + + // Main processing loop + bool quiting(false); + while (!quiting) { + LOG_TRACE_STAT("Polling ApplController for message"); + if (pcServer.pollForMessage()) { + LOG_TRACE_COND("Message received from ApplController"); + + // get pointer to received data + ACC::PLC::DH_ProcControl* newMsg = pcServer.getDataHolder(); + + if (newMsg->getCommand() == ACC::PLC::PCCmdInit) { + pcServer.sendResult(newMsg->getCommand(), ACC::PLC::PcCmdMaskOk); + newMsg->setCommand(ACC::PLC::PCCmd(newMsg->getCommand() &~ ACC::PLC::PCCmdResult)); + } + + if (newMsg->getCommand() == ACC::PLC::PCCmdPause) { + pcServer.sendResult(newMsg->getCommand(), ACC::PLC::PcCmdMaskOk); + newMsg->setCommand(ACC::PLC::PCCmd(newMsg->getCommand() &~ ACC::PLC::PCCmdResult)); + } + + if (newMsg->getCommand() == ACC::PLC::PCCmdRelease) { + pcServer.sendResult(newMsg->getCommand(), ACC::PLC::PcCmdMaskOk); + newMsg->setCommand(ACC::PLC::PCCmd(newMsg->getCommand() &~ ACC::PLC::PCCmdResult)); + } + + if (newMsg->getCommand() == ACC::PLC::PCCmdQuit) { + quiting = true; + } + + if (!pcServer.handleMessage(newMsg)) { + LOG_ERROR("ProcControlServer::handleMessage() failed"); + } + + } else { + // no new command received. If we are in the runstate + // call the run-routine again. + if (theProcess->inRunState()) { + ACC::PLC::DH_ProcControl tmpMsg(ACC::PLC::PCCmdRun); + pcServer.handleMessage(&tmpMsg); + } + } + } + + LOG_INFO_STR("Shutting down: ApplicationController"); + pcServer.unregisterAtAC(""); // send to AC before quiting + } + } + catch (Exception& ex) { + LOG_FATAL_STR("Caught exception: " << ex << endl); + LOG_FATAL_STR(programName << " terminated by exception!"); + return (1); + } + catch (std::exception& ex) { + LOG_FATAL_STR("Caught std::exception: " << ex.what()); + return (1); + } + catch (...) { + LOG_FATAL_STR("Caught unknown exception, exitting"); + return (1); + } + +#ifdef HAVE_MPI + TH_MPI::finalize(); +#endif + + LOG_INFO_STR(programName << " terminated normally"); + return (0); +} + + } // namespace CS1 +} // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_InputSection/src/Makefile.am b/Appl/CEP/CS1/CS1_InputSection/src/Makefile.am index a93014c877ced4b37f2af2189d2a9f7c02395d90..7c2e826e81e77c637cb9914def8b01e33bd87b00 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/Makefile.am +++ b/Appl/CEP/CS1/CS1_InputSection/src/Makefile.am @@ -7,7 +7,7 @@ lib_LTLIBRARIES = libcs1_inputsection.la libcs1_inputsection_la_SOURCES = Connector.cc \ InputThread.cc \ BeamletBuffer.cc \ - ACCmainInputSection.cc \ + ACCmain_InputSection.cc \ AH_InputSection.cc \ WH_InputSection.cc diff --git a/Appl/CEP/CS1/CS1_InputSection/src/main.cc b/Appl/CEP/CS1/CS1_InputSection/src/main.cc index fde008440c4eb15ecc1b0b9e8a58ddc4743657cb..6eda03efcdc14e62f5626171b0fac71a5033edf8 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/main.cc +++ b/Appl/CEP/CS1/CS1_InputSection/src/main.cc @@ -29,7 +29,7 @@ #include <CS1_InputSection/AH_InputSection.h> #include <tinyCEP/Profiler.h> #include <tinyCEP/ApplicationHolderController.h> -#include <CS1_InputSection/ACCmainInputSection.h> +#include <CS1_InputSection/ACCmain_InputSection.h> #include <exception> @@ -51,7 +51,7 @@ int main(int argc, char **argv) AH_InputSection myAH; ApplicationHolderController myAHController(myAH, 1); //listen to ACC every 1 runs - return ACCmainInputSection(argc, argv, &myAHController); + return ACCmain_InputSection(argc, argv, &myAHController); } #else diff --git a/Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc b/Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc index 7c1663fd60c6f4efff0551b8c3cbcdd848de2a29..56a82eb1372a32430a99da9ee5bdaf2f573fd3d4 100644 --- a/Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc +++ b/Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc @@ -109,7 +109,7 @@ string CS1_Parset::getMSname(unsigned firstSB, unsigned lastSB) const replace_all(name, "${MINUTES}", splitStartTime[4]); replace_all(name, "${SECONDS}", splitStartTime[5]); - replace_all(name, "${MSNUMBER}", str(format("%05u") % getUint32("Observation.ObsId"))); + replace_all(name, "${MSNUMBER}", str(format("%05u") % getUint32("Observation.ObsID"))); replace_all(name, "${SUBBAND}", str(firstSB != lastSB ? format("%u-%u") % firstSB % lastSB : format("%d") % firstSB)); return name; diff --git a/Appl/CEP/CS1/CS1_Run/src/CS1_Run.py b/Appl/CEP/CS1/CS1_Run/src/CS1_Run.py index c407e3baaa3d0fed67834afb3a4e2788f8fa7249..ff6d8720921e1b463acfcdbae86f4df91c509651 100755 --- a/Appl/CEP/CS1/CS1_Run/src/CS1_Run.py +++ b/Appl/CEP/CS1/CS1_Run/src/CS1_Run.py @@ -161,7 +161,7 @@ if __name__ == '__main__': inf = open(runningNumberFile, 'r') measurementnumber = int(inf.readline()) inf.close() - parset['Observation.ObsId'] = measurementnumber + parset['Observation.ObsID'] = measurementnumber outf = open(runningNumberFile, 'w') outf.write(str(measurementnumber + 1) + '\n') outf.close() @@ -176,7 +176,7 @@ if __name__ == '__main__': MS = MS.replace('${HOURS}', dateStr[3]) MS = MS.replace('${MINUTES}', dateStr[4]) MS = MS.replace('${SECONDS}', dateStr[5]) - MS = MS.replace('${MSNUMBER}', '%05d' % parset['Observation.ObsId']) + MS = MS.replace('${MSNUMBER}', '%05d' % parset['Observation.ObsID']) MS = MS.replace('${SUBBAND}', '*') dbfile.write(MS + '\t' + ' '.join(dateStr[0:3]) + '\t' + nodesStr + '\n') diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/ACCmainInputSection.h b/Appl/CEP/CS1/CS1_Storage/include/CS1_Storage/ACCmain_Storage.h similarity index 81% rename from Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/ACCmainInputSection.h rename to Appl/CEP/CS1/CS1_Storage/include/CS1_Storage/ACCmain_Storage.h index bb3d24a2a680ccad54259b7056e0e82c8beec1bb..b603d658116357bd9a712cf38953101567fcea3f 100644 --- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/ACCmainInputSection.h +++ b/Appl/CEP/CS1/CS1_Storage/include/CS1_Storage/ACCmain_Storage.h @@ -1,4 +1,4 @@ -//# ACCmainInputSection.h: main loop that can be used by any ACC enabled program +//# ACCmain_Storage.h: main loop that can be used by any ACC enabled program //# //# Copyright (C) 2006 //# ASTRON (Netherlands Foundation for Research in Astronomy) @@ -20,8 +20,8 @@ //# //# $Id$ -#ifndef LOFAR_CS1_INPUTSECTION_ACCMAININPUTSECTION_H -#define LOFAR_CS1_INPUTSECTION_ACCMAININPUTSECTION_H +#ifndef LOFAR_CS1_STORAGE_ACCMAIN_STORAGE_H +#define LOFAR_CS1_STORAGE_ACCMAIN_STORAGE_H // \file // main loop that can be used by any ACC enabled program @@ -35,7 +35,7 @@ namespace LOFAR { namespace CS1 { - int ACCmainInputSection (int argc, char* argv[], ProcessControl* theProcess); + int ACCmain_Storage (int argc, char* argv[], ACC::PLC::ProcessControl* theProcess); } // namespace CS1 } // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_Storage/src/ACCmain_Storage.cc b/Appl/CEP/CS1/CS1_Storage/src/ACCmain_Storage.cc new file mode 100644 index 0000000000000000000000000000000000000000..84da026ede96a111e3552e93409925c9ea29a365 --- /dev/null +++ b/Appl/CEP/CS1/CS1_Storage/src/ACCmain_Storage.cc @@ -0,0 +1,265 @@ +//# ACCmain_Storage.cc: main loop that can be used by any ACC enabled program +//# +//# Copyright (C) 2006 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id$ + +//# Always #include <lofar_config.h> first! +#include <lofar_config.h> + +//# Includes +#include <libgen.h> +#include <Common/LofarLogger.h> +#include <Common/LofarLocators.h> +#include <APS/ParameterSet.h> +#include <APS/Exceptions.h> +#include <PLC/ProcControlServer.h> +#include <sys/time.h> +#include <CS1_Storage/ACCmain_Storage.h> +#include <CS1_Interface/CS1_Parset.h> +#include <AMCBase/Epoch.h> + +#ifdef HAVE_MPI +#include <Transport/TH_MPI.h> +#endif + +namespace LOFAR { +namespace CS1 { + +using ACC::APS::ParameterSet; + +// +// ACCmain(argc, argv, procCtrl*) +// +int ACCmain_Storage (int argc, char* orig_argv[], ACC::PLC::ProcessControl* theProcess) { + char** argv = orig_argv; + +#ifdef HAVE_MPI + TH_MPI::initMPI(argc, orig_argv); + + int myRank = TH_MPI::getCurrentRank(); + + // The MPI standard does not demand that the commandline + // arguments are distributed, so we do it ourselves. + + // Broadcast number of arguments + MPI_Bcast(&argc, 1, MPI_INT, 0, MPI_COMM_WORLD); + // Some MPI implementations block on the Bcast. Synchronize + // the nodes to avoid deadlock. + MPI_Barrier(MPI_COMM_WORLD); + + if (myRank != 0) { + argv = new char*[argc + 1]; + argv[argc] = 0; + } else { + char** argv = orig_argv; + } + + for (int arg = 0; arg < argc; arg++) { + int arglen = 0; + if (myRank == 0) { + arglen = strlen(argv[arg]) + 1; + } + + // Broadcast the length of this argument + MPI_Bcast(&arglen, 1, MPI_INT, 0, MPI_COMM_WORLD); + + if (myRank != 0) { + argv[arg] = new char[arglen]; + } + // Broadcast the argument; + MPI_Bcast(argv[arg], arglen, MPI_BYTE, 0, MPI_COMM_WORLD); + } +#endif + + // + string programName(basename(argv[0])); + bool ACCmode(true); + + try { + // Check invocation syntax: [ACC] parsetfile UniqProcesName + // When we are called by ACC the first argument is ACC. + // otherwise we do all states right after each other. + if ((argc < 2) || (strcmp("ACC", argv[1]) != 0)) { + // we were not called by ACC + LOG_DEBUG(programName + " not started by ACC"); + ACCmode = false; + } + else { + LOG_DEBUG(programName + " started by ACC"); + } + + // Read in the parameterset. + ConfigLocator CL; + string ParsetFile = CL.locate(argv[1 + (ACCmode ? 1 : 0)]); + ASSERTSTR(!ParsetFile.empty(), "Could not find parameterset " << argv[1]); + LOG_INFO_STR("Using parameterset " << ParsetFile); + ACC::APS::globalParameterSet()->adoptFile(ParsetFile); + + // When not under control of ACC execute all modes immediately + if (!ACCmode) { + LOG_DEBUG(programName + " starting define"); + if (!theProcess->define()) { + return (1); + } + + LOG_DEBUG(programName + " initializing"); + if (!theProcess->init()) { + return (1); + } + + LOG_DEBUG(programName + " running"); + int noRuns = atoi(argv[argc - 1]); + if (noRuns == 0) { + noRuns = 1; + } + for (int run = 0; run < noRuns; run++) { + if (!theProcess->run()) { + return (1); + } + } + + LOG_DEBUG(programName + " releasing"); + if (!theProcess->release()) { + return (1); + } + + LOG_DEBUG(programName + " quitting"); + if (!theProcess->quit()) { + return (1); + } + + LOG_DEBUG(programName + " deleting process"); + + } + else { + // we are under control of ACC + // Note args are: ACC parsetfile UniqProcesName + + string procID(argv[3]); + string prefix = ACC::APS::globalParameterSet()->getString("_parsetPrefix"); + + // connect to Application Controller + ACC::PLC::ProcControlServer pcServer(ACC::APS::globalParameterSet()->getString(prefix+"_ACnode"), + ACC::APS::globalParameterSet()->getUint16(prefix+"_ACport"), + theProcess); + + + // Tell AC who we are. + LOG_DEBUG_STR("Registering at ApplController as " << procID); + sleep(1); + pcServer.registerAtAC(procID); + + // Main processing loop + + bool quiting(false); + uint nRuns = 1; + + while (!quiting) { + LOG_TRACE_STAT("Polling ApplController for message"); + if (pcServer.pollForMessage()) { + LOG_TRACE_COND("Message received from ApplController"); + + // get pointer to received data + ACC::PLC::DH_ProcControl* newMsg = pcServer.getDataHolder(); + + if (newMsg->getCommand() == ACC::PLC::PCCmdRun) { + pcServer.sendResult(newMsg->getCommand(), ACC::PLC::PcCmdMaskOk); + newMsg->setCommand(ACC::PLC::PCCmd(newMsg->getCommand() &~ ACC::PLC::PCCmdResult)); + } + + if (newMsg->getCommand() == ACC::PLC::PCCmdPause) { + CS1_Parset itsCS1PS = CS1_Parset(ACC::APS::globalParameterSet()); + + double startTime = itsCS1PS.startTime(); + double stopTime = itsCS1PS.stopTime(); + + double stepTime = itsCS1PS.BGLintegrationTime(); + + uint totalRuns = uint(1 + ceil((stopTime - startTime) / stepTime)); + + totalRuns = ((totalRuns+15)&~15); + + totalRuns = totalRuns / itsCS1PS.IONintegrationSteps(); + + uint nrRunsLeft = totalRuns - nRuns; + + time_t now = time(0); + char buf[26]; + ctime_r(&now, buf); + buf[24] = '\0'; + + cout << "time = " << buf << ", rank = " << TH_MPI::getCurrentRank() <<", nrRunsLeft = totalRuns - nRuns = " << totalRuns << " - " << nRuns << " = " << nrRunsLeft << endl; + + for (uint i=0; i<nrRunsLeft; i++) { + if (!theProcess->run()) { + return (1); + } + } + } + + if (newMsg->getCommand() == ACC::PLC::PCCmdQuit) { + pcServer.sendResult(newMsg->getCommand(), ACC::PLC::PcCmdMaskOk); + newMsg->setCommand(ACC::PLC::PCCmd(newMsg->getCommand() &~ ACC::PLC::PCCmdResult)); + quiting = true; + } + + if (!pcServer.handleMessage(newMsg)) { + LOG_ERROR("ProcControlServer::handleMessage() failed"); + } + + } else { + // no new command received. If we are in the runstate + // call the run-routine again. + if (theProcess->inRunState()) { + nRuns++; + ACC::PLC::DH_ProcControl tmpMsg(ACC::PLC::PCCmdRun); + pcServer.handleMessage(&tmpMsg); + } + } + } + + LOG_INFO_STR("Shutting down: ApplicationController"); + pcServer.unregisterAtAC(""); // send to AC before quiting + } + } + catch (Exception& ex) { + LOG_FATAL_STR("Caught exception: " << ex << endl); + LOG_FATAL_STR(programName << " terminated by exception!"); + return (1); + } + catch (std::exception& ex) { + LOG_FATAL_STR("Caught std::exception: " << ex.what()); + return (1); + } + catch (...) { + LOG_FATAL_STR("Caught unknown exception, exitting"); + return (1); + } + +#ifdef HAVE_MPI + TH_MPI::finalize(); +#endif + + LOG_INFO_STR(programName << " terminated normally"); + return (0); +} + + } // namespace CS1 +} // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_Storage/src/CS1_Storage_main.cc b/Appl/CEP/CS1/CS1_Storage/src/CS1_Storage_main.cc index f79d62a3db92f11bade6ec1eaf9112d9810ef22f..b6b6e509571f6cf7f23471bf02dfeda26e4c9c27 100644 --- a/Appl/CEP/CS1/CS1_Storage/src/CS1_Storage_main.cc +++ b/Appl/CEP/CS1/CS1_Storage/src/CS1_Storage_main.cc @@ -23,7 +23,7 @@ using namespace LOFAR; using namespace LOFAR::CS1; #if 1 -#include <PLC/ACCmain.h> +#include <CS1_Storage/ACCmain_Storage.h> int main(int argc, char* argv[]) { @@ -35,7 +35,7 @@ int main(int argc, char* argv[]) { AH_Storage myAH; ApplicationHolderController myAHC(myAH, 1); // listen to ACC Controller once every 1 runs. - return ACC::PLC::ACCmain(argc, argv, &myAHC); + return ACCmain_Storage(argc, argv, &myAHC); } #else diff --git a/Appl/CEP/CS1/CS1_Storage/src/Makefile.am b/Appl/CEP/CS1/CS1_Storage/src/Makefile.am index d09e97f9ab8d33bbd7d6adcd7958c6769fdba10f..7cfc73bceb132a60166285740a6e87487c9d59d9 100644 --- a/Appl/CEP/CS1/CS1_Storage/src/Makefile.am +++ b/Appl/CEP/CS1/CS1_Storage/src/Makefile.am @@ -2,6 +2,7 @@ lib_LTLIBRARIES = libcs1_storage.la libcs1_storage_la_SOURCES = AH_Storage.cc \ WH_SubbandWriter.cc \ + ACCmain_Storage.cc \ MSWriter.cc \ MSWriterImpl.cc