diff --git a/CEP/Pipeline/framework/lofarpipe/support/control.py b/CEP/Pipeline/framework/lofarpipe/support/control.py index 63fecc8c3778553df859509e13f1b5d891e650be..5b3c2eb67fd8562193a72767adc4568635e83ac2 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/control.py +++ b/CEP/Pipeline/framework/lofarpipe/support/control.py @@ -15,7 +15,7 @@ from lofarpipe.support.stateful import StatefulRecipe from lofarpipe.support.lofarexceptions import PipelineException from lofarpipe.support.xmllogging import get_active_stack from lofar.parameterset import parameterset -from lofar.messagebus.msgbus import ToBus +import lofar.messagebus.msgbus from lofar.messagebus.protocols.taskfeedbackdataproducts import TaskFeedbackDataproducts from lofar.messagebus.protocols.taskfeedbackprocessing import TaskFeedbackProcessing from lofar.messagebus.protocols.taskfeedbackstate import TaskFeedbackState @@ -56,16 +56,17 @@ class control(StatefulRecipe): `feedback` must be a parameterset """ - bus = ToBus("lofar.task.feedback.processing") - msg = TaskFeedbackProcessing( - "lofarpipe.support.control", - "", - "Processing feedback from the pipeline framework", - self.momID, - self.sasID, - feedback) + if self.feedback_method == "messagebus": + bus = lofar.messagebus.msgbus.ToBus("lofar.task.feedback.processing") + msg = TaskFeedbackProcessing( + "lofarpipe.support.control", + "", + "Processing feedback from the pipeline framework", + self.momID, + self.sasID, + feedback) - bus.send(msg) + bus.send(msg) def send_feedback_dataproducts(self, feedback): """ @@ -74,16 +75,17 @@ class control(StatefulRecipe): `feedback` must be a parameterset """ - bus = ToBus("lofar.task.feedback.dataproduct") - msg = TaskFeedbackDataproducts( - "lofarpipe.support.control", - "", - "Dataproduct feedback from the pipeline framework", - self.momID, - self.sasID, - feedback) + if self.feedback_method == "messagebus": + bus = lofar.messagebus.msgbus.ToBus("lofar.task.feedback.dataproduct") + msg = TaskFeedbackDataproducts( + "lofarpipe.support.control", + "", + "Dataproduct feedback from the pipeline framework", + self.momID, + self.sasID, + feedback) - bus.send(msg) + bus.send(msg) def _send_feedback_status(self, status): """ @@ -93,16 +95,17 @@ class control(StatefulRecipe): indicates failure. """ - bus = ToBus("lofar.task.feedback.state") - msg = TaskFeedbackState( - "lofarpipe.support.control", - "", - "Status feedback from the pipeline framework", - self.momID, - self.sasID, - status == 0) + if self.feedback_method == "messagebus": + bus = lofar.messagebus.msgbus.ToBus("lofar.task.feedback.state") + msg = TaskFeedbackState( + "lofarpipe.support.control", + "", + "Status feedback from the pipeline framework", + self.momID, + self.sasID, + status == 0) - bus.send(msg) + bus.send(msg) def pipeline_logic(self): """ @@ -127,12 +130,23 @@ class control(StatefulRecipe): # we can call our parent now that we have a job_name super(control, self).go() + # we now have a self.config -- read our settings + try: + self.feedback_method = self.config.get('feedback', 'method') + except: + self.feedback_method = "messagebus" + + if self.feedback_method == "messagebus" and not lofar.messagebus.msgbus.enabled: + self.logger.error("Feedback over messagebus requested, but messagebus support is not enabled or functional") + return 1 + # Pull several parameters from the parset self.momID = self.parset.getString("ObsSW.Observation.momID", "") # Note: 0 if obs was copied in Scheduler self.sasID = self.parset.getString("ObsSW.Observation.otdbID", "") # SAS ID # Start the pipeline self.logger.info("LOFAR Pipeline (%s) starting." % self.name) + self.logger.info("SASID = %s, MOMID = %s, Feedback method = %s" % (self.sasID, self.momID, self.feedback_method)) try: self.pipeline_logic() diff --git a/CEP/Pipeline/recipes/sip/pipeline.cfg.in b/CEP/Pipeline/recipes/sip/pipeline.cfg.in index 8732c7a30820b5cd5c51e3140dc170f37a663c5d..2b311047dd96f45bd74332b997240099414d4d28 100644 --- a/CEP/Pipeline/recipes/sip/pipeline.cfg.in +++ b/CEP/Pipeline/recipes/sip/pipeline.cfg.in @@ -23,3 +23,10 @@ engine_lpath = %(lofarroot)s/lib:%(casaroot)s/lib:%(pyraproot)s/lib:%(hdf5root)s [logging] log_file = %(runtime_directory)s/%(job_name)s/logs/%(start_time)s/pipeline.log xml_stat_file = %(runtime_directory)s/%(job_name)s/logs/%(start_time)s/statistics.xml + +[feedback] +# Method of providing feedback to LOFAR. +# Valid options: +# messagebus Send feedback and status using LCS/MessageBus +# none Do NOT send feedback and status +method = messagebus diff --git a/CEP/Pipeline/test/regression_tests/regression_test_runner.sh b/CEP/Pipeline/test/regression_tests/regression_test_runner.sh index ea8eaeeb27e8c88a04eff526337a232af10e1d00..9836aec5f5729abc387425d7bb25c8cf2d433fed 100755 --- a/CEP/Pipeline/test/regression_tests/regression_test_runner.sh +++ b/CEP/Pipeline/test/regression_tests/regression_test_runner.sh @@ -188,6 +188,12 @@ while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symli done REGRESSION_TEST_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" +# setup the qpid environment (is a no-op if qpid is not installed) +source $WORKSPACE/bin/MessageFuncs.sh +create_queue lofar.task.feedback.dataproducts +create_queue lofar.task.feedback.processing +create_queue lofar.task.feedback.state + # run the regression test for the pipeline: provide all the files in the directory DELTA=0.0001 python $"$REGRESSION_TEST_DIR/$PIPELINE"_test.py $WORKING_DIR/target_data/host1/* $WORKING_DIR/output_data/host1/* $DELTA || { echo $"regressiontest failed on data in dir $WORKING_DIR/output_data/host1" ; exit 1; }