diff --git a/CEP/BB/BBSControl/include/BBSControl/BBSKernelProcessControl.h b/CEP/BB/BBSControl/include/BBSControl/BBSKernelProcessControl.h index c64ac9082658dce9eb5bb576791e436e23007aee..287566afbe67df3c507e7934c886333d7403dc51 100644 --- a/CEP/BB/BBSControl/include/BBSControl/BBSKernelProcessControl.h +++ b/CEP/BB/BBSControl/include/BBSControl/BBSKernelProcessControl.h @@ -22,8 +22,8 @@ //# //# $Id: -#ifndef __BBSKERNELPROCESSCONTROL_H__ -#define __BBSKERNELPROCESSCONTROL_H__ +#ifndef LOFAR_BBSCONTROL_BBSKERNELPROCESSCONTROL_H +#define LOFAR_BBSCONTROL_BBSKERNELPROCESSCONTROL_H //# Never #include <config.h> or #include <lofar_config.h> in a header file! @@ -33,115 +33,89 @@ namespace LOFAR { - -namespace ParmDB -{ - class ParmDB; -} // namespace ParmDB - -namespace BBS -{ - -class BBSStep; -class BBSStrategy; -class BBSPredictStep; -class BBSSubtractStep; -class BBSCorrectStep; -class BBSSolveStep; -class Prediffer; -struct Context; - -// \addtogroup BBS -// @{ - -//# Description of class. -// The ProcessControl class defines the command interface that can be used -// to control the processes of an application.<br> -// All functions in this class are abstract and need to be -// implemented on both the client and the server-side. On the client side -// the implementation will only forward the function-call, on the server -// side (= the application process) the real implementation must be done. - -class BBSKernelProcessControl: public LOFAR::ACC::PLC::ProcessControl -{ -public: - // Constructor - BBSKernelProcessControl(); - - // Destructor - ~BBSKernelProcessControl(); - - // \name Command to control the processes. - // There are a dozen commands that can be sent to a application process - // to control its flow. The return values for these command are:<br> - // - True - Command executed succesfully. - // - False - Command could not be executed. - // + //# Forward declations + namespace ParmDB { class ParmDB; } + class CSConnection; + class TH_Socket; + + namespace BBS + { + //# Forward declations + class BBSStep; + class BBSStrategy; + class BBSPredictStep; + class BBSSubtractStep; + class BBSCorrectStep; + class BBSSolveStep; + class Prediffer; + struct Context; + class DH_BlobStreamable; + + // \addtogroup BBS // @{ - // During the \c define state the process check the contents of the - // ParameterSet it received during start-up. When everthing seems ok the - // process constructs the communication channels for exchanging data - // with the other processes. The connection are NOT made in the stage. - boost::logic::tribool define(); - - // When a process receives an \c init command it allocates the buffers it - // needs an makes the connections with the other processes. When the - // process succeeds in this it is ready for dataprocessing (or whatever - // task the process has). - boost::logic::tribool init(); - - // During the \c run phase the process does the work it is designed for. - // The run phase stays active until another command is send. - boost::logic::tribool run(); - - // With the \c pause command the process stops its run phase and starts - // waiting for another command. The \c condition argument contains the - // contition the process should use for ending the run phase. This - // condition is a key-value pair that can eg. contain a timestamp or a - // number of a datasample. - boost::logic::tribool pause(const string& condition); - - // \c Quit stops the process. - // The process \b must call \c unregisterAtAC at ProcControlServer during - // the execution of this command to pass the final results to the - // Application Controller. - boost::logic::tribool quit(); - - // With the \c snapshot command the process is instructed to save itself - // in a database is such a way that on another moment in time it can - // be reconstructed and can continue it task.<br> - // The \c destination argument contains database info the process - // must use to save itself. - boost::logic::tribool snapshot(const string& destination); - - // \c Recover reconstructs the process as it was saved some time earlier. - // The \c source argument contains the database info the process must use - // to find the information it needs. - boost::logic::tribool recover(const string& source); - - // With \c reinit the process receives a new parameterset that it must use - // to reinitialize itself. - boost::logic::tribool reinit(const string& configID); - // @} - - // Define a generic way to exchange info between client and server. - std::string askInfo(const string& keylist); - - bool handle(const BBSStrategy *strategy); - bool handle(const BBSPredictStep *step); - bool handle(const BBSSubtractStep *step); - bool handle(const BBSCorrectStep *step); - bool handle(const BBSSolveStep *step); - -private: - void convertStepToContext(const BBSStep *step, Context &context); - - Prediffer *itsPrediffer; - LOFAR::ParmDB::ParmDB *itsHistory; -}; // class BBSKernelProcessControl - -} // namespace BBS + // Implementation of the ProcessControl interface for the local BBSKernel + // controller. + class BBSKernelProcessControl: public ACC::PLC::ProcessControl + { + public: + // Default constructor. + BBSKernelProcessControl(); + + // Destructor + virtual ~BBSKernelProcessControl(); + + // @name Implementation of PLC interface. + // @{ + virtual tribool define(); + virtual tribool init(); + virtual tribool run(); + virtual tribool pause(const string& condition); + virtual tribool quit(); + virtual tribool snapshot(const string& destination); + virtual tribool recover(const string& source); + virtual tribool reinit(const string& configID); + virtual string askInfo(const string& keylist); + // @} + + bool handle(const BBSStrategy *strategy); + bool handle(const BBSStep *step); + + private: + // @name Implementation of handle() for the different BBSStep types. + // @{ + bool doHandle(const BBSPredictStep *step); + bool doHandle(const BBSSubtractStep *step); + bool doHandle(const BBSCorrectStep *step); + bool doHandle(const BBSSolveStep *step); + // @} + + void convertStepToContext(const BBSStep *step, Context &context); + + // Parameter set for this process controller. + ACC::APS::ParameterSet itsParameterSet; + + // Prediffer + Prediffer* itsPrediffer; + + // History database. + LOFAR::ParmDB::ParmDB* itsHistory; + + // DataHolder for exchanging data between local (BBSKernel) and global + // (BBS) process control. + DH_BlobStreamable* itsDataHolder; + + // TransportHolder used to exchange DataHolders. The local controller + // will open a client connection to the global controller. + TH_Socket* itsTransportHolder; + + // Connection between the local (BBSKernel) process control and the + // global (BBS) process control. + CSConnection* itsConnection; + }; + + } // namespace BBS + } // namespace LOFAR #endif diff --git a/CEP/BB/BBSControl/include/BBSControl/BBSProcessControl.h b/CEP/BB/BBSControl/include/BBSControl/BBSProcessControl.h index 04d41b4891220edcc8e142bfafd89214339a12f8..f2944ac6c0acde0d15e48e772c0f0730bfec20ce 100644 --- a/CEP/BB/BBSControl/include/BBSControl/BBSProcessControl.h +++ b/CEP/BB/BBSControl/include/BBSControl/BBSProcessControl.h @@ -28,50 +28,81 @@ //# Includes #include <PLC/ProcessControl.h> -#include <Common/lofar_smartptr.h> -#include <boost/logic/tribool_fwd.hpp> namespace LOFAR { //# Forward Declarations. + class CSConnection; + class TH_Socket; namespace BBS { //# Forward Declarations. class BBSStrategy; class BBSStep; + class BlobStreamable; + class DH_BlobStreamable; // \addtogroup BBS // @{ + // Implementation of the ProcessControl interface for the global BBS + // controller. class BBSProcessControl : public ACC::PLC::ProcessControl { public: + // Default constructor. + BBSProcessControl(); + + // Destructor. virtual ~BBSProcessControl(); - virtual boost::logic::tribool define(); - virtual boost::logic::tribool init(); - virtual boost::logic::tribool run(); - virtual boost::logic::tribool quit(); - private: - // The operations below are not supported by BBSProcessControl. + + // @name Implementation of PLC interface. // @{ - virtual boost::logic::tribool pause(const string& condition); - virtual boost::logic::tribool snapshot(const string& destination); - virtual boost::logic::tribool recover(const string& source); - virtual boost::logic::tribool reinit(const string& configID); - virtual string askInfo(const string& keylist); + virtual tribool define(); + virtual tribool init(); + virtual tribool run(); + virtual tribool quit(); + virtual tribool pause(const string& condition); + virtual tribool snapshot(const string& destination); + virtual tribool recover(const string& source); + virtual tribool reinit(const string& configID); + virtual string askInfo(const string& keylist); // @} - // Our parameter set. - ACC::APS::ParameterSet itsParamSet; + private: + // Send the strategy or one of the steps across. + bool sendObject(const BlobStreamable& bs); - // The strategy that will be constructed from the parameter set. + // The strategy that will be executed by this controller. BBSStrategy* itsStrategy; - // Vector containing all the separate steps that this strategy consists - // of in sequential order. + // Vector containing all the separate steps, in sequential order, that + // the strategy consists of. vector<const BBSStep*> itsSteps; + // Iterator for keeping track where we left while traversing the vector + // \c itsSteps. We need this iterator, because the run() method will be + // invoked several times by ACCmain. In each call to run() we must + // execute one BBSStep. + vector<const BBSStep*>::const_iterator itsStepsIterator; + + // DataHolder for exchanging data between global (BBS) and local + // (BBSKernel) process control. + DH_BlobStreamable* itsDataHolder; + + // TransportHolder used to exchange DataHolders. The global controller + // will open a server connection, waiting for local controllers to + // connect. + TH_Socket* itsTransportHolder; + + // Connection between the global (BBS) process control and the local + // (BBSKernel) process control. + CSConnection* itsConnection; + + // Flag indicating whether we've sent \c itsStrategy. We only need to + // send it once, as the very first message. + bool itsStrategySent; }; // @} diff --git a/CEP/BB/BBSControl/include/BBSControl/BBSSingleStep.h b/CEP/BB/BBSControl/include/BBSControl/BBSSingleStep.h index 4390b2a3b63c142fde7e2775333189363c6ff09d..bb56144c7ea26a2407818c18c60061f1aae0712a 100644 --- a/CEP/BB/BBSControl/include/BBSControl/BBSSingleStep.h +++ b/CEP/BB/BBSControl/include/BBSControl/BBSSingleStep.h @@ -44,6 +44,16 @@ namespace LOFAR class BBSSingleStep : public BBSStep { public: + virtual ~BBSSingleStep(); + + // Print the contents of \c *this in human readable form into the output + // stream \a os. + virtual void print(ostream& os) const; + + // Return the name of the data column to write data to. + string outputData() const { return itsOutputData; } + + protected: // Default constructor. Construct an empty BBSSingleStep object and make // it a child of the BBSStep object \a parent. BBSSingleStep(const BBSStep* parent = 0); @@ -56,87 +66,132 @@ namespace LOFAR const ACC::APS::ParameterSet& parset, const BBSStep* parent); - virtual ~BBSSingleStep(); - - // Print the contents of \c *this in human readable form into the output - // stream \a os. - virtual void print(ostream& os) const; - - protected: // Write the contents of \c *this into the blob output stream \a bos. virtual void write(BlobOStream& bos) const; // Read the contents from the blob input stream \a bis into \c *this. virtual void read(BlobIStream& bis); - public: - // Return the type of \c *this as a string. - virtual const string& classType() const; - - // Name of the data column to write data to + // Name of the data column to write data to. string itsOutputData; }; - class BBSPredictStep: public BBSSingleStep + class BBSPredictStep : public BBSSingleStep { public: - BBSPredictStep(const BBSStep* parent = 0) - : BBSSingleStep(parent) - { - } + BBSPredictStep(const BBSStep* parent = 0) : + BBSSingleStep(parent) + { + } - BBSPredictStep(const string& name, - const ACC::APS::ParameterSet& parSet, - const BBSStep* parent) - :BBSSingleStep(name, parSet, parent) - { - } + BBSPredictStep(const string& name, + const ACC::APS::ParameterSet& parSet, + const BBSStep* parent) : + BBSSingleStep(name, parSet, parent) + { + } + + private: + // Return the type of \c *this as a string. + virtual const string& classType() const { + static string theType("BBSPredictStep"); + return theType; + } }; - class BBSSubtractStep: public BBSSingleStep + class BBSSubtractStep : public BBSSingleStep { public: - BBSSubtractStep(const BBSStep* parent = 0) - : BBSSingleStep(parent) - { - } + BBSSubtractStep(const BBSStep* parent = 0) : + BBSSingleStep(parent) + { + } - BBSSubtractStep(const string& name, - const ACC::APS::ParameterSet& parSet, - const BBSStep* parent) - : BBSSingleStep(name, parSet, parent) - { - } + BBSSubtractStep(const string& name, + const ACC::APS::ParameterSet& parSet, + const BBSStep* parent) : + BBSSingleStep(name, parSet, parent) + { + } + private: + // Return the type of \c *this as a string. + virtual const string& classType() const { + static string theType("BBSSubtractStep"); + return theType; + } }; - class BBSCorrectStep: public BBSSingleStep + class BBSCorrectStep : public BBSSingleStep { public: - BBSCorrectStep(const BBSStep* parent = 0) - : BBSSingleStep(parent) - { - } + BBSCorrectStep(const BBSStep* parent = 0) : + BBSSingleStep(parent) + { + } - BBSCorrectStep(const string& name, - const ACC::APS::ParameterSet& parSet, - const BBSStep* parent) - : BBSSingleStep(name, parSet, parent) - { - } + BBSCorrectStep(const string& name, + const ACC::APS::ParameterSet& parSet, + const BBSStep* parent) : + BBSSingleStep(name, parSet, parent) + { + } + private: + // Return the type of \c *this as a string. + virtual const string& classType() const { + static string theType("BBSCorrectStep"); + return theType; + } }; + + class BBSShiftStep : public BBSSingleStep + { + public: + BBSShiftStep(const BBSStep* parent = 0) : + BBSSingleStep(parent) + { + } + + BBSShiftStep(const string& name, + const ACC::APS::ParameterSet& parSet, + const BBSStep* parent) : + BBSSingleStep(name, parSet, parent) + { + } + private: + // Return the type of \c *this as a string. + virtual const string& classType() const { + static string theType("BBSShiftStep"); + return theType; + } + }; - // For the time being we'll define the following steps as typedefs. If, - // and when, they need to be "upgraded" to a real class, we can do so, - // without affecting code referring to these types. - // @{ - typedef BBSSingleStep BBSShiftStep; - typedef BBSSingleStep BBSRefitStep; - // @} + class BBSRefitStep : public BBSSingleStep + { + public: + BBSRefitStep(const BBSStep* parent = 0) : + BBSSingleStep(parent) + { + } + + BBSRefitStep(const string& name, + const ACC::APS::ParameterSet& parSet, + const BBSStep* parent) : + BBSSingleStep(name, parSet, parent) + { + } + private: + // Return the type of \c *this as a string. + virtual const string& classType() const { + static string theType("BBSRefitStep"); + return theType; + } + }; + // @} } // namespace BBS diff --git a/CEP/BB/BBSControl/include/BBSControl/BBSSolveStep.h b/CEP/BB/BBSControl/include/BBSControl/BBSSolveStep.h index e55f5fa64f85f7fbd75675f70b974c821fc6f8e2..cd8bfac7f5fa4c97cd6c2887078916c4cb17f92d 100644 --- a/CEP/BB/BBSControl/include/BBSControl/BBSSolveStep.h +++ b/CEP/BB/BBSControl/include/BBSControl/BBSSolveStep.h @@ -61,6 +61,16 @@ namespace LOFAR // stream \a os. virtual void print(ostream& os) const; + // @name Accessor methods + // @{ + uint32 maxIter() const { return itsMaxIter; } + double epsilon() const { return itsEpsilon; } + double minConverged() const { return itsMinConverged; } + vector<string> parms() const { return itsParms; } + vector<string> exclParms() const { return itsExclParms; } + DomainSize domainSize() const { return itsDomainSize; } + // @} + private: // Write the contents of \c *this into the blob output stream \a bos. virtual void write(BlobOStream& bos) const; diff --git a/CEP/BB/BBSControl/include/BBSControl/BBSStep.h b/CEP/BB/BBSControl/include/BBSControl/BBSStep.h index 1c7fc5ade54c7f05b24850461cf73f8989ec17b1..48a5e6322c1f00a05ddfb9c9b661ba00e40e1ab9 100644 --- a/CEP/BB/BBSControl/include/BBSControl/BBSStep.h +++ b/CEP/BB/BBSControl/include/BBSControl/BBSStep.h @@ -69,19 +69,10 @@ namespace LOFAR // Destructor. virtual ~BBSStep(); - // Return the name of this step. - const string& getName() const { return itsName; } - // Return the full name of this step. The full name consists of the name // of this step, preceeded by that of its parent, etc., separated by // dots. - string getFullName() const; - - // Return a pointer to the parent of this step. - const BBSStep* getParent() const { return itsParent; } - - // Make \a parent the parent of this step. - void setParent(const BBSStep* parent) { itsParent = parent; } + string fullName() const; // Get all steps that this step consists of. The result will be a vector // containing pointers to all steps, sorted pre-order depth-first. @@ -107,6 +98,38 @@ namespace LOFAR // stream \a os. virtual void print(ostream& os) const; + // @name Accessor methods + // @{ + + // Return the name of this step. + string getName() const { return itsName; } + + // Return a pointer to the parent of this step. + const BBSStep* getParent() const { return itsParent; } + + // Make \a parent the parent of this step. + void setParent(const BBSStep* parent) { itsParent = parent; } + + // Return the selection of baselines for this step. + Baselines baselines() const { return itsBaselines; } + + // Return which correlation products should be used for this step. + Correlation correlation() const { return itsCorrelation; } + + // Return the amount of integration that must be applied to the data. + Integration integration() const { return itsIntegration; } + + // Return the sources in the source model for the current patch. + vector<string> sources() const { return itsSources; } + + // Return the extra sources outside the current patch. + vector<string> extraSources() const { return itsExtraSources; } + + // Return a list of instrument models to be used for this step. + vector<string> instrumentModels() const { return itsInstrumentModels; } + + // @} + protected: // Default constructor. Construct an empty BBSStep object and make it a // child of the BBSStep object \a parent. @@ -136,7 +159,6 @@ namespace LOFAR // for those members that are specified in \a parSet. void setParms(const ACC::APS::ParameterSet& parSet); - // Name of this step. string itsName; diff --git a/CEP/BB/BBSControl/include/BBSControl/BBSStrategy.h b/CEP/BB/BBSControl/include/BBSControl/BBSStrategy.h index a29fa6f3ad2e9583e94f8347b8445b9fb1994bfe..b0d50bfc9897b7436e75513e2df38f69f50770f2 100644 --- a/CEP/BB/BBSControl/include/BBSControl/BBSStrategy.h +++ b/CEP/BB/BBSControl/include/BBSControl/BBSStrategy.h @@ -75,6 +75,18 @@ namespace LOFAR // written when write(BlobOStream&) is called. void shouldWriteSteps(bool doSteps) { itsWriteSteps = doSteps; } + // @name Accessor methods + // @{ + string dataSet() const { return itsDataSet; } + BBDB bbDB() const { return itsBBDB; } + ParmDB parmDB() const { return itsParmDB; } + vector<string> stations() const { return itsStations; } + string inputData() const { return itsInputData; } + DomainSize domainSize() const { return itsDomainSize; } + Correlation correlation() const { return itsCorrelation; } + Integration integration() const { return itsIntegration; } + // @} + private: // Read the contents from the blob input stream \a bis into \c *this. virtual void read(BlobIStream& bis); diff --git a/CEP/BB/BBSControl/include/BBSControl/GlobalProcessControl.h b/CEP/BB/BBSControl/include/BBSControl/GlobalProcessControl.h index 04d41b4891220edcc8e142bfafd89214339a12f8..f2944ac6c0acde0d15e48e772c0f0730bfec20ce 100644 --- a/CEP/BB/BBSControl/include/BBSControl/GlobalProcessControl.h +++ b/CEP/BB/BBSControl/include/BBSControl/GlobalProcessControl.h @@ -28,50 +28,81 @@ //# Includes #include <PLC/ProcessControl.h> -#include <Common/lofar_smartptr.h> -#include <boost/logic/tribool_fwd.hpp> namespace LOFAR { //# Forward Declarations. + class CSConnection; + class TH_Socket; namespace BBS { //# Forward Declarations. class BBSStrategy; class BBSStep; + class BlobStreamable; + class DH_BlobStreamable; // \addtogroup BBS // @{ + // Implementation of the ProcessControl interface for the global BBS + // controller. class BBSProcessControl : public ACC::PLC::ProcessControl { public: + // Default constructor. + BBSProcessControl(); + + // Destructor. virtual ~BBSProcessControl(); - virtual boost::logic::tribool define(); - virtual boost::logic::tribool init(); - virtual boost::logic::tribool run(); - virtual boost::logic::tribool quit(); - private: - // The operations below are not supported by BBSProcessControl. + + // @name Implementation of PLC interface. // @{ - virtual boost::logic::tribool pause(const string& condition); - virtual boost::logic::tribool snapshot(const string& destination); - virtual boost::logic::tribool recover(const string& source); - virtual boost::logic::tribool reinit(const string& configID); - virtual string askInfo(const string& keylist); + virtual tribool define(); + virtual tribool init(); + virtual tribool run(); + virtual tribool quit(); + virtual tribool pause(const string& condition); + virtual tribool snapshot(const string& destination); + virtual tribool recover(const string& source); + virtual tribool reinit(const string& configID); + virtual string askInfo(const string& keylist); // @} - // Our parameter set. - ACC::APS::ParameterSet itsParamSet; + private: + // Send the strategy or one of the steps across. + bool sendObject(const BlobStreamable& bs); - // The strategy that will be constructed from the parameter set. + // The strategy that will be executed by this controller. BBSStrategy* itsStrategy; - // Vector containing all the separate steps that this strategy consists - // of in sequential order. + // Vector containing all the separate steps, in sequential order, that + // the strategy consists of. vector<const BBSStep*> itsSteps; + // Iterator for keeping track where we left while traversing the vector + // \c itsSteps. We need this iterator, because the run() method will be + // invoked several times by ACCmain. In each call to run() we must + // execute one BBSStep. + vector<const BBSStep*>::const_iterator itsStepsIterator; + + // DataHolder for exchanging data between global (BBS) and local + // (BBSKernel) process control. + DH_BlobStreamable* itsDataHolder; + + // TransportHolder used to exchange DataHolders. The global controller + // will open a server connection, waiting for local controllers to + // connect. + TH_Socket* itsTransportHolder; + + // Connection between the global (BBS) process control and the local + // (BBSKernel) process control. + CSConnection* itsConnection; + + // Flag indicating whether we've sent \c itsStrategy. We only need to + // send it once, as the very first message. + bool itsStrategySent; }; // @} diff --git a/CEP/BB/BBSControl/include/BBSControl/KernelProcessControl.h b/CEP/BB/BBSControl/include/BBSControl/KernelProcessControl.h index c64ac9082658dce9eb5bb576791e436e23007aee..287566afbe67df3c507e7934c886333d7403dc51 100644 --- a/CEP/BB/BBSControl/include/BBSControl/KernelProcessControl.h +++ b/CEP/BB/BBSControl/include/BBSControl/KernelProcessControl.h @@ -22,8 +22,8 @@ //# //# $Id: -#ifndef __BBSKERNELPROCESSCONTROL_H__ -#define __BBSKERNELPROCESSCONTROL_H__ +#ifndef LOFAR_BBSCONTROL_BBSKERNELPROCESSCONTROL_H +#define LOFAR_BBSCONTROL_BBSKERNELPROCESSCONTROL_H //# Never #include <config.h> or #include <lofar_config.h> in a header file! @@ -33,115 +33,89 @@ namespace LOFAR { - -namespace ParmDB -{ - class ParmDB; -} // namespace ParmDB - -namespace BBS -{ - -class BBSStep; -class BBSStrategy; -class BBSPredictStep; -class BBSSubtractStep; -class BBSCorrectStep; -class BBSSolveStep; -class Prediffer; -struct Context; - -// \addtogroup BBS -// @{ - -//# Description of class. -// The ProcessControl class defines the command interface that can be used -// to control the processes of an application.<br> -// All functions in this class are abstract and need to be -// implemented on both the client and the server-side. On the client side -// the implementation will only forward the function-call, on the server -// side (= the application process) the real implementation must be done. - -class BBSKernelProcessControl: public LOFAR::ACC::PLC::ProcessControl -{ -public: - // Constructor - BBSKernelProcessControl(); - - // Destructor - ~BBSKernelProcessControl(); - - // \name Command to control the processes. - // There are a dozen commands that can be sent to a application process - // to control its flow. The return values for these command are:<br> - // - True - Command executed succesfully. - // - False - Command could not be executed. - // + //# Forward declations + namespace ParmDB { class ParmDB; } + class CSConnection; + class TH_Socket; + + namespace BBS + { + //# Forward declations + class BBSStep; + class BBSStrategy; + class BBSPredictStep; + class BBSSubtractStep; + class BBSCorrectStep; + class BBSSolveStep; + class Prediffer; + struct Context; + class DH_BlobStreamable; + + // \addtogroup BBS // @{ - // During the \c define state the process check the contents of the - // ParameterSet it received during start-up. When everthing seems ok the - // process constructs the communication channels for exchanging data - // with the other processes. The connection are NOT made in the stage. - boost::logic::tribool define(); - - // When a process receives an \c init command it allocates the buffers it - // needs an makes the connections with the other processes. When the - // process succeeds in this it is ready for dataprocessing (or whatever - // task the process has). - boost::logic::tribool init(); - - // During the \c run phase the process does the work it is designed for. - // The run phase stays active until another command is send. - boost::logic::tribool run(); - - // With the \c pause command the process stops its run phase and starts - // waiting for another command. The \c condition argument contains the - // contition the process should use for ending the run phase. This - // condition is a key-value pair that can eg. contain a timestamp or a - // number of a datasample. - boost::logic::tribool pause(const string& condition); - - // \c Quit stops the process. - // The process \b must call \c unregisterAtAC at ProcControlServer during - // the execution of this command to pass the final results to the - // Application Controller. - boost::logic::tribool quit(); - - // With the \c snapshot command the process is instructed to save itself - // in a database is such a way that on another moment in time it can - // be reconstructed and can continue it task.<br> - // The \c destination argument contains database info the process - // must use to save itself. - boost::logic::tribool snapshot(const string& destination); - - // \c Recover reconstructs the process as it was saved some time earlier. - // The \c source argument contains the database info the process must use - // to find the information it needs. - boost::logic::tribool recover(const string& source); - - // With \c reinit the process receives a new parameterset that it must use - // to reinitialize itself. - boost::logic::tribool reinit(const string& configID); - // @} - - // Define a generic way to exchange info between client and server. - std::string askInfo(const string& keylist); - - bool handle(const BBSStrategy *strategy); - bool handle(const BBSPredictStep *step); - bool handle(const BBSSubtractStep *step); - bool handle(const BBSCorrectStep *step); - bool handle(const BBSSolveStep *step); - -private: - void convertStepToContext(const BBSStep *step, Context &context); - - Prediffer *itsPrediffer; - LOFAR::ParmDB::ParmDB *itsHistory; -}; // class BBSKernelProcessControl - -} // namespace BBS + // Implementation of the ProcessControl interface for the local BBSKernel + // controller. + class BBSKernelProcessControl: public ACC::PLC::ProcessControl + { + public: + // Default constructor. + BBSKernelProcessControl(); + + // Destructor + virtual ~BBSKernelProcessControl(); + + // @name Implementation of PLC interface. + // @{ + virtual tribool define(); + virtual tribool init(); + virtual tribool run(); + virtual tribool pause(const string& condition); + virtual tribool quit(); + virtual tribool snapshot(const string& destination); + virtual tribool recover(const string& source); + virtual tribool reinit(const string& configID); + virtual string askInfo(const string& keylist); + // @} + + bool handle(const BBSStrategy *strategy); + bool handle(const BBSStep *step); + + private: + // @name Implementation of handle() for the different BBSStep types. + // @{ + bool doHandle(const BBSPredictStep *step); + bool doHandle(const BBSSubtractStep *step); + bool doHandle(const BBSCorrectStep *step); + bool doHandle(const BBSSolveStep *step); + // @} + + void convertStepToContext(const BBSStep *step, Context &context); + + // Parameter set for this process controller. + ACC::APS::ParameterSet itsParameterSet; + + // Prediffer + Prediffer* itsPrediffer; + + // History database. + LOFAR::ParmDB::ParmDB* itsHistory; + + // DataHolder for exchanging data between local (BBSKernel) and global + // (BBS) process control. + DH_BlobStreamable* itsDataHolder; + + // TransportHolder used to exchange DataHolders. The local controller + // will open a client connection to the global controller. + TH_Socket* itsTransportHolder; + + // Connection between the local (BBSKernel) process control and the + // global (BBS) process control. + CSConnection* itsConnection; + }; + + } // namespace BBS + } // namespace LOFAR #endif diff --git a/CEP/BB/BBSControl/src/BBSKernel.cc b/CEP/BB/BBSControl/src/BBSKernel.cc index b020fac1e37a528e6f7907a7207ef48c6f8c3421..db71568da025ba47116d58e0171c066e552032b1 100644 --- a/CEP/BB/BBSControl/src/BBSKernel.cc +++ b/CEP/BB/BBSControl/src/BBSKernel.cc @@ -1,192 +1,51 @@ -//# BBSKernel.cc: minimal control wrapper around the kernel +//# BBSKernel.cc: minimal control wrapper around the kernel //# -//# Copyright (C) 2004 -//# ASTRON (Netherlands Foundation for Research in Astronomy) -//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# Copyright (C) 2004 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl //# -//# This program is free software; you can redistribute it and/or modify -//# it under the terms of the GNU General Public License as published by -//# the Free Software Foundation; either version 2 of the License, or -//# (at your option) any later version. +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. //# -//# This program is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. //# -//# You should have received a copy of the GNU General Public License -//# along with this program; if not, write to the Free Software -//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA //# -//# $Id: +//# $Id: -//# Always #include <lofar_config.h> first! #include <lofar_config.h> - -#include <APS/ParameterSet.h> -#include <PLC/ProcControlServer.h> -#include <Common/LofarLogger.h> -#include <BBSControl/BBSStrategy.h> -#include <BBSControl/BBSStep.h> -#include <BBSControl/BBSSingleStep.h> -#include <BBSControl/BBSSolveStep.h> +#include <libgen.h> #include <BBSControl/BBSKernelProcessControl.h> -#include <BBSKernel/Prediffer.h> -#include <BBSKernel/MNS/MeqDomain.h> - -#include <string> +#include <PLC/ACCmain.h> using namespace LOFAR; using namespace LOFAR::BBS; -using namespace std; -using LOFAR::ACC::APS::ParameterSet; -using LOFAR::ACC::PLC::ProcControlServer; -using LOFAR::ACC::PLC::DH_ProcControl; +using namespace LOFAR::ACC::PLC; int main(int argc, char **argv) { - BBSKernelProcessControl process; - - string programName = basename(argv[0]); - INIT_LOGGER(programName.c_str()); - - if((argc != 3) || (strncmp("ACC", argv[1], 3) != 0)) - { - // Not started by ACC. - LOG_TRACE_FLOW(programName + " not started by ACC."); - - if(argc != 2) - { - LOG_INFO("No parset file specified, usage: " + programName + " [ACC] <parset file>"); - return 1; - } - - try - { - ParameterSet parset(argv[1]); - BBSStrategy strategy(parset); - - process.handle(&strategy); - - vector<const BBSStep*> steps = strategy.getAllSteps(); - for(int i = 0; i < steps.size(); ++i) - { - cout << "step type: " << steps[i]->type() << endl; - - if(steps[i]->type() == "BBSPredictStep") - { - process.handle(dynamic_cast<const BBSPredictStep*>(steps[i])); - } - if(steps[i]->type() == "BBSSubtractStep") - { - process.handle(dynamic_cast<const BBSSubtractStep*>(steps[i])); - } - if(steps[i]->type() == "BBSCorrectStep") - { - process.handle(dynamic_cast<const BBSCorrectStep*>(steps[i])); - } - if(steps[i]->type() == "BBSSolveStep") - { - process.handle(dynamic_cast<const BBSSolveStep*>(steps[i])); - } - } - } - catch(...) - { - LOG_TRACE_FLOW_STR("Parset file " << argv[1] << " not found."); - return 1; - } - - /* - LOFAR::ACC::APS::globalParameterSet()->adoptCollection(parset); - - LOG_TRACE_FLOW("Calling define()..."); - if(!process->define()) - { - return 1; - } - - LOG_TRACE_FLOW("Calling init()..."); - if(!process->init()) - { - return 1; - } - - LOG_TRACE_FLOW("Calling run()..."); - if(!process->run()) - { - return 1; - } - - LOG_TRACE_FLOW("Calling quit()..."); - if (!process->quit()) - { - return 1; - } - */ - - LOG_TRACE_FLOW("Deleting process " + programName + "."); - } - else - { - // Started by ACC. - LOG_TRACE_FLOW(programName + " started by ACC."); - - // All ACC processes expect "ACC" as first argument, - // so the parameter file is the second argument. - - // Read in parameterfile and get my name - ParameterSet parset(argv[2]); - string processID = parset.getString("process.name"); - ParameterSet parsubset = parset.makeSubset(processID + "."); - - LOFAR::ACC::APS::globalParameterSet()->adoptCollection(parsubset); - - ProcControlServer controlServer(parsubset.getString("ACnode"), parsubset.getUint16("ACport"), &process); - LOG_TRACE_FLOW("Registering at ApplicationController"); - controlServer.registerAtAC(processID); - LOG_TRACE_FLOW("Registered at ApplicationController"); - - // Main processing loop - bool running = false; - bool quit = false; - while(!quit) - { - LOG_TRACE_FLOW("Polling ApplicationController for a new message."); - if(controlServer.pollForMessage()) - { - LOG_TRACE_FLOW("New message received from ApplicationController."); - - // Get pointer to message - DH_ProcControl* msg = controlServer.getDataHolder(); - controlServer.handleMessage(msg); - - switch(msg->getCommand()) - { - case LOFAR::ACC::PLC::PCCmdQuit: - quit = true; - break; - case LOFAR::ACC::PLC::PCCmdRun: - running = true; - break; - case LOFAR::ACC::PLC::PCCmdPause: - running = false; - break; - } - } - else - { - if(running) - { - process.run(); - } - } - } + const char* progName = basename(argv[0]); + INIT_LOGGER(progName); - LOG_INFO_STR("Unregistering at ApplicationController"); - controlServer.unregisterAtAC(""); - } - - LOG_INFO_STR(programName << " terminated normally."); - return 0; + LOG_INFO_STR(progName << " is starting up ..."); + try { + BBSKernelProcessControl myProcess; + return ACCmain(argc, argv, &myProcess); + } + + catch(Exception& e) + { + LOG_FATAL_STR(progName << " terminated due to fatal exception!\n" << e); + return 1; + } + + LOG_INFO_STR(progName << " terminated successfully."); + return 0; } diff --git a/CEP/BB/BBSControl/src/BBSKernelProcessControl.cc b/CEP/BB/BBSControl/src/BBSKernelProcessControl.cc index a68ec358acf133886e510eab863c56b392e960fd..aa6e0c4f60165c2fceb4194eabe87dcea8d6c800 100644 --- a/CEP/BB/BBSControl/src/BBSKernelProcessControl.cc +++ b/CEP/BB/BBSControl/src/BBSKernelProcessControl.cc @@ -26,427 +26,471 @@ #include <lofar_config.h> #include <BBSControl/BBSKernelProcessControl.h> - -#include <APS/ParameterSet.h> -#include <Common/LofarLogger.h> -#include <Common/StreamUtil.h> -#include <ParmDB/ParmDB.h> -#include <ParmDB/ParmDBMeta.h> +#include <BBSControl/DH_BlobStreamable.h> #include <BBSControl/BBSStrategy.h> #include <BBSControl/BBSSingleStep.h> #include <BBSControl/BBSSolveStep.h> #include <BBSKernel/BBSKernelStructs.h> #include <BBSKernel/Prediffer.h> #include <BBSKernel/Solver.h> +#include <ParmDB/ParmDB.h> +#include <ParmDB/ParmDBMeta.h> +#include <APS/ParameterSet.h> +#include <Transport/TH_Socket.h> +#include <Transport/CSConnection.h> +#include <Common/LofarLogger.h> +#include <Common/StreamUtil.h> +#include <Common/lofar_iomanip.h> -#include <iostream> -#include <iomanip> -using namespace std; +using namespace LOFAR::ACC::APS; namespace LOFAR { -namespace BBS -{ + namespace BBS + { using LOFAR::operator<<; - - BBSKernelProcessControl::BBSKernelProcessControl() - : ProcessControl(), - itsPrediffer(0), - itsHistory(0) + + + //##---- P u b l i c m e t h o d s ----##// + + BBSKernelProcessControl::BBSKernelProcessControl() : + ProcessControl(), + itsPrediffer(0), + itsHistory(0), + itsDataHolder(0), + itsTransportHolder(0), + itsConnection(0) { + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); } - - // Destructor + + BBSKernelProcessControl::~BBSKernelProcessControl() { - delete itsPrediffer; - delete itsHistory; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + delete itsPrediffer; + delete itsHistory; + delete itsDataHolder; + delete itsTransportHolder; + delete itsConnection; } - // \name Command to control the processes. - // There are a dozen commands that can be sent to a application process - // to control its flow. The return values for these command are:<br> - // - True - Command executed succesfully. - // - False - Command could not be executed. - // - // @{ - - // During the \c define state the process check the contents of the - // ParameterSet it received during start-up. When everthing seems ok the - // process constructs the communication channels for exchanging data - // with the other processes. The connection are NOT made in the stage. - boost::logic::tribool BBSKernelProcessControl::define() + + tribool BBSKernelProcessControl::define() { - LOG_TRACE_FLOW("define()"); - //LOFAR::ACC::APS::ParameterSet* parset = LOFAR::ACC::APS::globalParameterSet(); - - // Get BBSController IP-address? - // Initialize some data holders? - - return true; + LOG_INFO("BBSKernelProcessControl::define()"); + try { + // Create a new data holder. + itsDataHolder = new DH_BlobStreamable(); + + // Create a new client TH_Socket. Do not connect yet. + itsTransportHolder = + new TH_Socket(globalParameterSet()->getString("BBSControl.server"), + globalParameterSet()->getString("BBSControl.port"), + true, // sync + Socket::TCP, // protocol + false); // open socket now + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } + return true; } - // When a process receives an \c init command it allocates the buffers it - // needs and makes the connections with the other processes. When the - // process succeeds in this it is ready for dataprocessing (or whatever - // task the process has). - boost::logic::tribool BBSKernelProcessControl::init() + + tribool BBSKernelProcessControl::init() { - LOG_TRACE_FLOW("init()"); - - // Create connection with BBSController? - - return true; + LOG_INFO("BBSKernelProcessControl::init()"); + try { + // DH_BlobStreamable is initialized implicitly by its constructor. + ASSERT(itsDataHolder); + if (!itsDataHolder->isInitialized()) { + LOG_ERROR("Initialization of DataHolder failed"); + return false; + } + // Connect the client socket to the server. + ASSERT(itsTransportHolder); + if (!itsTransportHolder->init()) { + LOG_ERROR("Initialization of TransportHolder failed"); + return false; + } + // Create a new CSConnection object. + itsConnection = new CSConnection("RWConn", + itsDataHolder, + itsDataHolder, + itsTransportHolder); + if (!itsConnection || !itsConnection->isConnected()) { + LOG_ERROR("Creation of Connection failed"); + return false; + } + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } + // All went well. + return true; } - // During the \c run phase the process does the work it is designed for. - // The run phase stays active until another command is send. - boost::logic::tribool BBSKernelProcessControl::run() + + tribool BBSKernelProcessControl::run() { - LOG_TRACE_FLOW("run()"); - - // Non-blocking receive from BBSControl - - // If received msg - bool status = true; -/* - if(...) - { - if(msg->type() == BBSStrategy::type()) - { - status = handle(dynamic_cast<BBSStrategy*>(msg)); - } - if(msg->type() == BBSPredictStep::type()) - { - status = handle(dynamic_cast<BBSPredictStep*>(msg)); - } - else if(msg->type() == BBSSubtractStep::type()) - { - status = handle(dynamic_cast<BBSSubtractStep*>(msg)); - } - else if(msg->type() == BBSCorrectStep::type()) - { - status = handle(dynamic_cast<BBSCorrectStep*>(msg)); - } - else if(msg->type() == BBSSolveStep::type()) - { - status = handle(dynamic_cast<BBSSolveStep*>(msg)); - } - else - { - LOG_ERROR("Received message of unknown type, skipped.") - } - } -*/ - return status; + LOG_INFO("BBSKernelProcessControl::run()"); + + try { + // Blocking receive from BBSProcessControl + ASSERT(itsConnection); + if (itsConnection->read() == CSConnection::Error) { + LOG_ERROR("Connection::read() failed"); + return false; + } + // Deserialize the received message. + ASSERT(itsDataHolder); + BlobStreamable* msg = itsDataHolder->deserialize(); + if (!msg) { + LOG_ERROR("Failed to deserialize message"); + return false; + } + LOG_DEBUG_STR("Received a " << itsDataHolder->classType() + << " object"); + + // If the message contains a `strategy', handle the `strategy'. + BBSStrategy* strategy = dynamic_cast<BBSStrategy*>(msg); + if (strategy) { + return handle(strategy); + } + // If the message contains a `step', handle the `step'. + BBSStep* step = dynamic_cast<BBSStep*>(msg); + if (step) { + return handle(step); + } + // We received a message we can't handle + LOG_WARN_STR("Received message of unsupported type `" << + itsDataHolder->classType() << "'. Skipped."); + return false; + + // Should we send the result back? + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } } - // With the \c pause command the process stops its run phase and starts - // waiting for another command. The \c condition argument contains the - // contition the process should use for ending the run phase. This - // condition is a key-value pair that can eg. contain a timestamp or a - // number of a datasample. - boost::logic::tribool BBSKernelProcessControl::pause(const string& condition) + + tribool BBSKernelProcessControl::pause(const string& /*condition*/) { - return false; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + LOG_WARN("Not supported"); + return false; } - // \c Quit stops the process. - // The process \b must call \c unregisterAtAC at ProcControlServer during - // the execution of this command to pass the final results to the - // Application Controller. - boost::logic::tribool BBSKernelProcessControl::quit() + + tribool BBSKernelProcessControl::quit() { - LOG_TRACE_FLOW("quit()"); - return true; + LOG_INFO("BBSKernelProcessControl::quit()"); + return true; } - // With the \c snapshot command the process is instructed to save itself - // in a database is such a way that on another moment in time it can - // be reconstructed and can continue it task.<br> - // The \c destination argument contains database info the process - // must use to save itself. - boost::logic::tribool BBSKernelProcessControl::snapshot(const string& destination) + + tribool BBSKernelProcessControl::snapshot(const string& /*destination*/) { - return false; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + LOG_WARN("Not supported"); + return false; } - // \c Recover reconstructs the process as it was saved some time earlier. - // The \c source argument contains the database info the process must use - // to find the information it needs. - boost::logic::tribool BBSKernelProcessControl::recover(const string& source) + + tribool BBSKernelProcessControl::recover(const string& /*source*/) { - return false; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + LOG_WARN("Not supported"); + return false; } - // With \c reinit the process receives a new parameterset that it must use - // to reinitialize itself. - boost::logic::tribool BBSKernelProcessControl::reinit(const string& configID) + + tribool BBSKernelProcessControl::reinit(const string& /*configID*/) { - return false; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + LOG_WARN("Not supported"); + return false; } - // @} - // Define a generic way to exchange info between client and server. - std::string BBSKernelProcessControl::askInfo(const string& keylist) + + std::string BBSKernelProcessControl::askInfo(const string& /*keylist*/) { - return std::string(""); + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + return std::string(""); } - // This is somewhat hairy: convert from struct Correlation to struct CorrelationMask, which are - // basically the same. However, one is defined in the BBSControl package and the other in the - // BBSKernel package. It may be better to move the Correlation struct, and also the Baselines struct, - // to the BBSKernel package and then have BBSControl depend on that. - // - // Note: Correlation::NONE is meaningless; it is an error if this should occur. - // - /* - CorrelationMask BBSKernelProcessControl::convertCorrelationToMask(const Correlation &correlation) + + bool BBSKernelProcessControl::handle(const BBSStrategy *strategy) { - CorrelationMask mask; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + + try + { + // Pre-condition check + ASSERT(strategy); + + delete itsHistory; + itsHistory = 0; + + if(!strategy->parmDB().history.empty()) + { + LOFAR::ParmDB::ParmDBMeta historyDBMeta("aips", + strategy->parmDB().history); + itsHistory = new LOFAR::ParmDB::ParmDB(historyDBMeta); + } - switch(correlation.selection) - { - case Correlation::AUTO: - mask.selection = CorrelationMask::AUTO; - break; - case Correlation::CROSS: - mask.selection = CorrelationMask::CROSS; - break; - default: - mask.selection = CorrelationMask::ALL; - break; - } - mask.type = correlation.type; + // Create a new Prediffer. + delete itsPrediffer; + itsPrediffer = new Prediffer(strategy->dataSet(), + strategy->inputData(), + strategy->parmDB().localSky, + strategy->parmDB().instrument, + 0, + false); - return mask; + // Set data selection and work domain. + bool status; + status = itsPrediffer->setSelection(strategy->stations(), + strategy->correlation()) + && itsPrediffer->setWorkDomain(-1, -1, 0, 1e12); + return status; + } + + catch (Exception& e) + { + LOG_WARN_STR(e); + return false; + } } - */ - // This is somewhat hairy: convert a BBSStep to a context. If the issue mentioned above - // at convertCorrelationToMask() is resolved, this method will become simpler. The baselines - // and correlation could then be copied directly. - void BBSKernelProcessControl::convertStepToContext(const BBSStep *step, Context &context) + + bool BBSKernelProcessControl::handle(const BBSStep* bs) { - ASSERT(step->itsBaselines.station1.size() == step->itsBaselines.station2.size()); - - context.baselines = step->itsBaselines; - /* - vector<string>::const_iterator it1 = step->itsBaselines.station1.begin(); - vector<string>::const_iterator it2 = step->itsBaselines.station2.begin(); - while(it1 != step->itsBaselines.station1.end()) - { - context.baselines.push_back(pair<string, string>(*it1, *it2)); - ++it1; - ++it2; - } - */ - context.correlation = step->itsCorrelation; - context.sources = step->itsSources; - context.instrumentModel = step->itsInstrumentModels; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + { + const BBSPredictStep* step = dynamic_cast<const BBSPredictStep*>(bs); + if (step) return doHandle(step); + } + { + const BBSSubtractStep* step = dynamic_cast<const BBSSubtractStep*>(bs); + if (step) return doHandle(step); + } + { + const BBSCorrectStep* step = dynamic_cast<const BBSCorrectStep*>(bs); + if (step) return doHandle(step); + } + { + const BBSSolveStep* step = dynamic_cast<const BBSSolveStep*>(bs); + if (step) return doHandle(step); + } + return false; } + + + //##---- P r i v a t e m e t h o d s ----##// - - bool BBSKernelProcessControl::handle(const BBSStrategy *strategy) + void BBSKernelProcessControl::convertStepToContext(const BBSStep *step, Context &context) { - delete itsHistory; - itsHistory = 0; - - if(!strategy->itsParmDB.history.empty()) - { - LOFAR::ParmDB::ParmDBMeta historyDBMeta("aips", strategy->itsParmDB.history); - itsHistory = new LOFAR::ParmDB::ParmDB(historyDBMeta); - } - - // Create a new Prediffer. - delete itsPrediffer; - itsPrediffer = new Prediffer( strategy->itsDataSet, - strategy->itsInputData, - strategy->itsParmDB.localSky, - strategy->itsParmDB.instrument, - 0, - false); - - // Set data selection and work domain. - bool status; - status = itsPrediffer->setSelection(strategy->itsStations, strategy->itsCorrelation) - && itsPrediffer->setWorkDomain(-1, -1, 0, 1e12); - return status; + ASSERT(step->baselines().station1.size() == + step->baselines().station2.size()); + + context.baselines = step->baselines(); + /* + vector<string>::const_iterator it1 = step->itsBaselines.station1.begin(); + vector<string>::const_iterator it2 = step->itsBaselines.station2.begin(); + while(it1 != step->itsBaselines.station1.end()) + { + context.baselines.push_back(pair<string, string>(*it1, *it2)); + ++it1; + ++it2; + } + */ + context.correlation = step->correlation(); + context.sources = step->sources(); + context.instrumentModel = step->instrumentModels(); } - bool BBSKernelProcessControl::handle(const BBSPredictStep *step) + bool BBSKernelProcessControl::doHandle(const BBSPredictStep *step) { - ASSERTSTR(itsPrediffer, "No Prediffer available."); - ASSERTSTR(step, "Step corrupted."); - - PredictContext context; - convertStepToContext(step, context); - context.outputColumn = step->itsOutputData; - - // Set context. - if(!itsPrediffer->setContext(context)) - { - return false; - } - - // Execute predict. - itsPrediffer->predictVisibilities(); - return true; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + ASSERTSTR(itsPrediffer, "No Prediffer available."); + ASSERTSTR(step, "Step corrupted."); + + PredictContext context; + convertStepToContext(step, context); + context.outputColumn = step->outputData(); + + // Set context. + if(!itsPrediffer->setContext(context)) + { + return false; + } + + // Execute predict. + itsPrediffer->predictVisibilities(); + return true; } - bool BBSKernelProcessControl::handle(const BBSSubtractStep *step) + bool BBSKernelProcessControl::doHandle(const BBSSubtractStep *step) { - ASSERTSTR(itsPrediffer, "No Prediffer available."); - ASSERTSTR(step, "Step corrupted."); + ASSERTSTR(itsPrediffer, "No Prediffer available."); + ASSERTSTR(step, "Step corrupted."); - SubtractContext context; - convertStepToContext(step, context); - context.outputColumn = step->itsOutputData; + SubtractContext context; + convertStepToContext(step, context); + context.outputColumn = step->outputData(); - // Set context. - if(!itsPrediffer->setContext(context)) - { - return false; - } + // Set context. + if(!itsPrediffer->setContext(context)) + { + return false; + } - // Execute subtract. - itsPrediffer->subtractVisibilities(); - return true; + // Execute subtract. + itsPrediffer->subtractVisibilities(); + return true; } - bool BBSKernelProcessControl::handle(const BBSCorrectStep *step) + bool BBSKernelProcessControl::doHandle(const BBSCorrectStep *step) { - ASSERTSTR(itsPrediffer, "No Prediffer available."); - ASSERTSTR(step, "Step corrupted."); + ASSERTSTR(itsPrediffer, "No Prediffer available."); + ASSERTSTR(step, "Step corrupted."); - CorrectContext context; - convertStepToContext(step, context); - context.outputColumn = step->itsOutputData; + CorrectContext context; + convertStepToContext(step, context); + context.outputColumn = step->outputData(); - // Set context. - if(!itsPrediffer->setContext(context)) - { - return false; - } + // Set context. + if(!itsPrediffer->setContext(context)) + { + return false; + } - // Execute correct. - itsPrediffer->correctVisibilities(); - return true; + // Execute correct. + itsPrediffer->correctVisibilities(); + return true; } - bool BBSKernelProcessControl::handle(const BBSSolveStep *step) + bool BBSKernelProcessControl::doHandle(const BBSSolveStep *step) { - ASSERTSTR(itsPrediffer, "No Prediffer available."); - ASSERTSTR(step, "Step corrupted."); + ASSERTSTR(itsPrediffer, "No Prediffer available."); + ASSERTSTR(step, "Step corrupted."); - // Construct context. - GenerateContext context; - convertStepToContext(step, context); - context.unknowns = step->itsParms; - context.excludedUnknowns = step->itsExclParms; + // Construct context. + GenerateContext context; + convertStepToContext(step, context); + context.unknowns = step->parms(); + context.excludedUnknowns = step->exclParms(); - // Create solve domains. For now this just splits the work domain in a rectangular grid, - // with cells of size step->itsDomainSize. Should become more interesting in the near future. - const MeqDomain &workDomain = itsPrediffer->getWorkDomain(); + // Create solve domains. For now this just splits the work domain in a rectangular grid, + // with cells of size step->itsDomainSize. Should become more interesting in the near future. + const MeqDomain &workDomain = itsPrediffer->getWorkDomain(); - const int freqCount = (int) ceil((workDomain.endX() - workDomain.startX()) / step->itsDomainSize.bandWidth); - const int timeCount = (int) ceil((workDomain.endY() - workDomain.startY()) / step->itsDomainSize.timeInterval); + const int freqCount = (int) ceil((workDomain.endX() - workDomain.startX()) / step->domainSize().bandWidth); + const int timeCount = (int) ceil((workDomain.endY() - workDomain.startY()) / step->domainSize().timeInterval); - double timeOffset = workDomain.startY(); - double timeSize = step->itsDomainSize.timeInterval; + double timeOffset = workDomain.startY(); + double timeSize = step->domainSize().timeInterval; - int time = 0; - while(time < timeCount) - { - double freqOffset = workDomain.startX(); - double freqSize = step->itsDomainSize.bandWidth; - - if(timeOffset + timeSize > workDomain.endY()) - { - timeSize = workDomain.endY() - timeOffset; - } - - int freq = 0; - while(freq < freqCount) - { - if(freqOffset + freqSize > workDomain.endX()) - { - freqSize = workDomain.endX() - freqOffset; - } - - context.solveDomains.push_back(MeqDomain(freqOffset, freqOffset + freqSize, timeOffset, timeOffset + timeSize)); - - freqOffset += freqSize; - freq++; - } - - timeOffset += timeSize; - time++; - } + int time = 0; + while(time < timeCount) + { + double freqOffset = workDomain.startX(); + double freqSize = step->domainSize().bandWidth; + + if(timeOffset + timeSize > workDomain.endY()) + { + timeSize = workDomain.endY() - timeOffset; + } + + int freq = 0; + while(freq < freqCount) + { + if(freqOffset + freqSize > workDomain.endX()) + { + freqSize = workDomain.endX() - freqOffset; + } + + context.solveDomains.push_back(MeqDomain(freqOffset, freqOffset + freqSize, timeOffset, timeOffset + timeSize)); + + freqOffset += freqSize; + freq++; + } + + timeOffset += timeSize; + time++; + } - // Set context. - if(!itsPrediffer->setContext(context)) - { - return false; - } + // Set context. + if(!itsPrediffer->setContext(context)) + { + return false; + } - cout << "Solve domains:" << endl; - cout << itsPrediffer->getSolveDomains() << endl; + cout << "Solve domains:" << endl; + cout << itsPrediffer->getSolveDomains() << endl; - // Initialize the solver. - Solver solver; - solver.initSolvableParmData(1, itsPrediffer->getSolveDomains(), itsPrediffer->getWorkDomain()); - solver.setSolvableParmData(itsPrediffer->getSolvableParmData(), 0); - itsPrediffer->showSettings(); - - // Main iteration loop. - unsigned int iteration = 0; - bool converged = false; - while(iteration < step->itsMaxIter && !converged) - { - // Generate normal equations and pass them to the solver. - vector<casa::LSQFit> equations; - itsPrediffer->generateEquations(equations); - solver.mergeFitters(equations, 0); - - // Do one Levenberg-Maquardt step. - solver.solve(false); + // Initialize the solver. + Solver solver; + solver.initSolvableParmData(1, itsPrediffer->getSolveDomains(), itsPrediffer->getWorkDomain()); + solver.setSolvableParmData(itsPrediffer->getSolvableParmData(), 0); + itsPrediffer->showSettings(); + + // Main iteration loop. + unsigned int iteration = 0; + bool converged = false; + while(iteration < step->maxIter() && !converged) + { + // Generate normal equations and pass them to the solver. + vector<casa::LSQFit> equations; + itsPrediffer->generateEquations(equations); + solver.mergeFitters(equations, 0); + + // Do one Levenberg-Maquardt step. + solver.solve(false); - // Optionally log to history. - if(itsHistory) - { - solver.log(*itsHistory, step->itsName); - } - - // Check for convergence. - int convergedSolveDomains = 0; - for(unsigned int i = 0; i < context.solveDomains.size(); ++i) - { - Quality quality = solver.getQuality(i); - if(quality.itsChi < step->itsEpsilon) - { - convergedSolveDomains++; - } - } - converged = (((double) convergedSolveDomains) / context.solveDomains.size()) > step->itsMinConverged; + // Optionally log to history. + if(itsHistory) + { + solver.log(*itsHistory, step->getName()); + } + + // Check for convergence. + int convergedSolveDomains = 0; + for(unsigned int i = 0; i < context.solveDomains.size(); ++i) + { + Quality quality = solver.getQuality(i); + if(quality.itsChi < step->epsilon()) + { + convergedSolveDomains++; + } + } + converged = (((double) convergedSolveDomains) / context.solveDomains.size()) > step->minConverged(); - cout << "iteration " << iteration << ": " << setprecision(10) << solver.getSolvableValues(0) << endl; - cout << "solve domains converged: " << convergedSolveDomains << "/" << context.solveDomains.size() << - " (" << (((double) convergedSolveDomains) / context.solveDomains.size() * 100.0) << "%)" << endl; + cout << "iteration " << iteration << ": " << setprecision(10) << solver.getSolvableValues(0) << endl; + cout << "solve domains converged: " << convergedSolveDomains << "/" << context.solveDomains.size() << + " (" << (((double) convergedSolveDomains) / context.solveDomains.size() * 100.0) << "%)" << endl; - // Send updates back to the Prediffer. - itsPrediffer->updateSolvableParms(solver.getSolvableParmData()); + // Send updates back to the Prediffer. + itsPrediffer->updateSolvableParms(solver.getSolvableParmData()); - iteration++; - } + iteration++; + } - //cout << "Writing solutions into ParmDB ..." << endl; - //itsPrediffer->writeParms(); - return true; + //cout << "Writing solutions into ParmDB ..." << endl; + //itsPrediffer->writeParms(); + return true; } -} // namespace BBS + + } // namespace BBS + } // namespace LOFAR diff --git a/CEP/BB/BBSControl/src/BBSProcessControl.cc b/CEP/BB/BBSControl/src/BBSProcessControl.cc index dd46994973ac0a887b0e8feb2902c26fc5368aec..d1d2b675f9e4d82fc8a7130406c441e29d1c25c7 100644 --- a/CEP/BB/BBSControl/src/BBSProcessControl.cc +++ b/CEP/BB/BBSControl/src/BBSProcessControl.cc @@ -24,35 +24,65 @@ #include <BBSControl/BBSProcessControl.h> #include <BBSControl/BBSStrategy.h> +#include <BBSControl/BBSStep.h> +#include <BBSControl/DH_BlobStreamable.h> +#include <Transport/TH_Socket.h> +#include <Transport/CSConnection.h> #include <Common/LofarLogger.h> -#include <boost/logic/tribool.hpp> -using namespace boost::logic; using namespace LOFAR::ACC::APS; namespace LOFAR { namespace BBS { - //##-------- P u b l i c m e t h o d s --------##// + BBSProcessControl::BBSProcessControl() : + ProcessControl(), + itsStrategy(0), + itsDataHolder(0), + itsTransportHolder(0), + itsConnection(0), + itsStrategySent(false) + { + LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); + } + + BBSProcessControl::~BBSProcessControl() { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); + delete itsStrategy; + delete itsDataHolder; + delete itsTransportHolder; + delete itsConnection; } tribool BBSProcessControl::define() { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); + LOG_INFO("BBSProcessControl::define()"); try { - itsParamSet = *globalParameterSet(); - itsStrategy = new BBSStrategy(itsParamSet); + // Retrieve the strategy from the parameter set. + itsStrategy = new BBSStrategy(*globalParameterSet()); + + // Retrieve the steps in the strategy in sequential order. itsSteps = itsStrategy->getAllSteps(); - } catch(Exception& e) { - LOG_WARN_STR(e); - return false; + + // Create a new data holder. + itsDataHolder = new DH_BlobStreamable(); + + // Create a new server TH_Socket. Do not open the socket yet. + itsTransportHolder = + new TH_Socket(globalParameterSet()->getString("BBSControl.port"), + true, // sync + Socket::TCP, // protocol + false); // open socket now + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; } return true; } @@ -60,61 +90,145 @@ namespace LOFAR tribool BBSProcessControl::init() { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_INFO("BBSProcessControl::init()"); + try { + // We need to send the strategy first. + itsStrategySent = false; + + // Set the step iterator at the start of the vector of steps. + itsStepsIterator = itsSteps.begin(); + + // DH_BlobStreamable is initialized implicitly by its constructor. + ASSERT(itsDataHolder); + if (!itsDataHolder->isInitialized()) { + LOG_ERROR("Initialization of DataHolder failed"); + return false; + } + // Connect the client socket to the server. + ASSERT(itsTransportHolder); + if (!itsTransportHolder->init()) { + LOG_ERROR("Initialization of TransportHolder failed"); + return false; + } + // Create a new CSConnection object. + itsConnection = new CSConnection("RWConn", + itsDataHolder, + itsDataHolder, + itsTransportHolder); + if (!itsConnection || !itsConnection->isConnected()) { + LOG_ERROR("Creation of Connection failed"); + return false; + } + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } + // All went well. + return true; } tribool BBSProcessControl::run() { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_INFO("BBSProcessControl::run()"); + + try { + // Check pre-conditions + ASSERT(itsDataHolder); + ASSERT(itsConnection); + ASSERT(itsStrategy); + + // If we have not sent the strategy yet. We should do so now. + if (!itsStrategySent) { + return sendObject(*itsStrategy); + } + // Else, we should send the next step, unless we're at the end of the + // vector of steps. + else { + if (itsStepsIterator == itsSteps.end()) { + LOG_TRACE_COND("Reached end of vector of steps"); + return false; + } + // Send the next step and increment the iterator. + return sendObject(**itsStepsIterator++); + } + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } } - + + bool BBSProcessControl::sendObject(const BlobStreamable& bs) + { + try { + // Serialize the object + itsDataHolder->serialize(bs); + LOG_DEBUG_STR("Sending a " << itsDataHolder->classType() << " object"); + + // Do a blocking send + if (itsConnection->write() == CSConnection::Error) { + LOG_ERROR("Connection::write() failed"); + return false; + } + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } + // All went well. + return true; + } + + tribool BBSProcessControl::quit() { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_INFO("BBSProcessControl::quit()"); + return true; } //##-------- P r i v a t e m e t h o d s --------##// - tribool BBSProcessControl::pause(const string& condition) + tribool BBSProcessControl::pause(const string& /*condition*/) { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_WARN("Not supported"); + return false; } - tribool BBSProcessControl::snapshot(const string& destination) + tribool BBSProcessControl::snapshot(const string& /*destination*/) { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_WARN("Not supported"); + return false; } - tribool BBSProcessControl::recover(const string& source) + tribool BBSProcessControl::recover(const string& /*source*/) { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_WARN("Not supported"); + return false; } - tribool BBSProcessControl::reinit(const string& configID) + tribool BBSProcessControl::reinit(const string& /*configID*/) { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_WARN("Not supported"); + return false; } - string BBSProcessControl::askInfo(const string& keylist) + string BBSProcessControl::askInfo(const string& /*keylist*/) { + LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); return string(); } - //##-------- G l o b a l m e t h o d s --------##// - } // namespace BBS diff --git a/CEP/BB/BBSControl/src/BBSSingleStep.cc b/CEP/BB/BBSControl/src/BBSSingleStep.cc index b5f36c39d55c73e94eb5e45ca75624f6eebb02b4..079420368991421f2f6ffeda07bf3025f0b48cfd 100644 --- a/CEP/BB/BBSControl/src/BBSSingleStep.cc +++ b/CEP/BB/BBSControl/src/BBSSingleStep.cc @@ -91,14 +91,6 @@ namespace LOFAR } - const string& BBSSingleStep::classType() const - { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - static const string theType("BBSSingleStep"); - return theType; - } - - } // namespace BBS } // namespace LOFAR diff --git a/CEP/BB/BBSControl/src/BBSStep.cc b/CEP/BB/BBSControl/src/BBSStep.cc index 0e9bd397e105447dce03241bd70c7578c91e8e48..77aadd5e2ab37e273d2b1739aa576eb9f4abab33 100644 --- a/CEP/BB/BBSControl/src/BBSStep.cc +++ b/CEP/BB/BBSControl/src/BBSStep.cc @@ -52,11 +52,11 @@ namespace LOFAR } - string BBSStep::getFullName() const + string BBSStep::fullName() const { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); string name; - if (itsParent) name = itsParent->getFullName() + "."; + if (itsParent) name = itsParent->fullName() + "."; name += itsName; return name; } @@ -161,7 +161,7 @@ namespace LOFAR LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); os << "Step: " << itsName; Indent id; // add an extra indentation level - os << endl << indent << "Full name: " << getFullName() + os << endl << indent << "Full name: " << fullName() << endl << indent << itsBaselines << endl << indent << itsCorrelation << endl << indent << itsIntegration diff --git a/CEP/BB/BBSControl/src/BlobStreamable.cc b/CEP/BB/BBSControl/src/BlobStreamable.cc index 9c33c6b1e615f263f390ef572d39fbf8005493b1..f429f33294cc96bcf3b976012f2bb74340a2e579 100644 --- a/CEP/BB/BBSControl/src/BlobStreamable.cc +++ b/CEP/BB/BBSControl/src/BlobStreamable.cc @@ -85,10 +85,14 @@ namespace LOFAR // a proper object factory. BlobStreamable* BlobStreamable::create(const string& typeName) { - if (typeName == "BBSMultiStep") return new BBSMultiStep(); - else if (typeName == "BBSSingleStep") return new BBSSingleStep(); - else if (typeName == "BBSSolveStep") return new BBSSolveStep(); - else if (typeName == "BBSStrategy") return new BBSStrategy(); + if (typeName == "BBSStrategy") return new BBSStrategy(); + else if (typeName == "BBSMultiStep") return new BBSMultiStep(); + else if (typeName == "BBSSolveStep") return new BBSSolveStep(); + else if (typeName == "BBSPredictStep") return new BBSPredictStep(); + else if (typeName == "BBSSubtractStep") return new BBSSubtractStep(); + else if (typeName == "BBSCorrectStep") return new BBSCorrectStep(); + else if (typeName == "BBSShiftStep") return new BBSShiftStep(); + else if (typeName == "BBSRefitStep") return new BBSRefitStep(); else return 0; } diff --git a/CEP/BB/BBSControl/src/GlobalProcessControl.cc b/CEP/BB/BBSControl/src/GlobalProcessControl.cc index dd46994973ac0a887b0e8feb2902c26fc5368aec..d1d2b675f9e4d82fc8a7130406c441e29d1c25c7 100644 --- a/CEP/BB/BBSControl/src/GlobalProcessControl.cc +++ b/CEP/BB/BBSControl/src/GlobalProcessControl.cc @@ -24,35 +24,65 @@ #include <BBSControl/BBSProcessControl.h> #include <BBSControl/BBSStrategy.h> +#include <BBSControl/BBSStep.h> +#include <BBSControl/DH_BlobStreamable.h> +#include <Transport/TH_Socket.h> +#include <Transport/CSConnection.h> #include <Common/LofarLogger.h> -#include <boost/logic/tribool.hpp> -using namespace boost::logic; using namespace LOFAR::ACC::APS; namespace LOFAR { namespace BBS { - //##-------- P u b l i c m e t h o d s --------##// + BBSProcessControl::BBSProcessControl() : + ProcessControl(), + itsStrategy(0), + itsDataHolder(0), + itsTransportHolder(0), + itsConnection(0), + itsStrategySent(false) + { + LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); + } + + BBSProcessControl::~BBSProcessControl() { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); + delete itsStrategy; + delete itsDataHolder; + delete itsTransportHolder; + delete itsConnection; } tribool BBSProcessControl::define() { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); + LOG_INFO("BBSProcessControl::define()"); try { - itsParamSet = *globalParameterSet(); - itsStrategy = new BBSStrategy(itsParamSet); + // Retrieve the strategy from the parameter set. + itsStrategy = new BBSStrategy(*globalParameterSet()); + + // Retrieve the steps in the strategy in sequential order. itsSteps = itsStrategy->getAllSteps(); - } catch(Exception& e) { - LOG_WARN_STR(e); - return false; + + // Create a new data holder. + itsDataHolder = new DH_BlobStreamable(); + + // Create a new server TH_Socket. Do not open the socket yet. + itsTransportHolder = + new TH_Socket(globalParameterSet()->getString("BBSControl.port"), + true, // sync + Socket::TCP, // protocol + false); // open socket now + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; } return true; } @@ -60,61 +90,145 @@ namespace LOFAR tribool BBSProcessControl::init() { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_INFO("BBSProcessControl::init()"); + try { + // We need to send the strategy first. + itsStrategySent = false; + + // Set the step iterator at the start of the vector of steps. + itsStepsIterator = itsSteps.begin(); + + // DH_BlobStreamable is initialized implicitly by its constructor. + ASSERT(itsDataHolder); + if (!itsDataHolder->isInitialized()) { + LOG_ERROR("Initialization of DataHolder failed"); + return false; + } + // Connect the client socket to the server. + ASSERT(itsTransportHolder); + if (!itsTransportHolder->init()) { + LOG_ERROR("Initialization of TransportHolder failed"); + return false; + } + // Create a new CSConnection object. + itsConnection = new CSConnection("RWConn", + itsDataHolder, + itsDataHolder, + itsTransportHolder); + if (!itsConnection || !itsConnection->isConnected()) { + LOG_ERROR("Creation of Connection failed"); + return false; + } + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } + // All went well. + return true; } tribool BBSProcessControl::run() { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_INFO("BBSProcessControl::run()"); + + try { + // Check pre-conditions + ASSERT(itsDataHolder); + ASSERT(itsConnection); + ASSERT(itsStrategy); + + // If we have not sent the strategy yet. We should do so now. + if (!itsStrategySent) { + return sendObject(*itsStrategy); + } + // Else, we should send the next step, unless we're at the end of the + // vector of steps. + else { + if (itsStepsIterator == itsSteps.end()) { + LOG_TRACE_COND("Reached end of vector of steps"); + return false; + } + // Send the next step and increment the iterator. + return sendObject(**itsStepsIterator++); + } + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } } - + + bool BBSProcessControl::sendObject(const BlobStreamable& bs) + { + try { + // Serialize the object + itsDataHolder->serialize(bs); + LOG_DEBUG_STR("Sending a " << itsDataHolder->classType() << " object"); + + // Do a blocking send + if (itsConnection->write() == CSConnection::Error) { + LOG_ERROR("Connection::write() failed"); + return false; + } + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } + // All went well. + return true; + } + + tribool BBSProcessControl::quit() { - LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_INFO("BBSProcessControl::quit()"); + return true; } //##-------- P r i v a t e m e t h o d s --------##// - tribool BBSProcessControl::pause(const string& condition) + tribool BBSProcessControl::pause(const string& /*condition*/) { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_WARN("Not supported"); + return false; } - tribool BBSProcessControl::snapshot(const string& destination) + tribool BBSProcessControl::snapshot(const string& /*destination*/) { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_WARN("Not supported"); + return false; } - tribool BBSProcessControl::recover(const string& source) + tribool BBSProcessControl::recover(const string& /*source*/) { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_WARN("Not supported"); + return false; } - tribool BBSProcessControl::reinit(const string& configID) + tribool BBSProcessControl::reinit(const string& /*configID*/) { LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); - return indeterminate; + LOG_WARN("Not supported"); + return false; } - string BBSProcessControl::askInfo(const string& keylist) + string BBSProcessControl::askInfo(const string& /*keylist*/) { + LOG_TRACE_LIFETIME(TRACE_LEVEL_FLOW, ""); return string(); } - //##-------- G l o b a l m e t h o d s --------##// - } // namespace BBS diff --git a/CEP/BB/BBSControl/src/KernelControl.cc b/CEP/BB/BBSControl/src/KernelControl.cc index b020fac1e37a528e6f7907a7207ef48c6f8c3421..db71568da025ba47116d58e0171c066e552032b1 100644 --- a/CEP/BB/BBSControl/src/KernelControl.cc +++ b/CEP/BB/BBSControl/src/KernelControl.cc @@ -1,192 +1,51 @@ -//# BBSKernel.cc: minimal control wrapper around the kernel +//# BBSKernel.cc: minimal control wrapper around the kernel //# -//# Copyright (C) 2004 -//# ASTRON (Netherlands Foundation for Research in Astronomy) -//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# Copyright (C) 2004 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl //# -//# This program is free software; you can redistribute it and/or modify -//# it under the terms of the GNU General Public License as published by -//# the Free Software Foundation; either version 2 of the License, or -//# (at your option) any later version. +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. //# -//# This program is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. //# -//# You should have received a copy of the GNU General Public License -//# along with this program; if not, write to the Free Software -//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA //# -//# $Id: +//# $Id: -//# Always #include <lofar_config.h> first! #include <lofar_config.h> - -#include <APS/ParameterSet.h> -#include <PLC/ProcControlServer.h> -#include <Common/LofarLogger.h> -#include <BBSControl/BBSStrategy.h> -#include <BBSControl/BBSStep.h> -#include <BBSControl/BBSSingleStep.h> -#include <BBSControl/BBSSolveStep.h> +#include <libgen.h> #include <BBSControl/BBSKernelProcessControl.h> -#include <BBSKernel/Prediffer.h> -#include <BBSKernel/MNS/MeqDomain.h> - -#include <string> +#include <PLC/ACCmain.h> using namespace LOFAR; using namespace LOFAR::BBS; -using namespace std; -using LOFAR::ACC::APS::ParameterSet; -using LOFAR::ACC::PLC::ProcControlServer; -using LOFAR::ACC::PLC::DH_ProcControl; +using namespace LOFAR::ACC::PLC; int main(int argc, char **argv) { - BBSKernelProcessControl process; - - string programName = basename(argv[0]); - INIT_LOGGER(programName.c_str()); - - if((argc != 3) || (strncmp("ACC", argv[1], 3) != 0)) - { - // Not started by ACC. - LOG_TRACE_FLOW(programName + " not started by ACC."); - - if(argc != 2) - { - LOG_INFO("No parset file specified, usage: " + programName + " [ACC] <parset file>"); - return 1; - } - - try - { - ParameterSet parset(argv[1]); - BBSStrategy strategy(parset); - - process.handle(&strategy); - - vector<const BBSStep*> steps = strategy.getAllSteps(); - for(int i = 0; i < steps.size(); ++i) - { - cout << "step type: " << steps[i]->type() << endl; - - if(steps[i]->type() == "BBSPredictStep") - { - process.handle(dynamic_cast<const BBSPredictStep*>(steps[i])); - } - if(steps[i]->type() == "BBSSubtractStep") - { - process.handle(dynamic_cast<const BBSSubtractStep*>(steps[i])); - } - if(steps[i]->type() == "BBSCorrectStep") - { - process.handle(dynamic_cast<const BBSCorrectStep*>(steps[i])); - } - if(steps[i]->type() == "BBSSolveStep") - { - process.handle(dynamic_cast<const BBSSolveStep*>(steps[i])); - } - } - } - catch(...) - { - LOG_TRACE_FLOW_STR("Parset file " << argv[1] << " not found."); - return 1; - } - - /* - LOFAR::ACC::APS::globalParameterSet()->adoptCollection(parset); - - LOG_TRACE_FLOW("Calling define()..."); - if(!process->define()) - { - return 1; - } - - LOG_TRACE_FLOW("Calling init()..."); - if(!process->init()) - { - return 1; - } - - LOG_TRACE_FLOW("Calling run()..."); - if(!process->run()) - { - return 1; - } - - LOG_TRACE_FLOW("Calling quit()..."); - if (!process->quit()) - { - return 1; - } - */ - - LOG_TRACE_FLOW("Deleting process " + programName + "."); - } - else - { - // Started by ACC. - LOG_TRACE_FLOW(programName + " started by ACC."); - - // All ACC processes expect "ACC" as first argument, - // so the parameter file is the second argument. - - // Read in parameterfile and get my name - ParameterSet parset(argv[2]); - string processID = parset.getString("process.name"); - ParameterSet parsubset = parset.makeSubset(processID + "."); - - LOFAR::ACC::APS::globalParameterSet()->adoptCollection(parsubset); - - ProcControlServer controlServer(parsubset.getString("ACnode"), parsubset.getUint16("ACport"), &process); - LOG_TRACE_FLOW("Registering at ApplicationController"); - controlServer.registerAtAC(processID); - LOG_TRACE_FLOW("Registered at ApplicationController"); - - // Main processing loop - bool running = false; - bool quit = false; - while(!quit) - { - LOG_TRACE_FLOW("Polling ApplicationController for a new message."); - if(controlServer.pollForMessage()) - { - LOG_TRACE_FLOW("New message received from ApplicationController."); - - // Get pointer to message - DH_ProcControl* msg = controlServer.getDataHolder(); - controlServer.handleMessage(msg); - - switch(msg->getCommand()) - { - case LOFAR::ACC::PLC::PCCmdQuit: - quit = true; - break; - case LOFAR::ACC::PLC::PCCmdRun: - running = true; - break; - case LOFAR::ACC::PLC::PCCmdPause: - running = false; - break; - } - } - else - { - if(running) - { - process.run(); - } - } - } + const char* progName = basename(argv[0]); + INIT_LOGGER(progName); - LOG_INFO_STR("Unregistering at ApplicationController"); - controlServer.unregisterAtAC(""); - } - - LOG_INFO_STR(programName << " terminated normally."); - return 0; + LOG_INFO_STR(progName << " is starting up ..."); + try { + BBSKernelProcessControl myProcess; + return ACCmain(argc, argv, &myProcess); + } + + catch(Exception& e) + { + LOG_FATAL_STR(progName << " terminated due to fatal exception!\n" << e); + return 1; + } + + LOG_INFO_STR(progName << " terminated successfully."); + return 0; } diff --git a/CEP/BB/BBSControl/src/KernelProcessControl.cc b/CEP/BB/BBSControl/src/KernelProcessControl.cc index a68ec358acf133886e510eab863c56b392e960fd..aa6e0c4f60165c2fceb4194eabe87dcea8d6c800 100644 --- a/CEP/BB/BBSControl/src/KernelProcessControl.cc +++ b/CEP/BB/BBSControl/src/KernelProcessControl.cc @@ -26,427 +26,471 @@ #include <lofar_config.h> #include <BBSControl/BBSKernelProcessControl.h> - -#include <APS/ParameterSet.h> -#include <Common/LofarLogger.h> -#include <Common/StreamUtil.h> -#include <ParmDB/ParmDB.h> -#include <ParmDB/ParmDBMeta.h> +#include <BBSControl/DH_BlobStreamable.h> #include <BBSControl/BBSStrategy.h> #include <BBSControl/BBSSingleStep.h> #include <BBSControl/BBSSolveStep.h> #include <BBSKernel/BBSKernelStructs.h> #include <BBSKernel/Prediffer.h> #include <BBSKernel/Solver.h> +#include <ParmDB/ParmDB.h> +#include <ParmDB/ParmDBMeta.h> +#include <APS/ParameterSet.h> +#include <Transport/TH_Socket.h> +#include <Transport/CSConnection.h> +#include <Common/LofarLogger.h> +#include <Common/StreamUtil.h> +#include <Common/lofar_iomanip.h> -#include <iostream> -#include <iomanip> -using namespace std; +using namespace LOFAR::ACC::APS; namespace LOFAR { -namespace BBS -{ + namespace BBS + { using LOFAR::operator<<; - - BBSKernelProcessControl::BBSKernelProcessControl() - : ProcessControl(), - itsPrediffer(0), - itsHistory(0) + + + //##---- P u b l i c m e t h o d s ----##// + + BBSKernelProcessControl::BBSKernelProcessControl() : + ProcessControl(), + itsPrediffer(0), + itsHistory(0), + itsDataHolder(0), + itsTransportHolder(0), + itsConnection(0) { + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); } - - // Destructor + + BBSKernelProcessControl::~BBSKernelProcessControl() { - delete itsPrediffer; - delete itsHistory; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + delete itsPrediffer; + delete itsHistory; + delete itsDataHolder; + delete itsTransportHolder; + delete itsConnection; } - // \name Command to control the processes. - // There are a dozen commands that can be sent to a application process - // to control its flow. The return values for these command are:<br> - // - True - Command executed succesfully. - // - False - Command could not be executed. - // - // @{ - - // During the \c define state the process check the contents of the - // ParameterSet it received during start-up. When everthing seems ok the - // process constructs the communication channels for exchanging data - // with the other processes. The connection are NOT made in the stage. - boost::logic::tribool BBSKernelProcessControl::define() + + tribool BBSKernelProcessControl::define() { - LOG_TRACE_FLOW("define()"); - //LOFAR::ACC::APS::ParameterSet* parset = LOFAR::ACC::APS::globalParameterSet(); - - // Get BBSController IP-address? - // Initialize some data holders? - - return true; + LOG_INFO("BBSKernelProcessControl::define()"); + try { + // Create a new data holder. + itsDataHolder = new DH_BlobStreamable(); + + // Create a new client TH_Socket. Do not connect yet. + itsTransportHolder = + new TH_Socket(globalParameterSet()->getString("BBSControl.server"), + globalParameterSet()->getString("BBSControl.port"), + true, // sync + Socket::TCP, // protocol + false); // open socket now + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } + return true; } - // When a process receives an \c init command it allocates the buffers it - // needs and makes the connections with the other processes. When the - // process succeeds in this it is ready for dataprocessing (or whatever - // task the process has). - boost::logic::tribool BBSKernelProcessControl::init() + + tribool BBSKernelProcessControl::init() { - LOG_TRACE_FLOW("init()"); - - // Create connection with BBSController? - - return true; + LOG_INFO("BBSKernelProcessControl::init()"); + try { + // DH_BlobStreamable is initialized implicitly by its constructor. + ASSERT(itsDataHolder); + if (!itsDataHolder->isInitialized()) { + LOG_ERROR("Initialization of DataHolder failed"); + return false; + } + // Connect the client socket to the server. + ASSERT(itsTransportHolder); + if (!itsTransportHolder->init()) { + LOG_ERROR("Initialization of TransportHolder failed"); + return false; + } + // Create a new CSConnection object. + itsConnection = new CSConnection("RWConn", + itsDataHolder, + itsDataHolder, + itsTransportHolder); + if (!itsConnection || !itsConnection->isConnected()) { + LOG_ERROR("Creation of Connection failed"); + return false; + } + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } + // All went well. + return true; } - // During the \c run phase the process does the work it is designed for. - // The run phase stays active until another command is send. - boost::logic::tribool BBSKernelProcessControl::run() + + tribool BBSKernelProcessControl::run() { - LOG_TRACE_FLOW("run()"); - - // Non-blocking receive from BBSControl - - // If received msg - bool status = true; -/* - if(...) - { - if(msg->type() == BBSStrategy::type()) - { - status = handle(dynamic_cast<BBSStrategy*>(msg)); - } - if(msg->type() == BBSPredictStep::type()) - { - status = handle(dynamic_cast<BBSPredictStep*>(msg)); - } - else if(msg->type() == BBSSubtractStep::type()) - { - status = handle(dynamic_cast<BBSSubtractStep*>(msg)); - } - else if(msg->type() == BBSCorrectStep::type()) - { - status = handle(dynamic_cast<BBSCorrectStep*>(msg)); - } - else if(msg->type() == BBSSolveStep::type()) - { - status = handle(dynamic_cast<BBSSolveStep*>(msg)); - } - else - { - LOG_ERROR("Received message of unknown type, skipped.") - } - } -*/ - return status; + LOG_INFO("BBSKernelProcessControl::run()"); + + try { + // Blocking receive from BBSProcessControl + ASSERT(itsConnection); + if (itsConnection->read() == CSConnection::Error) { + LOG_ERROR("Connection::read() failed"); + return false; + } + // Deserialize the received message. + ASSERT(itsDataHolder); + BlobStreamable* msg = itsDataHolder->deserialize(); + if (!msg) { + LOG_ERROR("Failed to deserialize message"); + return false; + } + LOG_DEBUG_STR("Received a " << itsDataHolder->classType() + << " object"); + + // If the message contains a `strategy', handle the `strategy'. + BBSStrategy* strategy = dynamic_cast<BBSStrategy*>(msg); + if (strategy) { + return handle(strategy); + } + // If the message contains a `step', handle the `step'. + BBSStep* step = dynamic_cast<BBSStep*>(msg); + if (step) { + return handle(step); + } + // We received a message we can't handle + LOG_WARN_STR("Received message of unsupported type `" << + itsDataHolder->classType() << "'. Skipped."); + return false; + + // Should we send the result back? + } + catch (Exception& e) { + LOG_ERROR_STR(e); + return false; + } } - // With the \c pause command the process stops its run phase and starts - // waiting for another command. The \c condition argument contains the - // contition the process should use for ending the run phase. This - // condition is a key-value pair that can eg. contain a timestamp or a - // number of a datasample. - boost::logic::tribool BBSKernelProcessControl::pause(const string& condition) + + tribool BBSKernelProcessControl::pause(const string& /*condition*/) { - return false; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + LOG_WARN("Not supported"); + return false; } - // \c Quit stops the process. - // The process \b must call \c unregisterAtAC at ProcControlServer during - // the execution of this command to pass the final results to the - // Application Controller. - boost::logic::tribool BBSKernelProcessControl::quit() + + tribool BBSKernelProcessControl::quit() { - LOG_TRACE_FLOW("quit()"); - return true; + LOG_INFO("BBSKernelProcessControl::quit()"); + return true; } - // With the \c snapshot command the process is instructed to save itself - // in a database is such a way that on another moment in time it can - // be reconstructed and can continue it task.<br> - // The \c destination argument contains database info the process - // must use to save itself. - boost::logic::tribool BBSKernelProcessControl::snapshot(const string& destination) + + tribool BBSKernelProcessControl::snapshot(const string& /*destination*/) { - return false; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + LOG_WARN("Not supported"); + return false; } - // \c Recover reconstructs the process as it was saved some time earlier. - // The \c source argument contains the database info the process must use - // to find the information it needs. - boost::logic::tribool BBSKernelProcessControl::recover(const string& source) + + tribool BBSKernelProcessControl::recover(const string& /*source*/) { - return false; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + LOG_WARN("Not supported"); + return false; } - // With \c reinit the process receives a new parameterset that it must use - // to reinitialize itself. - boost::logic::tribool BBSKernelProcessControl::reinit(const string& configID) + + tribool BBSKernelProcessControl::reinit(const string& /*configID*/) { - return false; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + LOG_WARN("Not supported"); + return false; } - // @} - // Define a generic way to exchange info between client and server. - std::string BBSKernelProcessControl::askInfo(const string& keylist) + + std::string BBSKernelProcessControl::askInfo(const string& /*keylist*/) { - return std::string(""); + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + return std::string(""); } - // This is somewhat hairy: convert from struct Correlation to struct CorrelationMask, which are - // basically the same. However, one is defined in the BBSControl package and the other in the - // BBSKernel package. It may be better to move the Correlation struct, and also the Baselines struct, - // to the BBSKernel package and then have BBSControl depend on that. - // - // Note: Correlation::NONE is meaningless; it is an error if this should occur. - // - /* - CorrelationMask BBSKernelProcessControl::convertCorrelationToMask(const Correlation &correlation) + + bool BBSKernelProcessControl::handle(const BBSStrategy *strategy) { - CorrelationMask mask; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + + try + { + // Pre-condition check + ASSERT(strategy); + + delete itsHistory; + itsHistory = 0; + + if(!strategy->parmDB().history.empty()) + { + LOFAR::ParmDB::ParmDBMeta historyDBMeta("aips", + strategy->parmDB().history); + itsHistory = new LOFAR::ParmDB::ParmDB(historyDBMeta); + } - switch(correlation.selection) - { - case Correlation::AUTO: - mask.selection = CorrelationMask::AUTO; - break; - case Correlation::CROSS: - mask.selection = CorrelationMask::CROSS; - break; - default: - mask.selection = CorrelationMask::ALL; - break; - } - mask.type = correlation.type; + // Create a new Prediffer. + delete itsPrediffer; + itsPrediffer = new Prediffer(strategy->dataSet(), + strategy->inputData(), + strategy->parmDB().localSky, + strategy->parmDB().instrument, + 0, + false); - return mask; + // Set data selection and work domain. + bool status; + status = itsPrediffer->setSelection(strategy->stations(), + strategy->correlation()) + && itsPrediffer->setWorkDomain(-1, -1, 0, 1e12); + return status; + } + + catch (Exception& e) + { + LOG_WARN_STR(e); + return false; + } } - */ - // This is somewhat hairy: convert a BBSStep to a context. If the issue mentioned above - // at convertCorrelationToMask() is resolved, this method will become simpler. The baselines - // and correlation could then be copied directly. - void BBSKernelProcessControl::convertStepToContext(const BBSStep *step, Context &context) + + bool BBSKernelProcessControl::handle(const BBSStep* bs) { - ASSERT(step->itsBaselines.station1.size() == step->itsBaselines.station2.size()); - - context.baselines = step->itsBaselines; - /* - vector<string>::const_iterator it1 = step->itsBaselines.station1.begin(); - vector<string>::const_iterator it2 = step->itsBaselines.station2.begin(); - while(it1 != step->itsBaselines.station1.end()) - { - context.baselines.push_back(pair<string, string>(*it1, *it2)); - ++it1; - ++it2; - } - */ - context.correlation = step->itsCorrelation; - context.sources = step->itsSources; - context.instrumentModel = step->itsInstrumentModels; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + { + const BBSPredictStep* step = dynamic_cast<const BBSPredictStep*>(bs); + if (step) return doHandle(step); + } + { + const BBSSubtractStep* step = dynamic_cast<const BBSSubtractStep*>(bs); + if (step) return doHandle(step); + } + { + const BBSCorrectStep* step = dynamic_cast<const BBSCorrectStep*>(bs); + if (step) return doHandle(step); + } + { + const BBSSolveStep* step = dynamic_cast<const BBSSolveStep*>(bs); + if (step) return doHandle(step); + } + return false; } + + + //##---- P r i v a t e m e t h o d s ----##// - - bool BBSKernelProcessControl::handle(const BBSStrategy *strategy) + void BBSKernelProcessControl::convertStepToContext(const BBSStep *step, Context &context) { - delete itsHistory; - itsHistory = 0; - - if(!strategy->itsParmDB.history.empty()) - { - LOFAR::ParmDB::ParmDBMeta historyDBMeta("aips", strategy->itsParmDB.history); - itsHistory = new LOFAR::ParmDB::ParmDB(historyDBMeta); - } - - // Create a new Prediffer. - delete itsPrediffer; - itsPrediffer = new Prediffer( strategy->itsDataSet, - strategy->itsInputData, - strategy->itsParmDB.localSky, - strategy->itsParmDB.instrument, - 0, - false); - - // Set data selection and work domain. - bool status; - status = itsPrediffer->setSelection(strategy->itsStations, strategy->itsCorrelation) - && itsPrediffer->setWorkDomain(-1, -1, 0, 1e12); - return status; + ASSERT(step->baselines().station1.size() == + step->baselines().station2.size()); + + context.baselines = step->baselines(); + /* + vector<string>::const_iterator it1 = step->itsBaselines.station1.begin(); + vector<string>::const_iterator it2 = step->itsBaselines.station2.begin(); + while(it1 != step->itsBaselines.station1.end()) + { + context.baselines.push_back(pair<string, string>(*it1, *it2)); + ++it1; + ++it2; + } + */ + context.correlation = step->correlation(); + context.sources = step->sources(); + context.instrumentModel = step->instrumentModels(); } - bool BBSKernelProcessControl::handle(const BBSPredictStep *step) + bool BBSKernelProcessControl::doHandle(const BBSPredictStep *step) { - ASSERTSTR(itsPrediffer, "No Prediffer available."); - ASSERTSTR(step, "Step corrupted."); - - PredictContext context; - convertStepToContext(step, context); - context.outputColumn = step->itsOutputData; - - // Set context. - if(!itsPrediffer->setContext(context)) - { - return false; - } - - // Execute predict. - itsPrediffer->predictVisibilities(); - return true; + LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); + ASSERTSTR(itsPrediffer, "No Prediffer available."); + ASSERTSTR(step, "Step corrupted."); + + PredictContext context; + convertStepToContext(step, context); + context.outputColumn = step->outputData(); + + // Set context. + if(!itsPrediffer->setContext(context)) + { + return false; + } + + // Execute predict. + itsPrediffer->predictVisibilities(); + return true; } - bool BBSKernelProcessControl::handle(const BBSSubtractStep *step) + bool BBSKernelProcessControl::doHandle(const BBSSubtractStep *step) { - ASSERTSTR(itsPrediffer, "No Prediffer available."); - ASSERTSTR(step, "Step corrupted."); + ASSERTSTR(itsPrediffer, "No Prediffer available."); + ASSERTSTR(step, "Step corrupted."); - SubtractContext context; - convertStepToContext(step, context); - context.outputColumn = step->itsOutputData; + SubtractContext context; + convertStepToContext(step, context); + context.outputColumn = step->outputData(); - // Set context. - if(!itsPrediffer->setContext(context)) - { - return false; - } + // Set context. + if(!itsPrediffer->setContext(context)) + { + return false; + } - // Execute subtract. - itsPrediffer->subtractVisibilities(); - return true; + // Execute subtract. + itsPrediffer->subtractVisibilities(); + return true; } - bool BBSKernelProcessControl::handle(const BBSCorrectStep *step) + bool BBSKernelProcessControl::doHandle(const BBSCorrectStep *step) { - ASSERTSTR(itsPrediffer, "No Prediffer available."); - ASSERTSTR(step, "Step corrupted."); + ASSERTSTR(itsPrediffer, "No Prediffer available."); + ASSERTSTR(step, "Step corrupted."); - CorrectContext context; - convertStepToContext(step, context); - context.outputColumn = step->itsOutputData; + CorrectContext context; + convertStepToContext(step, context); + context.outputColumn = step->outputData(); - // Set context. - if(!itsPrediffer->setContext(context)) - { - return false; - } + // Set context. + if(!itsPrediffer->setContext(context)) + { + return false; + } - // Execute correct. - itsPrediffer->correctVisibilities(); - return true; + // Execute correct. + itsPrediffer->correctVisibilities(); + return true; } - bool BBSKernelProcessControl::handle(const BBSSolveStep *step) + bool BBSKernelProcessControl::doHandle(const BBSSolveStep *step) { - ASSERTSTR(itsPrediffer, "No Prediffer available."); - ASSERTSTR(step, "Step corrupted."); + ASSERTSTR(itsPrediffer, "No Prediffer available."); + ASSERTSTR(step, "Step corrupted."); - // Construct context. - GenerateContext context; - convertStepToContext(step, context); - context.unknowns = step->itsParms; - context.excludedUnknowns = step->itsExclParms; + // Construct context. + GenerateContext context; + convertStepToContext(step, context); + context.unknowns = step->parms(); + context.excludedUnknowns = step->exclParms(); - // Create solve domains. For now this just splits the work domain in a rectangular grid, - // with cells of size step->itsDomainSize. Should become more interesting in the near future. - const MeqDomain &workDomain = itsPrediffer->getWorkDomain(); + // Create solve domains. For now this just splits the work domain in a rectangular grid, + // with cells of size step->itsDomainSize. Should become more interesting in the near future. + const MeqDomain &workDomain = itsPrediffer->getWorkDomain(); - const int freqCount = (int) ceil((workDomain.endX() - workDomain.startX()) / step->itsDomainSize.bandWidth); - const int timeCount = (int) ceil((workDomain.endY() - workDomain.startY()) / step->itsDomainSize.timeInterval); + const int freqCount = (int) ceil((workDomain.endX() - workDomain.startX()) / step->domainSize().bandWidth); + const int timeCount = (int) ceil((workDomain.endY() - workDomain.startY()) / step->domainSize().timeInterval); - double timeOffset = workDomain.startY(); - double timeSize = step->itsDomainSize.timeInterval; + double timeOffset = workDomain.startY(); + double timeSize = step->domainSize().timeInterval; - int time = 0; - while(time < timeCount) - { - double freqOffset = workDomain.startX(); - double freqSize = step->itsDomainSize.bandWidth; - - if(timeOffset + timeSize > workDomain.endY()) - { - timeSize = workDomain.endY() - timeOffset; - } - - int freq = 0; - while(freq < freqCount) - { - if(freqOffset + freqSize > workDomain.endX()) - { - freqSize = workDomain.endX() - freqOffset; - } - - context.solveDomains.push_back(MeqDomain(freqOffset, freqOffset + freqSize, timeOffset, timeOffset + timeSize)); - - freqOffset += freqSize; - freq++; - } - - timeOffset += timeSize; - time++; - } + int time = 0; + while(time < timeCount) + { + double freqOffset = workDomain.startX(); + double freqSize = step->domainSize().bandWidth; + + if(timeOffset + timeSize > workDomain.endY()) + { + timeSize = workDomain.endY() - timeOffset; + } + + int freq = 0; + while(freq < freqCount) + { + if(freqOffset + freqSize > workDomain.endX()) + { + freqSize = workDomain.endX() - freqOffset; + } + + context.solveDomains.push_back(MeqDomain(freqOffset, freqOffset + freqSize, timeOffset, timeOffset + timeSize)); + + freqOffset += freqSize; + freq++; + } + + timeOffset += timeSize; + time++; + } - // Set context. - if(!itsPrediffer->setContext(context)) - { - return false; - } + // Set context. + if(!itsPrediffer->setContext(context)) + { + return false; + } - cout << "Solve domains:" << endl; - cout << itsPrediffer->getSolveDomains() << endl; + cout << "Solve domains:" << endl; + cout << itsPrediffer->getSolveDomains() << endl; - // Initialize the solver. - Solver solver; - solver.initSolvableParmData(1, itsPrediffer->getSolveDomains(), itsPrediffer->getWorkDomain()); - solver.setSolvableParmData(itsPrediffer->getSolvableParmData(), 0); - itsPrediffer->showSettings(); - - // Main iteration loop. - unsigned int iteration = 0; - bool converged = false; - while(iteration < step->itsMaxIter && !converged) - { - // Generate normal equations and pass them to the solver. - vector<casa::LSQFit> equations; - itsPrediffer->generateEquations(equations); - solver.mergeFitters(equations, 0); - - // Do one Levenberg-Maquardt step. - solver.solve(false); + // Initialize the solver. + Solver solver; + solver.initSolvableParmData(1, itsPrediffer->getSolveDomains(), itsPrediffer->getWorkDomain()); + solver.setSolvableParmData(itsPrediffer->getSolvableParmData(), 0); + itsPrediffer->showSettings(); + + // Main iteration loop. + unsigned int iteration = 0; + bool converged = false; + while(iteration < step->maxIter() && !converged) + { + // Generate normal equations and pass them to the solver. + vector<casa::LSQFit> equations; + itsPrediffer->generateEquations(equations); + solver.mergeFitters(equations, 0); + + // Do one Levenberg-Maquardt step. + solver.solve(false); - // Optionally log to history. - if(itsHistory) - { - solver.log(*itsHistory, step->itsName); - } - - // Check for convergence. - int convergedSolveDomains = 0; - for(unsigned int i = 0; i < context.solveDomains.size(); ++i) - { - Quality quality = solver.getQuality(i); - if(quality.itsChi < step->itsEpsilon) - { - convergedSolveDomains++; - } - } - converged = (((double) convergedSolveDomains) / context.solveDomains.size()) > step->itsMinConverged; + // Optionally log to history. + if(itsHistory) + { + solver.log(*itsHistory, step->getName()); + } + + // Check for convergence. + int convergedSolveDomains = 0; + for(unsigned int i = 0; i < context.solveDomains.size(); ++i) + { + Quality quality = solver.getQuality(i); + if(quality.itsChi < step->epsilon()) + { + convergedSolveDomains++; + } + } + converged = (((double) convergedSolveDomains) / context.solveDomains.size()) > step->minConverged(); - cout << "iteration " << iteration << ": " << setprecision(10) << solver.getSolvableValues(0) << endl; - cout << "solve domains converged: " << convergedSolveDomains << "/" << context.solveDomains.size() << - " (" << (((double) convergedSolveDomains) / context.solveDomains.size() * 100.0) << "%)" << endl; + cout << "iteration " << iteration << ": " << setprecision(10) << solver.getSolvableValues(0) << endl; + cout << "solve domains converged: " << convergedSolveDomains << "/" << context.solveDomains.size() << + " (" << (((double) convergedSolveDomains) / context.solveDomains.size() * 100.0) << "%)" << endl; - // Send updates back to the Prediffer. - itsPrediffer->updateSolvableParms(solver.getSolvableParmData()); + // Send updates back to the Prediffer. + itsPrediffer->updateSolvableParms(solver.getSolvableParmData()); - iteration++; - } + iteration++; + } - //cout << "Writing solutions into ParmDB ..." << endl; - //itsPrediffer->writeParms(); - return true; + //cout << "Writing solutions into ParmDB ..." << endl; + //itsPrediffer->writeParms(); + return true; } -} // namespace BBS + + } // namespace BBS + } // namespace LOFAR diff --git a/CEP/BB/BBSControl/src/Makefile.am b/CEP/BB/BBSControl/src/Makefile.am index 910add121d0802b2b627aad090d78f8b8fdc0974..a1a93b2b5afdeeb05a45e38a612ad9620d731e22 100644 --- a/CEP/BB/BBSControl/src/Makefile.am +++ b/CEP/BB/BBSControl/src/Makefile.am @@ -6,16 +6,21 @@ libbbscontrol_la_SOURCES= \ BBSSolveStep.cc \ BBSMultiStep.cc \ BBSProcessControl.cc \ + BBSKernelProcessControl.cc \ BBSStrategy.cc \ BBSStructs.cc \ BlobStreamable.cc \ DH_BlobStreamable.cc \ ParameterSetReader.cc -bin_PROGRAMS = BBSControl +bin_PROGRAMS = BBSControl BBSKernel BBSControl_SOURCES = BBSControl.cc BBSControl_LDADD = libbbscontrol.la BBSControl_DEPENDENCIES = libbbscontrol.la $(LOFAR_DEPEND) +BBSKernel_SOURCES = BBSKernel.cc +BBSKernel_LDADD = libbbscontrol.la +BBSKernel_DEPENDENCIES = libbbscontrol.la $(LOFAR_DEPEND) + include $(top_srcdir)/Makefile.common