diff --git a/CEP/Pipeline/framework/lofarpipe/CMakeLists.txt b/CEP/Pipeline/framework/lofarpipe/CMakeLists.txt index e5a6089bcab31fcf053d6ef9c623afae58f8353b..6ecc2a1b27e09bb7958fe47506ac0453f8c557b4 100644 --- a/CEP/Pipeline/framework/lofarpipe/CMakeLists.txt +++ b/CEP/Pipeline/framework/lofarpipe/CMakeLists.txt @@ -26,7 +26,6 @@ python_install( support/lofarnode.py support/loggingdecorators.py support/mac.py - support/mac_feedback.py support/parset.py support/pipelinelogging.py support/remotecommand.py diff --git a/CEP/Pipeline/framework/lofarpipe/support/control.py b/CEP/Pipeline/framework/lofarpipe/support/control.py index 2d85dc179f7f20dba54e881a69533a797e8b2bae..60e62d9144ca44e5329a6c9e021a36e881f1477a 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/control.py +++ b/CEP/Pipeline/framework/lofarpipe/support/control.py @@ -13,7 +13,11 @@ import traceback from lofarpipe.support.stateful import StatefulRecipe from lofarpipe.support.lofarexceptions import PipelineException from lofarpipe.support.xmllogging import get_active_stack -import lofarpipe.support.mac_feedback as mac_feedback +from lofar.parameterset import parameterset +from lofar.messagebus import ToBus +from lofar.messagebus.protocols.taskfeedbackdataproducts import TaskFeedbackDataproducts +from lofar.messagebus.protocols.taskfeedbackprocessing import TaskFeedbackProcessing +from lofar.messagebus.protocols.taskfeedbackstatus import TaskFeedbackStatus # Standalone Pipeline Control System # ------------------------------------------------------------------------------ @@ -22,7 +26,7 @@ class control(StatefulRecipe): """ Basic pipeline control framework. - Define a pipeline by subclassing and provding a body for the + Define a pipeline by subclassing and providing a body for the :meth:`pipeline_logic`. This class provides little, but can be specialised to eg provide a @@ -30,46 +34,60 @@ class control(StatefulRecipe): """ inputs = {} - def _send_mac_feedback(self, status): + def send_feedback_processing(self, feedback): """ - Send status information back to MAC, but only if the Python controller - host to send this information to was given as input argument. + Send processing feedback information back to LOFAR. + + `feedback` must be a parameterset + """ + + bus = ToBus("lofar.task.feedback.processing") + msg = TaskFeedbackProcessing( + "lofarpipe.support.control", + "", + "Processing feedback from the pipeline framework", + momID, + sasID, + feedback) + + bus.sendmsg(msg.qpidMsg()) + + def send_feedback_dataproducts(self, feedback): + """ + Send dataproduct feedback information back to LOFAR. + + `feedback` must be a parameterset + """ + + bus = ToBus("lofar.task.feedback.dataproduct") + msg = TaskFeedbackDataproducts( + "lofarpipe.support.control", + "", + "Dataproduct feedback from the pipeline framework", + momID, + sasID, + feedback) + + bus.sendmsg(msg.qpidMsg()) + + def _send_feedback_status(self, status): + """ + Send status information back to LOFAR. + `status` must be an integer; 0 indicates success, any other value indicates failure. - The port number is calculated as 22000 + observationNr%1000. - We need to extract this number from the job-name, which should be equal - to "Observation" + str(observationNr). """ - try: - host = self.inputs['args'][1] - except IndexError: - self.logger.warn( - "No MAC Python controller host specified. " - "Not sending status feedback to MAC" - ) - return - # Determine port number to use. - match = re.findall(r'^Observation(\d+)$', self.inputs['job_name']) - if match: - port = 25000 + int(match[0]) % 7000 - self.logger.info( - "Sending status feedback to MAC [%s:%s] (status: %s)" % - (host, port, status) - ) - else: - self.logger.warn( - r"Job-name does not match with pattern '^Observation(\d+)$'. " - "Not sending status feedback to MAC" - ) - return - # Send feedback information - try: - mac_feedback.send_status(host, port, status) - except IOError, error: - self.logger.warn( - "Failed to send status feedback to MAC [%s:%s]: %s" % - (host, port, error) - ) + + bus = ToBus("lofar.task.feedback.status") + msg = TaskFeedbackStatus( + "lofarpipe.support.control", + "", + "Status feedback from the pipeline framework", + momID, + sasID, + status == 0) + + bus.sendmsg(msg.qpidMsg()) def pipeline_logic(self): """ @@ -98,10 +116,12 @@ class control(StatefulRecipe): self.logger.error("*******************************************") - self._send_mac_feedback(1) + # Emit process status + self._send_feedback_status(1) return 1 else: - self._send_mac_feedback(0) + # Emit process status + self._send_feedback_status(0) return 0 finally: # always print a xml stats file diff --git a/CEP/Pipeline/framework/lofarpipe/support/lofaringredient.py b/CEP/Pipeline/framework/lofarpipe/support/lofaringredient.py index df9ca7e230b961d5bc195211f8613c8106c06771..e6f6006242c8920664c82e7ffadcf37c0e6b16b5 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/lofaringredient.py +++ b/CEP/Pipeline/framework/lofarpipe/support/lofaringredient.py @@ -11,6 +11,7 @@ from UserDict import DictMixin from lofarpipe.cuisine.ingredient import WSRTingredient from lofarpipe.support.utilities import string_to_list, is_iterable +from lofar.parameterset import parameterset # These are currently only used by lofarrecipe.run_task to provide default # input and output dicts based on copying metadata from the parent. @@ -194,6 +195,13 @@ class DictField(Field): def is_valid(self, value): return isinstance(value, dict) +class ParsetField(Field): + """ + A Field which accepts a parameterset object. + """ + def is_valid(self, value): + return isinstance(value, parameterset) + class FileList(ListField): """ A Field which accepts a list of extant filenames. diff --git a/CEP/Pipeline/framework/lofarpipe/support/mac_feedback.py b/CEP/Pipeline/framework/lofarpipe/support/mac_feedback.py deleted file mode 100644 index baa2561991687ac2da905590a2871bd8ae0b3e7f..0000000000000000000000000000000000000000 --- a/CEP/Pipeline/framework/lofarpipe/support/mac_feedback.py +++ /dev/null @@ -1,67 +0,0 @@ -# LOFAR IMAGING PIPELINE -# -# MAC Status Feedback QuickFix -# Marcel Loose, 2012 -# loose@astron.nl -# ------------------------------------------------------------------------------ - -""" -This module implements the quick fix (Redmine issue #3633) for the pipeline -status feedback to MAC. -""" - -import random -import socket -import time - -def __try_connect(host, port, tries=5, min_timeout=1.0, max_timeout=5.0): - """ - Try to connect to `host`:`port` up time `tries` times, using a random - timeout interval ([`min_timeout` .. `max_timeout`) seconds ) between - retries. - Return a socket object. - Raises `socket.error` if all connect tries fail. - """ - # Create a socket (SOCK_STREAM means a TCP socket) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - while True: - tries -= 1 - try: - sock.connect((host, port)) - except socket.error, err: - print("Could not connect to %s:%s (got %s)" % - (host, str(port), str(err))) - if tries > 0: - timeout = random.uniform(min_timeout, max_timeout) - print("Retrying in %f seconds (%d more %s)." % - (timeout, tries, "try" if tries==1 else "tries")) - time.sleep(timeout) - else: - raise IOError(err) - else: - return sock - - -def send_status(host, port, status): - """ - Send the pipeline status to a TCP listener at `host`:`port`. If `status` is - non-zero, send the string 'ABORT'; otherwise send the string 'FINISHED'. - """ - message = "FINISHED" if status == 0 else "ABORT" - sock = __try_connect(host, port) - sock.sendall(message) - sock.close() - - -if __name__ == "__main__": - """ - Simple command line test. - Usage: python mac_feedback.py [<host>] [<port>] [<status>] - """ - import sys - host = str(sys.argv[1]) if len(sys.argv) > 1 else "localhost" - port = int(sys.argv[2]) if len(sys.argv) > 2 else 9999 - stat = int(sys.argv[3]) if len(sys.argv) > 3 else 0 - send_status(host, port, stat) - - diff --git a/CEP/Pipeline/recipes/sip/bin/calibration_pipeline.py b/CEP/Pipeline/recipes/sip/bin/calibration_pipeline.py index a6d35a7bd03fe2cdb010e3ec5f0645b515b58885..96adf1a214663d2050e7cf8018b555690f29ed79 100755 --- a/CEP/Pipeline/recipes/sip/bin/calibration_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/calibration_pipeline.py @@ -31,7 +31,7 @@ class calibration_pipeline(control): 4. Create a sourcedb from the user-supplied sky model, and an empty parmdb. 5. Run BBS to calibrate the data. 6. Copy the MS's to their final output destination. - 7. Create feedback file for further processing by the LOFAR framework (MAC) + 7. Create feedback for further processing by the LOFAR framework """ def __init__(self): @@ -39,7 +39,6 @@ class calibration_pipeline(control): self.parset = parameterset() self.input_data = {} self.output_data = {} - self.parset_feedback_file = None def usage(self): @@ -108,7 +107,6 @@ class calibration_pipeline(control): except IndexError: return self.usage() self.parset.adoptFile(parset_file) - self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' @@ -294,31 +292,27 @@ class calibration_pipeline(control): ) # ********************************************************************* - # 7. Create feedback file for further processing by the LOFAR framework + # 7. Create feedback for further processing by the LOFAR framework # a. get metadata of the measurement sets # b. get metadata of the instrument models - # c. join the two files and write the final feedback file - correlated_metadata = os.path.join(parset_dir, "correlated.metadata") - instrument_metadata = os.path.join(parset_dir, "instrument.metadata") + # c. join the two and write the final feedback with duration(self, "get_metadata"): - self.run_task("get_metadata", output_correlated_mapfile, - parset_file=correlated_metadata, + correlated_metadata = self.run_task("get_metadata", output_correlated_mapfile, parset_prefix=( self.parset.getString('prefix') + self.parset.fullModuleName('DataProducts')), - product_type="Correlated") + product_type="Correlated")["metadata"] with duration(self, "get_metadata"): - self.run_task("get_metadata", output_instrument_mapfile, - parset_file=instrument_metadata, + instrument_metadata = self.run_task("get_metadata", output_instrument_mapfile, parset_prefix=( self.parset.getString('prefix') + self.parset.fullModuleName('DataProducts')), - product_type="InstrumentModel") + product_type="InstrumentModel")["metadata"] - parset = parameterset(correlated_metadata) - parset.adoptFile(instrument_metadata) - parset.writeFile(self.parset_feedback_file) + self.send_feedback_processing(parameterset()) + self.send_feedback_dataproducts(correlated_metadata) + self.send_feedback_dataproducts(instrument_metadata) return 0 diff --git a/CEP/Pipeline/recipes/sip/bin/imaging_pipeline.py b/CEP/Pipeline/recipes/sip/bin/imaging_pipeline.py index fa2f6e071981bfae11fb21ac6e4932c6995d753f..7301851da03089f6b6f67b7c076a53984ba3108f 100755 --- a/CEP/Pipeline/recipes/sip/bin/imaging_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/imaging_pipeline.py @@ -81,7 +81,7 @@ class imaging_pipeline(control): and results are collected an added to the casa image. The images created are converted from casa to HDF5 and copied to the correct output location. - 7. Export meta data: An outputfile with meta data is generated ready for + 7. Export meta data: meta data is generated ready for consumption by the LTA and/or the LOFAR framework. @@ -102,7 +102,6 @@ class imaging_pipeline(control): self.target_data = DataMap() self.output_data = DataMap() self.scratch_directory = None - self.parset_feedback_file = None self.parset_dir = None self.mapfile_dir = None @@ -123,7 +122,6 @@ class imaging_pipeline(control): except IndexError: return self.usage() self.parset.adoptFile(parset_file) - self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' if not 'job_name' in self.inputs: @@ -231,14 +229,16 @@ class imaging_pipeline(control): # ********************************************************************* # (7) Get metadata - # Create a parset-file containing the metadata for MAC/SAS - self.run_task("get_metadata", placed_data_image_map, - parset_file = self.parset_feedback_file, + # Create a parset containing the metadata for MAC/SAS + metadata = self.run_task("get_metadata", placed_data_image_map, parset_prefix = ( full_parset.getString('prefix') + full_parset.fullModuleName('DataProducts') ), - product_type = "SkyImage") + product_type = "SkyImage")["metadata"] + + self.send_feedback_processing(parameterset()) + self.send_feedback_dataproducts(metadata) return 0 diff --git a/CEP/Pipeline/recipes/sip/bin/long_baseline_pipeline.py b/CEP/Pipeline/recipes/sip/bin/long_baseline_pipeline.py index 8ccd77e892d4c817047dbe0ebc8ee22e78110ce7..c082eeb4b79dbb9ea2393a33229f02b356d0efad 100644 --- a/CEP/Pipeline/recipes/sip/bin/long_baseline_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/long_baseline_pipeline.py @@ -52,7 +52,7 @@ class msss_imager_pipeline(control): single large measurement set and perform flagging, RFI and bad station exclusion. - 2. Generate meta information feedback files based on dataproduct information + 2. Generate meta information feedback based on dataproduct information and parset/configuration data **Per subband-group, the following output products will be delivered:** @@ -69,7 +69,6 @@ class msss_imager_pipeline(control): self.target_data = DataMap() self.output_data = DataMap() self.scratch_directory = None - self.parset_feedback_file = None self.parset_dir = None self.mapfile_dir = None @@ -90,7 +89,6 @@ class msss_imager_pipeline(control): except IndexError: return self.usage() self.parset.adoptFile(parset_file) - self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' if not 'job_name' in self.inputs: @@ -194,14 +192,16 @@ class msss_imager_pipeline(control): # Create a parset-file containing the metadata for MAC/SAS at nodes - self.run_task("get_metadata", output_ms_mapfile, - parset_file = self.parset_feedback_file, + metadata = self.run_task("get_metadata", output_ms_mapfile, parset_prefix = ( full_parset.getString('prefix') + full_parset.fullModuleName('DataProducts') ), toplevel_meta_data_path=toplevel_meta_data_path, - product_type = "Correlated") + product_type = "Correlated")["metadata"] + + self.send_feedback_processing(parameterset()) + self.send_feedback_dataproducts(metadata) return 0 diff --git a/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py b/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py index 15d9632db40dc08ab1454e498d88a3c59c7cb2fa..20f192fe6ccd65cb4bb5febb7cf4980cfad9cb7f 100755 --- a/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py @@ -37,7 +37,7 @@ class msss_calibrator_pipeline(control): parset, and the sourcedb made earlier 5. Perform gain correction on the created instrument table 6. Copy corrected MS's to their final output destination - 7. Create output for consumption by the LOFAR framework + 7. Create metadata for consumption by the LOFAR framework **Per subband-group, the following output products will be delivered:** @@ -51,7 +51,6 @@ class msss_calibrator_pipeline(control): self.parset = parameterset() self.input_data = {} self.output_data = {} - self.parset_feedback_file = None def usage(self): @@ -120,7 +119,6 @@ class msss_calibrator_pipeline(control): except IndexError: return self.usage() self.parset.adoptFile(parset_file) - self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' @@ -309,31 +307,27 @@ class msss_calibrator_pipeline(control): ) # ********************************************************************* - # 7. Create feedback file for further processing by the LOFAR framework + # 7. Create feedback for further processing by the LOFAR framework # a. get metadata of the measurement sets # b. get metadata of the instrument models - # c. join the two files and write the final feedback file - correlated_metadata = os.path.join(parset_dir, "correlated.metadata") - instrument_metadata = os.path.join(parset_dir, "instrument.metadata") + # c. join the two and write the final feedback with duration(self, "get_metadata"): - self.run_task("get_metadata", output_correlated_mapfile, - parset_file=correlated_metadata, + correlated_metadata = self.run_task("get_metadata", output_correlated_mapfile, parset_prefix=( self.parset.getString('prefix') + self.parset.fullModuleName('DataProducts')), - product_type="Correlated") + product_type="Correlated")["metadata"] with duration(self, "get_metadata"): - self.run_task("get_metadata", output_instrument_mapfile, - parset_file=instrument_metadata, + instrument_metadata = self.run_task("get_metadata", output_instrument_mapfile, parset_prefix=( self.parset.getString('prefix') + self.parset.fullModuleName('DataProducts')), - product_type="InstrumentModel") + product_type="InstrumentModel")["metadata"] - parset = parameterset(correlated_metadata) - parset.adoptFile(instrument_metadata) - parset.writeFile(self.parset_feedback_file) + self.send_feedback_processing(parameterset()) + self.send_feedback_dataproducts(correlated_metadata) + self.send_feedback_dataproducts(instrument_metadata) return 0 diff --git a/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py b/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py index 42c6a13db83c7ebdcef935928ccf85699fb2d4bd..111d759f1f45bb0fc5a93a5742b02b9deb13c39e 100755 --- a/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py @@ -77,7 +77,7 @@ class msss_imager_pipeline(control): and results are collected an added to the casa image. The images created are converted from casa to HDF5 and copied to the correct output location. - 7. Export meta data: An outputfile with meta data is generated ready for + 7. Export meta data: meta data is generated ready for consumption by the LTA and/or the LOFAR framework. @@ -98,7 +98,6 @@ class msss_imager_pipeline(control): self.target_data = DataMap() self.output_data = DataMap() self.scratch_directory = None - self.parset_feedback_file = None self.parset_dir = None self.mapfile_dir = None @@ -119,7 +118,6 @@ class msss_imager_pipeline(control): except IndexError: return self.usage() self.parset.adoptFile(parset_file) - self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' if not 'job_name' in self.inputs: @@ -239,29 +237,17 @@ class msss_imager_pipeline(control): toplevel_meta_data = parameterset() toplevel_meta_data.replace("numberOfMajorCycles", str(number_of_major_cycles)) - toplevel_meta_data_path = os.path.join( - self.parset_dir, "toplevel_meta_data.parset") - try: - toplevel_meta_data.writeFile(toplevel_meta_data_path) - self.logger.info("Wrote meta data to: " + - toplevel_meta_data_path) - except RuntimeError, err: - self.logger.error( - "Failed to write toplevel meta information parset: %s" % str( - toplevel_meta_data_path)) - return 1 - - - # Create a parset-file containing the metadata for MAC/SAS at nodes - self.run_task("get_metadata", placed_data_image_map, - parset_file = self.parset_feedback_file, + # Create a parset containing the metadata for MAC/SAS at nodes + metadata = self.run_task("get_metadata", placed_data_image_map, parset_prefix = ( full_parset.getString('prefix') + full_parset.fullModuleName('DataProducts') ), - toplevel_meta_data_path=toplevel_meta_data_path, - product_type = "SkyImage") + product_type = "SkyImage")["metadata"] + + self.send_feedback_processing(toplevel_meta_data) + self.send_feedback_dataproducts(metadata) return 0 diff --git a/CEP/Pipeline/recipes/sip/bin/msss_target_pipeline.py b/CEP/Pipeline/recipes/sip/bin/msss_target_pipeline.py index 59c853ad7843cddbc1823d0e2b4e5cce98a1da4f..57911053e746da725158f4b17f358d0a8494f8e6 100755 --- a/CEP/Pipeline/recipes/sip/bin/msss_target_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/msss_target_pipeline.py @@ -38,7 +38,7 @@ class msss_target_pipeline(control): 5. Run BBS using the instrument file from the target observation, to correct for instrumental effects 6. Copy the MS's to their final output destination. - 7. Create feedback file for further processing by the LOFAR framework (MAC) + 7. Create feedback for further processing by the LOFAR framework **Per subband-group, the following output products will be delivered:** @@ -51,7 +51,6 @@ class msss_target_pipeline(control): self.parset = parameterset() self.input_data = {} self.output_data = {} - self.parset_feedback_file = None def usage(self): @@ -181,7 +180,6 @@ class msss_target_pipeline(control): except IndexError: return self.usage() self.parset.adoptFile(parset_file) - self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' @@ -345,17 +343,17 @@ class msss_target_pipeline(control): ) # ********************************************************************* - # 7. Create feedback file for further processing by the LOFAR framework - # (MAC) - # Create a parset-file containing the metadata for MAC/SAS + # 7. Create feedback for further processing by the LOFAR framework with duration(self, "get_metadata"): - self.run_task("get_metadata", corrected_mapfile, - parset_file=self.parset_feedback_file, + metadata = self.run_task("get_metadata", corrected_mapfile, parset_prefix=( self.parset.getString('prefix') + self.parset.fullModuleName('DataProducts') ), - product_type="Correlated") + product_type="Correlated")["metadata"] + + self.send_feedback_processing(parameterset()) + self.send_feedback_dataproducts(metadata) return 0 diff --git a/CEP/Pipeline/recipes/sip/bin/preprocessing_pipeline.py b/CEP/Pipeline/recipes/sip/bin/preprocessing_pipeline.py index 6039d9211d34f4a2baa6de011af3f909a57be235..3d6f02ff011d15009bb88befaa107e1787cb1b7e 100755 --- a/CEP/Pipeline/recipes/sip/bin/preprocessing_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/preprocessing_pipeline.py @@ -34,7 +34,6 @@ class preprocessing_pipeline(control): self.input_data = [] self.output_data = [] self.io_data_mask = [] - self.parset_feedback_file = None def usage(self): @@ -120,7 +119,6 @@ class preprocessing_pipeline(control): except IndexError: return self.usage() self.parset.adoptFile(parset_file) - self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' @@ -232,15 +230,15 @@ class preprocessing_pipeline(control): # ********************************************************************* # 6. Create feedback file for further processing by the LOFAR framework - # (MAC) - # Create a parset-file containing the metadata for MAC/SAS + # Create a parset containing the metadata with duration(self, "get_metadata"): - self.run_task("get_metadata", output_data_mapfile, - parset_file=self.parset_feedback_file, + metadata = self.run_task("get_metadata", output_data_mapfile, parset_prefix=( self.parset.getString('prefix') + self.parset.fullModuleName('DataProducts')), - product_type="Correlated") + product_type="Correlated")["metadata"] + + self.send_feedback_dataproducts(metadata) return 0 diff --git a/CEP/Pipeline/recipes/sip/bin/pulsar_pipeline.py b/CEP/Pipeline/recipes/sip/bin/pulsar_pipeline.py index 6b54ec10b66b0de8d2fbc95cad197564c7e8246f..98c9e0ca78d8daae1fd0c2ab5a19073fb6ff60f6 100755 --- a/CEP/Pipeline/recipes/sip/bin/pulsar_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/pulsar_pipeline.py @@ -37,7 +37,6 @@ class pulsar_pipeline(control): self.parset = parameterset() self.input_data = {} self.output_data = {} - self.parset_feedback_file = None def go(self): @@ -51,7 +50,6 @@ class pulsar_pipeline(control): except IndexError: return self.usage() self.parset.adoptFile(parset_file) - self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' @@ -168,7 +166,25 @@ class pulsar_pipeline(control): # Run the pulsar pipeline self.logger.debug("Starting pulp with: " + join(sys.argv)) p = pulp.pulp(self) - return p.go() + + if not p.go() + self.logger.error("PULP did not succeed. Bailing out!") + return 0 + + # PULP puts the feedback in ObservationXXXXX_feedback, where XXXXX is the SAS ID. + # Note that the input parset is called ObservationXXXXX + metadata_file = "%s_feedback" % (parset_file,) + + # Read and forward the feedback + try: + metadata = parameterset(metadata_file) + except IOError, e: + self.logger.error("Could not read feedback from %s: %s" % (metadata_file,e)) + return 0 + + send_feedback_dataproduct(metadata) + + return 1 if __name__ == '__main__': diff --git a/CEP/Pipeline/recipes/sip/bin/startPython.sh b/CEP/Pipeline/recipes/sip/bin/startPython.sh index dd1fb545237af6c2f1fad935094c38b6bd4e9b45..ca3a0327cca5ec24b7cf17bbab11ce7c559c92ec 100755 --- a/CEP/Pipeline/recipes/sip/bin/startPython.sh +++ b/CEP/Pipeline/recipes/sip/bin/startPython.sh @@ -41,10 +41,9 @@ usage() pythonProgram="${1}" parsetFile="${2}" -controlHost="${3}" echo "**** $(date) ****" >> ${logFile} -echo "Executing: $0 ${pythonProgram} ${parsetFile} ${controlHost}" >> ${logFile} +echo "Executing: $0 ${pythonProgram} ${parsetFile}" >> ${logFile} use_pulp="$(getparsetvalue $parsetFile "ObsSW.Observation.processSubtype")" if [ "${use_pulp}" == "Pulsar Pipeline" ]; then @@ -81,14 +80,14 @@ if [ -n "$debug" ]; then echo "PATH=${PATH}" >> ${logFile} echo "PYTHONPATH=${PYTHONPATH}" >> ${logFile} echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> ${logFile} - echo "${pythonProgram} ${programOptions} ${parsetFile} ${controlHost}" \ + echo "${pythonProgram} ${programOptions} ${parsetFile}" \ >> ${logFile} fi # Start the Python program in the background. # This script should return ASAP so that MAC can set the task to ACTIVE. # STDERR will be redirected to the log-file. -${pythonProgram} ${programOptions} ${parsetFile} ${controlHost} \ +${pythonProgram} ${programOptions} ${parsetFile} \ 1> /dev/null 2>> ${logFile} & # Check if the Python program died early. If so, this indicates an error. diff --git a/CEP/Pipeline/recipes/sip/master/get_metadata.py b/CEP/Pipeline/recipes/sip/master/get_metadata.py index e34b94e0fef44d725d7a0f97daf1881352bc72b0..8367f97d54f085013864e8636b025c914d41d3c6 100644 --- a/CEP/Pipeline/recipes/sip/master/get_metadata.py +++ b/CEP/Pipeline/recipes/sip/master/get_metadata.py @@ -25,7 +25,7 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): 2. Load mapfiles 3. call node side of the recipe 4. validate performance - 5. Create the parset-file and write it to disk. + 5. Create the parset and return it. **Command line arguments** @@ -36,23 +36,17 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): '--product-type', help="Data product type", ), - 'parset_file': ingredient.StringField( - '--parset-file', - help="Path to the output parset file" - ), 'parset_prefix': ingredient.StringField( '--parset-prefix', help="Prefix for each key in the output parset file", default='' ), - 'toplevel_meta_data_path': ingredient.StringField( - '--toplevel-meta-data', - help="Path to parset with toplevel meta information, default = ''", - default='' - ) } outputs = { + 'metadata': ingredient.ParsetField( + help="parset containing obtained metadata" + ) } # List of valid data product types. @@ -117,17 +111,11 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): data.save(args[0]) # ******************************************************************** - # 5. Create the parset-file and write it to disk. + # 5. Create the parset-file and return it to the caller parset = parameterset() prefix = "Output_%s_" % product_type parset.replace('%snrOf%s' % (global_prefix, prefix), str(len(jobs))) - # If there is meta data to add from the toplevel script - pipeline_meta_parset_path = self.inputs['toplevel_meta_data_path'] - if pipeline_meta_parset_path != "": - pipeline_meta_parset = parameterset(pipeline_meta_parset_path) - parset.adoptCollection(pipeline_meta_parset) - prefix = global_prefix + prefix for idx, job in enumerate(jobs): self.logger.debug("job[%d].results = %s" % (idx, job.results)) @@ -140,16 +128,8 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): parset.adoptCollection(meta_data_parset, '%s[%d].' % (prefix, idx)) - try: - - create_directory(os.path.dirname(self.inputs['parset_file'])) - parset.writeFile(self.inputs['parset_file']) - self.logger.info("Wrote meta data to: " + - self.inputs['parset_file']) - except RuntimeError, err: - self.logger.error("Failed to write meta-data: %s" % str(err)) - return 1 - + # Return result to caller + self.outputs["metadata"] = parset return 0