Skip to content
Snippets Groups Projects
Commit 2f84ba8d authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #7352: Use qpid to send status, dataproduct, and processing feedback to...

Task #7352: Use qpid to send status, dataproduct, and processing feedback to MAC/MoM -- initial implementation
parent aa107164
No related branches found
No related tags found
No related merge requests found
Showing
with 149 additions and 224 deletions
...@@ -26,7 +26,6 @@ python_install( ...@@ -26,7 +26,6 @@ python_install(
support/lofarnode.py support/lofarnode.py
support/loggingdecorators.py support/loggingdecorators.py
support/mac.py support/mac.py
support/mac_feedback.py
support/parset.py support/parset.py
support/pipelinelogging.py support/pipelinelogging.py
support/remotecommand.py support/remotecommand.py
......
...@@ -13,7 +13,11 @@ import traceback ...@@ -13,7 +13,11 @@ import traceback
from lofarpipe.support.stateful import StatefulRecipe from lofarpipe.support.stateful import StatefulRecipe
from lofarpipe.support.lofarexceptions import PipelineException from lofarpipe.support.lofarexceptions import PipelineException
from lofarpipe.support.xmllogging import get_active_stack from lofarpipe.support.xmllogging import get_active_stack
import lofarpipe.support.mac_feedback as mac_feedback from lofar.parameterset import parameterset
from lofar.messagebus import ToBus
from lofar.messagebus.protocols.taskfeedbackdataproducts import TaskFeedbackDataproducts
from lofar.messagebus.protocols.taskfeedbackprocessing import TaskFeedbackProcessing
from lofar.messagebus.protocols.taskfeedbackstatus import TaskFeedbackStatus
# Standalone Pipeline Control System # Standalone Pipeline Control System
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
...@@ -22,7 +26,7 @@ class control(StatefulRecipe): ...@@ -22,7 +26,7 @@ class control(StatefulRecipe):
""" """
Basic pipeline control framework. Basic pipeline control framework.
Define a pipeline by subclassing and provding a body for the Define a pipeline by subclassing and providing a body for the
:meth:`pipeline_logic`. :meth:`pipeline_logic`.
This class provides little, but can be specialised to eg provide a This class provides little, but can be specialised to eg provide a
...@@ -30,46 +34,60 @@ class control(StatefulRecipe): ...@@ -30,46 +34,60 @@ class control(StatefulRecipe):
""" """
inputs = {} inputs = {}
def _send_mac_feedback(self, status): def send_feedback_processing(self, feedback):
""" """
Send status information back to MAC, but only if the Python controller Send processing feedback information back to LOFAR.
host to send this information to was given as input argument.
`feedback` must be a parameterset
"""
bus = ToBus("lofar.task.feedback.processing")
msg = TaskFeedbackProcessing(
"lofarpipe.support.control",
"",
"Processing feedback from the pipeline framework",
momID,
sasID,
feedback)
bus.sendmsg(msg.qpidMsg())
def send_feedback_dataproducts(self, feedback):
"""
Send dataproduct feedback information back to LOFAR.
`feedback` must be a parameterset
"""
bus = ToBus("lofar.task.feedback.dataproduct")
msg = TaskFeedbackDataproducts(
"lofarpipe.support.control",
"",
"Dataproduct feedback from the pipeline framework",
momID,
sasID,
feedback)
bus.sendmsg(msg.qpidMsg())
def _send_feedback_status(self, status):
"""
Send status information back to LOFAR.
`status` must be an integer; 0 indicates success, any other value `status` must be an integer; 0 indicates success, any other value
indicates failure. indicates failure.
The port number is calculated as 22000 + observationNr%1000.
We need to extract this number from the job-name, which should be equal
to "Observation" + str(observationNr).
""" """
try:
host = self.inputs['args'][1] bus = ToBus("lofar.task.feedback.status")
except IndexError: msg = TaskFeedbackStatus(
self.logger.warn( "lofarpipe.support.control",
"No MAC Python controller host specified. " "",
"Not sending status feedback to MAC" "Status feedback from the pipeline framework",
) momID,
return sasID,
# Determine port number to use. status == 0)
match = re.findall(r'^Observation(\d+)$', self.inputs['job_name'])
if match: bus.sendmsg(msg.qpidMsg())
port = 25000 + int(match[0]) % 7000
self.logger.info(
"Sending status feedback to MAC [%s:%s] (status: %s)" %
(host, port, status)
)
else:
self.logger.warn(
r"Job-name does not match with pattern '^Observation(\d+)$'. "
"Not sending status feedback to MAC"
)
return
# Send feedback information
try:
mac_feedback.send_status(host, port, status)
except IOError, error:
self.logger.warn(
"Failed to send status feedback to MAC [%s:%s]: %s" %
(host, port, error)
)
def pipeline_logic(self): def pipeline_logic(self):
""" """
...@@ -98,10 +116,12 @@ class control(StatefulRecipe): ...@@ -98,10 +116,12 @@ class control(StatefulRecipe):
self.logger.error("*******************************************") self.logger.error("*******************************************")
self._send_mac_feedback(1) # Emit process status
self._send_feedback_status(1)
return 1 return 1
else: else:
self._send_mac_feedback(0) # Emit process status
self._send_feedback_status(0)
return 0 return 0
finally: finally:
# always print a xml stats file # always print a xml stats file
......
...@@ -11,6 +11,7 @@ from UserDict import DictMixin ...@@ -11,6 +11,7 @@ from UserDict import DictMixin
from lofarpipe.cuisine.ingredient import WSRTingredient from lofarpipe.cuisine.ingredient import WSRTingredient
from lofarpipe.support.utilities import string_to_list, is_iterable from lofarpipe.support.utilities import string_to_list, is_iterable
from lofar.parameterset import parameterset
# These are currently only used by lofarrecipe.run_task to provide default # These are currently only used by lofarrecipe.run_task to provide default
# input and output dicts based on copying metadata from the parent. # input and output dicts based on copying metadata from the parent.
...@@ -194,6 +195,13 @@ class DictField(Field): ...@@ -194,6 +195,13 @@ class DictField(Field):
def is_valid(self, value): def is_valid(self, value):
return isinstance(value, dict) return isinstance(value, dict)
class ParsetField(Field):
"""
A Field which accepts a parameterset object.
"""
def is_valid(self, value):
return isinstance(value, parameterset)
class FileList(ListField): class FileList(ListField):
""" """
A Field which accepts a list of extant filenames. A Field which accepts a list of extant filenames.
......
# LOFAR IMAGING PIPELINE
#
# MAC Status Feedback QuickFix
# Marcel Loose, 2012
# loose@astron.nl
# ------------------------------------------------------------------------------
"""
This module implements the quick fix (Redmine issue #3633) for the pipeline
status feedback to MAC.
"""
import random
import socket
import time
def __try_connect(host, port, tries=5, min_timeout=1.0, max_timeout=5.0):
"""
Try to connect to `host`:`port` up time `tries` times, using a random
timeout interval ([`min_timeout` .. `max_timeout`) seconds ) between
retries.
Return a socket object.
Raises `socket.error` if all connect tries fail.
"""
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
tries -= 1
try:
sock.connect((host, port))
except socket.error, err:
print("Could not connect to %s:%s (got %s)" %
(host, str(port), str(err)))
if tries > 0:
timeout = random.uniform(min_timeout, max_timeout)
print("Retrying in %f seconds (%d more %s)." %
(timeout, tries, "try" if tries==1 else "tries"))
time.sleep(timeout)
else:
raise IOError(err)
else:
return sock
def send_status(host, port, status):
"""
Send the pipeline status to a TCP listener at `host`:`port`. If `status` is
non-zero, send the string 'ABORT'; otherwise send the string 'FINISHED'.
"""
message = "FINISHED" if status == 0 else "ABORT"
sock = __try_connect(host, port)
sock.sendall(message)
sock.close()
if __name__ == "__main__":
"""
Simple command line test.
Usage: python mac_feedback.py [<host>] [<port>] [<status>]
"""
import sys
host = str(sys.argv[1]) if len(sys.argv) > 1 else "localhost"
port = int(sys.argv[2]) if len(sys.argv) > 2 else 9999
stat = int(sys.argv[3]) if len(sys.argv) > 3 else 0
send_status(host, port, stat)
...@@ -31,7 +31,7 @@ class calibration_pipeline(control): ...@@ -31,7 +31,7 @@ class calibration_pipeline(control):
4. Create a sourcedb from the user-supplied sky model, and an empty parmdb. 4. Create a sourcedb from the user-supplied sky model, and an empty parmdb.
5. Run BBS to calibrate the data. 5. Run BBS to calibrate the data.
6. Copy the MS's to their final output destination. 6. Copy the MS's to their final output destination.
7. Create feedback file for further processing by the LOFAR framework (MAC) 7. Create feedback for further processing by the LOFAR framework
""" """
def __init__(self): def __init__(self):
...@@ -39,7 +39,6 @@ class calibration_pipeline(control): ...@@ -39,7 +39,6 @@ class calibration_pipeline(control):
self.parset = parameterset() self.parset = parameterset()
self.input_data = {} self.input_data = {}
self.output_data = {} self.output_data = {}
self.parset_feedback_file = None
def usage(self): def usage(self):
...@@ -108,7 +107,6 @@ class calibration_pipeline(control): ...@@ -108,7 +107,6 @@ class calibration_pipeline(control):
except IndexError: except IndexError:
return self.usage() return self.usage()
self.parset.adoptFile(parset_file) self.parset.adoptFile(parset_file)
self.parset_feedback_file = parset_file + "_feedback"
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
...@@ -294,31 +292,27 @@ class calibration_pipeline(control): ...@@ -294,31 +292,27 @@ class calibration_pipeline(control):
) )
# ********************************************************************* # *********************************************************************
# 7. Create feedback file for further processing by the LOFAR framework # 7. Create feedback for further processing by the LOFAR framework
# a. get metadata of the measurement sets # a. get metadata of the measurement sets
# b. get metadata of the instrument models # b. get metadata of the instrument models
# c. join the two files and write the final feedback file # c. join the two and write the final feedback
correlated_metadata = os.path.join(parset_dir, "correlated.metadata")
instrument_metadata = os.path.join(parset_dir, "instrument.metadata")
with duration(self, "get_metadata"): with duration(self, "get_metadata"):
self.run_task("get_metadata", output_correlated_mapfile, correlated_metadata = self.run_task("get_metadata", output_correlated_mapfile,
parset_file=correlated_metadata,
parset_prefix=( parset_prefix=(
self.parset.getString('prefix') + self.parset.getString('prefix') +
self.parset.fullModuleName('DataProducts')), self.parset.fullModuleName('DataProducts')),
product_type="Correlated") product_type="Correlated")["metadata"]
with duration(self, "get_metadata"): with duration(self, "get_metadata"):
self.run_task("get_metadata", output_instrument_mapfile, instrument_metadata = self.run_task("get_metadata", output_instrument_mapfile,
parset_file=instrument_metadata,
parset_prefix=( parset_prefix=(
self.parset.getString('prefix') + self.parset.getString('prefix') +
self.parset.fullModuleName('DataProducts')), self.parset.fullModuleName('DataProducts')),
product_type="InstrumentModel") product_type="InstrumentModel")["metadata"]
parset = parameterset(correlated_metadata) self.send_feedback_processing(parameterset())
parset.adoptFile(instrument_metadata) self.send_feedback_dataproducts(correlated_metadata)
parset.writeFile(self.parset_feedback_file) self.send_feedback_dataproducts(instrument_metadata)
return 0 return 0
......
...@@ -81,7 +81,7 @@ class imaging_pipeline(control): ...@@ -81,7 +81,7 @@ class imaging_pipeline(control):
and results are collected an added to the casa image. The images created and results are collected an added to the casa image. The images created
are converted from casa to HDF5 and copied to the correct output are converted from casa to HDF5 and copied to the correct output
location. location.
7. Export meta data: An outputfile with meta data is generated ready for 7. Export meta data: meta data is generated ready for
consumption by the LTA and/or the LOFAR framework. consumption by the LTA and/or the LOFAR framework.
...@@ -102,7 +102,6 @@ class imaging_pipeline(control): ...@@ -102,7 +102,6 @@ class imaging_pipeline(control):
self.target_data = DataMap() self.target_data = DataMap()
self.output_data = DataMap() self.output_data = DataMap()
self.scratch_directory = None self.scratch_directory = None
self.parset_feedback_file = None
self.parset_dir = None self.parset_dir = None
self.mapfile_dir = None self.mapfile_dir = None
...@@ -123,7 +122,6 @@ class imaging_pipeline(control): ...@@ -123,7 +122,6 @@ class imaging_pipeline(control):
except IndexError: except IndexError:
return self.usage() return self.usage()
self.parset.adoptFile(parset_file) self.parset.adoptFile(parset_file)
self.parset_feedback_file = parset_file + "_feedback"
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
if not 'job_name' in self.inputs: if not 'job_name' in self.inputs:
...@@ -231,14 +229,16 @@ class imaging_pipeline(control): ...@@ -231,14 +229,16 @@ class imaging_pipeline(control):
# ********************************************************************* # *********************************************************************
# (7) Get metadata # (7) Get metadata
# Create a parset-file containing the metadata for MAC/SAS # Create a parset containing the metadata for MAC/SAS
self.run_task("get_metadata", placed_data_image_map, metadata = self.run_task("get_metadata", placed_data_image_map,
parset_file = self.parset_feedback_file,
parset_prefix = ( parset_prefix = (
full_parset.getString('prefix') + full_parset.getString('prefix') +
full_parset.fullModuleName('DataProducts') full_parset.fullModuleName('DataProducts')
), ),
product_type = "SkyImage") product_type = "SkyImage")["metadata"]
self.send_feedback_processing(parameterset())
self.send_feedback_dataproducts(metadata)
return 0 return 0
......
...@@ -52,7 +52,7 @@ class msss_imager_pipeline(control): ...@@ -52,7 +52,7 @@ class msss_imager_pipeline(control):
single large measurement set and perform flagging, RFI and bad station single large measurement set and perform flagging, RFI and bad station
exclusion. exclusion.
2. Generate meta information feedback files based on dataproduct information 2. Generate meta information feedback based on dataproduct information
and parset/configuration data and parset/configuration data
**Per subband-group, the following output products will be delivered:** **Per subband-group, the following output products will be delivered:**
...@@ -69,7 +69,6 @@ class msss_imager_pipeline(control): ...@@ -69,7 +69,6 @@ class msss_imager_pipeline(control):
self.target_data = DataMap() self.target_data = DataMap()
self.output_data = DataMap() self.output_data = DataMap()
self.scratch_directory = None self.scratch_directory = None
self.parset_feedback_file = None
self.parset_dir = None self.parset_dir = None
self.mapfile_dir = None self.mapfile_dir = None
...@@ -90,7 +89,6 @@ class msss_imager_pipeline(control): ...@@ -90,7 +89,6 @@ class msss_imager_pipeline(control):
except IndexError: except IndexError:
return self.usage() return self.usage()
self.parset.adoptFile(parset_file) self.parset.adoptFile(parset_file)
self.parset_feedback_file = parset_file + "_feedback"
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
if not 'job_name' in self.inputs: if not 'job_name' in self.inputs:
...@@ -194,14 +192,16 @@ class msss_imager_pipeline(control): ...@@ -194,14 +192,16 @@ class msss_imager_pipeline(control):
# Create a parset-file containing the metadata for MAC/SAS at nodes # Create a parset-file containing the metadata for MAC/SAS at nodes
self.run_task("get_metadata", output_ms_mapfile, metadata = self.run_task("get_metadata", output_ms_mapfile,
parset_file = self.parset_feedback_file,
parset_prefix = ( parset_prefix = (
full_parset.getString('prefix') + full_parset.getString('prefix') +
full_parset.fullModuleName('DataProducts') full_parset.fullModuleName('DataProducts')
), ),
toplevel_meta_data_path=toplevel_meta_data_path, toplevel_meta_data_path=toplevel_meta_data_path,
product_type = "Correlated") product_type = "Correlated")["metadata"]
self.send_feedback_processing(parameterset())
self.send_feedback_dataproducts(metadata)
return 0 return 0
......
...@@ -37,7 +37,7 @@ class msss_calibrator_pipeline(control): ...@@ -37,7 +37,7 @@ class msss_calibrator_pipeline(control):
parset, and the sourcedb made earlier parset, and the sourcedb made earlier
5. Perform gain correction on the created instrument table 5. Perform gain correction on the created instrument table
6. Copy corrected MS's to their final output destination 6. Copy corrected MS's to their final output destination
7. Create output for consumption by the LOFAR framework 7. Create metadata for consumption by the LOFAR framework
**Per subband-group, the following output products will be delivered:** **Per subband-group, the following output products will be delivered:**
...@@ -51,7 +51,6 @@ class msss_calibrator_pipeline(control): ...@@ -51,7 +51,6 @@ class msss_calibrator_pipeline(control):
self.parset = parameterset() self.parset = parameterset()
self.input_data = {} self.input_data = {}
self.output_data = {} self.output_data = {}
self.parset_feedback_file = None
def usage(self): def usage(self):
...@@ -120,7 +119,6 @@ class msss_calibrator_pipeline(control): ...@@ -120,7 +119,6 @@ class msss_calibrator_pipeline(control):
except IndexError: except IndexError:
return self.usage() return self.usage()
self.parset.adoptFile(parset_file) self.parset.adoptFile(parset_file)
self.parset_feedback_file = parset_file + "_feedback"
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
...@@ -309,31 +307,27 @@ class msss_calibrator_pipeline(control): ...@@ -309,31 +307,27 @@ class msss_calibrator_pipeline(control):
) )
# ********************************************************************* # *********************************************************************
# 7. Create feedback file for further processing by the LOFAR framework # 7. Create feedback for further processing by the LOFAR framework
# a. get metadata of the measurement sets # a. get metadata of the measurement sets
# b. get metadata of the instrument models # b. get metadata of the instrument models
# c. join the two files and write the final feedback file # c. join the two and write the final feedback
correlated_metadata = os.path.join(parset_dir, "correlated.metadata")
instrument_metadata = os.path.join(parset_dir, "instrument.metadata")
with duration(self, "get_metadata"): with duration(self, "get_metadata"):
self.run_task("get_metadata", output_correlated_mapfile, correlated_metadata = self.run_task("get_metadata", output_correlated_mapfile,
parset_file=correlated_metadata,
parset_prefix=( parset_prefix=(
self.parset.getString('prefix') + self.parset.getString('prefix') +
self.parset.fullModuleName('DataProducts')), self.parset.fullModuleName('DataProducts')),
product_type="Correlated") product_type="Correlated")["metadata"]
with duration(self, "get_metadata"): with duration(self, "get_metadata"):
self.run_task("get_metadata", output_instrument_mapfile, instrument_metadata = self.run_task("get_metadata", output_instrument_mapfile,
parset_file=instrument_metadata,
parset_prefix=( parset_prefix=(
self.parset.getString('prefix') + self.parset.getString('prefix') +
self.parset.fullModuleName('DataProducts')), self.parset.fullModuleName('DataProducts')),
product_type="InstrumentModel") product_type="InstrumentModel")["metadata"]
parset = parameterset(correlated_metadata) self.send_feedback_processing(parameterset())
parset.adoptFile(instrument_metadata) self.send_feedback_dataproducts(correlated_metadata)
parset.writeFile(self.parset_feedback_file) self.send_feedback_dataproducts(instrument_metadata)
return 0 return 0
......
...@@ -77,7 +77,7 @@ class msss_imager_pipeline(control): ...@@ -77,7 +77,7 @@ class msss_imager_pipeline(control):
and results are collected an added to the casa image. The images created and results are collected an added to the casa image. The images created
are converted from casa to HDF5 and copied to the correct output are converted from casa to HDF5 and copied to the correct output
location. location.
7. Export meta data: An outputfile with meta data is generated ready for 7. Export meta data: meta data is generated ready for
consumption by the LTA and/or the LOFAR framework. consumption by the LTA and/or the LOFAR framework.
...@@ -98,7 +98,6 @@ class msss_imager_pipeline(control): ...@@ -98,7 +98,6 @@ class msss_imager_pipeline(control):
self.target_data = DataMap() self.target_data = DataMap()
self.output_data = DataMap() self.output_data = DataMap()
self.scratch_directory = None self.scratch_directory = None
self.parset_feedback_file = None
self.parset_dir = None self.parset_dir = None
self.mapfile_dir = None self.mapfile_dir = None
...@@ -119,7 +118,6 @@ class msss_imager_pipeline(control): ...@@ -119,7 +118,6 @@ class msss_imager_pipeline(control):
except IndexError: except IndexError:
return self.usage() return self.usage()
self.parset.adoptFile(parset_file) self.parset.adoptFile(parset_file)
self.parset_feedback_file = parset_file + "_feedback"
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
if not 'job_name' in self.inputs: if not 'job_name' in self.inputs:
...@@ -239,29 +237,17 @@ class msss_imager_pipeline(control): ...@@ -239,29 +237,17 @@ class msss_imager_pipeline(control):
toplevel_meta_data = parameterset() toplevel_meta_data = parameterset()
toplevel_meta_data.replace("numberOfMajorCycles", toplevel_meta_data.replace("numberOfMajorCycles",
str(number_of_major_cycles)) str(number_of_major_cycles))
toplevel_meta_data_path = os.path.join(
self.parset_dir, "toplevel_meta_data.parset")
try: # Create a parset containing the metadata for MAC/SAS at nodes
toplevel_meta_data.writeFile(toplevel_meta_data_path) metadata = self.run_task("get_metadata", placed_data_image_map,
self.logger.info("Wrote meta data to: " +
toplevel_meta_data_path)
except RuntimeError, err:
self.logger.error(
"Failed to write toplevel meta information parset: %s" % str(
toplevel_meta_data_path))
return 1
# Create a parset-file containing the metadata for MAC/SAS at nodes
self.run_task("get_metadata", placed_data_image_map,
parset_file = self.parset_feedback_file,
parset_prefix = ( parset_prefix = (
full_parset.getString('prefix') + full_parset.getString('prefix') +
full_parset.fullModuleName('DataProducts') full_parset.fullModuleName('DataProducts')
), ),
toplevel_meta_data_path=toplevel_meta_data_path, product_type = "SkyImage")["metadata"]
product_type = "SkyImage")
self.send_feedback_processing(toplevel_meta_data)
self.send_feedback_dataproducts(metadata)
return 0 return 0
......
...@@ -38,7 +38,7 @@ class msss_target_pipeline(control): ...@@ -38,7 +38,7 @@ class msss_target_pipeline(control):
5. Run BBS using the instrument file from the target observation, to 5. Run BBS using the instrument file from the target observation, to
correct for instrumental effects correct for instrumental effects
6. Copy the MS's to their final output destination. 6. Copy the MS's to their final output destination.
7. Create feedback file for further processing by the LOFAR framework (MAC) 7. Create feedback for further processing by the LOFAR framework
**Per subband-group, the following output products will be delivered:** **Per subband-group, the following output products will be delivered:**
...@@ -51,7 +51,6 @@ class msss_target_pipeline(control): ...@@ -51,7 +51,6 @@ class msss_target_pipeline(control):
self.parset = parameterset() self.parset = parameterset()
self.input_data = {} self.input_data = {}
self.output_data = {} self.output_data = {}
self.parset_feedback_file = None
def usage(self): def usage(self):
...@@ -181,7 +180,6 @@ class msss_target_pipeline(control): ...@@ -181,7 +180,6 @@ class msss_target_pipeline(control):
except IndexError: except IndexError:
return self.usage() return self.usage()
self.parset.adoptFile(parset_file) self.parset.adoptFile(parset_file)
self.parset_feedback_file = parset_file + "_feedback"
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
...@@ -345,17 +343,17 @@ class msss_target_pipeline(control): ...@@ -345,17 +343,17 @@ class msss_target_pipeline(control):
) )
# ********************************************************************* # *********************************************************************
# 7. Create feedback file for further processing by the LOFAR framework # 7. Create feedback for further processing by the LOFAR framework
# (MAC)
# Create a parset-file containing the metadata for MAC/SAS
with duration(self, "get_metadata"): with duration(self, "get_metadata"):
self.run_task("get_metadata", corrected_mapfile, metadata = self.run_task("get_metadata", corrected_mapfile,
parset_file=self.parset_feedback_file,
parset_prefix=( parset_prefix=(
self.parset.getString('prefix') + self.parset.getString('prefix') +
self.parset.fullModuleName('DataProducts') self.parset.fullModuleName('DataProducts')
), ),
product_type="Correlated") product_type="Correlated")["metadata"]
self.send_feedback_processing(parameterset())
self.send_feedback_dataproducts(metadata)
return 0 return 0
......
...@@ -34,7 +34,6 @@ class preprocessing_pipeline(control): ...@@ -34,7 +34,6 @@ class preprocessing_pipeline(control):
self.input_data = [] self.input_data = []
self.output_data = [] self.output_data = []
self.io_data_mask = [] self.io_data_mask = []
self.parset_feedback_file = None
def usage(self): def usage(self):
...@@ -120,7 +119,6 @@ class preprocessing_pipeline(control): ...@@ -120,7 +119,6 @@ class preprocessing_pipeline(control):
except IndexError: except IndexError:
return self.usage() return self.usage()
self.parset.adoptFile(parset_file) self.parset.adoptFile(parset_file)
self.parset_feedback_file = parset_file + "_feedback"
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
...@@ -232,15 +230,15 @@ class preprocessing_pipeline(control): ...@@ -232,15 +230,15 @@ class preprocessing_pipeline(control):
# ********************************************************************* # *********************************************************************
# 6. Create feedback file for further processing by the LOFAR framework # 6. Create feedback file for further processing by the LOFAR framework
# (MAC) # Create a parset containing the metadata
# Create a parset-file containing the metadata for MAC/SAS
with duration(self, "get_metadata"): with duration(self, "get_metadata"):
self.run_task("get_metadata", output_data_mapfile, metadata = self.run_task("get_metadata", output_data_mapfile,
parset_file=self.parset_feedback_file,
parset_prefix=( parset_prefix=(
self.parset.getString('prefix') + self.parset.getString('prefix') +
self.parset.fullModuleName('DataProducts')), self.parset.fullModuleName('DataProducts')),
product_type="Correlated") product_type="Correlated")["metadata"]
self.send_feedback_dataproducts(metadata)
return 0 return 0
......
...@@ -37,7 +37,6 @@ class pulsar_pipeline(control): ...@@ -37,7 +37,6 @@ class pulsar_pipeline(control):
self.parset = parameterset() self.parset = parameterset()
self.input_data = {} self.input_data = {}
self.output_data = {} self.output_data = {}
self.parset_feedback_file = None
def go(self): def go(self):
...@@ -51,7 +50,6 @@ class pulsar_pipeline(control): ...@@ -51,7 +50,6 @@ class pulsar_pipeline(control):
except IndexError: except IndexError:
return self.usage() return self.usage()
self.parset.adoptFile(parset_file) self.parset.adoptFile(parset_file)
self.parset_feedback_file = parset_file + "_feedback"
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
...@@ -168,7 +166,25 @@ class pulsar_pipeline(control): ...@@ -168,7 +166,25 @@ class pulsar_pipeline(control):
# Run the pulsar pipeline # Run the pulsar pipeline
self.logger.debug("Starting pulp with: " + join(sys.argv)) self.logger.debug("Starting pulp with: " + join(sys.argv))
p = pulp.pulp(self) p = pulp.pulp(self)
return p.go()
if not p.go()
self.logger.error("PULP did not succeed. Bailing out!")
return 0
# PULP puts the feedback in ObservationXXXXX_feedback, where XXXXX is the SAS ID.
# Note that the input parset is called ObservationXXXXX
metadata_file = "%s_feedback" % (parset_file,)
# Read and forward the feedback
try:
metadata = parameterset(metadata_file)
except IOError, e:
self.logger.error("Could not read feedback from %s: %s" % (metadata_file,e))
return 0
send_feedback_dataproduct(metadata)
return 1
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -41,10 +41,9 @@ usage() ...@@ -41,10 +41,9 @@ usage()
pythonProgram="${1}" pythonProgram="${1}"
parsetFile="${2}" parsetFile="${2}"
controlHost="${3}"
echo "**** $(date) ****" >> ${logFile} echo "**** $(date) ****" >> ${logFile}
echo "Executing: $0 ${pythonProgram} ${parsetFile} ${controlHost}" >> ${logFile} echo "Executing: $0 ${pythonProgram} ${parsetFile}" >> ${logFile}
use_pulp="$(getparsetvalue $parsetFile "ObsSW.Observation.processSubtype")" use_pulp="$(getparsetvalue $parsetFile "ObsSW.Observation.processSubtype")"
if [ "${use_pulp}" == "Pulsar Pipeline" ]; then if [ "${use_pulp}" == "Pulsar Pipeline" ]; then
...@@ -81,14 +80,14 @@ if [ -n "$debug" ]; then ...@@ -81,14 +80,14 @@ if [ -n "$debug" ]; then
echo "PATH=${PATH}" >> ${logFile} echo "PATH=${PATH}" >> ${logFile}
echo "PYTHONPATH=${PYTHONPATH}" >> ${logFile} echo "PYTHONPATH=${PYTHONPATH}" >> ${logFile}
echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> ${logFile} echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> ${logFile}
echo "${pythonProgram} ${programOptions} ${parsetFile} ${controlHost}" \ echo "${pythonProgram} ${programOptions} ${parsetFile}" \
>> ${logFile} >> ${logFile}
fi fi
# Start the Python program in the background. # Start the Python program in the background.
# This script should return ASAP so that MAC can set the task to ACTIVE. # This script should return ASAP so that MAC can set the task to ACTIVE.
# STDERR will be redirected to the log-file. # STDERR will be redirected to the log-file.
${pythonProgram} ${programOptions} ${parsetFile} ${controlHost} \ ${pythonProgram} ${programOptions} ${parsetFile} \
1> /dev/null 2>> ${logFile} & 1> /dev/null 2>> ${logFile} &
# Check if the Python program died early. If so, this indicates an error. # Check if the Python program died early. If so, this indicates an error.
......
...@@ -25,7 +25,7 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -25,7 +25,7 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn):
2. Load mapfiles 2. Load mapfiles
3. call node side of the recipe 3. call node side of the recipe
4. validate performance 4. validate performance
5. Create the parset-file and write it to disk. 5. Create the parset and return it.
**Command line arguments** **Command line arguments**
...@@ -36,23 +36,17 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -36,23 +36,17 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn):
'--product-type', '--product-type',
help="Data product type", help="Data product type",
), ),
'parset_file': ingredient.StringField(
'--parset-file',
help="Path to the output parset file"
),
'parset_prefix': ingredient.StringField( 'parset_prefix': ingredient.StringField(
'--parset-prefix', '--parset-prefix',
help="Prefix for each key in the output parset file", help="Prefix for each key in the output parset file",
default='' default=''
), ),
'toplevel_meta_data_path': ingredient.StringField(
'--toplevel-meta-data',
help="Path to parset with toplevel meta information, default = ''",
default=''
)
} }
outputs = { outputs = {
'metadata': ingredient.ParsetField(
help="parset containing obtained metadata"
)
} }
# List of valid data product types. # List of valid data product types.
...@@ -117,17 +111,11 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -117,17 +111,11 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn):
data.save(args[0]) data.save(args[0])
# ******************************************************************** # ********************************************************************
# 5. Create the parset-file and write it to disk. # 5. Create the parset-file and return it to the caller
parset = parameterset() parset = parameterset()
prefix = "Output_%s_" % product_type prefix = "Output_%s_" % product_type
parset.replace('%snrOf%s' % (global_prefix, prefix), str(len(jobs))) parset.replace('%snrOf%s' % (global_prefix, prefix), str(len(jobs)))
# If there is meta data to add from the toplevel script
pipeline_meta_parset_path = self.inputs['toplevel_meta_data_path']
if pipeline_meta_parset_path != "":
pipeline_meta_parset = parameterset(pipeline_meta_parset_path)
parset.adoptCollection(pipeline_meta_parset)
prefix = global_prefix + prefix prefix = global_prefix + prefix
for idx, job in enumerate(jobs): for idx, job in enumerate(jobs):
self.logger.debug("job[%d].results = %s" % (idx, job.results)) self.logger.debug("job[%d].results = %s" % (idx, job.results))
...@@ -140,16 +128,8 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -140,16 +128,8 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn):
parset.adoptCollection(meta_data_parset, parset.adoptCollection(meta_data_parset,
'%s[%d].' % (prefix, idx)) '%s[%d].' % (prefix, idx))
try: # Return result to caller
self.outputs["metadata"] = parset
create_directory(os.path.dirname(self.inputs['parset_file']))
parset.writeFile(self.inputs['parset_file'])
self.logger.info("Wrote meta data to: " +
self.inputs['parset_file'])
except RuntimeError, err:
self.logger.error("Failed to write meta-data: %s" % str(err))
return 1
return 0 return 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment