Skip to content
Snippets Groups Projects
imager_bbs.py 6.28 KiB
Newer Older
# imager_bbz BBS (BlackBoard Selfcal) recipe
# Wouter Klijn
# klijn@astron.nl
# ------------------------------------------------------------------------------
from __future__ import with_statement
import sys
import os

from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn
from lofarpipe.support.baserecipe import BaseRecipe
from lofarpipe.support.data_map import DataMap, MultiDataMap
import lofarpipe.support.lofaringredient as ingredient
from lofarpipe.support.remotecommand import ComputeJob

class imager_bbs(BaseRecipe, RemoteCommandRecipeMixIn):
    """
    Imager_bbs master performs a bbs run based on the supplied parset it is a
    shallow wrapper around bbs. Additional functionality compared to the default
    bbs recipe is the capability to add an id that allows multiple
    runs to have different output files
    
    1. Load and validates that the input mapfiles are correct 
    2. and then starts the node script, use indexed path names for the 
       communication
    3. Check if all nodes succeeded. If so return a mapfile with calibrated
       ms
       
    **Command line Arguments**
    
    1. Path to a mapfile with measurement sets to calibrate
    """
    inputs = {
        'parset': ingredient.FileField(
            '-p', '--parset',
        'nthreads': ingredient.IntField(
            '--nthreads',
            default=8,
            help="Number of threads per process"
        ),
        'bbs_executable': ingredient.StringField(
            '--bbs-executable',
            help="BBS standalone executable (bbs-reducer)"
        ),
        'instrument_mapfile': ingredient.FileField(
            '--instrument-mapfile',
            help="Full path to the mapfile containing the names of the "
                 "instrument model files generated by the `parmdb` recipe"
        ),
        'sourcedb_mapfile': ingredient.FileField(
            '--sourcedb-mapfile',
            help="Full path to the mapfile containing the names of the "
                 "sourcedbs generated by the `sourcedb` recipe"
        ),
        'id': ingredient.IntField(
            '--id',
            default=0,
            help="Optional integer id for distinguishing multiple runs"
        ),
        'mapfile': ingredient.StringField(
            '--mapfile',
            help="Full path to the file containing the output data products"
        ),
    }

    outputs = {
        'mapfile': ingredient.FileField(
            help="Full path to a mapfile describing the processed data"
        """
        imager_bbs functionality. Called by framework performing all the work
        """
        self.logger.info("Starting imager_bbs run")
        # ********************************************************************
        # 1. Load the and validate the data

        ms_map = MultiDataMap.load(self.inputs['args'][0])
        parmdb_map = MultiDataMap.load(self.inputs['instrument_mapfile'])
        sourcedb_map = DataMap.load(self.inputs['sourcedb_mapfile'])

        # TODO: DataMap extention
#        #Check if the input has equal length and on the same nodes
#        if not validate_data_maps(ms_map, parmdb_map):
#            self.logger.error("The combination of mapfiles failed validation:")
#            self.logger.error("ms_map: \n{0}".format(ms_map))
#            self.logger.error("parmdb_map: \n{0}".format(parmdb_map))
#            return 1
        # *********************************************************************
        # 2. Start the node scripts
        jobs = []
        node_command = " python %s" % (self.__file__.replace("master", "nodes"))
        map_dir = os.path.join(
                        self.config.get("layout", "job_directory"), "mapfiles")
        run_id = str(self.inputs.get("id"))

        # Update the skip fields of the four maps. If 'skip' is True in any of
        # these maps, then 'skip' must be set to True in all maps.
        for w, x, y in zip(ms_map, parmdb_map, sourcedb_map):
            w.skip = x.skip = y.skip = (
                w.skip or x.skip or y.skip
            )

        ms_map.iterator = parmdb_map.iterator = sourcedb_map.iterator = \
        for (idx, (ms, parmdb, sourcedb)) in enumerate(zip(ms_map, parmdb_map, sourcedb_map)):
            #host is same for each entry (validate_data_maps)
            host, ms_list = ms.host, ms.file
            # Write data maps to MultaDataMaps
                    map_dir, "%s-%s_map_%s.map" % (host, idx, run_id))
            MultiDataMap([tuple([host, ms_list, False])]).save(ms_list_path)

                    map_dir, "%s-%s_parmdb_%s.map" % (host, idx, run_id))
            MultiDataMap(
                [tuple([host, parmdb.file, False])]).save(parmdb_list_path)

                    map_dir, "%s-%s_sky_%s.map" % (host, idx, run_id))
            MultiDataMap(
                [tuple([host, [sourcedb.file], False])]).save(sourcedb_list_path)

            arguments = [self.inputs['bbs_executable'],
                         self.inputs['parset'],
                         ms_list_path, parmdb_list_path, sourcedb_list_path]
            jobs.append(ComputeJob(host, node_command, arguments,
                    resources={
                        "cores": self.inputs['nthreads']
                    }))

        # start and wait till all are finished
        self._schedule_jobs(jobs)

        # **********************************************************************
        # 3. validate the node output and construct the output mapfile.
        if self.error.isSet():   #if one of the nodes failed
            self.logger.error("One of the nodes failed while performing"
                              "a BBS run. Aborting: concat.ms corruption")
            return 1

        # return the output: The measurement set that are calibrated:
        # calibrated data is placed in the ms sets
        MultiDataMap(ms_map).save(self.inputs['mapfile'])
        self.logger.info("Wrote file with  calibrated data")

        self.outputs['mapfile'] = self.inputs['mapfile']
        return 0


if __name__ == '__main__':
    sys.exit(imager_bbs().main())