Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
pulsar_pipeline.py 7.36 KiB
#!/usr/bin/env python
#
# pulsar_pipeline.py is a wrapper around the actual pulsar pipeline pulp.py
# It is supplied with a pipeline parset which it will digest.
# It reads data input and output locations which are written to mapfiles
# These mapfiles can then be read by pulp to know which input data
# it should use and which output locations and filenames it should write to.
#
# Alwin de Jong (jong@astron.nl) March 2014
###################################################################

#import os, sys, re
#import optparse as opt
#from subprocess import PIPE, STDOUT, Popen

import os
import sys
import pulp

from string import join
from lofarpipe.support.control import control
from lofarpipe.support.data_map import DataMap, validate_data_maps
from lofarpipe.support.lofarexceptions import PipelineException
from lofarpipe.support.utilities import create_directory
from lofar.parameterset import parameterset
from lofarpipe.support.loggingdecorators import mail_log_on_exception, duration


class pulsar_pipeline(control):
    """
    This is the main wrapper class around the pulsar pipeline for
    integration of the pulsar pipelines as a 'black-box'
    """

    def __init__(self):
        super(pulsar_pipeline, self).__init__()
        self.input_data = {}
        self.output_data = {}
  

    def _get_io_product_specs(self):
        """
        Get input- and output-data product specifications from the
        parset-file, and do some sanity checks.
        """
        dps = self.parset.makeSubset(
            self.parset.fullModuleName('DataProducts') + '.'
        )
        # Coherent Stokes input data
        self.coherentStokesEnabled = dps.getBool('Input_CoherentStokes.enabled', False)
        self.input_data['coherent'] = DataMap([
            tuple(os.path.join(location, filename).split(':')) + (skip,)
                for location, filename, skip in zip(
                    dps.getStringVector('Input_CoherentStokes.locations'),
                    dps.getStringVector('Input_CoherentStokes.filenames'),
                    dps.getBoolVector('Input_CoherentStokes.skip'))
        ])
        self.logger.debug("%d Input_CoherentStokes data products specified" %
                    len(self.input_data['coherent']))
        # Incoherent Stokes input data
        self.incoherentStokesEnabled = dps.getBool('Input_IncoherentStokes.enabled', False)
        self.input_data['incoherent'] = DataMap([
            tuple(os.path.join(location, filename).split(':')) + (skip,)
                for location, filename, skip in zip(
                    dps.getStringVector('Input_IncoherentStokes.locations'),
                    dps.getStringVector('Input_IncoherentStokes.filenames'),
                    dps.getBoolVector('Input_IncoherentStokes.skip'))
        ])
        self.logger.debug("%d Input_IncoherentStokes data products specified" %
                            len(self.input_data['incoherent']))
        self.output_data['data'] = DataMap([
            tuple(os.path.join(location, filename).split(':')) + (skip,)
                for location, filename, skip in zip(
                    dps.getStringVector('Output_Pulsar.locations'),
                    dps.getStringVector('Output_Pulsar.filenames'),
                    dps.getBoolVector('Output_Pulsar.skip'))
        ])
        self.logger.debug("%d Output_Pulsar data products specified" %
                            len(self.output_data['data']))
        
        # Sanity checks on input- and output data product specifications
        # the existing validate_data_maps will probably not work for pulsar pipeline,
        # were length of output equals length of input coherent + length of input incoherent arrays
        # in_len = len(self.input_data['coherent']) + len(self.input_data['incoherent'])
        # out_len = len(self.output_data['data'])
        # if (in_len != out_len):
        #     raise DataMapError("number of enabled input data products %s does not match the total number of output files %s" % 
        #                     (in_len, out_len))
        
    @mail_log_on_exception
    def pipeline_logic(self):
        """
        Define the individual tasks that comprise the current pipeline.
        This method will be invoked by the base-class's `go()` method.
        """
        # *********************************************************************
        # 1. Prepare phase, collect data from parset and input mapfiles.
        #
        # Note that PULP will read many of these fields directly. That makes
        # the following fields, and possibly others, part of the API towards
        # PULP:
        #
        # self.config
        # self.logger
        # self.input_data
        # self.output_data
        # self.parset_feedback_file
        # self.job_dir

        # Get input/output-data products specifications.
        self._get_io_product_specs()

        self.job_dir = self.config.get("layout", "job_directory")
        parset_dir = os.path.join(self.job_dir, "parsets")
        mapfile_dir = os.path.join(self.job_dir, "mapfiles")
        
        # Create directories for temporary parset- and map files
        create_directory(parset_dir)
        create_directory(mapfile_dir)

        # Write input- and output data map-files
        # Coherent Stokes
        self.input_CS_mapfile = os.path.join(mapfile_dir, "input_CS_data.mapfile")
        self.input_data['coherent'].save(self.input_CS_mapfile)
        # Incoherent Stokes
        self.input_IS_mapfile = os.path.join(mapfile_dir, "input_IS_data.mapfile")
        self.input_data['incoherent'].save(self.input_IS_mapfile)
        # Output data
        self.output_data_mapfile = os.path.join(mapfile_dir, "output_data.mapfile")
        self.output_data['data'].save(self.output_data_mapfile)

        if len(self.input_data) == 0:
            self.logger.warn("No input data files to process. Bailing out!")
            return 0

        self.pulsar_parms = self.parset.makeSubset(self.parset.fullModuleName('Pulsar') + '.')
        pulsar_parset = os.path.join(parset_dir, "Pulsar.parset")
        self.pulsar_parms.writeFile(pulsar_parset)
            
        self.logger.debug("Processing: %s" %
          ', '.join(str(f) for f in self.input_data))
        
        # Rebuilding sys.argv without the options given automatically by framework
        # --auto = automatic run from framework
        # -q = quiet mode, no user interaction
        sys.argv = ['pulp.py', '--auto', '-q']
      
        if (not self.coherentStokesEnabled):
          sys.argv.extend(["--noCS", "--noCV", "--noFE"])
          
        if (not self.incoherentStokesEnabled):
          sys.argv.append("--noIS")       

        # Tell PULP where to write the feedback to
        self.parset_feedback_file =  "%s_feedback" % (self.parset_file,)
       
        # Run the pulsar pipeline
        self.logger.debug("Starting pulp with: " + join(sys.argv))
        p = pulp.pulp(self) # TODO: MUCK self to capture the API

	# NOTE: PULP returns 0 on SUCCESS!!
        if p.go():
          self.logger.error("PULP did not succeed. Bailing out!")
          return 0

        # Read and forward the feedback
        try:
          metadata = parameterset(self.parset_feedback_file)
        except IOError, e:
          self.logger.error("Could not read feedback from %s: %s" % (metadata_file,e))
          return 0

        self.send_feedback_dataproducts(metadata)

        return 1

    
if __name__ == '__main__':
    sys.exit(pulsar_pipeline().main())