diff --git a/CEP/BB/BBSControl/include/BBSControl/CommandExecutor.h b/CEP/BB/BBSControl/include/BBSControl/CommandExecutor.h index 69b70bb06b085e78bea849ce764a92968edf5ab0..4c491bfe26b1cf759e404369bb876849db4dae1a 100644 --- a/CEP/BB/BBSControl/include/BBSControl/CommandExecutor.h +++ b/CEP/BB/BBSControl/include/BBSControl/CommandExecutor.h @@ -27,6 +27,7 @@ // Concrete viistor class for concrete Command classes #include <BBSControl/CommandVisitor.h> +#include <BBSControl/CommandResult.h> #include <BBSKernel/Prediffer.h> #include <Common/lofar_smartptr.h> #include <Common/lofar_string.h> @@ -76,9 +77,15 @@ namespace LOFAR virtual void visit(const BBSRefitStep &command); // @} + // Get result of the last executed command. + const CommandResult &getResult() + { + return itsResult; + } + private: bool convertTime(string in, double &out); - + // Kernel. scoped_ptr<Prediffer> itsKernel; @@ -87,7 +94,7 @@ namespace LOFAR // Connection to the solver. shared_ptr<BlobStreamableConnection> itsSolverConnection; - + // Region of interest. int32 itsROIFrequency[2]; double itsROITime[2]; @@ -95,6 +102,9 @@ namespace LOFAR // Chunk size and position (in time). double itsChunkSize; double itsChunkPosition; + + // Result of the last executed command. + CommandResult itsResult; }; // @} diff --git a/CEP/BB/BBSControl/sql/create_blackboard_functions.sql b/CEP/BB/BBSControl/sql/create_blackboard_functions.sql index 6d41fc973b213e8f6f1ca726c4f8edaf40387526..5f8d6db44442c74ec5f95e1a3772bbd3436cd09f 100644 --- a/CEP/BB/BBSControl/sql/create_blackboard_functions.sql +++ b/CEP/BB/BBSControl/sql/create_blackboard_functions.sql @@ -132,14 +132,14 @@ $$ "ParmDB.LocalSky", "ParmDB.Instrument", "ParmDB.History", - "Stations", - "InputData", - "RegionOfInterest.Freq", - "RegionOfInterest.Time", - "WorkDomainSize.Freq", - "WorkDomainSize.Time", - "Correlation.Selection", - "Correlation.Type") + "Strategy.Stations", + "Strategy.InputData", + "Strategy.RegionOfInterest.Freq", + "Strategy.RegionOfInterest.Time", + "Strategy.WorkDomainSize.Freq", + "Strategy.WorkDomainSize.Time", + "Strategy.Correlation.Selection", + "Strategy.Correlation.Type") VALUES ('ACTIVE', data_set, diff --git a/CEP/BB/BBSControl/sql/create_blackboard_tables.sql b/CEP/BB/BBSControl/sql/create_blackboard_tables.sql index 2f4d0c9ad7ad30f0d10cc2fffc9e9524ea05bda1..02c57faca677f5f2a1b0a00b7eba0ef04320d42f 100644 --- a/CEP/BB/BBSControl/sql/create_blackboard_tables.sql +++ b/CEP/BB/BBSControl/sql/create_blackboard_tables.sql @@ -1,24 +1,27 @@ CREATE TABLE blackboard.strategy ( - state TEXT DEFAULT 'UNDEFINED', + state TEXT DEFAULT 'UNDEFINED', - "DataSet" TEXT NOT NULL, + "DataSet" TEXT NOT NULL, - "ParmDB.LocalSky" TEXT NOT NULL, - "ParmDB.Instrument" TEXT NOT NULL, - "ParmDB.History" TEXT NOT NULL, + "ParmDB.LocalSky" TEXT NOT NULL, + "ParmDB.Instrument" TEXT NOT NULL, + "ParmDB.History" TEXT NOT NULL, - "Stations" TEXT DEFAULT '[]', - "InputData" TEXT DEFAULT 'DATA', + "Strategy.Stations" TEXT DEFAULT '[]', + "Strategy.InputData" TEXT DEFAULT 'DATA', - "RegionOfInterest.Freq" TEXT DEFAULT '[]', - "RegionOfInterest.Time" TEXT DEFAULT '[]', + "Strategy.RegionOfInterest.Freq" TEXT DEFAULT '[]', + "Strategy.RegionOfInterest.Time" TEXT DEFAULT '[]', - "WorkDomainSize.Freq" DOUBLE PRECISION NOT NULL, - "WorkDomainSize.Time" DOUBLE PRECISION NOT NULL, + "Strategy.WorkDomainSize.Freq" DOUBLE PRECISION NOT NULL, + "Strategy.WorkDomainSize.Time" DOUBLE PRECISION NOT NULL, - "Correlation.Selection" TEXT DEFAULT 'CROSS', - "Correlation.Type" TEXT DEFAULT '[]' + "Strategy.Correlation.Selection" TEXT DEFAULT 'CROSS', + "Strategy.Correlation.Type" TEXT DEFAULT '[]', + + "Strategy.Integration.Freq" DOUBLE PRECISION DEFAULT 1.0, + "Strategy.Integration.Time" DOUBLE PRECISION DEFAULT 1.0 ); diff --git a/CEP/BB/BBSControl/src/BBSStrategy.cc b/CEP/BB/BBSControl/src/BBSStrategy.cc index 6db38a0d58696dc3f0a7676e23659191b23d3412..491228a1d6ea8e0561a7903a3ec3b8f088443855 100644 --- a/CEP/BB/BBSControl/src/BBSStrategy.cc +++ b/CEP/BB/BBSControl/src/BBSStrategy.cc @@ -83,11 +83,11 @@ namespace LOFAR itsRegionOfInterest.time = ps.getStringVector("RegionOfInterest.Time"); } catch (APSException&) {} - + // Get the work domain size for this strategy itsDomainSize.bandWidth = ps.getDouble("WorkDomainSize.Freq"); itsDomainSize.timeInterval = ps.getDouble("WorkDomainSize.Time"); - + // Get the correlation product selection (ALL, AUTO, or CROSS) itsCorrelation.selection = ps.getString("Correlation.Selection"); itsCorrelation.type = ps.getStringVector("Correlation.Type"); @@ -178,12 +178,14 @@ namespace LOFAR << itsStations << endl << "Strategy.InputData = " << itsInputData - // << endl << "Strategy.RegionOfInterest = " - // << itsRegionOfInterest - << endl << "Strategy.WorkDomainSize.Freq = " + << endl << "Strategy.WorkDomainSize.Freq = " << itsDomainSize.bandWidth << endl << "Strategy.WorkDomainSize.Time = " << itsDomainSize.timeInterval + << endl << "Strategy.RegionOfInterest.Freq = " + << itsRegionOfInterest.frequency + << endl << "Strategy.RegionOfInterest.Time = " + << itsRegionOfInterest.time << endl << "Strategy.Correlation.Selection = " << itsCorrelation.selection << endl << "Strategy.Correlation.Type = " @@ -215,7 +217,11 @@ namespace LOFAR ps.getDouble("Strategy.WorkDomainSize.Freq"); itsDomainSize.timeInterval = ps.getDouble("Strategy.WorkDomainSize.Time"); - itsCorrelation.selection = + itsRegionOfInterest.frequency = + ps.getInt32Vector("Strategy.RegionOfInterest.Freq"); + itsRegionOfInterest.time = + ps.getStringVector("Strategy.RegionOfInterest.Time"); + itsCorrelation.selection = ps.getString("Strategy.Correlation.Selection"); itsCorrelation.type = ps.getStringVector("Strategy.Correlation.Type"); diff --git a/CEP/BB/BBSControl/src/CommandExecutor.cc b/CEP/BB/BBSControl/src/CommandExecutor.cc index 8d74993846e2dee2720e5f1c831b866cc8cae559..fd8baac2f7592d3616ca7d39bd78e2699fa7a68f 100644 --- a/CEP/BB/BBSControl/src/CommandExecutor.cc +++ b/CEP/BB/BBSControl/src/CommandExecutor.cc @@ -78,10 +78,10 @@ bool CommandExecutor::convertTime(string in, double &out) //# TODO: Convert from default epoch to MS epoch (as it may differ from //# the default!) casa::Quantum<casa::Double> time; - + if(in.empty() || !casa::MVTime::read(time, in)) return false; - + out = static_cast<double>(time.getValue("s")); return true; } @@ -113,16 +113,21 @@ void CommandExecutor::visit(const InitializeCommand &/*command*/) } //# Select stations and correlations. - itsKernel->setSelection(strategy->stations(), strategy->correlation()); + if(!itsKernel->setSelection(strategy->stations(), strategy->correlation())) + { + itsResult = CommandResult(CommandResult::ERROR, + "Data selection is empty."); + return; + } //# Store chunk size. itsChunkSize = strategy->domainSize().timeInterval; - + LOG_DEBUG_STR("ChunkSize: " << itsChunkSize); //# Get Region Of Interest. RegionOfInterest roi = strategy->regionOfInterest(); - + //# Get Local Data Domain. MeqDomain ldd = itsKernel->getLocalDataDomain(); @@ -151,14 +156,16 @@ void CommandExecutor::visit(const InitializeCommand &/*command*/) itsROIFrequency[0] = roi.frequency[0]; if(roi.frequency.size() >= 2) itsROIFrequency[1] = roi.frequency[1]; - + //# Reset chunk iteration. itsChunkPosition = itsROITime[0]; - + LOG_DEBUG_STR("Region Of Interest:" << endl << " + Channels: " << itsROIFrequency[0] << " - " << itsROIFrequency[1] << endl << " + Time: " << itsROITime[0] << " - " << itsROITime[1] << endl); + + itsResult = CommandResult(CommandResult::OK, "Ok."); } @@ -167,6 +174,9 @@ void CommandExecutor::visit(const FinalizeCommand &/*command*/) LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); LOG_DEBUG("Handling a FinalizeCommand"); + + //# How to notify KernelProcessControl of this? + itsResult = CommandResult(CommandResult::OK, "Ok."); } @@ -180,16 +190,30 @@ void CommandExecutor::visit(const NextChunkCommand &/*command*/) //# issue an itsKernel->nextChunk() here. However, as this is entangled with //# the new MS interface, we'll emulate it by setting a new local work //# domain for now. - - bool result = itsKernel->setWorkDomain(static_cast<int>(itsROIFrequency[0]), + if(itsChunkPosition >= itsROITime[1]) + { + LOG_DEBUG_STR("NextChunk: OUT_OF_DATA"); + itsResult = CommandResult(CommandResult::OUT_OF_DATA); + return; + } + + double size = itsChunkSize; + if(itsChunkPosition + itsChunkSize > itsROITime[1]) + size = itsROITime[1] - itsChunkPosition; + + if(!itsKernel->setWorkDomain(static_cast<int>(itsROIFrequency[0]), static_cast<int>(itsROIFrequency[1]), itsChunkPosition, - itsChunkSize); - - itsChunkPosition += itsChunkSize; - - if(!result) - LOG_DEBUG_STR("NextChunk: OUT_OF_DATA"); + size)) + { + itsResult = CommandResult(CommandResult::ERROR, + "Could not set work domain."); + } + else + { + itsChunkPosition += itsChunkSize; + itsResult = CommandResult(CommandResult::OK, "Ok."); + } } @@ -198,6 +222,7 @@ void CommandExecutor::visit(const RecoverCommand &/*command*/) LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); LOG_DEBUG("Handling a RecoverCommand"); + itsResult = CommandResult(CommandResult::ERROR, "Not yet implemented."); } @@ -206,6 +231,7 @@ void CommandExecutor::visit(const SynchronizeCommand &/*command*/) LOG_TRACE_FLOW(AUTO_FUNCTION_NAME); LOG_DEBUG("Handling a SynchronizeCommand"); + itsResult = CommandResult(CommandResult::ERROR, "Not yet implemented."); } @@ -224,6 +250,8 @@ void CommandExecutor::visit(const BBSMultiStep &command) { LOG_DEBUG("Handling a BBSMultiStep"); LOG_DEBUG_STR("Command: " << endl << command); + + ASSERTSTR(false, "Should not get here..."); } @@ -245,12 +273,15 @@ void CommandExecutor::visit(const BBSPredictStep &command) // Set context. if(!itsKernel->setContext(context)) -// return false; + { + itsResult = CommandResult(CommandResult::ERROR, + "Could not set context."); return; + } // Execute predict. itsKernel->predictVisibilities(); -// return true; + itsResult = CommandResult(CommandResult::OK, "Ok."); } @@ -272,12 +303,15 @@ void CommandExecutor::visit(const BBSSubtractStep &command) // Set context. if(!itsKernel->setContext(context)) -// return false; + { + itsResult = CommandResult(CommandResult::ERROR, + "Could not set context."); return; + } // Execute subtract. itsKernel->subtractVisibilities(); -// return true; + itsResult = CommandResult(CommandResult::OK, "Ok."); } @@ -299,12 +333,15 @@ void CommandExecutor::visit(const BBSCorrectStep &command) // Set context. if(!itsKernel->setContext(context)) -// return false; + { + itsResult = CommandResult(CommandResult::ERROR, + "Could not set context."); return; + } // Execute correct. itsKernel->correctVisibilities(); - //return true; + itsResult = CommandResult(CommandResult::OK, "Ok."); } @@ -376,6 +413,8 @@ void CommandExecutor::visit(const BBSSolveStep &command) // Set context. if(!itsKernel->setContext(context)) { + itsResult = CommandResult(CommandResult::ERROR, + "Could not set context."); return; } @@ -527,7 +566,8 @@ void CommandExecutor::visit(const BBSSolveStep &command) timer.stop(); LOG_DEBUG_STR("[END ] Writing solutions; " << timer); - //return true; + + itsResult = CommandResult(CommandResult::OK, "Ok."); } @@ -535,6 +575,8 @@ void CommandExecutor::visit(const BBSShiftStep &command) { LOG_DEBUG("Handling a BBSShiftStep"); LOG_DEBUG_STR("Command: " << endl << command); + + itsResult = CommandResult(CommandResult::ERROR, "Not yet implemented."); } @@ -542,6 +584,8 @@ void CommandExecutor::visit(const BBSRefitStep &command) { LOG_DEBUG("Handling a BBSRefitStep"); LOG_DEBUG_STR("Command: " << endl << command); + + itsResult = CommandResult(CommandResult::ERROR, "Not yet implemented."); } } //# namespace BBS diff --git a/CEP/BB/BBSControl/src/KernelProcessControl.cc b/CEP/BB/BBSControl/src/KernelProcessControl.cc index 66956e655fe48f562b60d3f91ef71428ea684690..61f5f91fccfe2cbf389ed2cd8447b734af304f26 100644 --- a/CEP/BB/BBSControl/src/KernelProcessControl.cc +++ b/CEP/BB/BBSControl/src/KernelProcessControl.cc @@ -31,6 +31,7 @@ #include <BBSControl/CommandId.h> #include <BBSControl/BBSStep.h> #include <BBSControl/InitializeCommand.h> +#include <BBSControl/FinalizeCommand.h> #include <BBSControl/NextChunkCommand.h> #include <BBSControl/BBSStrategy.h> /* @@ -66,6 +67,14 @@ namespace LOFAR { namespace BBS { + namespace + { + InitializeCommand cmd1; + FinalizeCommand cmd2; + NextChunkCommand cmd3; + } + + //##---- P u b l i c m e t h o d s ----##// KernelProcessControl::KernelProcessControl() : ProcessControl(), @@ -91,7 +100,6 @@ namespace BBS globalParameterSet()->getString("Controller.Host"), globalParameterSet()->getString("Controller.Port"))); */ - char *user = getenv("USER"); ASSERT(user); string userString(user); @@ -115,23 +123,16 @@ namespace BBS LOG_DEBUG("KernelProcessControl::init()"); try { -/* - LOG_DEBUG_STR("Trying to connect to controller@" - << globalParameterSet()->getString("Controller.Host") << ":" - << globalParameterSet()->getString("Controller.Port" ) - << "..."); - - if(!itsControllerConnection->connect()) - { - LOG_ERROR("+ could not connect to controller"); - return false; - } - LOG_DEBUG("+ ok"); -*/ - + // Create a new CommandQueue. This will open a connection to the + // blackboard database. itsCommandQueue.reset(new CommandQueue( globalParameterSet()->getString("BBDB.DBName"))); + // Register for the "command" trigger, which fires when a new + // command is posted to the blackboard database. + itsCommandQueue->registerTrigger(CommandQueue::Trigger::Command); + + LOG_DEBUG("Trying to connect to solver@localhost"); if(!itsSolverConnection->connect()) { @@ -139,7 +140,7 @@ namespace BBS return false; } LOG_DEBUG("+ ok"); - + itsCommandExecutor.reset(new CommandExecutor(itsCommandQueue, itsSolverConnection)); } @@ -170,10 +171,12 @@ namespace BBS } LOG_DEBUG("New run, entering RUN state."); - itsState = KernelProcessControl::FIRST_RUN; +// itsState = KernelProcessControl::FIRST_RUN; + itsState = KernelProcessControl::RUN; break; } +/* case KernelProcessControl::FIRST_RUN: { shared_ptr<const BBSStrategy>strategy( @@ -198,36 +201,40 @@ namespace BBS itsState = KernelProcessControl::RUN; break; } +*/ case KernelProcessControl::RUN: { - NextCommandType cmd(itsCommandQueue->getNextCommand()); if(cmd.first) { - LOG_DEBUG("Received command, remaining in RUN state."); - cmd.first->accept(*itsCommandExecutor); + itsCommandQueue->addResult(cmd.second, + itsCommandExecutor->getResult()); + + if(cmd.first->type() == "Finalize") + return false; } else - { - LOG_DEBUG("Command Queue empty, entering WAIT state."); itsState = KernelProcessControl::WAIT; - } + break; } case KernelProcessControl::WAIT: { - /* await notification here */ - LOG_DEBUG("WAIT state not yet implemented, returning to \ - RUN state."); - sleep(3); - itsState = KernelProcessControl::RUN; + LOG_DEBUG("Waiting from next Command..."); + + if(itsCommandQueue-> + waitForTrigger(CommandQueue::Trigger::Command)) + { + itsState = KernelProcessControl::RUN; + } break; } + default: { THROW(LocalControlException, "Unknown state identifier \