From 9b04b71adce90433bd8d10f236c7f48a758dc254 Mon Sep 17 00:00:00 2001 From: Ger van Diepen <diepen@astron.nl> Date: Thu, 29 May 2008 12:09:24 +0000 Subject: [PATCH] bug 1193: First version --- .gitattributes | 7 + CEP/BB/MWCommon/Makefile.am | 3 + CEP/BB/MWCommon/bootstrap | 3 + CEP/BB/MWCommon/configure.in | 66 +++++++ .../MWCommon/include/MWCommon/ClusterDesc.h | 83 +++++++++ CEP/BB/MWCommon/include/MWCommon/Controller.h | 137 ++++++++++++++ .../include/MWCommon/ControllerBase.h | 92 ++++++++++ .../MWCommon/include/MWCommon/DomainShape.h | 63 +++++++ .../MWCommon/include/MWCommon/MPIConnection.h | 77 ++++++++ .../include/MWCommon/MPIConnectionSet.h | 77 ++++++++ CEP/BB/MWCommon/include/MWCommon/MWBlobIO.h | 133 ++++++++++++++ .../MWCommon/include/MWCommon/MWConnection.h | 98 ++++++++++ .../include/MWCommon/MWConnectionSet.h | 70 ++++++++ CEP/BB/MWCommon/include/MWCommon/MWError.h | 36 ++++ .../MWCommon/include/MWCommon/MWGlobalStep.h | 40 +++++ CEP/BB/MWCommon/include/MWCommon/MWIos.h | 54 ++++++ .../MWCommon/include/MWCommon/MWLocalStep.h | 42 +++++ .../MWCommon/include/MWCommon/MWMultiStep.h | 80 +++++++++ CEP/BB/MWCommon/include/MWCommon/MWStep.h | 85 +++++++++ .../MWCommon/include/MWCommon/MWStepFactory.h | 47 +++++ .../MWCommon/include/MWCommon/MWStepVisitor.h | 84 +++++++++ CEP/BB/MWCommon/include/MWCommon/Makefile.am | 37 ++++ .../MWCommon/include/MWCommon/MasterControl.h | 125 +++++++++++++ .../MWCommon/include/MWCommon/MemConnection.h | 69 +++++++ .../include/MWCommon/MemConnectionSet.h | 69 +++++++ CEP/BB/MWCommon/include/MWCommon/NodeDesc.h | 65 +++++++ CEP/BB/MWCommon/include/MWCommon/ObsDomain.h | 85 +++++++++ .../include/MWCommon/ParameterHandler.h | 104 +++++++++++ .../include/MWCommon/SocketConnection.h | 81 +++++++++ .../include/MWCommon/SocketConnectionSet.h | 81 +++++++++ .../include/MWCommon/SocketListener.h | 45 +++++ CEP/BB/MWCommon/include/MWCommon/VdsDesc.h | 83 +++++++++ .../MWCommon/include/MWCommon/VdsPartDesc.h | 100 +++++++++++ .../include/MWCommon/WorkDomainSpec.h | 127 +++++++++++++ .../MWCommon/include/MWCommon/WorkerControl.h | 46 +++++ .../MWCommon/include/MWCommon/WorkerFactory.h | 48 +++++ CEP/BB/MWCommon/include/MWCommon/WorkerInfo.h | 72 ++++++++ .../MWCommon/include/MWCommon/WorkerProxy.h | 93 ++++++++++ .../MWCommon/include/MWCommon/WorkersDesc.h | 76 ++++++++ CEP/BB/MWCommon/include/Makefile.am | 3 + CEP/BB/MWCommon/src/Makefile.am | 51 ++++++ CEP/BB/MWCommon/src/socketrun | 40 +++++ CEP/BB/MWCommon/src/startdistproc | 170 ++++++++++++++++++ CEP/BB/MWCommon/test/Makefile.am | 38 ++++ CEP/BB/MWCommon/test/tClusterDesc.cc | 76 ++++++++ CEP/BB/MWCommon/test/tClusterDesc.sh | 2 + CEP/BB/MWCommon/test/tNodeDesc.cc | 51 ++++++ CEP/BB/MWCommon/test/tNodeDesc.sh | 2 + CEP/BB/MWCommon/test/tSocketConnection.cc | 76 ++++++++ CEP/BB/MWCommon/test/tSocketConnection.run | 40 +++++ CEP/BB/MWCommon/test/tSocketConnection.sh | 2 + CEP/BB/MWCommon/test/tSocketConnection.stdout | 22 +++ CEP/BB/MWCommon/test/tVdsDesc.cc | 128 +++++++++++++ CEP/BB/MWCommon/test/tVdsDesc.sh | 2 + CEP/BB/MWCommon/test/tVdsPartDesc.cc | 76 ++++++++ CEP/BB/MWCommon/test/tVdsPartDesc.sh | 2 + CEP/BB/MWCommon/test/tWorkersDesc.cc | 139 ++++++++++++++ CEP/BB/MWCommon/test/tWorkersDesc.sh | 2 + CEP/BB/MWCommon/test/tfinddproc.in_cd | 6 + CEP/BB/MWCommon/test/tfinddproc.in_vd | 41 +++++ CEP/BB/MWCommon/test/tfinddproc.run | 3 + CEP/BB/MWCommon/test/tfinddproc.sh | 2 + CEP/BB/MWCommon/test/tfinddproc.stdout | 3 + CEP/BB/MWCommon/test/tstartdproc.in_cd | 6 + CEP/BB/MWCommon/test/tstartdproc.in_vd | 41 +++++ CEP/BB/MWCommon/test/tstartdproc.run | 21 +++ CEP/BB/MWCommon/test/tstartdproc.sh | 2 + CEP/BB/MWCommon/test/tstartdproc.stdout | 22 +++ 68 files changed, 3852 insertions(+) create mode 100644 CEP/BB/MWCommon/Makefile.am create mode 100755 CEP/BB/MWCommon/bootstrap create mode 100644 CEP/BB/MWCommon/configure.in create mode 100644 CEP/BB/MWCommon/include/MWCommon/ClusterDesc.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/Controller.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/ControllerBase.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/DomainShape.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MPIConnection.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MPIConnectionSet.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWBlobIO.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWConnection.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWConnectionSet.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWError.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWGlobalStep.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWIos.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWLocalStep.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWMultiStep.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWStep.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWStepFactory.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MWStepVisitor.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/Makefile.am create mode 100644 CEP/BB/MWCommon/include/MWCommon/MasterControl.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MemConnection.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/MemConnectionSet.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/NodeDesc.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/ObsDomain.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/ParameterHandler.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/SocketConnection.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/SocketConnectionSet.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/SocketListener.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/VdsDesc.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/VdsPartDesc.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/WorkDomainSpec.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/WorkerControl.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/WorkerFactory.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/WorkerInfo.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/WorkerProxy.h create mode 100644 CEP/BB/MWCommon/include/MWCommon/WorkersDesc.h create mode 100644 CEP/BB/MWCommon/include/Makefile.am create mode 100644 CEP/BB/MWCommon/src/Makefile.am create mode 100755 CEP/BB/MWCommon/src/socketrun create mode 100755 CEP/BB/MWCommon/src/startdistproc create mode 100644 CEP/BB/MWCommon/test/Makefile.am create mode 100644 CEP/BB/MWCommon/test/tClusterDesc.cc create mode 100755 CEP/BB/MWCommon/test/tClusterDesc.sh create mode 100644 CEP/BB/MWCommon/test/tNodeDesc.cc create mode 100755 CEP/BB/MWCommon/test/tNodeDesc.sh create mode 100644 CEP/BB/MWCommon/test/tSocketConnection.cc create mode 100755 CEP/BB/MWCommon/test/tSocketConnection.run create mode 100755 CEP/BB/MWCommon/test/tSocketConnection.sh create mode 100644 CEP/BB/MWCommon/test/tSocketConnection.stdout create mode 100644 CEP/BB/MWCommon/test/tVdsDesc.cc create mode 100755 CEP/BB/MWCommon/test/tVdsDesc.sh create mode 100644 CEP/BB/MWCommon/test/tVdsPartDesc.cc create mode 100755 CEP/BB/MWCommon/test/tVdsPartDesc.sh create mode 100644 CEP/BB/MWCommon/test/tWorkersDesc.cc create mode 100755 CEP/BB/MWCommon/test/tWorkersDesc.sh create mode 100644 CEP/BB/MWCommon/test/tfinddproc.in_cd create mode 100644 CEP/BB/MWCommon/test/tfinddproc.in_vd create mode 100755 CEP/BB/MWCommon/test/tfinddproc.run create mode 100755 CEP/BB/MWCommon/test/tfinddproc.sh create mode 100644 CEP/BB/MWCommon/test/tfinddproc.stdout create mode 100644 CEP/BB/MWCommon/test/tstartdproc.in_cd create mode 100644 CEP/BB/MWCommon/test/tstartdproc.in_vd create mode 100755 CEP/BB/MWCommon/test/tstartdproc.run create mode 100755 CEP/BB/MWCommon/test/tstartdproc.sh create mode 100644 CEP/BB/MWCommon/test/tstartdproc.stdout diff --git a/.gitattributes b/.gitattributes index 2dab6bb4306..182524a5d83 100644 --- a/.gitattributes +++ b/.gitattributes @@ -102,6 +102,13 @@ CEP/BB/BBSKernel/test/tParmMerge.in_mep2/table.f0 -text svneol=unset#unset CEP/BB/BBSKernel/test/tParmMerge.in_mep2/table.f0i -text svneol=unset#unset CEP/BB/BBSKernel/test/tParmMerge.in_mep2/table.lock -text svneol=unset#unset CEP/BB/BB_GUI/make_jar.sh -text svneol=native#application/octet-stream +CEP/BB/MWCommon/bootstrap -text +CEP/BB/MWCommon/src/socketrun -text +CEP/BB/MWCommon/src/startdistproc -text +CEP/BB/MWCommon/test/tfinddproc.in_cd -text +CEP/BB/MWCommon/test/tfinddproc.in_vd -text +CEP/BB/MWCommon/test/tstartdproc.in_cd -text +CEP/BB/MWCommon/test/tstartdproc.in_vd -text CEP/BB/MWImager/bootstrap -text CEP/BB/MWImager/src/mwimager -text CEP/BB/SourceDB/bootstrap -text diff --git a/CEP/BB/MWCommon/Makefile.am b/CEP/BB/MWCommon/Makefile.am new file mode 100644 index 00000000000..c58f05d8e33 --- /dev/null +++ b/CEP/BB/MWCommon/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src test include + +include $(top_srcdir)/Makefile.common diff --git a/CEP/BB/MWCommon/bootstrap b/CEP/BB/MWCommon/bootstrap new file mode 100755 index 00000000000..06f18cde1db --- /dev/null +++ b/CEP/BB/MWCommon/bootstrap @@ -0,0 +1,3 @@ +#!/bin/sh + +../../../autoconf_share/bootstrap ../../../autoconf_share diff --git a/CEP/BB/MWCommon/configure.in b/CEP/BB/MWCommon/configure.in new file mode 100644 index 00000000000..89d37054d4f --- /dev/null +++ b/CEP/BB/MWCommon/configure.in @@ -0,0 +1,66 @@ +dnl +dnl Process this file with autoconf to produce a configure script. +dnl +AC_INIT() +dnl AC_CONFIG_AUX_DIR(config) +dnl AM_CONFIG_HEADER(config/config.h) +AM_CONFIG_HEADER(config.h) +AM_INIT_AUTOMAKE(MWCommon, 0.1, no-define) + +dnl Initialize for LOFAR (may set compilers) +lofar_INIT + +dnl Checks for programs. +AC_PROG_AWK +AC_PROG_YACC +AC_PROG_CC +AC_PROG_CXX +AM_PROG_LEX +AC_PROG_INSTALL +AC_PROG_LN_S +AC_DISABLE_SHARED +AC_PROG_LIBTOOL + +dnl Checks for libraries. + +dnl dnl Replace `main' with a function in -lfl: +dnl AC_CHECK_LIB(fl, main) +dnl dnl Replace `main' with a function in -lcosev_r: +dnl AC_CHECK_LIB(cosev_r, main) +dnl dnl Replace `main' with a function in -lcosnm_r: +dnl AC_CHECK_LIB(cosnm_r, main) +dnl dnl Replace `main' with a function in -lorb_r: +dnl AC_CHECK_LIB(orb_r, main) +dnl dnl Replace `main' with a function in -lpthread: +dnl AC_CHECK_LIB(pthread, main) +dnl dnl Replace `main' with a function in -lvport_r: +dnl AC_CHECK_LIB(vport_r, main) + +dnl Checks for header files. +AC_HEADER_STDC +AC_CHECK_HEADERS(unistd.h) + +dnl Checks for typedefs, structures, and compiler characteristics. +AC_C_CONST +AC_TYPE_SIZE_T + +dnl Checks for library functions. +AC_FUNC_VPRINTF + +dnl +dnl Check for LOFAR specific things +dnl +lofar_GENERAL +lofar_INTERNAL(LCS/Blob,Blob,,1,Blob/BlobHeader.h,,) +lofar_INTERNAL(LCS/ACC/APS,APS,,1,APS/ParameterSet.h,,) + +dnl +dnl Output Makefiles +dnl +AC_OUTPUT( +include/Makefile +include/MWCommon/Makefile +src/Makefile +test/Makefile +Makefile +) diff --git a/CEP/BB/MWCommon/include/MWCommon/ClusterDesc.h b/CEP/BB/MWCommon/include/MWCommon/ClusterDesc.h new file mode 100644 index 00000000000..29762fd46a6 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/ClusterDesc.h @@ -0,0 +1,83 @@ +/// @file +/// @brief Description of a cluster and the nodes in it. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_CLUSTERDESC_H +#define LOFAR_MWCOMMON_CLUSTERDESC_H + +//# Includes +#include <MWCommon/NodeDesc.h> +#include <string> +#include <vector> +#include <iosfwd> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Description of a cluster and the nodes in it. + + /// This class holds the basic description of a cluster. + /// It defines which nodes are part of the cluster and which file systems + /// each node has access to. + /// If a data set is distributed over many file systems, the cluster + /// description tells which node can handle a data set part on a particular + /// file system. + /// + /// Currently the information is made persistent in a LOFAR .parset file. + /// In the future it needs to use the Centrol Processor Resource Manager. + + class ClusterDesc + { + public: + /// Construct an empty object. + ClusterDesc() + {} + + /// Construct from the given parameterset. + /// @{ + explicit ClusterDesc (const std::string& parsetName); + explicit ClusterDesc (const ParameterSet& parset) + { init (parset); } + /// @} + + /// Set cluster name. + void setName (const std::string& name) + { itsName = name; } + + /// Add a file system it has access to. + void addNode (const NodeDesc& node); + + /// Write it in parset format. + void write (std::ostream& os) const; + + /// Get the name. + const std::string& getName() const + { return itsName; } + + /// Get all nodes. + const std::vector<NodeDesc>& getNodes() const + { return itsNodes; } + + /// Get the map of file system to node. + const std::map<std::string, std::vector<std::string> >& getMap() const + { return itsFS2Nodes; } + + private: + /// Fill the object from the given parset file. + void init (const ParameterSet& parset); + + /// Add entries to the mapping of FileSys to Nodes. + void add2Map (const NodeDesc& node); + + std::string itsName; + std::vector<NodeDesc> itsNodes; + std::map<std::string, std::vector<std::string> > itsFS2Nodes; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/Controller.h b/CEP/BB/MWCommon/include/MWCommon/Controller.h new file mode 100644 index 00000000000..103feedd3c2 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/Controller.h @@ -0,0 +1,137 @@ +//# Controller.h: Class to execute the master and workers +//# +//# Copyright (C) 2008 +//# +//# $Id$ + +#ifndef LOFAR_MWCOMMON_CONTROLLER_H +#define LOFAR_MWCOMMON_CONTROLLER_H + +//# Includes +#include <MWCommon/ControllerBase.h> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Class to execute the master and the workers + + /// This templated class does the execution of the master and the local and + /// global workers. A local worker is the worker that operates on the data. + /// A global worker is used to combine results from local workers (e.g. + /// to do a global solve). + /// The class is used by the master as well as the workers. The process rank + /// determines if it is used as the master (rank 0) or a worker. + /// The <tt>execute</tt> functions does most of the work. A so-called + /// Runner object (the template parameter) is used to do application + /// specific operations. + /// The <tt>execute</tt> function operates in several stages: + /// <ol> + /// <li> It uses the Runner object to ask for the data set to process. + /// Such a data set is a description file (handled by VdsDesc) telling + /// the number of data set parts and where each part resides. + /// <li> The connections between master and workers are set up. It will use + /// MPI if compiled in and required. Otherwise it can use sockets. + /// If there is only one process, everything will run in that single + /// process and memory connections will be used. This is particularly + /// useful for debugging. + /// It is checked if there is a local worker for each data set part. + /// <li> The processes will start executing. The workers wait for messages + /// and act upon it. A message contains a type (see MasterControl) and + /// usually an MWStep object. That object tells the worker what to do. + /// A factory (WorkerFactory) is used to create the correct MWStep object. + /// In this way the framework is very general and any step can be used. + /// <br>The master will do the following: + /// <ol> + /// <li> Obtain the domain of the data set and send it with the + /// ParameterSet obtained from the Runner to the workers to let + /// them initialise themselves. + /// <li> Ask all workers what kind of work they can perform. + /// <li> Ask the Runner to do the application-specific work. This is + /// done by calling the appropriate functions in MasterControl. + /// They will send commands to the workers and act upon their replies. + /// <li> After the Runner has finished, it quits MasterControl which + /// sends quit messages to the workers. + /// </ol> + /// </ol> + + /// <example> + /// Here follows an example of a Runner class taken from the test + /// program tMWControl. It shows the functions which have to be + /// defined in the class. Also the copy constructor must be available + /// (in this example it is implemented by the compiler). + /// <srcblock> + /// class Runner + /// { + /// public: + /// explicit Runner (const string& parsetName) + /// : itsParams (ParameterSet(parsetName)) + /// { + /// // Define the functions creating the proxy workers. + /// // The names localWorker and globalWorker are mandatory. + /// itsFactory.push_back ("LocalWorker", PredifferTest::create); + /// itsFactory.push_back ("GlobalWorker", SolverTest::create); + /// } + /// + /// string getDataSetName() const + /// { return itsParams.getDataSetName(); } + /// + /// const WorkerFactory& getFactory() const + /// { return itsFactory; } + /// + /// ParameterSet getParSet() const + /// { return itsParams; } + /// + /// void run (MasterControl& mc) + /// { + /// // Assemble all steps defined in the parameters into a single spec. + /// vector<MWStrategySpec> strategySpecs = itsParams.getStrategies(); + /// // Loop through all strategies. + /// for (vector<MWStrategySpec>::const_iterator iter=strategySpecs.begin(); + /// iter!=strategySpecs.end(); + /// ++iter) { + /// mc.setWorkDomainSpec (iter->getWorkDomainSpec()); + /// // Execute the steps. + /// mc.processSteps (iter->getSteps()); + /// } + /// } + /// + /// private: + /// MWParameterHandler itsParams; + /// WorkerFactory itsFactory; + /// }; + /// </srcblock> + /// </example> + + template<typename Runner> + class Controller : public ControllerBase + { + public: + // Construct the controller to process the given (distributed) data set + // on the given cluster (using its cluster description name). + // Standard output is logged in the given log file. + Controller (const Runner& runner, + const string& clusterName, + const string& logFileName) + : ControllerBase (runner.getFactory(), + runner.getParSet(), + runner.getDataSetName(), + clusterName, + logFileName), + itsRunner (runner) + {} + + virtual ~Controller() + {} + + // Let the Runner run. + virtual void run (MasterControl& mc) + { itsRunner.run (mc); } + + private: + //# Data members + Runner itsRunner; + }; + +}} //# end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/ControllerBase.h b/CEP/BB/MWCommon/include/MWCommon/ControllerBase.h new file mode 100644 index 00000000000..50893d4eef2 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/ControllerBase.h @@ -0,0 +1,92 @@ +//# ControllerBase.h: Base class to execute the master and workers +//# +//# Copyright (C) 2008 +//# +//# $Id$ + +#ifndef LOFAR_MWCOMMON_CONTROLLERBASE_H +#define LOFAR_MWCOMMON_CONTROLLERBASE_H + +//# Includes +#include <MWCommon/MWConnectionSet.h> +#include <MWCommon/WorkerFactory.h> +#include <MWCommon/MasterControl.h> +#include <MWCommon/VdsDesc.h> +#include <MWCommon/MWIos.h> + +namespace LOFAR { namespace CEP { + + //# Forward Declarations. + class VdsDesc; + + /// @ingroup mwcommon + /// @brief Base class to execute the master and the workers + + /// This non-templated class factors out all non-templated code of + /// the Controller class. + + class ControllerBase + { + public: + // Construct the controller to process the given (distributed) data set + // on the given cluster (using its cluster description name). + // Standard output is logged in the given log file. + ControllerBase (const WorkerFactory&, + const ParameterSet&, + const string& dsDescName, + const string& clusterDescName, + const string& logFileName); + + virtual ~ControllerBase(); + + // Execute the run using sockets or a single process. + // If the host or port name is empty, a single process is used. + // If sockets are used, it is assumed that <tt>nproc</tt> identical + // processes (including master) have been started. Each process should + // have a unique rank, where rank 0 will be the master. + // <br>It returns a non-zero value on failure. + int execute (const string& host, const string& port, + int nproc, int rank); + + // Execute the run using MPI or a single process. + // <br>It returns a non-zero value on failure. + int execute(); + + private: + // Let the Runner in the derived class run. + virtual void run (MasterControl&) = 0; + + // Do the execute using MPI, sockets, or single process. + int doExecute (const string& host, const string& port, + int nrNode, int rank, bool useMPI); + + // Run the master process. + void runMaster (const string& port, + int globalWorkerRank, + int nworkers, int nparts); + + // Run a local worker. + void runLocalWorker (const string& host, const string& port); + + // Run a global worker. + void runGlobalWorker (const string& host, const string& port); + + // Setup all the workers. + // Find out what they can do, i.e. if they work locally or globally. + // It returns a vector telling which data part is handled by local worker i. + std::vector<std::string> setAllWorkers (MWConnectionSet& workers, + int nworkers); + + //# Data members + WorkerFactory itsFactory; + ParameterSet itsParSet; + VdsDesc itsDsDesc; + std::string itsClusterName; + std::string itsLogName; + MWConnectionSet::ShPtr itsLocalWorkers; + MWConnectionSet::ShPtr itsGlobalWorkers; + }; + +}} //# end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/DomainShape.h b/CEP/BB/MWCommon/include/MWCommon/DomainShape.h new file mode 100644 index 00000000000..12e05bc2cc7 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/DomainShape.h @@ -0,0 +1,63 @@ +/// @file +/// @brief Define the shape of a domain. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_DOMAINSHAPE_H +#define LOFAR_MWCOMMON_DOMAINSHAPE_H + +#include <Blob/BlobOStream.h> +#include <Blob/BlobIStream.h> +#include <iosfwd> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Define the shape of a domain. + + /// This class defines the shape of a domain. + /// Currently this can only be done for time and frequency. + /// + /// This object can be used by ObsDomain to iterate over its observation + /// domain in chunk of this domain shape. + + class DomainShape + { + public: + /// Set default shape to all frequencies and times. + DomainShape(); + + /// Set from frequency in Hz and time in sec. + DomainShape (double freqSize, double timeSize); + + /// Get the shape. + /// @{ + double getFreqSize() const + { return itsFreqSize; } + double getTimeSize() const + { return itsTimeSize; } + /// @} + + /// Convert to/from blob. + /// @{ + friend LOFAR::BlobOStream& operator<< (LOFAR::BlobOStream&, + const DomainShape&); + friend LOFAR::BlobIStream& operator>> (LOFAR::BlobIStream&, + DomainShape&); + /// @} + + /// Print. + friend std::ostream& operator<< (std::ostream&, + const DomainShape&); + + private: + double itsFreqSize; + double itsTimeSize; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MPIConnection.h b/CEP/BB/MWCommon/include/MWCommon/MPIConnection.h new file mode 100644 index 00000000000..7a8ef0acb1b --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MPIConnection.h @@ -0,0 +1,77 @@ +/// @file +/// @brief Connection to workers based on MPI. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MPICONNECTION_H +#define LOFAR_MWCOMMON_MPICONNECTION_H + +#include <MWCommon/MWConnection.h> +#include <boost/shared_ptr.hpp> + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Connection to workers based on MPI. + + /// This class handles the MPI connection between two processes by + /// giving it the correct MPI rank of the other (destination) process. + /// + /// The length of a message to receive is determined using \a MPI_Probe. + /// + /// It has some static methods to do the basic MPI handling + /// (init, finalize, get nrnodes, get rank). + /// + /// This class requires compile variable HAVE_MPI to be set in order to + /// use MPI. If not set, it will compile fine, but cannot really be used. + /// Only the static functions can be used which will nothing and return + /// a default value. + + class MPIConnection: public MWConnection + { + public: + /// Define a shared pointer to this object. + typedef boost::shared_ptr<MPIConnection> ShPtr; + + /// Set up a connection to the given destination. + /// The tag can be used to define the type of destination + /// (e.g. prediffer or solver). + MPIConnection (int destinationRank, int tag); + + virtual ~MPIConnection(); + + /// Check the state of the connection. Default is true. + virtual bool isConnected() const; + + /// Get the length of the message. + virtual int getMessageLength(); + + /// Receive the data sent by the destination + /// and wait until data has been received into buf. + virtual void receive (void* buf, unsigned size); + + /// Send the data to the destination + /// and wait until the data has been sent. + virtual void send (const void* buf, unsigned size); + + /// Functions to access MPI. + /// If no MPI available, getRank returns 0 and getNrNodes returns 1. + /// @{ + static void initMPI (int argc, const char**& argv); + static void endMPI(); + static int getRank(); + static int getNrNodes(); + /// @} + + private: + int itsDestRank; + int itsTag; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MPIConnectionSet.h b/CEP/BB/MWCommon/include/MWCommon/MPIConnectionSet.h new file mode 100644 index 00000000000..c5cb3d8858e --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MPIConnectionSet.h @@ -0,0 +1,77 @@ +/// @file +/// @brief Class to hold a set of MPI connections. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +/// $Id$ + +#ifndef LOFAR_MWCOMMON_MPICONNECTIONSET_H +#define LOFAR_MWCOMMON_MPICONNECTIONSET_H + +#include <MWCommon/MWConnectionSet.h> +#include <MWCommon/MPIConnection.h> +#include <vector> + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Class to hold a set of MPI connections. + + /// This class represents a set of MPI connections. Typically it is used + /// to group connections to workers of a specific type. + /// The main reason for having this class is the ability to check if any + /// connection in the group is ready to receive data (i.e. if the other + /// side of the connection has sent data). This is done using MPI_Probe + /// with the tag of the first connection, so all connections in the group + /// should have the same (and unique) tag. + /// + /// @todo Implement getReadyConnection. + + class MPIConnectionSet: public MWConnectionSet + { + public: + /// Define a shared pointer to this object. + typedef boost::shared_ptr<MPIConnectionSet> ShPtr; + + /// Set up a connection set to destinations using MPI. + MPIConnectionSet(); + + virtual ~MPIConnectionSet(); + + /// Clone the derived object to contain only the connections + /// as indexed in the given vector. + virtual MWConnectionSet::ShPtr clone(const std::vector<int>&) const; + + /// Add a connection to the given rank using the tag. + /// The tag can be used to define the type of destination + /// (e.g. prediffer or solver). + /// It returns the sequence nr of the connection. + int addConnection (int rank, int tag); + + /// Get the number of connections. + virtual int size() const; + + /// Get seqnr of connection that is ready to receive. + /// <0 means no connection ready yet. + virtual int getReadyConnection(); + + /// Read the data into the BlobString buffer using the connection + /// with the given sequence nr. + virtual void read (int seqnr, LOFAR::BlobString&); + + /// Write the data from the BlobString buffer using the connection + /// with the given sequence nr. + virtual void write (int seqnr, const LOFAR::BlobString&); + + /// Write the data from the BlobString buffer to all connections. + virtual void writeAll (const LOFAR::BlobString&); + + private: + std::vector<MPIConnection::ShPtr> itsConns; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWBlobIO.h b/CEP/BB/MWCommon/include/MWCommon/MWBlobIO.h new file mode 100644 index 00000000000..61c569e33d0 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWBlobIO.h @@ -0,0 +1,133 @@ +/// @file +/// @brief Classes to convert a message to/from a blob. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWBLOBIO_H +#define LOFAR_MWCOMMON_MWBLOBIO_H + +#include <Blob/BlobString.h> +#include <Blob/BlobOBufString.h> +#include <Blob/BlobIBufString.h> +#include <Blob/BlobOStream.h> +#include <Blob/BlobIStream.h> +#include <casa/OS/Timer.h> +#include <Common/Timer.h> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Class to convert a message from a blob. + + /// This class is the opposite of MWBlobOut. + /// It can be used to obtain operation and streamId and to read the message. + + class MWBlobIn + { + public: + /// Start reading back a message from the buffer. It reads the operation + /// and streamId which can be obtained using their \a get functions. + /// The message itself can be read from the \a blobStream(). + explicit MWBlobIn (const LOFAR::BlobString& buf); + + /// Get the operation, streamId, or workerId. + /// @{ + int getOperation() const + { return itsOper; } + int getStreamId() const + { return itsStreamId; } + int getWorkerId() const + { return itsWorkerId; } + /// @} + + /// Get the timings which can be the low-precision elapsed, system, + /// and user time, and the high-precision elapsed time. + /// All times are in seconds. + /// @{ + float getElapsedTime() const; + float getSystemTime() const; + float getUserTime() const; + double getPrecTime() const; + /// @} + + /// Get the blobstream to read the data from. + LOFAR::BlobIStream& blobStream() + { return itsStream; } + + /// End the Blob processing. + void finish() + { itsStream.getEnd(); } + + private: + LOFAR::BlobIBufString itsBuf; + LOFAR::BlobIStream itsStream; + LOFAR::int32 itsOper; + LOFAR::int32 itsStreamId; + LOFAR::int32 itsWorkerId; + float itsElapsedTime; + float itsSystemTime; + float itsUserTime; + double itsPrecTime; + }; + + + /// @ingroup mwcommon + /// @brief Class to convert a message to a blob. + + /// This class forms the envelope of messages used in the MW framework. + /// MW messages are transmitted as blobs. + /// The envelope consist of the basic blob header with type 'mw'. The + /// blob header defines things like endianness, version, and length. + /// The envelope also contains the operation type and streamId. + /// The operation type tells the worker what is has to do. + /// The streamId is for future use to make it possible to have parallel + /// work streams in a worker to keep it busy. + /// The workerId gives the id of the worker. + /// + /// The envelope has room for timings. In this way the master can know + /// how much time it took for a worker to execute a command. + /// The \a setTimes function can be used to set the timings. It uses + /// the casa Timer class to get the low-precision elapsed, user, and system + /// times. The LOFAR NSTimer class is used for high-precision elapsed time. + /// + /// The message proper has to be written by the user of this class + /// into the supplied \a blobStream. + /// After all data are written, the \a finish function has to be called. + + class MWBlobOut + { + public: + /// Start a message blob in the buffer and put the given operation, + /// streamId, and workerId into it. + /// The message itself can be put into the \a blobStream(). + MWBlobOut (LOFAR::BlobString& buf, + int operation, int streamId, int workerId=-1); + + /// Reset the operation. + void setOperation (int operation); + + /// Set the times it took to do the operation. + void setTimes (const casa::Timer&, const LOFAR::NSTimer&); + + /// Get the blobstream to write the data in. + LOFAR::BlobOStream& blobStream() + { return itsStream; } + + /// End the Blob processing. + void finish() + { itsStream.putEnd(); } + + private: + LOFAR::BlobOBufString itsBuf; + LOFAR::BlobOStream itsStream; + //# Remember where to put operation or times. + int itsOperOffset; + int itsTimeOffset; + }; + +}} //end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWConnection.h b/CEP/BB/MWCommon/include/MWCommon/MWConnection.h new file mode 100644 index 00000000000..4a404ea79b0 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWConnection.h @@ -0,0 +1,98 @@ +/// @file +/// @brief Abstract base class for all MWConnections. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWCONNECTION_H +#define LOFAR_MWCOMMON_MWCONNECTION_H + +#include <boost/shared_ptr.hpp> + +//# Forward Declarations +namespace LOFAR { + class BlobString; +} + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Abstract base class for all MWConnections. + + /// This class defines the base class for classes to transport data. + /// Actually, the data transport is done between two MWConnection objects + /// of the same type. + /// + /// The data are packed in LOFAR Blob objects to support heterogeneous + /// machines (with different endianness). It also makes it possible to + /// version the data to make future upgrades possible. Finally as blob + /// contains a length making it easily possible to support varying length + /// messages. + /// + /// To support varying length messages for both socket and MPI connections, + /// the length can be determined first. If found, the message length is + /// known. Otherwise the blob header is read to find the message length. + /// This is needed because in MPI a message has to be read in one receive, + /// while sockets have no direct means to determine the message length. + /// + /// Derived classes (e.g. MPIConnection) implement the concrete transport + /// classes. + + class MWConnection + { + public: + /// Define a shared pointer to this object. + typedef boost::shared_ptr<MWConnection> ShPtr; + + MWConnection() + {} + + virtual ~MWConnection(); + + /// Initialize the Transport; this may for instance open a file, + /// port or dbms connection. + /// Default does nothing. + virtual void init(); + + /// Check the state of this MWConnection. Default is true. + virtual bool isConnected() const; + + /// Receive the data blob sent by the connected MWConnection + /// and wait until data has been received into \a buf. + /// The buffer is resized as needed. + /// By default it uses the functions \a getMessageLength and \a receive + /// to determine the length of the message and to receive the data. + virtual void read (LOFAR::BlobString& buf); + + /// Send the data to the connected MWConnection + /// and wait until the data has been sent. + /// By default is uses function \a send to send the data. + virtual void write (const LOFAR::BlobString& buf); + + private: + /// Cannot make a copy of this object (thus also of derived classes). + /// @{ + MWConnection (const MWConnection&); + MWConnection& operator=(const MWConnection&); + /// @} + + /// Try to get the length of the message. + /// -1 is returned if it could not determine it. + /// In such a case the length needs to be read from the blob header. + virtual int getMessageLength() = 0; + + /// Receive the given amount of data in the buffer. + /// and wait until data has been received into buf. + virtual void receive (void* buf, unsigned size) = 0; + + /// Send the fixed sized data to the connected MWConnection + /// and wait until the data has been sent. + virtual void send (const void* buf, unsigned size) = 0; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWConnectionSet.h b/CEP/BB/MWCommon/include/MWCommon/MWConnectionSet.h new file mode 100644 index 00000000000..287faefa7be --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWConnectionSet.h @@ -0,0 +1,70 @@ +/// @file +/// @brief Abstract base class for all MWConnectionSets. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWCONNECTIONSET_H +#define LOFAR_MWCOMMON_MWCONNECTIONSET_H + +#include <Blob/BlobString.h> +#include <boost/shared_ptr.hpp> +#include <vector> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Abstract base class for all MWConnectionSets. + + /// This class defines the abstract base class for all MWConnectionSet + /// classes. + /// The object can be cloned, where it is possible to only use the + /// given connections. In this way connections can be regrouped asd needed. + /// Note that a cloned object uses the same MWConnection objects as + /// the original. + /// + /// See class MWConnection for a description of connections. + + class MWConnectionSet + { + public: + /// Define a shared pointer to this object. + typedef boost::shared_ptr<MWConnectionSet> ShPtr; + + MWConnectionSet() + {} + + virtual ~MWConnectionSet(); + + /// Clone the derived object, optionally to contain only the connections + /// as indexed in the given vector. + /// It uses the same connections as the original. + /// @{ + MWConnectionSet::ShPtr clone() const; + virtual MWConnectionSet::ShPtr clone(const std::vector<int>&) const = 0; + /// @} + + /// Get the number of connections. + virtual int size() const = 0; + + /// Get seqnr of connection that is ready to receive. + /// <0 means no connection ready yet. + virtual int getReadyConnection() = 0; + + /// Read the data into the BlobString buffer using the connection + /// with the given sequence nr. + virtual void read (int seqnr, LOFAR::BlobString&) = 0; + + /// Write the data from the BlobString buffer using the connection + /// with the given sequence nr. + virtual void write (int seqnr, const LOFAR::BlobString&) = 0; + + /// Write the data from the BlobString buffer to all connections. + virtual void writeAll (const LOFAR::BlobString&) = 0; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWError.h b/CEP/BB/MWCommon/include/MWCommon/MWError.h new file mode 100644 index 00000000000..465b76abdfd --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWError.h @@ -0,0 +1,36 @@ +/// @file +/// @brief Basic exception for master/worker related errors. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWERROR_H +#define LOFAR_MWCOMMON_MWERROR_H + +#include <Common/Exception.h> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Basic exception for master/worker related errors. + + /// This class defines the basic MW exception. + /// Only this basic exception is defined so far. In the future, some more + /// fine-grained exceptions might be derived from it. + + class MWError: public Exception + { + public: + /// Create the exception object with the given message. + explicit MWError (const std::string& text, const std::string& file="", + int line=0, const std::string& func=""); + + virtual ~MWError() throw(); + }; + + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWGlobalStep.h b/CEP/BB/MWCommon/include/MWCommon/MWGlobalStep.h new file mode 100644 index 00000000000..cc27d5c6512 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWGlobalStep.h @@ -0,0 +1,40 @@ +/// @file +/// @brief Base classes for global MW commands (like subtract) +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWGLOBALSTEP_H +#define LOFAR_MWCOMMON_MWGLOBALSTEP_H + +#include <MWCommon/MWStep.h> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Base class for a step to process a global MW command. + + /// This class defines a class that serves as the base class for a + /// global MW step. A global MW step is a step that cannot be executed + /// directly by a worker without the need of interaction between workers. + + class MWGlobalStep: public MWStep + { + public: + MWGlobalStep() + {} + + virtual ~MWGlobalStep(); + + /// Visit the object, so the visitor can process it. + /// The default implementation uses the MWStepVisitor::visitGlobal + /// function. + virtual void visit (MWStepVisitor&) const; + }; + + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWIos.h b/CEP/BB/MWCommon/include/MWCommon/MWIos.h new file mode 100644 index 00000000000..c7c7baa790e --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWIos.h @@ -0,0 +1,54 @@ +//# MWIos.h: +//# +//# Copyright (C) 2007 +//# +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWIOSTREAM_H +#define LOFAR_MWCOMMON_MWIOSTREAM_H + +#include <iostream> +#include <fstream> +#include <string> + +#define MWCOUT MWIos::os() + +namespace LOFAR { namespace CEP { + + /// MPI has the problem that the output of cout is unpredictable. + /// Therefore the output of tMWControl is using a separate output + /// file for each rank. + /// This class makes this possible. The alias MWCOUT can be used for it. + /// + /// Not that everything is static, so no destructor is called. + /// The clear function can be called at the end of the program to + /// delete the internal object, otherwise tools like valgrind will + /// complain about a memory leak. + + class MWIos + { + public: + /// Define the name of the output file. + static void setName (const std::string& name) + { itsName = name; } + + /// Get access to its ostream. + /// It creates the ostream if not done yet. + static std::ofstream& os() + { if (!itsIos) makeIos(); return *itsIos; } + + /// Remove the ostream (otherwise there'll be a memory leak). + static void clear() + { delete itsIos; } + + private: + /// Make the ostream if not done yet. + static void makeIos(); + + static std::string itsName; + static std::ofstream* itsIos; + }; + +}} // end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWLocalStep.h b/CEP/BB/MWCommon/include/MWCommon/MWLocalStep.h new file mode 100644 index 00000000000..c3c60471505 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWLocalStep.h @@ -0,0 +1,42 @@ +/// @file +/// @brief Base classes for local MW commands +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWLOCALSTEP_H +#define LOFAR_MWCOMMON_MWLOCALSTEP_H + +#include <MWCommon/MWStep.h> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Base class for a step to process a local MW command. + + /// This class defines a class that serves as the base class for a + /// local MW step. A local MW step is a step that can be executed + /// directly by a worker without the need of interaction between workers. + /// An example is a subtract or correct. A solve is not a local step, + /// because it requires interaction between workers. + + class MWLocalStep: public MWStep + { + public: + MWLocalStep() + {} + + virtual ~MWLocalStep(); + + /// Visit the object, so the visitor can process it. + /// The default implementation uses the MWStepVisitor::visitlocal + /// function. + virtual void visit (MWStepVisitor&) const; + }; + + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWMultiStep.h b/CEP/BB/MWCommon/include/MWCommon/MWMultiStep.h new file mode 100644 index 00000000000..ffc5d145a1c --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWMultiStep.h @@ -0,0 +1,80 @@ +/// @file +/// @brief A step consisting of several other steps. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWMULTISTEP_H +#define LOFAR_MWCOMMON_MWMULTISTEP_H + +#include <MWCommon/MWStep.h> +#include <list> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief A step consisting of several other steps. + + /// This class makes it possible to form a list of MWStep objects. + /// Note that the class itself is an MWStep, so the list can be nested. + /// The \a visit function will call \a visit of each step in the list. + /// + /// It uses the standard MWStep functionality (factory and visitor) to + /// create and process the object. + /// The object can be converted to/from blob, so it can be sent to workers. + + class MWMultiStep: public MWStep + { + public: + virtual ~MWMultiStep(); + + /// Clone the step object. + virtual MWMultiStep* clone() const; + + /// Create a new object of this type. + static MWStep::ShPtr create(); + + /// Register the create function in the MWStepFactory. + static void registerCreate(); + + /// Add a clone of a step object. + void push_back (const MWStep&); + + /// Add a step object. + void push_back (const MWStep::ShPtr&); + + /// Give the (unique) class name of the MWStep. + virtual std::string className() const; + + /// Visit the object, which visits each step. + virtual void visit (MWStepVisitor&) const; + + /// Convert to/from blob. + /// Note that reading back from a blob uses MWStepFactory to + /// create the correct objects. + /// @{ + virtual void toBlob (LOFAR::BlobOStream&) const; + virtual void fromBlob (LOFAR::BlobIStream&); + /// @} + + /// Print the contents and type. Indent as needed. + virtual void print (std::ostream& os, const std::string& indent) const; + + /// Define functions and so to iterate in the STL way. + /// @{ + typedef std::list<MWStep::ShPtr>::const_iterator const_iterator; + const_iterator begin() const + { return itsSteps.begin(); } + const_iterator end() const + { return itsSteps.end(); } + /// @} + + private: + std::list<MWStep::ShPtr> itsSteps; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWStep.h b/CEP/BB/MWCommon/include/MWCommon/MWStep.h new file mode 100644 index 00000000000..89dd915959e --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWStep.h @@ -0,0 +1,85 @@ +/// @file +/// @brief Abstract base class for steps to process MW commands. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWSTEP_H +#define LOFAR_MWCOMMON_MWSTEP_H + +#include <MWCommon/MWStepVisitor.h> +#include <MWCommon/ParameterHandler.h> +#include <Blob/BlobOStream.h> +#include <Blob/BlobIStream.h> +#include <boost/shared_ptr.hpp> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Abstract base class for steps to process MW commands. + + /// This class is the abstract base class for all possible steps that + /// can be executed in the Master-Control framework. + /// A step must be able to store and retrieve itself into/from a blob. + /// + /// The \a visit function uses the visitor pattern to get access to + /// a concrete MWStep object, for example to execute the step. + /// It means that a function needs to be added to the visitor classes + /// for each newly derived MWStep class. + /// + /// The MWStepFactory class is a class containing a map of type name to + /// a \a create function that can create an MWStep object of the required + /// type. At the beginning of a program the required create functions have + /// to be registered in the factory. Note that the user can choose which + /// create function maps to a given name, which makes it possible to + /// use different implementations of similar functionality. + + class MWStep + { + public: + /// Define a shared pointer to this object. + typedef boost::shared_ptr<MWStep> ShPtr; + + virtual ~MWStep(); + + /// Clone the step object. + virtual MWStep* clone() const = 0; + + /// Give the (unique) class name of the MWStep. + virtual std::string className() const = 0; + + /// Get the parameter set. + // The default implementation returns an empty set. + virtual ParameterSet getParms() const; + + /// Visit the object, so the visitor can process it. + /// The default implementation uses the MWStepVisitor::visit function. + virtual void visit (MWStepVisitor&) const; + + /// Print the contents and type. Indent as needed. + /// The default implementation does nothing. + virtual void print (std::ostream& os, const std::string& indent) const; + + /// Convert to/from blob. + /// @{ + virtual void toBlob (LOFAR::BlobOStream&) const = 0; + virtual void fromBlob (LOFAR::BlobIStream&) = 0; + /// @} + + /// Convert to/from blob. + /// @{ + friend LOFAR::BlobOStream& operator<< (LOFAR::BlobOStream& bs, + const MWStep& step) + { step.toBlob(bs); return bs; } + friend LOFAR::BlobIStream& operator>> (LOFAR::BlobIStream& bs, + MWStep& step) + { step.fromBlob(bs); return bs; } + /// @} + }; + + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWStepFactory.h b/CEP/BB/MWCommon/include/MWCommon/MWStepFactory.h new file mode 100644 index 00000000000..d01ab86f132 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWStepFactory.h @@ -0,0 +1,47 @@ +/// @file +/// @brief Factory pattern to make the correct MWStep object +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWSTEPFACTORY_H +#define LOFAR_MWCOMMON_MWSTEPFACTORY_H + +#include <MWCommon/MWStep.h> +#include <map> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Factory pattern to make the correct MWStep object + + /// This class contains a map of names to \a create functions + /// of derived MWStep objects. It is used to reconstruct the correct + /// MWStep object when reading it back from a blob. + /// + /// The map is static, so there is only one instance in a program. + /// Usually the functions will be registered at the beginning of a program. + + class MWStepFactory + { + public: + /// Define the signature of the function to create an MWStep object. + typedef MWStep::ShPtr Creator(); + + /// Add a creator function. + static void push_back (const std::string& name, Creator*); + + /// Create the derived MWStep object with the given name. + /// An exception is thrown if the name is not in the map. + static MWStep::ShPtr create (const std::string& name); + + private: + static std::map<std::string, Creator*> itsMap; + }; + + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MWStepVisitor.h b/CEP/BB/MWCommon/include/MWCommon/MWStepVisitor.h new file mode 100644 index 00000000000..e5f299d4420 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MWStepVisitor.h @@ -0,0 +1,84 @@ +/// @file +/// @brief Base visitor class to visit an MWStep hierarchy. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MWSTEPVISITOR_H +#define LOFAR_MWCOMMON_MWSTEPVISITOR_H + +#include <boost/shared_ptr.hpp> +#include <string> +#include <map> + +namespace LOFAR { namespace CEP { + + //# Forward Declarations + class MWStep; + class MWMultiStep; + class MWGlobalStep; + class MWLocalStep; + + + /// @ingroup mwcommon + /// @brief Base visitor class to visit an MWStep hierarchy. + + /// This is a class to traverse a MWStep composite using the visitor + /// pattern (see Design Patterns, Gamma et al, 1995). + /// It is the base class for all visitor classes. + /// + /// For each step in the composite, a visitXXX function is called where + /// XXX is the step type. In this way many different visitors can be + /// used without the need of implementing such functions in the MWStep + /// classes. The downside is that a visitYYY function needs to be added + /// to all visitor classes if an new step type YYY is created. + + class MWStepVisitor + { + public: + /// Define the visit function for an arbitrary MWStep object. + typedef void VisitFunc (MWStepVisitor&, const MWStep&); + + /// Destructor. + virtual ~MWStepVisitor(); + + /// Visit the different predefined step types. + /// The default implementation throws an exception that the step cannot + /// be handled. + virtual void visitMulti (const MWMultiStep&); + virtual void visitGlobal (const MWGlobalStep&); + virtual void visitLocal (const MWLocalStep&); + + /// Visit for an arbitrary \a MWStep type. + /// The default implementation calls the \a VisitFunc function which + /// is registered for the type name of the \a MWStep object. + /// If not registered, it calls visitStep. + virtual void visit (const MWStep&); + + /// Visit for an arbitrary \a MWStep type. + /// The default implementation throws an exception that the step cannot + /// be handled. + virtual void visitStep (const MWStep&); + + /// Register a visit function for an MWStep with the given name. + /// This can be used for other types of MWStep objects. + /// The given function will usually be a static function in a derived + /// visitor class calling a class member function. It can look like: + /// <pre> + /// void MyVisitor::doXXX (MWStepVisitor& visitor, const MWStep& step) + /// { dynamic_cast<MyVisitor&>(visitor).visitXXX( + /// dynamic_cast<const MWSTepXXX&>(step)); } + /// </pre> + /// The casts are kind of ugly, but unavoidable. + /// The doXXX functions can be registered by the constructor. + void registerVisit (const std::string& name, VisitFunc*); + + private: + std::map<std::string, VisitFunc*> itsMap; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/Makefile.am b/CEP/BB/MWCommon/include/MWCommon/Makefile.am new file mode 100644 index 00000000000..08390d105ce --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/Makefile.am @@ -0,0 +1,37 @@ +pkginclude_HEADERS = Package__Version.h \ +ClusterDesc.h \ +ControllerBase.h \ +Controller.h \ +DomainShape.h \ +MPIConnection.h \ +MPIConnectionSet.h \ +MWBlobIO.h \ +MWConnection.h \ +MWConnectionSet.h \ +MWError.h \ +MWGlobalStep.h \ +MWIos.h \ +MWLocalStep.h \ +MWMultiStep.h \ +MWStep.h \ +MWStepFactory.h \ +MWStepVisitor.h \ +MasterControl.h \ +MemConnection.h \ +MemConnectionSet.h \ +NodeDesc.h \ +ObsDomain.h \ +ParameterHandler.h \ +SocketConnection.h \ +SocketConnectionSet.h \ +SocketListener.h \ +VdsDesc.h \ +VdsPartDesc.h \ +WorkDomainSpec.h \ +WorkerControl.h \ +WorkerFactory.h \ +WorkerInfo.h \ +WorkerProxy.h \ +WorkersDesc.h + +include $(top_srcdir)/Makefile.common diff --git a/CEP/BB/MWCommon/include/MWCommon/MasterControl.h b/CEP/BB/MWCommon/include/MWCommon/MasterControl.h new file mode 100644 index 00000000000..282a4dfd372 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MasterControl.h @@ -0,0 +1,125 @@ +/// @file +/// @brief Master control of a distributed process. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MASTERCONTROL_H +#define LOFAR_MWCOMMON_MASTERCONTROL_H + +#include <MWCommon/MWStepVisitor.h> +#include <MWCommon/MWStep.h> +#include <MWCommon/ObsDomain.h> +#include <MWCommon/WorkDomainSpec.h> +#include <MWCommon/MWConnectionSet.h> +#include <MWCommon/ParameterHandler.h> +#include <string> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Master control of a distributed process. + + /// This class does the overall control of the master/worker framework. + /// It defines the basic operations (see the enum) that can be done by the + /// workers. + /// + /// Its operations are as follows: + /// <ol> + /// <li> The \a setInitInfo function sends the basic info to all + /// workers like the name of the VDS to be used. + /// <li> The \a setWorkDomain function defines the work domain info + /// in a WorkDomainSpec object. + /// <li> The processSteps function does the actual processing. + /// It loops over the entire observation domain in work domain chunks. + /// For each work domain it loops over the steps to be processed. + /// This is done by using the MasterControl as a visitor to an MWStep. + /// <li> After all steps are processed, it sends a quit command to the + /// workers. + /// </ol> + /// As said above, a step is processed by using the MasterControl as + /// an MWStepVisitor object. Usually step maps directly to an operation + /// and prcoessing the step simply consists of sending a single command + /// to the workers. + /// However, in case of a solve it is more ivolved. + /// It consists of sending multiple operations to localWorkers and globalWorker + /// and testing if the globalWorker has converged. This is all handled in the + /// \a visitSolve function. + /// + /// Instead of using MasterControl as the visitor, it might also be + /// possible to pass a visitor object to the MasterControl. However, + /// apart from processing the steps the MasterControl is doing hardly + /// anything at all, so it might be better to have anther XXXControl + /// class resembling this one. + /// (It might be better to rename MasterControl to BBSControl as it is + /// modeled after the BBSKernel functionality). + + class MasterControl: public MWStepVisitor + { + public: + /// Define the possible standard operations. + enum Operation { + /// initialize + Init=1, + /// set work domain + SetWd, + /// process a step + Step, + /// global initial info (e.g. solvable parm info) + GlobalInit, + /// get all iteration info (e.g. normal equations) + GlobalInfo, + /// execute a global iteration (e.g. solve) + GlobalExec, + /// end the processing of a work domain + EndWd + }; + + /// Provide a descriptive string for the standard operations + /// @param op Enumeration to be described + friend std::ostream& operator<<(std::ostream& os, MasterControl::Operation op); + + /// Create the master control with the given localWorker and globalWorker + /// connections. + MasterControl (const MWConnectionSet::ShPtr& localWorkers, + const MWConnectionSet::ShPtr& globalWorkers); + + ~MasterControl(); + + /// Set the MS name to process. + void setInitInfo (const ParameterSet& parms, + const std::vector<std::string>& dataPartNames, + const ObsDomain&); + + /// Set the work domain specification. + void setWorkDomainSpec (const WorkDomainSpec&); + + /// Process a step (which can consist of multiple steps). + void processSteps (const MWStep&); + + /// End the processing. + void quit(); + + private: + /// Process the various MWStep types. + /// @{ + virtual void visitGlobal (const MWGlobalStep&); + virtual void visitLocal (const MWLocalStep&); + /// @} + + /// Read the result from all localWorkers and/or globalWorkers. + /// This is merely to see if the workers have performed the step. + void readAllWorkers (bool localWorkers, bool globalWorkers); + + //# Data members. + ObsDomain itsFullDomain; + WorkDomainSpec itsWds; + MWConnectionSet::ShPtr itsLocalWorkers; + MWConnectionSet::ShPtr itsGlobalWorkers; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MemConnection.h b/CEP/BB/MWCommon/include/MWCommon/MemConnection.h new file mode 100644 index 00000000000..de43b330eb3 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MemConnection.h @@ -0,0 +1,69 @@ +/// @file +/// @brief Connection to workers based on memory. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MEMCONNECTION_H +#define LOFAR_MWCOMMON_MEMCONNECTION_H + +#include <MWCommon/MWConnection.h> +#include <MWCommon/WorkerProxy.h> +#include <Blob/BlobString.h> +#include <boost/shared_ptr.hpp> + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Connection to workers based on memory. + + /// This class acts as the MW communication mechanism in memory. + /// It makes it possible to use the MW framework in a single process + /// which makes debugging easier. + /// + /// It is used in the same way as a SocketConnection or MPIConnection, but + /// because everything is synchronous in a single process, a WorkerProxy + /// object must be registered with the connection. Its \a handleData function + /// function is immediately called when data are sent. + /// The result is stored in a buffer in the MemConnection object, which + /// can thereafter be read. + /// After a read the buffer is cleared to ensure it is not read twice + /// (as is also the case in a 'normal' connection). + + class MemConnection: public MWConnection + { + public: + /// Define a shared pointer to this object. + typedef boost::shared_ptr<MemConnection> ShPtr; + + /// Set up a connection to the given destination and attach a worker. + explicit MemConnection (const WorkerProxy::ShPtr& worker); + + virtual ~MemConnection(); + + /// Get the length of the message. + /// It returns the length of the data in the result buffer. + virtual int getMessageLength(); + + /// Receive the data (i.e. the result of a worker from \a itsResult). + /// The \a itsResult buffer is cleared hereafter. + virtual void receive (void* buf, unsigned size); + + /// Write the data and process it by the worker. + /// The result is stored in \a itsResult. + virtual void write (const LOFAR::BlobString& buf); + + private: + /// This function cannot be called as \a write is implemented. + virtual void send (const void* buf, unsigned size); + + WorkerProxy::ShPtr itsWorker; + LOFAR::BlobString itsResult; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/MemConnectionSet.h b/CEP/BB/MWCommon/include/MWCommon/MemConnectionSet.h new file mode 100644 index 00000000000..8765fcdcbd2 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/MemConnectionSet.h @@ -0,0 +1,69 @@ +/// @file +/// @brief Set of Memory connections. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_MEMCONNECTIONSET_H +#define LOFAR_MWCOMMON_MEMCONNECTIONSET_H + +#include <MWCommon/MWConnectionSet.h> +#include <MWCommon/MemConnection.h> +#include <vector> + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Set of Memory connections. + + /// This class represents a set of memory connections. Typically it is used + /// to group connections to workers of a specific type. + /// In practice memory connections will hardly be used, but they come + /// in handy for debugging purposes. + + class MemConnectionSet: public MWConnectionSet + { + public: + /// Set up a connection set to destinations with the given tag. + /// The tag can be used to define the type of destination + /// (e.g. prediffer or solver). + explicit MemConnectionSet(); + + virtual ~MemConnectionSet(); + + /// Clone the derived object to contain only the connections + /// as indexed in the given vector. + virtual MWConnectionSet::ShPtr clone(const std::vector<int>&) const; + + /// Add a connection to the given worker. + /// It returns the sequence nr of the connection. + int addConnection (const WorkerProxy::ShPtr& worker); + + /// Get the number of connections. + virtual int size() const; + + /// Get seqnr of connection that is ready to receive. + /// Is not really useful for this type of connection, so always returns 0. + virtual int getReadyConnection(); + + /// Read the data into the BlobString buffer using the connection + /// with the given sequence nr. + virtual void read (int seqnr, LOFAR::BlobString&); + + /// Write the data from the BlobString buffer using the connection + /// with the given sequence nr. + virtual void write (int seqnr, const LOFAR::BlobString&); + + /// Write the data from the BlobString buffer to all connections. + virtual void writeAll (const LOFAR::BlobString&); + + private: + std::vector<MemConnection::ShPtr> itsConns; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/NodeDesc.h b/CEP/BB/MWCommon/include/MWCommon/NodeDesc.h new file mode 100644 index 00000000000..2e67771d909 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/NodeDesc.h @@ -0,0 +1,65 @@ +/// @file +/// @brief Description of a node in a cluster. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_NODEDESC_H +#define LOFAR_MWCOMMON_NODEDESC_H + +//# Includes +#include <MWCommon/ParameterHandler.h> +#include <string> +#include <vector> +#include <iosfwd> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Description of a node in a cluster. + + /// This class holds the basic description of a node. + /// It tells the name of the node and which file systems it has access to. + /// + /// Currently the information is made persistent in a LOFAR .parset file. + /// In the future it needs to use the Centrol Processor Resource Manager. + + class NodeDesc + { + public: + /// Construct an empty object. + NodeDesc() + {} + + /// Construct from the given parameterset. + explicit NodeDesc (const ParameterSet&); + + /// Set node name. + void setName (const std::string& name) + { itsName = name; } + + /// Add a file system it has access to. + void addFileSys (const std::string& fsName) + { itsFileSys.push_back (fsName); } + + /// Write it in parset format. + void write (std::ostream& os, const std::string& prefix) const; + + /// Get the name. + const std::string& getName() const + { return itsName; } + + /// Get the file systems it has access to. + const std::vector<std::string>& getFileSys() const + { return itsFileSys; } + + private: + std::string itsName; //# full name of the node + std::vector<std::string> itsFileSys; //# name of file systems + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/ObsDomain.h b/CEP/BB/MWCommon/include/MWCommon/ObsDomain.h new file mode 100644 index 00000000000..f71a96c806f --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/ObsDomain.h @@ -0,0 +1,85 @@ +/// @file +/// @brief Define the boundary values of a domain. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_OBSDOMAIN_H +#define LOFAR_MWCOMMON_OBSDOMAIN_H + +#include <MWCommon/DomainShape.h> +#include <Blob/BlobOStream.h> +#include <Blob/BlobIStream.h> +#include <iosfwd> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Define the boundary values of a domain. + + /// This class defines the boundaries of an observation domain. + /// Currently it only defines a single range in time and freq. + /// In the future it will probably need to be extended to multiple bands. + /// + /// Furthermore it offers a function to get the next work domain + /// given a work domain shape defined by a DomainShape object. + /// The master control uses this function to iterate over work domains. + + class ObsDomain + { + public: + /// Set default shape to all frequencies and times. + ObsDomain(); + + /// Form the starting work domain from the full observation domain + /// and the work domain shape. + ObsDomain (const ObsDomain& fullDomain, + const DomainShape& workDomainShape); + + /// Set frequency range (in Hz). + void setFreq (double startFreq, double endFreq); + + /// Set time range (in sec). + void setTime (double startTime, double endTime); + + /// Get the values. + /// @{ + double getStartFreq() const + { return itsStartFreq; } + double getEndFreq() const + { return itsEndFreq; } + double getStartTime() const + { return itsStartTime; } + double getEndTime() const + { return itsEndTime; } + /// @} + + /// Go to the next work domain. + /// Return false if no more work domains. + bool getNextWorkDomain (ObsDomain& workDomain, + const DomainShape& workDomainShape) const; + + /// Convert to/from blob. + /// @{ + friend LOFAR::BlobOStream& operator<< (LOFAR::BlobOStream& bs, + const ObsDomain& ds); + friend LOFAR::BlobIStream& operator>> (LOFAR::BlobIStream& bs, + ObsDomain&); + /// @} + + /// Print the object. + friend std::ostream& operator<< (std::ostream& os, + const ObsDomain& ds); + + private: + double itsStartFreq; + double itsEndFreq; + double itsStartTime; + double itsEndTime; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/ParameterHandler.h b/CEP/BB/MWCommon/include/MWCommon/ParameterHandler.h new file mode 100644 index 00000000000..2f76260d995 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/ParameterHandler.h @@ -0,0 +1,104 @@ +/// @file +/// @brief Handle a LOFAR .parset file +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_PARAMETERHANDLER_H +#define LOFAR_MWCOMMON_PARAMETERHANDLER_H + +#include <APS/ParameterSet.h> +#include <Blob/BlobIStream.h> +#include <Blob/BlobOStream.h> + +namespace LOFAR { namespace CEP { + + //# Put ParameterSet into LOFAR::CEP namespace for ease of use. + using LOFAR::ACC::APS::ParameterSet; + + /// @ingroup mwcontrol + /// @brief Handle a LOFAR .parset file + + /// This class handles the processing of a LOFAR .parset file + /// It augments the LOFAR ParameterSet class with functions that can deal + /// with undefined parameters. There is a set of functions that return + /// a default value if undefined and a set of functions that leave the + /// value untouched if undefined. + + class ParameterHandler + { + public: + ParameterHandler (const ParameterSet&); + + /// Get a parameter value. + /// An exception is thrown if it does not exist. + /// @{ + std::string getString (const std::string& parm) const; + double getDouble (const std::string& parm) const; + unsigned getUint (const std::string& parm) const; + bool getBool (const std::string& parm) const; + std::vector<std::string> getStringVector (const std::string& parm) const; + /// @} + + /// Get a parameter value. + /// If it does not exist, the default value is used instead. + /// @{ + std::string getString (const std::string& parm, + const std::string& defVal) const; + double getDouble (const std::string& parm, + double defVal) const; + unsigned getUint (const std::string& parm, + unsigned defVal) const; + bool getBool (const std::string& parm, + bool defVal) const; + std::vector<std::string> getStringVector + (const std::string& parm, const std::vector<std::string>& defVal) const; + /// @} + + /// Get a parameter value and fill \a value with it. + /// If it does not exist, nothing is done. + /// @{ + void fillString (const std::string& parm, + std::string& value) const; + void fillDouble (const std::string& parm, + double& value) const; + void fillUint (const std::string& parm, + unsigned& value) const; + void fillBool (const std::string& parm, + bool& value) const; + void fillStringVector (const std::string& parm, + std::vector<std::string>& value) const; + /// @} + + // Convert automatically to a ParameterSet. + operator const ParameterSet& () const + { return itsParms; } + + protected: + ParameterSet itsParms; + }; + + + // Write/read a ParameterSet into/from a blob. + // @{ + LOFAR::BlobOStream& operator<< (LOFAR::BlobOStream&, const ParameterSet&); + LOFAR::BlobIStream& operator>> (LOFAR::BlobIStream&, ParameterSet&); + // @} + + inline std::string ParameterHandler::getString (const std::string& parm) const + { return itsParms.getString (parm); } + inline double ParameterHandler::getDouble (const std::string& parm) const + { return itsParms.getDouble (parm); } + inline unsigned ParameterHandler::getUint (const std::string& parm) const + { return itsParms.getUint32 (parm); } + inline bool ParameterHandler::getBool (const std::string& parm) const + { return itsParms.getBool (parm); } + inline std::vector<std::string> ParameterHandler::getStringVector + (const std::string& parm) const + { return itsParms.getStringVector (parm); } + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/SocketConnection.h b/CEP/BB/MWCommon/include/MWCommon/SocketConnection.h new file mode 100644 index 00000000000..a1804dd41d5 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/SocketConnection.h @@ -0,0 +1,81 @@ +/// @file +/// @brief Connection to workers based on a socket. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_SOCKETCONNECTION_H +#define LOFAR_MWCOMMON_SOCKETCONNECTION_H + +#include <MWCommon/MWConnection.h> +#include <Common/Net/Socket.h> +#include <boost/shared_ptr.hpp> +#include <string> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Connection to workers based on a socket. + + /// This class handles the socket connection between two processes. + /// For a client it can set up the connection to a server on a given + /// host and port. + /// For a server it can hold the connection created by SocketListener. + /// + /// It is meant to send and receive blobs. The length of a message to + /// receive is read (by base class MWConnection) from the blob header. + + class SocketConnection: public MWConnection + { + public: + /// Define a shared pointer to this object. + typedef boost::shared_ptr<SocketConnection> ShPtr; + + /// Set up the client side of a connection. + /// Upon the first send or receive it connects to the server + /// on the given host and port. + /// If the making the connection fails, it will sleep one second and try + /// again for up to 60 attempts. In this way the case is handled + /// where a server is started a bit later than a client. + SocketConnection (const std::string& hostName, const std::string& port); + + /// Add a socket from the server when it accepted a connection + /// (used by SocketListener). + /// It takes over the ownership of the pointer. + explicit SocketConnection (LOFAR::Socket*); + + virtual ~SocketConnection(); + + /// Check the state of the connection. + virtual bool isConnected() const; + + /// Get the length of the message. + /// Always returns -1 indicating that the length has to be read + /// from the header. + virtual int getMessageLength(); + + /// Receive the data sent by the destination + /// and wait until data has been received into buf. + virtual void receive (void* buf, unsigned size); + + /// Send the data to the destination + /// and wait until the data has been sent. + virtual void send (const void* buf, unsigned size); + + /// Get the name of the host this process is running on. + /// If sockets are not supported (e.g. Cray), it returns an empty string. + static std::string getHostName(); + + private: + /// Initialize the connection. + void init(); + + LOFAR::Socket itsConnSocket; + LOFAR::Socket* itsDataSocket; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/SocketConnectionSet.h b/CEP/BB/MWCommon/include/MWCommon/SocketConnectionSet.h new file mode 100644 index 00000000000..901a0aab91f --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/SocketConnectionSet.h @@ -0,0 +1,81 @@ +/// @file +/// @brief Set of socket connections. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_SOCKETCONNECTIONSET_H +#define LOFAR_MWCOMMON_SOCKETCONNECTIONSET_H + +#include <MWCommon/MWConnectionSet.h> +#include <MWCommon/SocketListener.h> +#include <MWCommon/SocketConnection.h> +#include <vector> + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Set of socket connections. + + /// This class represents a set of socket connections. Typically it is used + /// to group connections to workers of a specific type. + /// The main reason for having this class is the ability to check if any + /// connection in the group is ready to receive data (i.e. if the other + /// side of the connection has sent data). This is done using the \a select + /// function on the fd-s of the sockets in the set. + /// + /// The SocketConnectionSet class creates a socket listener. Thus it is + /// the server side of a connection and is typically used by the master + /// control. + /// + /// @todo Implement getReadyConnection. + + class SocketConnectionSet: public MWConnectionSet + { + public: + /// Set up a connection set for a server. + /// It creates a SocketListener object on the given port. + explicit SocketConnectionSet (const std::string& port); + + /// Set up a connection from an existing SocketListener. + /// It makes a (shallow) copy of the listener object. + explicit SocketConnectionSet (const SocketListener&); + + virtual ~SocketConnectionSet(); + + /// Clone the derived object to contain only the connections + /// as indexed in the given vector. + virtual MWConnectionSet::ShPtr clone(const std::vector<int>&) const; + + /// Accept connections from the given number of clients to the server. + void addConnections (int nr); + + /// Get the number of connections. + virtual int size() const; + + /// Get seqnr of connection that is ready to receive. + /// <0 means no connection ready yet. + virtual int getReadyConnection(); + + /// Read the data into the BlobString buffer using the connection + /// with the given sequence nr. + virtual void read (int seqnr, LOFAR::BlobString&); + + /// Write the data from the BlobString buffer using the connection + /// with the given sequence nr. + virtual void write (int seqnr, const LOFAR::BlobString&); + + /// Write the data from the BlobString buffer to all connections. + virtual void writeAll (const LOFAR::BlobString&); + + private: + SocketListener itsListener; + std::vector<SocketConnection::ShPtr> itsConns; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/SocketListener.h b/CEP/BB/MWCommon/include/MWCommon/SocketListener.h new file mode 100644 index 00000000000..3113c4d6f3f --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/SocketListener.h @@ -0,0 +1,45 @@ +/// @file +/// @brief Class that creates a socket and accepts connections. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_SOCKETLISTENER_H +#define LOFAR_MWCOMMON_SOCKETLISTENER_H + +#include <MWCommon/SocketConnection.h> +#include <Common/Net/Socket.h> +#include <boost/shared_ptr.hpp> +#include <string> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Class that creates a socket and accepts connections. + + /// This class sets up a socket listener. It is used by SocketConnectionSet + /// to accept connection requests from workers. + /// + /// Internally the class uses a shared pointer to a socket object. + /// It means that a copy of a SocketListener object can be made, but that + /// copies share the same underlying socket object. + + class SocketListener + { + public: + /// Set up the server side of a listener. + explicit SocketListener (const std::string& port); + + /// Listen to a connection and accept it. + /// It blocks until another process wants to connect. + SocketConnection::ShPtr accept(); + + private: + boost::shared_ptr<LOFAR::Socket> itsConnSocket; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/VdsDesc.h b/CEP/BB/MWCommon/include/MWCommon/VdsDesc.h new file mode 100644 index 00000000000..aefa2608557 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/VdsDesc.h @@ -0,0 +1,83 @@ +/// @file +/// @brief Describe an entire visibility data set +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_VDSDESC_H +#define LOFAR_MWCOMMON_VDSDESC_H + +//# Includes +#include <MWCommon/VdsPartDesc.h> +#include <casa/Utilities/Regex.h> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Describe an entire visibility data set + + /// This class holds the description of an entire visibility data set (VDS). + /// In VdsPartDesc objects it describes the parts it consists of and + /// on which file systems they are located. + /// A VdsPartDesc object is also used to describe the entire VDS. + /// Furthermore it contains the names of all antennae, which can be used + /// to map the antenna name to the antenna number when a selection on + /// antenna names is done. + /// + /// Currently the information is made persistent in a LOFAR .parset file. + /// In the future it needs to use the Centrol Processor Resource Manager. + + class VdsDesc + { + public: + /// Construct with a description of the entire visibility data set. + /// Also supply a vector mapping antenna name to number. + VdsDesc (const VdsPartDesc&, const std::vector<std::string>& antNames); + + /// Construct from the given parameterset. + /// @{ + explicit VdsDesc (const std::string& parsetName); + explicit VdsDesc (const ParameterSet& parset) + { init (parset); } + /// @} + + /// Add a part. + void addPart (const VdsPartDesc& part) + { itsParts.push_back (part); } + + /// Get the description of the parts. + const std::vector<VdsPartDesc>& getParts() const + { return itsParts; } + + /// Get the description of the VDS. + const VdsPartDesc& getDesc() const + { return itsDesc; } + + /// Get antennas names. + const std::vector<std::string>& getAntNames() const + { return itsAntNames; } + + /// Convert an antenna name to its index. + /// -1 is returned if not found. + int antNr (const std::string& name) const; + + /// Convert an antenna regex to indices. + std::vector<int> antNrs (const casa::Regex& names) const; + + /// Write it in parset format. + void write (std::ostream& os) const; + + private: + /// Fill the object from the given parset file. + void init (const ParameterSet& parset); + + VdsPartDesc itsDesc; + std::vector<VdsPartDesc> itsParts; + std::vector<std::string> itsAntNames; //# maps antennanr to name + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/VdsPartDesc.h b/CEP/BB/MWCommon/include/MWCommon/VdsPartDesc.h new file mode 100644 index 00000000000..0deb1d5f1e1 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/VdsPartDesc.h @@ -0,0 +1,100 @@ +/// @file +/// @brief Description of a visibility data set or part thereof. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_VDSPARTDESC_H +#define LOFAR_MWCOMMON_VDSPARTDESC_H + +//# Includes +#include <MWCommon/ParameterHandler.h> +#include <string> +#include <vector> +#include <iosfwd> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Description of a visibility data set or part thereof. + + /// This class holds the description of a visibility data set (VDS) part. + /// It defines the name of the part and on which file system it is located. + /// Using the ClusterDesc object it can be derived on which node this + /// VDS part can be processed best. This is done by the WorkersDesc + /// class. + /// + /// The description of the VDS also contains info about the time, + /// frequency, and baseline domain of the visibility data. + /// + /// Currently the information is made persistent in a LOFAR .parset file. + /// In the future it needs to use the Centrol Processor Resource Manager. + + class VdsPartDesc + { + public: + /// Construct an empty object. + VdsPartDesc() + : itsStartTime(0), itsEndTime(0) + {} + + /// Construct from the given parameterset. + explicit VdsPartDesc (const ParameterSet&); + + /// Set VDS name and file system. + void setName (const std::string& name, const std::string& fileSys); + + /// Set the start and end time. + void setTimes (double startTime, double endTime); + + /// Add a band. + void addBand (int nchan, double startFreq, double endFreq); + + /// Set the baselines. + void setBaselines (const std::vector<int>& ant1, + const std::vector<int>& ant2); + + /// Write it in parset format. + void write (std::ostream& os, const std::string& prefix) const; + + /// Get the values. + /// @{ + const std::string& getName() const + { return itsName; } + const std::string& getFileSys() const + { return itsFileSys; } + double getStartTime() const + { return itsStartTime; } + double getEndTime() const + { return itsEndTime; } + int getNBand() const + { return itsNChan.size(); } + const std::vector<int>& getNChan() const + { return itsNChan; } + const std::vector<double>& getStartFreqs() const + { return itsStartFreqs; } + const std::vector<double>& getEndFreqs() const + { return itsEndFreqs; } + const std::vector<int>& getAnt1() const + { return itsAnt1; } + const std::vector<int>& getAnt2() const + { return itsAnt2; } + /// @} + + private: + std::string itsName; //# full name of the VDS + std::string itsFileSys; //# name of file system the VDS resides on + double itsStartTime; + double itsEndTime; + std::vector<int> itsNChan; //# nr of channels per band + std::vector<double> itsStartFreqs; //# start freq of each band + std::vector<double> itsEndFreqs; //# end freq of each band + std::vector<int> itsAnt1; //# 1st antenna of each baseline + std::vector<int> itsAnt2; //# 2nd antenna of each baseline + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/WorkDomainSpec.h b/CEP/BB/MWCommon/include/MWCommon/WorkDomainSpec.h new file mode 100644 index 00000000000..048921337d0 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/WorkDomainSpec.h @@ -0,0 +1,127 @@ +/// @file +/// @brief Define the specifications of the work domain. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_WORKDOMAINSPEC_H +#define LOFAR_MWCOMMON_WORKDOMAINSPEC_H + +#include <MWCommon/DomainShape.h> +#include <Blob/BlobOStream.h> +#include <Blob/BlobIStream.h> +#include <vector> +#include <string> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Define the specifications of the work domain. + + /// This class defines the properties of a work domain. It contains: + /// <ul> + /// <li> The size in time and freq. + /// <li> The integration to be done in time and/or freq. + /// <li> The input data column. + /// <li> The antenna numbers or names to be selected. + /// <li> If autocorrelations between antennas are to be selected. + /// <li> The polarisation correlations to be selected. + /// </ul> + /// A work domain defines the amount of data a worker can hold in memory. + /// + /// The control will iterate over the entire data set in chunk of the + /// work domain size. For each chunk it will perform the steps as defined + /// by an MWMultiStep object on the data in the work domain or a subset + /// of them. + + class WorkDomainSpec + { + public: + /// Default constructor (for containers). + WorkDomainSpec() + : itsInColumn("DATA"), + itsAutoCorr(false) + {} + + /// Set/get work domain shape. + /// @{ + void setShape (const DomainShape& shape) + { itsShape = shape; } + const DomainShape& getShape() const + { return itsShape; } + /// @} + + /// Set/get integration interval in frequency or time. + /// @{ + void setFreqIntegration (double hz) + { itsFreqInt = hz; } + void setTimeIntegration (double sec) + { itsTimeInt = sec; } + double getFreqIntegration() const + { return itsFreqInt; } + double getTImeIntegration() const + { return itsTimeInt; } + /// @} + + /// Set/get the input data column to use. + /// @{ + void setInColumn (const std::string& inColumn) + { itsInColumn = inColumn; } + const std::string& getInColumn() const + { return itsInColumn; } + /// @} + + /// Set/get the antennas to use (0-based numbers). + /// @{ + void setAntennas (const std::vector<int>& antNrs); + const std::vector<int>& getAntennas() const + { return itsAntNrs; } + /// @} + + /// Set/get antennas by name patterns. + /// Each name can be a filename-like pattern. + /// @{ + void setAntennaNames (const std::vector<std::string>& antNamePatterns); + const std::vector<std::string>& getAntennaNames() const + { return itsAntNames; } + /// @} + + /// Set/get the autocorrelations flag. + /// @{ + void setAutoCorr (bool autoCorr) + { itsAutoCorr = autoCorr; } + bool getAutoCorr() const + { return itsAutoCorr; } + /// @} + + /// Set/get the correlations to use. + /// @{ + void setCorr (const std::vector<bool>& corr); + const std::vector<bool>& getCorr() const + { return itsCorr; } + /// @} + + /// Write or read the object into/from a blob stream. + /// @{ + friend LOFAR::BlobOStream& operator<< (LOFAR::BlobOStream&, + const WorkDomainSpec&); + friend LOFAR::BlobIStream& operator>> (LOFAR::BlobIStream&, + WorkDomainSpec&); + /// @} + + private: + std::string itsInColumn; + std::vector<int> itsAntNrs; + std::vector<std::string> itsAntNames; + bool itsAutoCorr; + std::vector<bool> itsCorr; + DomainShape itsShape; + double itsFreqInt; + double itsTimeInt; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/WorkerControl.h b/CEP/BB/MWCommon/include/MWCommon/WorkerControl.h new file mode 100644 index 00000000000..694207c5a0f --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/WorkerControl.h @@ -0,0 +1,46 @@ +/// @file +/// @brief High level worker control. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_WORKERCONTROL_H +#define LOFAR_MWCOMMON_WORKERCONTROL_H + +#include <MWCommon/WorkerProxy.h> +#include <MWCommon/MWConnection.h> + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief High level worker control. + + /// This class if the high level control of a proxy worker. + /// The \a init function sets up the connection and does the initialisation. + /// The \a run function receives commands from the master + /// control, lets the proxy execute them, and sends replies back. + /// When the quit command is received, the \a run function will end. + + class WorkerControl + { + public: + /// Construct with the given proxy, that will execute the commands. + WorkerControl (const WorkerProxy::ShPtr& proxy); + + /// Initialise the connection and send an init message to the master. + void init (const MWConnection::ShPtr& connection); + + /// Receive and execute messages until an end message is received. + void run(); + + private: + MWConnection::ShPtr itsConnection; + WorkerProxy::ShPtr itsProxy; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/WorkerFactory.h b/CEP/BB/MWCommon/include/MWCommon/WorkerFactory.h new file mode 100644 index 00000000000..062bb58f375 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/WorkerFactory.h @@ -0,0 +1,48 @@ +/// @file +/// @brief Factory pattern to generate a WorkerProxy object. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_WORKERFACTORY_H +#define LOFAR_MWCOMMON_WORKERFACTORY_H + +#include <MWCommon/WorkerProxy.h> +#include <map> +#include <string> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Factory pattern to generate a WorkerProxy object. + + /// This class contains a map of names to \a create functions + /// of derived WorkerProxy objects. It is used to construct the correct + /// WorkerProxy object given a type name. + /// In this way one can choose which worker to use. For example, it makes + /// it possible to use simple test workers to process prediffer and + /// solver operations to check the control logic. + + class WorkerFactory + { + public: + /// Define the signature of the function to create the worker. + typedef WorkerProxy::ShPtr Creator (); + + /// Add a creator function. + void push_back (const std::string& name, Creator*); + + /// Create the object of the given name. + /// An exception is thrown if the name is not in the map. + WorkerProxy::ShPtr create (const std::string& name) const; + + private: + std::map<std::string, Creator*> itsMap; + }; + + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/WorkerInfo.h b/CEP/BB/MWCommon/include/MWCommon/WorkerInfo.h new file mode 100644 index 00000000000..a0b3c002fe6 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/WorkerInfo.h @@ -0,0 +1,72 @@ +/// @file +/// @brief Information about a worker. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_WORKERINFO_H +#define LOFAR_MWCOMMON_WORKERINFO_H + +#include <string> +#include <vector> + +//# Forward Declarations. +namespace LOFAR { + class BlobOStream; + class BlobIStream; +} + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Information about a worker. + + /// This class contains the information describing a worker. + /// It contains the name of the host it is running on and a vector + /// with the types of work in can perform. Currently only the first + /// work type is taken into account. + /// + /// @todo Take all work types into account. + + class WorkerInfo + { + public: + /// Creatye empty object. + WorkerInfo(); + + /// Construct the object from the given info. + WorkerInfo (const std::string& hostName, + const std::vector<int>& workTypes); + + ~WorkerInfo(); + + /// Get the host name. + const std::string& getHostName() const + { return itsHostName; } + + /// Get the work types. + const std::vector<int>& getWorkTypes() const + { return itsWorkTypes; } + + /// Get the first work type. Returns 0 if no work types. + int getWorkType() const; + + /// Read or write the info from/into a blob. + /// @{ + friend LOFAR::BlobOStream& operator<< (LOFAR::BlobOStream& bs, + const WorkerInfo& info); + friend LOFAR::BlobIStream& operator>> (LOFAR::BlobIStream& bs, + WorkerInfo& info); + /// @} + + private: + std::string itsHostName; + std::vector<int> itsWorkTypes; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/WorkerProxy.h b/CEP/BB/MWCommon/include/MWCommon/WorkerProxy.h new file mode 100644 index 00000000000..c1af7215457 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/WorkerProxy.h @@ -0,0 +1,93 @@ +/// @file +/// @brief Abstract base class for all worker proxies. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_WORKERPROXY_H +#define LOFAR_MWCOMMON_WORKERPROXY_H + +#include <MWCommon/WorkerInfo.h> +#include <MWCommon/ParameterHandler.h> +#include <string> +#include <vector> +#include <boost/shared_ptr.hpp> + +//# Forward Declarations. +namespace LOFAR { + class BlobString; + class BlobIStream; +} + + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Abstract base class for all worker proxies. + + /// This class is the abstract base class for the possible workers. + /// Usually a worker is a proxy class to a class doing the actual work. + /// The WorkerControl class uses a WorkerProxy to do the actual work. + /// + /// Functions to create a worker proxy from a given type name can be + /// registered in a WorkerFactory object. It gives the user the freedom + /// to choose which function is registered making it possible to use + /// some simple test classes instead of the full-blown real classes to + /// test the control flow. + + class WorkerProxy + { + public: + /// Define a shared pointer to this object. + typedef boost::shared_ptr<WorkerProxy> ShPtr; + + WorkerProxy(); + + virtual ~WorkerProxy(); + + /// Fill the buffer with the worker proxy info (like host and work types). + /// This is used at initialisation time to make the worker capabilities + /// known to the master. + void putWorkerInfo (LOFAR::BlobString& out); + + /// Get the worker info from the blob string. It is used by the master + /// to extract it from a message. + static WorkerInfo getWorkerInfo (LOFAR::BlobString& in); + + /// Process the command and data that has been received in the input + /// buffer and write the possible result into the output buffer. + /// If the input buffer contains the \a quit command, the \a quit function + /// is called and the status \a false is returned. + /// Otherwise the \a process function is called to do the actual + /// processing. + bool handleMessage (const LOFAR::BlobString& in, LOFAR::BlobString& out); + + private: + /// Get the work types supported by the proxy. + virtual std::vector<int> getWorkTypes() const = 0; + + /// Let a derived class set the initial info. + virtual void setInitInfo (const ParameterSet&, + const std::string& dataPartName) = 0; + + /// Let a derived class process the received data. + /// The returned operation will be put into the reply message. + /// If the returned operation is < 0, no reply message will be sent. + virtual int process (int operation, int streamId, + LOFAR::BlobIStream& in, + LOFAR::BlobOStream& out) = 0; + + /// Let a derived class end its processing. + /// The default implementation does nothing. + virtual void quit(); + + + /// The workerId is set at the beginning. + int itsWorkerId; + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/MWCommon/WorkersDesc.h b/CEP/BB/MWCommon/include/MWCommon/WorkersDesc.h new file mode 100644 index 00000000000..a3c2a00a6a0 --- /dev/null +++ b/CEP/BB/MWCommon/include/MWCommon/WorkersDesc.h @@ -0,0 +1,76 @@ +/// @file +/// @brief Description of all workers. +/// +/// @copyright (c) 2007 ASKAP, All Rights Reserved. +/// @author Ger van Diepen (diepen AT astron nl) +/// +//# $Id$ + +#ifndef LOFAR_MWCOMMON_WORKERSDESC_H +#define LOFAR_MWCOMMON_WORKERSDESC_H + +//# Includes +#include <MWCommon/ClusterDesc.h> +#include <string> +#include <vector> + +namespace LOFAR { namespace CEP { + + /// @ingroup mwcommon + /// @brief Description of all workers. + + /// This class holds the description of the workers in an MW run. + /// For each worker it describes on which node it runs and which types + /// of work it can perform. + /// Combined with the cluster description, this information is used + /// to determine which worker can be used to perform a given type of work + /// on data on a given file system. In determining this, it keeps track of + /// the workload to avoid that the same worker is selected again and again. + + class WorkersDesc + { + public: + /// Construct from the given cluster description. + WorkersDesc (const ClusterDesc&); + + /// Add a worker with the given id. + /// It can do the work types given in the vector on the given node. + void addWorker (unsigned id, const std::string& node, + const std::vector<int>& workTypes); + + /// Increase or decrease the load for the given worker. + /// Return the new load. + /// @{ + int incrLoad (unsigned worker) + { return ++itsLoad[worker]; } + int decrLoad (unsigned worker) + { return --itsLoad[worker]; } + /// @} + + /// Find the worker with the lowest load that can perform the given + /// work type for data on the given file system. + /// The file system can be empty indicating that any worker can do it. + /// It returns -1 if no suitable worker could be found. + int findWorker (int workType, const std::string& fileSystem) const; + + private: + /// Map giving the workers on each node. + typedef std::map<std::string,std::vector<unsigned> > MapN2W; + /// Map given the nodes with access to a file system. + typedef std::map<std::string,std::vector<std::string> > MapF2N; + + /// Find worker with lowest load. + /// @{ + int findLowest (const MapN2W& workMap) const; + int findLowest (const MapN2W& workMap, + const std::string& fileSystem) const; + /// @} + + MapF2N itsFS2Nodes; //# map FileSystem to nodes + std::map<int,MapN2W> itsMap; //# map worktype to node/worker + std::vector<int> itsLoad; //# load of each worker (#times used) + }; + +}} /// end namespaces + +#endif diff --git a/CEP/BB/MWCommon/include/Makefile.am b/CEP/BB/MWCommon/include/Makefile.am new file mode 100644 index 00000000000..baddbaf3fce --- /dev/null +++ b/CEP/BB/MWCommon/include/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = MWCommon + +include $(top_srcdir)/Makefile.common diff --git a/CEP/BB/MWCommon/src/Makefile.am b/CEP/BB/MWCommon/src/Makefile.am new file mode 100644 index 00000000000..874d7f42e23 --- /dev/null +++ b/CEP/BB/MWCommon/src/Makefile.am @@ -0,0 +1,51 @@ +lib_LTLIBRARIES = libmwcommon.la + +libmwcommon_la_SOURCES = Package__Version.cc \ +ClusterDesc.cc \ +ControllerBase.cc \ +DomainShape.cc \ +MPIConnection.cc \ +MPIConnectionSet.cc \ +MWBlobIO.cc \ +MWConnection.cc \ +MWConnectionSet.cc \ +MWError.cc \ +MWGlobalStep.cc \ +MWIos.cc \ +MWLocalStep.cc \ +MWMultiStep.cc \ +MWStep.cc \ +MWStepFactory.cc \ +MWStepVisitor.cc \ +MasterControl.cc \ +MemConnection.cc \ +MemConnectionSet.cc \ +NodeDesc.cc \ +ObsDomain.cc \ +ParameterHandler.cc \ +SocketConnection.cc \ +SocketConnectionSet.cc \ +SocketListener.cc \ +VdsDesc.cc \ +VdsPartDesc.cc \ +WorkDomainSpec.cc \ +WorkerControl.cc \ +WorkerFactory.cc \ +WorkerInfo.cc \ +WorkerProxy.cc \ +WorkersDesc.cc + +bin_PROGRAMS = versionmwcommon finddproc + +versionmwcommon_SOURCES = versionmwcommon.cc +versionmwcommon_LDADD = libmwcommon.la +versionmwcommon_DEPENDENCIES = libmwcommon.la $(LOFAR_DEPEND) + +finddproc_SOURCES = finddproc.cc +finddproc_LDADD = libmwcommon.la +finddproc_DEPENDENCIES = libmwcommon.la $(LOFAR_DEPEND) + +scriptdir = $(bindir) +dist_script_SCRIPTS = startdproc socketrun + +include $(top_srcdir)/Makefile.common diff --git a/CEP/BB/MWCommon/src/socketrun b/CEP/BB/MWCommon/src/socketrun new file mode 100755 index 00000000000..e42829af3cf --- /dev/null +++ b/CEP/BB/MWCommon/src/socketrun @@ -0,0 +1,40 @@ +#!/bin/sh + +# socketrun.cc: Start a distributed process using socket connections +# +# @copyright (c) 2008 ASKAP, All Rights Reserved. +# @author Ger van Diepen <diepen AT astron nl> +# +# $Id$ + + +# This script starts a distributed process to process a datasets. +# The processes will use sockets for communication. +# +# run as: socketrun dry hfn port program [arg1 arg2 ...] + +dry=$1 +shift +hfn=$1 +shift +port=$1 +shift +program=$1 +shift + +np=`wc -l $hfn | awk '{print $1}'` + +# The first host is the master one. +masterhost= +rank=0 +for host in `cat $hfn` +do + if test "$masterhost" = ""; then + masterhost=$host + fi + echo "ssh $host $program socket $masterhost $port $np $rank" + if test $dry = 0; then + ssh $host $program socket $masterhost $port $np $rank & + fi + rank=`expr $rank + 1` +done diff --git a/CEP/BB/MWCommon/src/startdistproc b/CEP/BB/MWCommon/src/startdistproc new file mode 100755 index 00000000000..a425bc07368 --- /dev/null +++ b/CEP/BB/MWCommon/src/startdistproc @@ -0,0 +1,170 @@ +#!/bin/sh + +# startdistproc: Start a distributed process +# +# @copyright (c) 2008 ASKAP, All Rights Reserved. +# @author Ger van Diepen <diepen AT astron nl> +# +# $Id$ + + +# This script starts a distributed process to process a datasets. +# It can use sockets or MPI. +# The VdsDesc and ClusterDesc files are used to determine on which nodes +# the various processes need to be started. + + +# Find the path used to start the script. +pgmpath=`dirname $0` +pgmpath=`cd $pgmpath > /dev/null 2>&1 && pwd` + +# Set default options. +modxx=single +dsn= +cdn= +hfn= +fdp= +master=localhost +extra= +dry=1 +help=0 +program= +if [ $# = 0 ]; then + help=1 +fi + +# Handle possible options. +while [ $# != 0 ] +do + if [ "$1" = "-h" -o "$1" = "-help" -o "$1" = "--help" ]; then + help=1 + break + elif [ "$1" = "-dsn" ]; then + shift + dsn="$1" + shift + elif [ "$1" = "-cdn" ]; then + shift + cdn="$1" + shift + elif [ "$1" = "-hfn" ]; then + shift + hfn="$1" + shift + elif [ "$1" = "-fdp" ]; then + shift + fdp="$1" + shift + elif [ "$1" = "-dry" ]; then + shift + dry=1 + elif [ "$1" = "-nodry" ]; then + shift + dry=0 + elif [ "$1" = "-mode" ]; then + shift + modxx="$1" + shift + elif [ "$1" = "-masterhost" ]; then + shift + master="$1" + shift + elif [ "$1" = "-extrahosts" ]; then + shift + extra="$1" + shift + elif [ "$1" = "-noextrahosts" ]; then + shift + extra= + else + case "$1" in + -*) + echo "$1 is an unknown startdistproc option" + exit 1 + ;; + *) + # A program name has been found, so stop parsing options. + program=$1 + shift + break + ;; + esac + fi +done + +if [ $help = 1 ]; then + echo '' + echo 'startdistproc starts a distributed process to process the given dataset.' + echo 'The mode tells if it is started through MPI, sockets, or as a single process.' + echo '' + echo 'Run as:' + echo ' startdistproc -dsn datasetname -cdn clusterdescname [-mode mode]' + echo ' [-hfn hostfilename] [-fdp finddproc-path] [-[no]dry]' + echo ' [-extrahosts "host1 host2 ..."] [-noextrahosts]' + echo ' [-masterhost host] program [arg1 arg2 ...]' + echo '' + echo ' -dsn datasetname The name of the file describing the distributed dataset.' + echo ' -cdn clusterdescname The name of the file describing the cluster.' + echo ' -hfn hostfilename The name of the generated hostfile.' + echo ' It defaults to /tmp/machinefile_$$' + echo ' -fdp finddproc-path Path where to find finddproc.' + echo ' Default is empty.' + echo ' -dry Do a dry run (default is -nodry).' + echo ' -mode mode mode can be mpi, single, or a number.' + echo ' A number denotes that sockets have to be used' + echo ' and defines the port to be used.' + echo ' -masterhost host The name of the host the master runs on.' + echo ' The default is localhost.' + echo ' -extrahosts hosts The hosts to be used for extra processes (e.g. solvers).' + echo ' Multiple hosts have to be separated by whitespace' + echo ' and enclosed in quotes.' + echo ' -h -help --help This help text.' + exit 0 +fi + +if [ "$program" = "" ]; then + echo "Error: no program name given ('startdistproc -h' gives help)" + exit 1 +fi +if test "$dsn" = ""; then + echo "Error: no dataset name given (use -dsn)" + exit 1 +fi +if test "$cdn" = ""; then + echo "Error: no clusterdesc name given (use -cdn)" + exit 1 +fi +if test ! -f "$dsn"; then + echo "Error: dataset $dsn not found" + exit 1 +fi +if test ! -f "$cdn"; then + echo "Error: clusterdesc $cdn not found" + exit 1 +fi +if test "$hfn" = ""; then + hfn="/tmp/machinefile_$$" +fi +if test "$fdp" != ""; then + fdp="$fdp/" +fi + +# Prepend the hosts with the master and extra hosts. +${fdp}finddproc $dsn $cdn $master $extra > $hfn +np=`wc -l $hfn | awk '{print $1}'` + +# Run the program as needed. +if test "$modxx" = "mpi"; then + echo "mpirun -np $np -machinefile $hfn $program $@" + if test $dry = 0; then + mpirun -np $np -machinefile $hfn $program "$@" + fi +elif test "$modxx" = "single"; then + echo "$program $@" + if test $dry = 0; then + $program "$@" + fi +else + echo "socketrun $hfn $program $@" + $pgmpath/socketrun $dry $hfn $modxx $program "$@" +fi diff --git a/CEP/BB/MWCommon/test/Makefile.am b/CEP/BB/MWCommon/test/Makefile.am new file mode 100644 index 00000000000..1affce34fc0 --- /dev/null +++ b/CEP/BB/MWCommon/test/Makefile.am @@ -0,0 +1,38 @@ +check_PROGRAMS = tClusterDesc tNodeDesc tVdsDesc tVdsPartDesc tWorkersDesc \ + tSocketConnection + +CHECKTOOLPROGS = $(CHECK_PROGRAMS) + +TESTS = tClusterDesc.sh tNodeDesc.sh tVdsDesc.sh tVdsPartDesc.sh \ + tWorkersDesc.sh \ + tSocketConnection.sh \ + tfinddproc.sh tstartdproc.sh + +LDADD = ../src/libmwcommon.la + +tClusterDesc_SOURCES = tClusterDesc.cc +tClusterDesc_DEPENDENCIES = ../src/libmwcommon.la $(LOFAR_DEPEND) + +tNodeDesc_SOURCES = tNodeDesc.cc +tNodeDesc_DEPENDENCIES = ../src/libmwcommon.la $(LOFAR_DEPEND) + +tVdsDesc_SOURCES = tVdsDesc.cc +tVdsDesc_DEPENDENCIES = ../src/libmwcommon.la $(LOFAR_DEPEND) + +tVdsPartDesc_SOURCES = tVdsPartDesc.cc +tVdsPartDesc_DEPENDENCIES = ../src/libmwcommon.la $(LOFAR_DEPEND) + +tWorkersDesc_SOURCES = tWorkersDesc.cc +tWorkersDesc_DEPENDENCIES = ../src/libmwcommon.la $(LOFAR_DEPEND) + +tSocketConnection_SOURCES = tSocketConnection.cc +tSocketConnection_DEPENDENCIES = ../src/libmwcommon.la $(LOFAR_DEPEND) + +EXTRA_DIST = tSocketConnection.run tSocketConnection.stdout \ + tfinddproc.in_vd tfinddproc.in_cd \ + tfinddproc.run tfinddproc.stdout + tstartdproc.in_vd tstartdproc.in_cd \ + tstartdproc.run tstartdproc.stdout + +include $(top_srcdir)/Makefile.common + diff --git a/CEP/BB/MWCommon/test/tClusterDesc.cc b/CEP/BB/MWCommon/test/tClusterDesc.cc new file mode 100644 index 00000000000..6c7799f342e --- /dev/null +++ b/CEP/BB/MWCommon/test/tClusterDesc.cc @@ -0,0 +1,76 @@ +//# tClusterDesc.cc: Test program for class ClusterDesc +//# +//# Copyright (C) 2007 +//# +//# $Id$ + +#include <MWCommon/ClusterDesc.h> +#include <Common/LofarLogger.h> +#include <ostream> +#include <fstream> + +using namespace LOFAR::CEP; +using namespace std; + +void check (const ClusterDesc& cl) +{ + ASSERT (cl.getName() == "cl"); + ASSERT (cl.getNodes().size() == 2); + const vector<NodeDesc>& nodes = cl.getNodes(); + ASSERT (nodes[0].getFileSys().size() == 2); + ASSERT (nodes[0].getFileSys()[0] == "fs0"); + ASSERT (nodes[0].getFileSys()[1] == "fs1"); + ASSERT (nodes[1].getFileSys().size() == 2); + ASSERT (nodes[1].getFileSys()[0] == "fs1"); + ASSERT (nodes[1].getFileSys()[1] == "fs2"); + ASSERT (cl.getMap().size() == 3); + map<string,vector<string> >::const_iterator fsmap; + fsmap = cl.getMap().find("fs0"); + ASSERT (fsmap->second.size() == 1); + ASSERT (fsmap->second[0] == "node1"); + fsmap = cl.getMap().find("fs1"); + ASSERT (fsmap->second.size() == 2); + ASSERT (fsmap->second[0] == "node1"); + ASSERT (fsmap->second[1] == "node2"); + fsmap = cl.getMap().find("fs2"); + ASSERT (fsmap->second.size() == 1); + ASSERT (fsmap->second[0] == "node2"); +} + +void doIt() +{ + ClusterDesc cl; + cl.setName ("cl"); + NodeDesc node1; + node1.setName ("node1"); + node1.addFileSys ("fs0"); + node1.addFileSys ("fs1"); + cl.addNode (node1); + NodeDesc node2; + node2.setName ("node2"); + node2.addFileSys ("fs1"); + node2.addFileSys ("fs2"); + cl.addNode (node2); + check(cl); + // Write into parset file. + ofstream fos("tClusterDesc_tmp.fil"); + cl.write (fos); + // Read back. + LOFAR::ACC::APS::ParameterSet parset("tClusterDesc_tmp.fil"); + ClusterDesc cl2(parset); + check(cl2); + cl = cl2; + check(cl); +} + +int main() +{ + try { + doIt(); + } catch (std::exception& x) { + cout << "Unexpected exception: " << x.what() << endl; + return 1; + } + cout << "OK" << endl; + return 0; +} diff --git a/CEP/BB/MWCommon/test/tClusterDesc.sh b/CEP/BB/MWCommon/test/tClusterDesc.sh new file mode 100755 index 00000000000..bf6ca826f61 --- /dev/null +++ b/CEP/BB/MWCommon/test/tClusterDesc.sh @@ -0,0 +1,2 @@ +#!/bin/sh +$lofar_sharedir/runtest.sh tClusterDesc > tClusterDesc.log 2>&1 diff --git a/CEP/BB/MWCommon/test/tNodeDesc.cc b/CEP/BB/MWCommon/test/tNodeDesc.cc new file mode 100644 index 00000000000..ceff7dc4476 --- /dev/null +++ b/CEP/BB/MWCommon/test/tNodeDesc.cc @@ -0,0 +1,51 @@ +//# tNodeDesc.cc: Test program for class NodeDesc +//# +//# Copyright (C) 2007 +//# +//# $Id$ + +#include <MWCommon/NodeDesc.h> +#include <Common/LofarLogger.h> +#include <ostream> +#include <fstream> + +using namespace LOFAR::CEP; +using namespace std; + +void check (const NodeDesc& node) +{ + ASSERT (node.getName() == "node1"); + ASSERT (node.getFileSys().size() == 2); + ASSERT (node.getFileSys()[0] == "fs0"); + ASSERT (node.getFileSys()[1] == "fs1"); +} + +void doIt() +{ + NodeDesc node; + node.setName ("node1"); + node.addFileSys ("fs0"); + node.addFileSys ("fs1"); + check(node); + // Write into parset file. + ofstream fos("tNodeDesc_tmp.fil"); + node.write (fos, ""); + // Read back. + LOFAR::ACC::APS::ParameterSet parset("tNodeDesc_tmp.fil"); + NodeDesc node2(parset); + check(node2); + node = node2; + check(node); +} + +int main() +{ + try { + doIt(); + } catch (std::exception& x) { + cout << "Unexpected exception: " << x.what() << endl; + return 1; + } + cout << "OK" << endl; + return 0; +} diff --git a/CEP/BB/MWCommon/test/tNodeDesc.sh b/CEP/BB/MWCommon/test/tNodeDesc.sh new file mode 100755 index 00000000000..b8947633d48 --- /dev/null +++ b/CEP/BB/MWCommon/test/tNodeDesc.sh @@ -0,0 +1,2 @@ +#!/bin/sh +$lofar_sharedir/runtest.sh tNodeDesc > tNodeDesc.log 2>&1 diff --git a/CEP/BB/MWCommon/test/tSocketConnection.cc b/CEP/BB/MWCommon/test/tSocketConnection.cc new file mode 100644 index 00000000000..b9c789307d2 --- /dev/null +++ b/CEP/BB/MWCommon/test/tSocketConnection.cc @@ -0,0 +1,76 @@ +//# tSocketConnection.cc: Program to test SocketConnection +//# +//# Copyright (C) 2007 +//# +//# $Id$ + +#include <MWCommon/SocketConnection.h> +#include <MWCommon/SocketListener.h> +#include <Common/LofarLogger.h> +#include <iostream> +#include <unistd.h> + +using namespace LOFAR::CEP; +using namespace std; + +void doClient (const string& host, const string& port) +{ + // sleep (2); + cout << "Client connection on host " << host + << ", port " << port << endl; + SocketConnection socket(host, port); + double dv = 1; + socket.send (&dv, sizeof(dv)); + cout << "sent " << dv << endl; + float fv = 0; + socket.receive (&fv, sizeof(fv)); + ASSERT (fv == 2); + cout << "received " << fv << endl; + sleep(2); + socket.receive (&fv, sizeof(fv)); + ASSERT (fv == 3); + cout << "received " << fv << endl; + dv = 2; + socket.send (&dv, sizeof(dv)); + cout << "sent " << dv << endl; +} + +void doServer (const string& port) +{ + cout << "Server connection on port " << port << endl; + SocketListener listener(port); + SocketConnection::ShPtr socket = listener.accept(); + double dv = 0; + socket->receive (&dv, sizeof(dv)); + ASSERT (dv == 1); + cout << "received " << dv << endl; + float fv = 2; + socket->send (&fv, sizeof(fv)); + cout << "sent " << fv << endl; + fv = 3; + socket->send (&fv, sizeof(fv)); + cout << "sent " << fv << endl; + socket->receive (&dv, sizeof(dv)); + ASSERT (dv == 2); + cout << "received " << dv << endl; +} + +int main(int argc, const char* argv[]) +{ + int status = 0; + try { + if (argc < 2) { + cerr << "Run as:" << endl; + cerr << " as server: tSocketConnection port" << endl; + cerr << " as client: tSocketConnection port host" << endl; + } else if (argc == 2) { + doServer (argv[1]); + } else { + doClient (argv[2], argv[1]); + } + } catch (std::exception& x) { + cout << "Unexpected exception in " << argv[0] << ": " << x.what() << endl; + status = 1; + } + exit(status); +} diff --git a/CEP/BB/MWCommon/test/tSocketConnection.run b/CEP/BB/MWCommon/test/tSocketConnection.run new file mode 100755 index 00000000000..1a566966f1a --- /dev/null +++ b/CEP/BB/MWCommon/test/tSocketConnection.run @@ -0,0 +1,40 @@ +#!/bin/sh + +# Run the tSocketConnection test program by starting it twice, +# once as server and once as client. +./tSocketConnection 3851 > tSocketConnection_tmp.outs1 & +./tSocketConnection 3851 localhost > tSocketConnection_tmp.outc1 + +# If something went wrong, kill the background process (if it's still running) +# and exit. +STATUS=$? +if [ $STATUS != 0 ]; then + if kill -0 $! 2>/dev/null; then + kill -9 $! 2>/dev/null + fi + exit $STATUS +fi + +# Output the result in order. +echo "Run1 ..." +cat tSocketConnection_tmp.outs1 tSocketConnection_tmp.outc1 + + +# Do another run, now starting the client before the server. +./tSocketConnection 3851 localhost > tSocketConnection_tmp.outc2 & +./tSocketConnection 3851 > tSocketConnection_tmp.outs2 + +# If something went wrong, kill the background process (if it's still running) +# and exit. +STATUS=$? +if [ $STATUS != 0 ]; then + if kill -0 $! 2>/dev/null; then + kill -9 $! 2>/dev/null + fi + exit $STATUS +fi + +# Output the result in order. +echo "Run2 ..." +cat tSocketConnection_tmp.outs2 tSocketConnection_tmp.outc2 +exit 0 diff --git a/CEP/BB/MWCommon/test/tSocketConnection.sh b/CEP/BB/MWCommon/test/tSocketConnection.sh new file mode 100755 index 00000000000..7ae851288b8 --- /dev/null +++ b/CEP/BB/MWCommon/test/tSocketConnection.sh @@ -0,0 +1,2 @@ +#!/bin/sh +$lofar_sharedir/runtest.sh tSocketConnection > tSocketConnection.log 2>&1 diff --git a/CEP/BB/MWCommon/test/tSocketConnection.stdout b/CEP/BB/MWCommon/test/tSocketConnection.stdout new file mode 100644 index 00000000000..d6a1f5e09bb --- /dev/null +++ b/CEP/BB/MWCommon/test/tSocketConnection.stdout @@ -0,0 +1,22 @@ +Run1 ... +Server connection on port 3851 +received 1 +sent 2 +sent 3 +received 2 +Client connection on host localhost, port 3851 +sent 1 +received 2 +received 3 +sent 2 +Run2 ... +Server connection on port 3851 +received 1 +sent 2 +sent 3 +received 2 +Client connection on host localhost, port 3851 +sent 1 +received 2 +received 3 +sent 2 diff --git a/CEP/BB/MWCommon/test/tVdsDesc.cc b/CEP/BB/MWCommon/test/tVdsDesc.cc new file mode 100644 index 00000000000..4cba7f69937 --- /dev/null +++ b/CEP/BB/MWCommon/test/tVdsDesc.cc @@ -0,0 +1,128 @@ +//# tVdsDesc.cc: Test program for class VdsDesc +//# +//# Copyright (C) 2007 +//# +//# $Id$ + +#include <MWCommon/VdsDesc.h> +#include <Common/LofarLogger.h> +#include <ostream> +#include <fstream> + +using namespace LOFAR::CEP; +using namespace casa; +using namespace std; + +void checkVds (const VdsPartDesc& vds, double endTime) +{ + ASSERT (vds.getName() == "/usr/local/xyx"); + ASSERT (vds.getFileSys() == "node1:/usr"); + ASSERT (vds.getStartTime() == 0); + ASSERT (vds.getEndTime() == endTime); + ASSERT (vds.getNChan().size() == 2); + ASSERT (vds.getNChan()[0] == 64); + ASSERT (vds.getNChan()[1] == 128); + ASSERT (vds.getStartFreqs().size() == 2); + ASSERT (vds.getStartFreqs()[0] == 20); + ASSERT (vds.getStartFreqs()[1] == 120); + ASSERT (vds.getEndFreqs().size() == 2); + ASSERT (vds.getEndFreqs()[0] == 100); + ASSERT (vds.getEndFreqs()[1] == 300); + ASSERT (vds.getAnt1().size() == 3); + ASSERT (vds.getAnt1()[0] == 0); + ASSERT (vds.getAnt1()[1] == 1); + ASSERT (vds.getAnt1()[2] == 2); + ASSERT (vds.getAnt2().size() == 3); + ASSERT (vds.getAnt2()[0] == 0); + ASSERT (vds.getAnt2()[1] == 1); + ASSERT (vds.getAnt2()[2] == 3); +} + +void check (const VdsDesc& vfds) +{ + checkVds (vfds.getDesc(), 1); + checkVds (vfds.getParts()[0], 2); + ASSERT (vfds.getAntNames().size() == 4); + ASSERT (vfds.getAntNames()[0] == "RT0"); + ASSERT (vfds.getAntNames()[1] == "RT1"); + ASSERT (vfds.getAntNames()[2] == "RT2"); + ASSERT (vfds.getAntNames()[3] == "RT3"); +} + +void tryAnt (const VdsDesc& vfds) +{ + ASSERT (vfds.antNr("RT0") == 0); + ASSERT (vfds.antNr("RT1") == 1); + ASSERT (vfds.antNr("RT2") == 2); + ASSERT (vfds.antNr("RT3") == 3); + ASSERT (vfds.antNr("RT4") == -1); + { + vector<int> antNrs = vfds.antNrs(Regex("RT.*")); + ASSERT (antNrs.size() == 4); + ASSERT (antNrs[0] == 0); + ASSERT (antNrs[1] == 1); + ASSERT (antNrs[2] == 2); + ASSERT (antNrs[3] == 3); + } + { + vector<int> antNrs = vfds.antNrs(Regex(".*0")); + ASSERT (antNrs.size() == 1); + ASSERT (antNrs[0] == 0); + } + { + vector<int> antNrs = vfds.antNrs(Regex("RT2")); + ASSERT (antNrs.size() == 1); + ASSERT (antNrs[0] == 2); + } + { + vector<int> antNrs = vfds.antNrs(Regex("RT*")); + ASSERT (antNrs.size() == 0); + } +} + +void doIt() +{ + VdsPartDesc vds; + vds.setName ("/usr/local/xyx", "node1:/usr"); + vds.setTimes (0, 1); + vds.addBand (64, 20, 100); + vds.addBand (128, 120, 300); + vector<int> ant1(3); + ant1[0] = 0; + ant1[1] = 1; + ant1[2] = 2; + vector<int> ant2(ant1); + ant2[2] = 3; + vds.setBaselines (ant1, ant2); + vector<string> antNames(4); + antNames[0] = "RT0"; + antNames[1] = "RT1"; + antNames[2] = "RT2"; + antNames[3] = "RT3"; + VdsDesc vfds(vds, antNames); + vds.setTimes(0,2); + vfds.addPart (vds); + check(vfds); + // Write into parset file. + ofstream fos("tVdsDesc_tmp.fil"); + vfds.write (fos); + // Read back. + LOFAR::ACC::APS::ParameterSet parset("tVdsDesc_tmp.fil"); + VdsDesc vfds2(parset); + check(vfds2); + vfds = vfds2; + check(vfds); + tryAnt (vfds); +} + +int main() +{ + try { + doIt(); + } catch (std::exception& x) { + cout << "Unexpected exception: " << x.what() << endl; + return 1; + } + cout << "OK" << endl; + return 0; +} diff --git a/CEP/BB/MWCommon/test/tVdsDesc.sh b/CEP/BB/MWCommon/test/tVdsDesc.sh new file mode 100755 index 00000000000..92e0501c032 --- /dev/null +++ b/CEP/BB/MWCommon/test/tVdsDesc.sh @@ -0,0 +1,2 @@ +#!/bin/sh +$lofar_sharedir/runtest.sh tVdsDesc > tVdsDesc.log 2>&1 diff --git a/CEP/BB/MWCommon/test/tVdsPartDesc.cc b/CEP/BB/MWCommon/test/tVdsPartDesc.cc new file mode 100644 index 00000000000..3e49b538b20 --- /dev/null +++ b/CEP/BB/MWCommon/test/tVdsPartDesc.cc @@ -0,0 +1,76 @@ +//# tVdsPartDesc.cc: Test program for class VdsPartDesc +//# +//# Copyright (C) 2007 +//# +//# $Id$ + +#include <MWCommon/VdsPartDesc.h> +#include <Common/LofarLogger.h> +#include <ostream> +#include <fstream> + +using namespace LOFAR::CEP; +using namespace std; + +void check (const VdsPartDesc& vds) +{ + ASSERT (vds.getName() == "/usr/local/xyx"); + ASSERT (vds.getFileSys() == "node1:/usr"); + ASSERT (vds.getStartTime() == 0); + ASSERT (vds.getEndTime() == 1); + ASSERT (vds.getNChan().size() == 2); + ASSERT (vds.getNChan()[0] == 64); + ASSERT (vds.getNChan()[1] == 128); + ASSERT (vds.getStartFreqs().size() == 2); + ASSERT (vds.getStartFreqs()[0] == 20); + ASSERT (vds.getStartFreqs()[1] == 120); + ASSERT (vds.getEndFreqs().size() == 2); + ASSERT (vds.getEndFreqs()[0] == 100); + ASSERT (vds.getEndFreqs()[1] == 300); + ASSERT (vds.getAnt1().size() == 3); + ASSERT (vds.getAnt1()[0] == 0); + ASSERT (vds.getAnt1()[1] == 1); + ASSERT (vds.getAnt1()[2] == 2); + ASSERT (vds.getAnt2().size() == 3); + ASSERT (vds.getAnt2()[0] == 0); + ASSERT (vds.getAnt2()[1] == 1); + ASSERT (vds.getAnt2()[2] == 3); +} + +void doIt() +{ + VdsPartDesc vds; + vds.setName ("/usr/local/xyx", "node1:/usr"); + vds.setTimes (0, 1); + vds.addBand (64, 20, 100); + vds.addBand (128, 120, 300); + vector<int> ant1(3); + ant1[0] = 0; + ant1[1] = 1; + ant1[2] = 2; + vector<int> ant2(ant1); + ant2[2] = 3; + vds.setBaselines (ant1, ant2); + check(vds); + // Write into parset file. + ofstream fos("tVdsPartDesc_tmp.fil"); + vds.write (fos, ""); + // Read back. + LOFAR::ACC::APS::ParameterSet parset("tVdsPartDesc_tmp.fil"); + VdsPartDesc vds2(parset); + check(vds2); + vds = vds2; + check(vds); +} + +int main() +{ + try { + doIt(); + } catch (std::exception& x) { + cout << "Unexpected exception: " << x.what() << endl; + return 1; + } + cout << "OK" << endl; + return 0; +} diff --git a/CEP/BB/MWCommon/test/tVdsPartDesc.sh b/CEP/BB/MWCommon/test/tVdsPartDesc.sh new file mode 100755 index 00000000000..692242ac7c6 --- /dev/null +++ b/CEP/BB/MWCommon/test/tVdsPartDesc.sh @@ -0,0 +1,2 @@ +#!/bin/sh +$lofar_sharedir/runtest.sh tVdsPartDesc > tVdsPartDesc.log 2>&1 diff --git a/CEP/BB/MWCommon/test/tWorkersDesc.cc b/CEP/BB/MWCommon/test/tWorkersDesc.cc new file mode 100644 index 00000000000..d41d8d7b5db --- /dev/null +++ b/CEP/BB/MWCommon/test/tWorkersDesc.cc @@ -0,0 +1,139 @@ +//# tWorkersDesc.cc: Test program for class WorkersDesc +//# +//# Copyright (C) 2007 +//# +//# $Id$ + +#include <MWCommon/WorkersDesc.h> +#include <Common/LofarLogger.h> +#include <ostream> + +using namespace LOFAR::CEP; +using namespace std; + +void doIt1() +{ + // First define the cluster. + // File systems can be accessed from multiple nodes. + ClusterDesc cl; + cl.setName ("cl"); + NodeDesc node0; + node0.setName ("node0"); + node0.addFileSys ("fs0"); + node0.addFileSys ("fs1"); + cl.addNode (node0); + NodeDesc node1; + node1.setName ("node1"); + node1.addFileSys ("fs1"); + node1.addFileSys ("fs2"); + cl.addNode (node1); + NodeDesc node2; + node2.setName ("node2"); + node2.addFileSys ("fs0"); + node2.addFileSys ("fs1"); + node2.addFileSys ("fs2"); + cl.addNode (node2); + WorkersDesc wdesc(cl); + // Now define all workers which can perform 2 work types. + vector<int> wtypes(2); + wtypes[0] = 0; + wtypes[1] = 1; + wdesc.addWorker (0, "node0", wtypes); + wdesc.addWorker (1, "node1", wtypes); + wdesc.addWorker (2, "node2", wtypes); + // Now find a worker for a specific task on a file system. + int worker; + worker = wdesc.findWorker (0, "fs0"); + ASSERT (worker == 0); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (0, "fs2"); + ASSERT (worker == 1); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (0, "fs1"); + ASSERT (worker == 2); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (0, "fs2"); + ASSERT (worker == 1); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (0, "fs1"); + ASSERT (worker == 0); + worker = wdesc.findWorker (0, "fs0"); + ASSERT (worker == 0); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (0, "fs0"); + ASSERT (worker == 2); + wdesc.incrLoad (worker); + wdesc.incrLoad (0); + wdesc.incrLoad (1); + worker = wdesc.findWorker (1, ""); + ASSERT (worker == 2); + wdesc.incrLoad (worker); + ASSERT (wdesc.findWorker (2, "") == -1); + ASSERT (wdesc.findWorker (0, "fs3") == -1); +} + +void doIt2() +{ + // First define the cluster. + // FIle systems can be accessed from a single node. + ClusterDesc cl; + cl.setName ("cl"); + NodeDesc node0; + node0.setName ("node0"); + node0.addFileSys ("fs0"); + cl.addNode (node0); + NodeDesc node1; + node1.setName ("node1"); + node1.addFileSys ("fs1"); + cl.addNode (node1); + NodeDesc node2; + node2.setName ("node2"); + node2.addFileSys ("fs2"); + cl.addNode (node2); + WorkersDesc wdesc(cl); + // Now define all workers which can perform 2 work types. + vector<int> wtypes(2); + wtypes[0] = 0; + wtypes[1] = 1; + wdesc.addWorker (0, "node0", wtypes); + wdesc.addWorker (1, "node1", wtypes); + wdesc.addWorker (2, "node2", wtypes); + // Now find a worker for a specific task on a file system. + int worker; + worker = wdesc.findWorker (0, "fs0"); + ASSERT (worker == 0); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (0, "fs0"); + ASSERT (worker == 0); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (0, "fs2"); + ASSERT (worker == 2); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (0, "fs1"); + ASSERT (worker == 1); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (1, ""); + ASSERT (worker == 1); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (1, ""); + ASSERT (worker == 2); + wdesc.incrLoad (worker); + worker = wdesc.findWorker (1, ""); + ASSERT (worker == 0); + wdesc.incrLoad (worker); + ASSERT (wdesc.findWorker (2, "") == -1); + ASSERT (wdesc.findWorker (0, "fs4") == -1); +} + +int main() +{ + try { + doIt1(); + doIt2(); + } catch (std::exception& x) { + cout << "Unexpected exception: " << x.what() << endl; + return 1; + } + cout << "OK" << endl; + return 0; +} diff --git a/CEP/BB/MWCommon/test/tWorkersDesc.sh b/CEP/BB/MWCommon/test/tWorkersDesc.sh new file mode 100755 index 00000000000..6d110d5be13 --- /dev/null +++ b/CEP/BB/MWCommon/test/tWorkersDesc.sh @@ -0,0 +1,2 @@ +#!/bin/sh +$lofar_sharedir/runtest.sh tWorkersDesc > tWorkersDesc.log 2>&1 diff --git a/CEP/BB/MWCommon/test/tfinddproc.in_cd b/CEP/BB/MWCommon/test/tfinddproc.in_cd new file mode 100644 index 00000000000..06f24c07a95 --- /dev/null +++ b/CEP/BB/MWCommon/test/tfinddproc.in_cd @@ -0,0 +1,6 @@ +ClusterName = cl +NNodes = 2 +Node0.NodeName = node1 +Node0.NodeFileSys = [node1:/usr] +Node1.NodeName = node2 +Node1.NodeFileSys = [node1:/usr] diff --git a/CEP/BB/MWCommon/test/tfinddproc.in_vd b/CEP/BB/MWCommon/test/tfinddproc.in_vd new file mode 100644 index 00000000000..1182808f4dd --- /dev/null +++ b/CEP/BB/MWCommon/test/tfinddproc.in_vd @@ -0,0 +1,41 @@ +Name = /usr/local/xyx +FileSys = +StartTime = 0 +EndTime = 2 +NChan = [64,128] +StartFreqs = [20,120] +EndFreqs = [100,300] +Ant1 = [0,1,2] +Ant2 = [0,1,3] +AntNames = [RT0,RT1,RT2,RT3] +NParts = 3 + +Part0.Name = /usr/local/xyx0 +Part0.FileSys = node1:/usr +Part0.StartTime = 0 +Part0.EndTime = 2 +Part0.NChan = [64,128] +Part0.StartFreqs = [20,120] +Part0.EndFreqs = [100,300] +Part0.Ant1 = [0,1,2] +Part0.Ant2 = [0,1,3] + +Part1.Name = /usr/local/xyx1 +Part1.FileSys = node1:/usr +Part1.StartTime = 0 +Part1.EndTime = 2 +Part1.NChan = [64,128] +Part1.StartFreqs = [20,120] +Part1.EndFreqs = [100,300] +Part1.Ant1 = [0,1,2] +Part1.Ant2 = [0,1,3] + +Part2.Name = /usr/local/xyx2 +Part2.FileSys = node1:/usr +Part2.StartTime = 0 +Part2.EndTime = 2 +Part2.NChan = [64,128] +Part2.StartFreqs = [20,120] +Part2.EndFreqs = [100,300] +Part2.Ant1 = [0,1,2] +Part2.Ant2 = [0,1,3] diff --git a/CEP/BB/MWCommon/test/tfinddproc.run b/CEP/BB/MWCommon/test/tfinddproc.run new file mode 100755 index 00000000000..a7f3a3ae82d --- /dev/null +++ b/CEP/BB/MWCommon/test/tfinddproc.run @@ -0,0 +1,3 @@ +#!/bin/sh + +../src/finddproc tfinddproc.in_vd tfinddproc.in_cd diff --git a/CEP/BB/MWCommon/test/tfinddproc.sh b/CEP/BB/MWCommon/test/tfinddproc.sh new file mode 100755 index 00000000000..f02961762f9 --- /dev/null +++ b/CEP/BB/MWCommon/test/tfinddproc.sh @@ -0,0 +1,2 @@ +#!/bin/sh +$lofar_sharedir/runtest.sh tfinddproc > tfinddproc.log 2>&1 diff --git a/CEP/BB/MWCommon/test/tfinddproc.stdout b/CEP/BB/MWCommon/test/tfinddproc.stdout new file mode 100644 index 00000000000..c3fa761a404 --- /dev/null +++ b/CEP/BB/MWCommon/test/tfinddproc.stdout @@ -0,0 +1,3 @@ +node1 +node2 +node1 diff --git a/CEP/BB/MWCommon/test/tstartdproc.in_cd b/CEP/BB/MWCommon/test/tstartdproc.in_cd new file mode 100644 index 00000000000..06f24c07a95 --- /dev/null +++ b/CEP/BB/MWCommon/test/tstartdproc.in_cd @@ -0,0 +1,6 @@ +ClusterName = cl +NNodes = 2 +Node0.NodeName = node1 +Node0.NodeFileSys = [node1:/usr] +Node1.NodeName = node2 +Node1.NodeFileSys = [node1:/usr] diff --git a/CEP/BB/MWCommon/test/tstartdproc.in_vd b/CEP/BB/MWCommon/test/tstartdproc.in_vd new file mode 100644 index 00000000000..1182808f4dd --- /dev/null +++ b/CEP/BB/MWCommon/test/tstartdproc.in_vd @@ -0,0 +1,41 @@ +Name = /usr/local/xyx +FileSys = +StartTime = 0 +EndTime = 2 +NChan = [64,128] +StartFreqs = [20,120] +EndFreqs = [100,300] +Ant1 = [0,1,2] +Ant2 = [0,1,3] +AntNames = [RT0,RT1,RT2,RT3] +NParts = 3 + +Part0.Name = /usr/local/xyx0 +Part0.FileSys = node1:/usr +Part0.StartTime = 0 +Part0.EndTime = 2 +Part0.NChan = [64,128] +Part0.StartFreqs = [20,120] +Part0.EndFreqs = [100,300] +Part0.Ant1 = [0,1,2] +Part0.Ant2 = [0,1,3] + +Part1.Name = /usr/local/xyx1 +Part1.FileSys = node1:/usr +Part1.StartTime = 0 +Part1.EndTime = 2 +Part1.NChan = [64,128] +Part1.StartFreqs = [20,120] +Part1.EndFreqs = [100,300] +Part1.Ant1 = [0,1,2] +Part1.Ant2 = [0,1,3] + +Part2.Name = /usr/local/xyx2 +Part2.FileSys = node1:/usr +Part2.StartTime = 0 +Part2.EndTime = 2 +Part2.NChan = [64,128] +Part2.StartFreqs = [20,120] +Part2.EndFreqs = [100,300] +Part2.Ant1 = [0,1,2] +Part2.Ant2 = [0,1,3] diff --git a/CEP/BB/MWCommon/test/tstartdproc.run b/CEP/BB/MWCommon/test/tstartdproc.run new file mode 100755 index 00000000000..4c6d344fb02 --- /dev/null +++ b/CEP/BB/MWCommon/test/tstartdproc.run @@ -0,0 +1,21 @@ +#!/bin/sh + +if test "$srcdir" = ""; then + srcdir=../../../src +fi + +echo '' +$srcdir/../src/startdproc -dsn tstartdproc.in_vd -cdn tstartdproc.in_cd \ +-mode single -dry -hfn tstartdproc_tmp.out1 -fdp ../src prog1 -arg arg2 +cat tstartdproc_tmp.out1 + +echo '' +$srcdir/../src/startdproc -dsn tstartdproc.in_vd -cdn tstartdproc.in_cd \ +-mode mpi -dry -hfn tstartdproc_tmp.out2 -fdp ../src prog1 -arg arg2 +cat tstartdproc_tmp.out2 + +echo '' +$srcdir/../src/startdproc -dsn tstartdproc.in_vd -cdn tstartdproc.in_cd \ +-mode 3851 -dry -hfn tstartdproc_tmp.out3 -fdp ../src prog1 -arg arg2 +cat tstartdproc_tmp.out3 + diff --git a/CEP/BB/MWCommon/test/tstartdproc.sh b/CEP/BB/MWCommon/test/tstartdproc.sh new file mode 100755 index 00000000000..9c7f2520872 --- /dev/null +++ b/CEP/BB/MWCommon/test/tstartdproc.sh @@ -0,0 +1,2 @@ +#!/bin/sh +$lofar_sharedir/runtest.sh tstartdproc > tstartdproc.log 2>&1 diff --git a/CEP/BB/MWCommon/test/tstartdproc.stdout b/CEP/BB/MWCommon/test/tstartdproc.stdout new file mode 100644 index 00000000000..51db9e42fd8 --- /dev/null +++ b/CEP/BB/MWCommon/test/tstartdproc.stdout @@ -0,0 +1,22 @@ + +prog1 -arg arg2 +localhost +node1 +node2 +node1 + +mpirun -np 4 -machinefile tstartdproc_tmp.out2 prog1 -arg arg2 +localhost +node1 +node2 +node1 + +socketrun tstartdproc_tmp.out3 prog1 -arg arg2 +ssh localhost prog1 socket localhost 3851 4 0 +ssh node1 prog1 socket localhost 3851 4 1 +ssh node2 prog1 socket localhost 3851 4 2 +ssh node1 prog1 socket localhost 3851 4 3 +localhost +node1 +node2 +node1 -- GitLab