Newer
Older
# LOFAR IMAGING PIPELINE
Wouter Klijn
committed
# imager_bbz BBS (BlackBoard Selfcal) recipe
# Wouter Klijn
# klijn@astron.nl
# ------------------------------------------------------------------------------
from __future__ import with_statement
import sys
import os
from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn
from lofarpipe.support.baserecipe import BaseRecipe
from lofarpipe.support.data_map import DataMap, MultiDataMap
import lofarpipe.support.lofaringredient as ingredient
from lofarpipe.support.remotecommand import ComputeJob
class imager_bbs(BaseRecipe, RemoteCommandRecipeMixIn):
"""
Imager_bbs master performs a bbs run based on the supplied parset it is a
shallow wrapper around bbs. Additional functionality compared to the default
bbs recipe is the capability to add an id that allows multiple
runs to have different output files
1. Load and validates that the input mapfiles are correct
2. and then starts the node script, use indexed path names for the
communication
3. Check if all nodes succeeded. If so return a mapfile with calibrated
ms
**Command line Arguments**
1. Path to a mapfile with measurement sets to calibrate
Wouter Klijn
committed
"""
inputs = {
'parset': ingredient.FileField(
'-p', '--parset',
help="BBS configuration parset"
'nthreads': ingredient.IntField(
'--nthreads',
default=8,
help="Number of threads per process"
),
'bbs_executable': ingredient.StringField(
'--bbs-executable',
help="BBS standalone executable (bbs-reducer)"
),
'instrument_mapfile': ingredient.FileField(
'--instrument-mapfile',
help="Full path to the mapfile containing the names of the "
"instrument model files generated by the `parmdb` recipe"
),
Wouter Klijn
committed
'sourcedb_mapfile': ingredient.FileField(
'--sourcedb-mapfile',
help="Full path to the mapfile containing the names of the "
Wouter Klijn
committed
"sourcedbs generated by the `sourcedb` recipe"
),
'id': ingredient.IntField(
'--id',
default=0,
help="Optional integer id for distinguishing multiple runs"
),
'mapfile': ingredient.StringField(
'--mapfile',
help="Full path to the file containing the output data products"
),
}
outputs = {
'mapfile': ingredient.FileField(
help="Full path to a mapfile describing the processed data"
)
}
def go(self):
"""
imager_bbs functionality. Called by framework performing all the work
"""
super(imager_bbs, self).go()
Wouter Klijn
committed
self.logger.info("Starting imager_bbs run")
# ********************************************************************
# 1. Load the and validate the data
ms_map = MultiDataMap.load(self.inputs['args'][0])
parmdb_map = MultiDataMap.load(self.inputs['instrument_mapfile'])
sourcedb_map = DataMap.load(self.inputs['sourcedb_mapfile'])
# TODO: DataMap extention
# #Check if the input has equal length and on the same nodes
# if not validate_data_maps(ms_map, parmdb_map):
# self.logger.error("The combination of mapfiles failed validation:")
# self.logger.error("ms_map: \n{0}".format(ms_map))
# self.logger.error("parmdb_map: \n{0}".format(parmdb_map))
# return 1
# *********************************************************************
# 2. Start the node scripts
jobs = []
node_command = " python %s" % (self.__file__.replace("master", "nodes"))
map_dir = os.path.join(
self.config.get("layout", "job_directory"), "mapfiles")
run_id = str(self.inputs.get("id"))
Wouter Klijn
committed
# Update the skip fields of the four maps. If 'skip' is True in any of
# these maps, then 'skip' must be set to True in all maps.
for w, x, y in zip(ms_map, parmdb_map, sourcedb_map):
w.skip = x.skip = y.skip = (
w.skip or x.skip or y.skip
)
Wouter Klijn
committed
ms_map.iterator = parmdb_map.iterator = sourcedb_map.iterator = \
DataMap.SkipIterator
for (idx, (ms, parmdb, sourcedb)) in enumerate(zip(ms_map, parmdb_map, sourcedb_map)):
#host is same for each entry (validate_data_maps)
ms_list_path = os.path.join(
map_dir, "%s-%s_map_%s.map" % (host, idx, run_id))
MultiDataMap([tuple([host, ms_list, False])]).save(ms_list_path)
parmdb_list_path = os.path.join(
map_dir, "%s-%s_parmdb_%s.map" % (host, idx, run_id))
MultiDataMap(
[tuple([host, parmdb.file, False])]).save(parmdb_list_path)
sourcedb_list_path = os.path.join(
map_dir, "%s-%s_sky_%s.map" % (host, idx, run_id))
MultiDataMap(
[tuple([host, [sourcedb.file], False])]).save(sourcedb_list_path)
arguments = [self.inputs['bbs_executable'],
self.inputs['parset'],
ms_list_path, parmdb_list_path, sourcedb_list_path]
jobs.append(ComputeJob(host, node_command, arguments,
resources={
"cores": self.inputs['nthreads']
}))
# start and wait till all are finished
self._schedule_jobs(jobs)
# **********************************************************************
# 3. validate the node output and construct the output mapfile.
if self.error.isSet(): #if one of the nodes failed
Wouter Klijn
committed
self.logger.error("One of the nodes failed while performing"
"a BBS run. Aborting: concat.ms corruption")
return 1
# return the output: The measurement set that are calibrated:
# calibrated data is placed in the ms sets
MultiDataMap(ms_map).save(self.inputs['mapfile'])
self.logger.info("Wrote file with calibrated data")
self.outputs['mapfile'] = self.inputs['mapfile']
return 0
if __name__ == '__main__':
sys.exit(imager_bbs().main())