From 7c3411b82fba52a37f721cc7cc3b00bc2d97c079 Mon Sep 17 00:00:00 2001 From: Marcel Loose <loose@astron.nl> Date: Wed, 19 Oct 2011 11:39:55 +0000 Subject: [PATCH] Fixes #2109: Merged task branch CEP-Pipeline-Bug1713-task-branch with the trunk. --- .../docs/examples/definition/sip2/tasks.cfg | 4 +- .../source/pipelines/sip/recipes/dppp.rst | 2 +- .../source/pipelines/sip/recipes/vdsmaker.rst | 2 +- .../framework/lofarpipe/support/baserecipe.py | 14 +- .../framework/lofarpipe/support/parset.py | 44 +- .../framework/lofarpipe/support/utilities.py | 3 +- CEP/Pipeline/recipes/sip/CMakeLists.txt | 8 +- CEP/Pipeline/recipes/sip/master/bbs.py | 4 +- .../recipes/sip/master/calibrator_pipeline.py | 120 +++++ .../recipes/sip/master/cep2_datamapper.py | 73 ++- CEP/Pipeline/recipes/sip/master/cimager.py | 4 +- .../sip/master/{new_dppp.py => dppp.py} | 13 +- CEP/Pipeline/recipes/sip/master/new_bbs.py | 423 ++++++++++++++++++ CEP/Pipeline/recipes/sip/master/parmdb.py | 42 +- CEP/Pipeline/recipes/sip/master/sourcedb.py | 40 +- .../recipes/sip/master/target_pipeline.py | 128 ++++++ .../master/{new_vdsmaker.py => vdsmaker.py} | 10 +- CEP/Pipeline/recipes/sip/master/vdsreader.py | 4 +- .../sip/nodes/demix/shiftphasecenter.py | 5 +- CEP/Pipeline/recipes/sip/nodes/new_bbs.py | 113 +++++ CEP/Pipeline/recipes/sip/nodes/parmdb.py | 14 +- CEP/Pipeline/recipes/sip/nodes/sourcedb.py | 42 +- CEP/Pipeline/recipes/sip/tasks.cfg | 23 +- 23 files changed, 1043 insertions(+), 92 deletions(-) create mode 100644 CEP/Pipeline/recipes/sip/master/calibrator_pipeline.py rename CEP/Pipeline/recipes/sip/master/{new_dppp.py => dppp.py} (94%) create mode 100644 CEP/Pipeline/recipes/sip/master/new_bbs.py create mode 100644 CEP/Pipeline/recipes/sip/master/target_pipeline.py rename CEP/Pipeline/recipes/sip/master/{new_vdsmaker.py => vdsmaker.py} (94%) create mode 100644 CEP/Pipeline/recipes/sip/nodes/new_bbs.py diff --git a/CEP/Pipeline/docs/examples/definition/sip2/tasks.cfg b/CEP/Pipeline/docs/examples/definition/sip2/tasks.cfg index 0e652aa54ae..25398551cf0 100644 --- a/CEP/Pipeline/docs/examples/definition/sip2/tasks.cfg +++ b/CEP/Pipeline/docs/examples/definition/sip2/tasks.cfg @@ -3,7 +3,7 @@ recipe = datamapper mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/storage_mapfile [vdsmaker] -recipe = new_vdsmaker +recipe = vdsmaker directory = %(runtime_directory)s/jobs/%(job_name)s/vds gvds = %(runtime_directory)s/jobs/%(job_name)s/vds/inputs.gvds makevds = %(lofarroot)s/bin/makevds @@ -19,7 +19,7 @@ min_flux = 0.5 skymodel_file = %(runtime_directory)s/jobs/%(job_name)s/parsets/bbs.skymodel [ndppp] -recipe = new_dppp +recipe = dppp executable = %(lofarroot)s/bin/NDPPP initscript = %(lofarroot)s/lofarinit.sh working_directory = /data/scratch/swinbank diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/dppp.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/dppp.rst index 2fc26c6a45d..37814fe44ef 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/dppp.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/dppp.rst @@ -4,5 +4,5 @@ DPPP ==== -.. autoclass:: new_dppp.new_dppp +.. autoclass:: dppp.dppp :show-inheritance: diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsmaker.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsmaker.rst index ce13707016b..1a55828d179 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsmaker.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsmaker.rst @@ -2,5 +2,5 @@ vdsmaker ======== -.. autoclass:: new_vdsmaker.new_vdsmaker +.. autoclass:: vdsmaker.vdsmaker :show-inheritance: diff --git a/CEP/Pipeline/framework/lofarpipe/support/baserecipe.py b/CEP/Pipeline/framework/lofarpipe/support/baserecipe.py index 2cf61345d0b..554592fcfa6 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/baserecipe.py +++ b/CEP/Pipeline/framework/lofarpipe/support/baserecipe.py @@ -183,13 +183,18 @@ class BaseRecipe(RecipeIngredients, WSRTrecipe): import datetime self.inputs["start_time"] = datetime.datetime.utcnow().replace(microsecond=0).isoformat() - self.logger.debug("Pipeline start time: %s" % self.inputs['start_time']) - # Config is passed in from spawning recipe. But if this is the start # of a pipeline, it won't have one. if not hasattr(self, "config"): self.config = self._read_config() + # Only configure handlers if our parent is the root logger. + # Otherwise, our parent should have done it for us. + if isinstance(self.logger.parent, logging.RootLogger): + self._setup_logging() + + self.logger.debug("Pipeline start time: %s" % self.inputs['start_time']) + # Ensure we have a runtime directory if not self.inputs.has_key('runtime_directory'): self.inputs["runtime_directory"] = self.config.get( @@ -228,8 +233,3 @@ class BaseRecipe(RecipeIngredients, WSRTrecipe): "Required inputs not available: %s" % " ".join(self.inputs.missing()) ) - - # Only configure handlers if our parent is the root logger. - # Otherwise, our parent should have done it for us. - if isinstance(self.logger.parent, logging.RootLogger): - self._setup_logging() diff --git a/CEP/Pipeline/framework/lofarpipe/support/parset.py b/CEP/Pipeline/framework/lofarpipe/support/parset.py index dee9ae3643f..f6ee294c99f 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/parset.py +++ b/CEP/Pipeline/framework/lofarpipe/support/parset.py @@ -53,16 +53,16 @@ class Parset(parameterset): self.keys ) - def makeSubset(self, baseKey, prefix=None): - newps = Parset() - for key in self.keys: - if key[:len(baseKey)] == baseKey: - if prefix: - newkey = key.replace(baseKey, prefix) - else: - newkey = key - newps.add(newkey, self[key].get()) - return newps + #def makeSubset(self, baseKey, prefix=None): + #newps = Parset() + #for key in self.keys: + #if key[:len(baseKey)] == baseKey: + #if prefix: + #newkey = key.replace(baseKey, prefix) + #else: + #newkey = key + #newps.add(newkey, self[key].get()) + #return newps def addStringVector(self, key, vector): super(Parset, self).add(key, "[ %s ]" % ", ".join(vector)) @@ -79,6 +79,22 @@ class Parset(parameterset): def __iter__(self): return iter(self.keys) + @classmethod + def fromDict(cls, kvm): + """ + Create a parameterset object from the given dict `kvm`. + + Caution: although any value that can be converted to a string will be + written to the Parset, some values cannot be interpreted correctly by + the C++ Parameterset class (e.g., a python dict). + """ + if not isinstance(kvm, dict): + raise TypeError("Input argument must be a dictionary") + obj = Parset() + for k in kvm: + obj.add(k, str(kvm[k])) + return obj + def get_parset(parset_filename): """ Returns an instance of Parset with the given file loaded. @@ -89,8 +105,14 @@ def patch_parset(parset, data, output_dir=None): """ Generate a parset file by adding the contents of the data dictionary to the specified parset object. Write it to file, and return the filename. + + `parset` may either be the filename of a parset-file or an instance of + `lofar.parameterset.parameterset`. """ - temp_parset = get_parset(parset) + if isinstance(parset, str): + temp_parset = parameterset(parset) + else: + temp_parset = parset.makeSubset('') # a sneaky way to copy the parset for key, value in data.iteritems(): temp_parset.replace(key, value) fd, output = mkstemp(dir=output_dir) diff --git a/CEP/Pipeline/framework/lofarpipe/support/utilities.py b/CEP/Pipeline/framework/lofarpipe/support/utilities.py index c040b5144e2..8474c8ceaa3 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/utilities.py +++ b/CEP/Pipeline/framework/lofarpipe/support/utilities.py @@ -45,7 +45,8 @@ def create_directory(dirname): Recursively create a directory, without failing if it already exists. """ try: - os.makedirs(dirname) + if dirname: + os.makedirs(dirname) except OSError, failure: if failure.errno != errno.EEXIST: raise diff --git a/CEP/Pipeline/recipes/sip/CMakeLists.txt b/CEP/Pipeline/recipes/sip/CMakeLists.txt index 097b0ce0ffb..f9252bfcc5f 100644 --- a/CEP/Pipeline/recipes/sip/CMakeLists.txt +++ b/CEP/Pipeline/recipes/sip/CMakeLists.txt @@ -6,19 +6,22 @@ python_install( master/bbs.py master/cimager.py master/cep2_datamapper.py + master/calibrator_pipeline.py master/compression_pipeline.py master/count_timesteps.py master/datamapper.py master/demixing.py + master/dppp.py master/flag_baseline.py master/make_flaggable.py - master/new_dppp.py - master/new_vdsmaker.py + master/new_bbs.py master/parmdb.py master/rficonsole.py master/skymodel.py master/sourcedb.py master/storagemapper.py + master/target_pipeline.py + master/vdsmaker.py master/vdsreader.py nodes/bbs.py nodes/cimager.py @@ -27,6 +30,7 @@ python_install( nodes/dppp.py nodes/flag_baseline.py nodes/make_flaggable.py + nodes/new_bbs.py nodes/parmdb.py nodes/rficonsole.py nodes/sourcedb.py diff --git a/CEP/Pipeline/recipes/sip/master/bbs.py b/CEP/Pipeline/recipes/sip/master/bbs.py index 773b2eb6f7c..b0ed9c6709a 100644 --- a/CEP/Pipeline/recipes/sip/master/bbs.py +++ b/CEP/Pipeline/recipes/sip/master/bbs.py @@ -149,8 +149,8 @@ class bbs(BaseRecipe): inputs['nproc'] = self.inputs['nproc'] inputs['directory'] = os.path.dirname(vds_file) outputs = LOFARoutput(self.inputs) - if self.cook_recipe('new_vdsmaker', inputs, outputs): - self.logger.warn("new_vdsmaker reports failure") + if self.cook_recipe('vdsmaker', inputs, outputs): + self.logger.warn("vdsmaker reports failure") return 1 self.logger.debug("BBS GVDS is %s" % (vds_file,)) diff --git a/CEP/Pipeline/recipes/sip/master/calibrator_pipeline.py b/CEP/Pipeline/recipes/sip/master/calibrator_pipeline.py new file mode 100644 index 00000000000..af868da47b8 --- /dev/null +++ b/CEP/Pipeline/recipes/sip/master/calibrator_pipeline.py @@ -0,0 +1,120 @@ +# LOFAR IMAGING PIPELINE +# +# Calibrator Pipeline recipe +# Marcel Loose, 2011 +# loose@astron.nl +# ------------------------------------------------------------------------------ + +import os +import sys + +from lofarpipe.support.control import control +from lofar.parameterset import parameterset + +class calibrator_pipeline(control): + """ + The calibrator pipeline can be used to determine the instrument database + (parmdb) from the observation of a known "calibrator" source. + + This pipeline will perform the following operations: + - Create a empty parmdb for BBS + - Run makesourcedb on skymodel files for calibrator source(s) and the + Ateam, which are to be stored in a standard place ($LOFARROOT/share) + - DPPP: flagging, using standard parset + - Demix the relevant A-team sources (for now using python script, later + to use DPPP), using the A-team sourcedb. + - Run BBS to calibrate the calibrator source(s), again using standard + parset, and the sourcedb made earlier + """ + + def __init__(self): + control.__init__(self) + self.parset = parameterset() + + + def usage(self): + print >> sys.stderr, "Usage: %s [options] <parset-file>" % sys.argv[0] + return 1 + + + def go(self): + """ + Read the parset-file that was given as input argument, and set the + jobname before calling the base-class's `go()` method. + """ + try: + parset_file = self.inputs['args'][0] + except IndexError: + return self.usage() + self.parset.adoptFile(parset_file) + # Set job-name to basename of parset-file w/o extension, if it's not + # set on the command-line with '-j' or '--job-name' + if not self.inputs.has_key('job_name'): + self.inputs['job_name'] = ( + os.path.splitext(os.path.basename(parset_file))[0] + ) + super(calibrator_pipeline, self).go() + + + def pipeline_logic(self): + """ + Define the individual tasks that comprise the current pipeline. + This method will be invoked by the base-class's `go()` method. + """ + + # Create a parameter-subset containing only python-control stuff. + py_parset = self.parset.makeSubset( + 'ObsSW.Observation.ObservationControl.PythonControl.') + + # Generate a datamap-file, which is a parset-file containing + # key/value pairs of hostname and list of MS-files. + data_mapfile = self.run_task( + "cep2_datamapper", + observation_dir=py_parset.getString('observationDirectory') +# parset=self.inputs['args'][0] + )['mapfile'] + + # Create an empty parmdb for DPPP + parmdb_mapfile = self.run_task("parmdb", data_mapfile)['mapfile'] + + # Create a sourcedb based on sourcedb's input argument "skymodel" + sourcedb_mapfile = self.run_task("sourcedb", data_mapfile)['mapfile'] + + # Produce a GVDS file describing the data on the compute nodes. + gvds_file = self.run_task("vdsmaker", data_mapfile)['gvds'] + + # Read metadata (start, end times, pointing direction) from GVDS. + vdsinfo = self.run_task("vdsreader", gvds=gvds_file) + + # Create a parameter-subset for DPPP and write it to file. + ndppp_parset = os.path.join( + self.config.get("layout", "job_directory"), + "parsets", "NDPPP.parset") + py_parset.makeSubset('DPPP.').writeFile(ndppp_parset) + + # Run the Default Pre-Processing Pipeline (DPPP); + dppp_mapfile = self.run_task( + "ndppp", data_mapfile, + data_start_time=vdsinfo['start_time'], + data_end_time=vdsinfo['end_time'], + parset=ndppp_parset + )['mapfile'] + + # Demix the relevant A-team sources + demix_mapfile = self.run_task("demixing", dppp_mapfile)['mapfile'] + + # Create a parameter-subset for BBS and write it to file. + bbs_parset = os.path.join( + self.config.get("layout", "job_directory"), + "parsets", "BBS.parset") + py_parset.makeSubset('BBS.').writeFile(bbs_parset) + + # Run BBS to calibrate the calibrator source(s). + self.run_task( + "new_bbs", demix_mapfile, + parset=bbs_parset, + instrument_mapfile=parmdb_mapfile, + sky_mapfile=sourcedb_mapfile) + +if __name__ == '__main__': + sys.exit(calibrator_pipeline().main()) diff --git a/CEP/Pipeline/recipes/sip/master/cep2_datamapper.py b/CEP/Pipeline/recipes/sip/master/cep2_datamapper.py index 15e8edf9c5b..2adfb984be1 100644 --- a/CEP/Pipeline/recipes/sip/master/cep2_datamapper.py +++ b/CEP/Pipeline/recipes/sip/master/cep2_datamapper.py @@ -9,6 +9,7 @@ import os.path import sys from lofar.mstools import findFiles +from lofar.parameterset import parameterset from lofarpipe.support.baserecipe import BaseRecipe from lofarpipe.support.group_data import store_data_map @@ -33,7 +34,19 @@ class cep2_datamapper(BaseRecipe): ), 'observation_dir': ingredient.StringField( '--observation-dir', - help="Full path to the directory to search for MS files" + help="Full path to the directory to search for MS files " + "(deprecated)", + default="" + ), + 'observation_sap': ingredient.IntField( + '--observation-sap', + help="Sub-Array Pointing (deprecated)", + default=0 + ), + 'parset': ingredient.StringField( + '--parset', + help="Full path to the parset-file provided by MAC/SAS", + default="" ) } @@ -43,15 +56,60 @@ class cep2_datamapper(BaseRecipe): ) } + + def _read_files(self): + """Read data file locations from parset-file""" + self.logger.debug("Reading data file locations from parset-file: %s" % + self.inputs['parset']) + parset = parameterset(self.inputs['parset']) + filenames = parset.getStringVector( + 'ObsSW.Observation.DataProducts.Input_Correlated.filenames') + locations = parset.getStringVector( + 'ObsSW.Observation.DataProducts.Input_Correlated.locations') + return [''.join(x).split(':') for x in zip(locations, filenames)] + + + def _search_files(self): + """ + Search for the data-files. The value of `self.inputs['job_name']` is + used to compose the glob search pattern. It is split into parts + separated by '_'. The first part should (in principle) be identical to + the MAC/SAS observation ID (e.g., L29066). The second (optional) part + specifies the sub-array-pointing(e.g., 1); it defaults to 0. + """ + job_name_parts = self.inputs['job_name'].split('_') + job = job_name_parts[0] + sap = 0 + try: + errmsg = ( + "Job-name part indicating sub-array-pointing index is %s, " + "defaulting to 0" + ) + sap = int(job_name_parts[1]) + except IndexError: + self.logger.debug(errmsg % "missing") + except ValueError: + self.logger.warn(errmsg % "non-numeric") + ms_pattern = os.path.join( + self.inputs['observation_dir'], + '%s_SAP%03d_SB???_uv.MS{,.dppp}' % (job, sap) + ) + self.logger.debug("Searching for data files: %s" % ms_pattern) + data = findFiles(ms_pattern, '-1d') + return zip(data[0], data[1]) + + def go(self): self.logger.info("Starting CEP-II datamapper run") super(cep2_datamapper, self).go() - # Search for the data-files - data = findFiles(os.path.join(self.inputs['observation_dir'], - '*.{dppp,MS,dp3}'), - '-1d') - datamap = zip(data[0], data[1]) + if self.inputs['parset']: + datamap = self._read_files() + elif self.inputs['observation_dir']: + datamap = self._search_files() + else: + self.logger.error("Either observation_dir or parset must be given") + return 1 self.logger.info("Found %i datasets to process." % len(datamap)) self.logger.debug("datamap = %s" % datamap) @@ -59,10 +117,11 @@ class cep2_datamapper(BaseRecipe): # Write datamap-file create_directory(os.path.dirname(self.inputs['mapfile'])) store_data_map(self.inputs['mapfile'], datamap) - self.logger.debug("Wrote mapfile %s" % self.inputs['mapfile']) + self.logger.debug("Wrote mapfile: %s" % self.inputs['mapfile']) self.outputs['mapfile'] = self.inputs['mapfile'] return 0 + if __name__ == '__main__': sys.exit(cep2_datamapper().main()) diff --git a/CEP/Pipeline/recipes/sip/master/cimager.py b/CEP/Pipeline/recipes/sip/master/cimager.py index 9a04f9aecd0..6f541e9f4ff 100644 --- a/CEP/Pipeline/recipes/sip/master/cimager.py +++ b/CEP/Pipeline/recipes/sip/master/cimager.py @@ -131,8 +131,8 @@ class cimager(BaseRecipe, RemoteCommandRecipeMixIn): inputs['nproc'] = self.inputs['nproc'] inputs['directory'] = os.path.dirname(gvds_file) outputs = LOFARoutput(self.inputs) - if self.cook_recipe('new_vdsmaker', inputs, outputs): - self.logger.warn("new_vdsmaker reports failure") + if self.cook_recipe('vdsmaker', inputs, outputs): + self.logger.warn("vdsmaker reports failure") return 1 self.logger.debug("cimager GVDS is %s" % (gvds_file,)) diff --git a/CEP/Pipeline/recipes/sip/master/new_dppp.py b/CEP/Pipeline/recipes/sip/master/dppp.py similarity index 94% rename from CEP/Pipeline/recipes/sip/master/new_dppp.py rename to CEP/Pipeline/recipes/sip/master/dppp.py index 0fb9148975e..fdc5a5fda8f 100644 --- a/CEP/Pipeline/recipes/sip/master/new_dppp.py +++ b/CEP/Pipeline/recipes/sip/master/dppp.py @@ -23,7 +23,7 @@ from lofarpipe.support.remotecommand import ComputeJob from lofarpipe.support.group_data import load_data_map from lofarpipe.support.parset import Parset -class new_dppp(BaseRecipe, RemoteCommandRecipeMixIn): +class dppp(BaseRecipe, RemoteCommandRecipeMixIn): """ Runs DPPP (either ``NDPPP`` or -- in the unlikely event it's required -- ``IDPPP``) on a number of MeasurementSets. This is used for compressing @@ -99,7 +99,7 @@ class new_dppp(BaseRecipe, RemoteCommandRecipeMixIn): def go(self): self.logger.info("Starting DPPP run") - super(new_dppp, self).go() + super(dppp, self).go() # Keep track of "Total flagged" messages in the DPPP logs # ---------------------------------------------------------------------- @@ -110,12 +110,7 @@ class new_dppp(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.debug("Loading map from %s" % self.inputs['args']) data = load_data_map(self.inputs['args'][0]) - - # We can use the same node script as the "old" IPython dppp recipe - # ---------------------------------------------------------------------- - command = "python %s" % ( - self.__file__.replace('master', 'nodes').replace('new_dppp', 'dppp') - ) + command = "python %s" % (self.__file__.replace('master', 'nodes')) outnames = collections.defaultdict(list) jobs = [] for host, ms in data: @@ -169,4 +164,4 @@ class new_dppp(BaseRecipe, RemoteCommandRecipeMixIn): return 0 if __name__ == '__main__': - sys.exit(new_dppp().main()) + sys.exit(dppp().main()) diff --git a/CEP/Pipeline/recipes/sip/master/new_bbs.py b/CEP/Pipeline/recipes/sip/master/new_bbs.py new file mode 100644 index 00000000000..6db41854cc6 --- /dev/null +++ b/CEP/Pipeline/recipes/sip/master/new_bbs.py @@ -0,0 +1,423 @@ +# LOFAR IMAGING PIPELINE +# +# BBS (BlackBoard Selfcal) recipe +# John Swinbank, 2009-10 +# swinbank@transientskp.org +# ------------------------------------------------------------------------------ + +from __future__ import with_statement +import subprocess +import sys +import os +import threading +import tempfile +import shutil +import time +import signal + +from lofar.parameterset import parameterset + +from lofarpipe.support.baserecipe import BaseRecipe +from lofarpipe.support.group_data import load_data_map, store_data_map +from lofarpipe.support.lofarexceptions import PipelineException +from lofarpipe.support.pipelinelogging import CatchLog4CPlus +from lofarpipe.support.pipelinelogging import log_process_output +from lofarpipe.support.remotecommand import run_remote_command +from lofarpipe.support.remotecommand import ComputeJob +from lofarpipe.support.jobserver import job_server +import lofarpipe.support.utilities as utilities +import lofarpipe.support.lofaringredient as ingredient + + +class new_bbs(BaseRecipe): + """ + The bbs recipe coordinates running BBS on a group of MeasurementSets. It + runs both GlobalControl and KernelControl; as yet, SolverControl has not + been integrated. + + The recipe will also run the sourcedb and parmdb recipes on each of the + input MeasuementSets. + + **Arguments** + + A mapfile describing the data to be processed. + """ + inputs = { + 'control_exec': ingredient.ExecField( + '--control-exec', + dest="control_exec", + help="BBS Control executable" + ), + 'kernel_exec': ingredient.ExecField( + '--kernel-exec', + dest="kernel_exec", + help="BBS Kernel executable" + ), + 'initscript': ingredient.FileField( + '--initscript', + dest="initscript", + help="Initscript to source (ie, lofarinit.sh)" + ), + 'parset': ingredient.FileField( + '-p', '--parset', + dest="parset", + help="BBS configuration parset" + ), + 'db_key': ingredient.StringField( + '--db-key', + dest="db_key", + help="Key to identify BBS session" + ), + 'db_host': ingredient.StringField( + '--db-host', + dest="db_host", + help="Database host with optional port (e.g. ldb001:5432)" + ), + 'db_user': ingredient.StringField( + '--db-user', + dest="db_user", + help="Database user" + ), + 'db_name': ingredient.StringField( + '--db-name', + dest="db_name", + help="Database name" + ), + 'instrument_mapfile': ingredient.FileField( + '--instrument-mapfile', + help="Full path to the mapfile containing the names of the " + "instrument model files generated by the `parmdb` recipe" + ), + 'sky_mapfile': ingredient.FileField( + '--sky-mapfile', + help="Full path to the mapfile containing the names of the " + "sky model files generated by the `sourcedb` recipe" + ), + 'data_mapfile': ingredient.StringField( + '--data-mapfile', + help="Full path to the mapfile containing the names of the " + "data files that were processed by BBS (clobbered if exists)" + ) + } + + def __init__(self): + super(new_bbs, self).__init__() + self.parset = parameterset() + self.killswitch = threading.Event() + + + def _set_input(self, in_key, ps_key): + """ + Set the input-key `in_key` to the value of `ps_key` in the parset, if + that is defined. + """ + try: + self.inputs[in_key] = self.parset.getString(ps_key) + except RuntimeError, e: + self.logger.warn(str(e)) + + + def _make_bbs_map(self): + """ + This method bundles the contents of three different map-files. + All three map-files contain a per-node list of filenames as parset. + The contents of these files are related. The elements of the lists + form triplets of MS-file, its associated instrument model and its + associated sky model. + + The returned data structure `bbs_map` is a list of tuples, where + each tuple is a pair of hostname and the aforementioned triplet. + + For example: + bbs_map[0] = ('locus001', + ('/data/L29697/L29697_SAP000_SB000_uv.MS', + '/data/scratch/loose/L29697/L29697_SAP000_SB000_uv.MS.instrument', + '/data/scratch/loose/L29697/L29697_SAP000_SB000_uv.MS.sky') + ) + """ + self.logger.debug("Creating BBS map-file using: %s, %s, %s" % + (self.inputs['args'][0], + self.inputs['instrument_mapfile'], + self.inputs['sky_mapfile'])) + data_map = parameterset(self.inputs['args'][0]) + instrument_map = parameterset(self.inputs['instrument_mapfile']) + sky_map = parameterset(self.inputs['sky_mapfile']) + bbs_map = [] + for host in data_map.keys(): + data = data_map.getStringVector(host, []) + instrument = instrument_map.getStringVector(host, []) + sky = sky_map.getStringVector(host, []) + triplets = zip(data, instrument, sky) + for triplet in triplets: + bbs_map.append((host, triplet)) + # Error handling and reporting + if not len(data) == len(instrument) == len(sky): + self.logger.warn( + "Number of data files (%d) does not match with number of " + "instrument files (%d) or number of skymodel files (%d) " + "on %s" % (len(data), len(instrument), len(sky), host)) + if len(triplets) > 0: + msg = "The following triplets will be used: " + msg += ", ".join([str(t) for t in triplets]) + self.logger.warn(msg) + if len(triplets) < len(data): + msg = "The following data files will not be processed: " + msg += ", ".join([str(t) for t in data[len(triplets):]]) + self.logger.warn(msg) + # Store data mapfile containing list of files to be processed by BBS. + data_map = [] + for (host, triplet) in bbs_map: + data_map.append((host, triplet[0])) + store_data_map(self.inputs['data_mapfile'], data_map) + + return bbs_map + + + def go(self): + self.logger.info("Starting BBS run") + super(new_bbs, self).go() + + # Check for relevant input parameters in the parset-file + # ---------------------------------------------------------------------- + self.logger.debug("Reading parset from %s" % self.inputs['parset']) + self.parset = parameterset(self.inputs['parset']) + + self._set_input('db_host', 'BBSControl.BBDB.Host') + self._set_input('db_user', 'BBSControl.BBDB.User') + self._set_input('db_name', 'BBSControl.BBDB.Name') + self._set_input('db_key', 'BBSControl.BBDB.Key') + + #self.logger.debug("self.inputs = %s" % self.inputs) + + # Clean the blackboard database + # ---------------------------------------------------------------------- + self.logger.info( + "Cleaning BBS database for key '%s'" % (self.inputs['db_key']) + ) + command = ["psql", + "-h", self.inputs['db_host'], + "-U", self.inputs['db_user'], + "-d", self.inputs['db_name'], + "-c", "DELETE FROM blackboard.session WHERE key='%s';" % + self.inputs['db_key'] + ] + self.logger.debug(command) + if subprocess.call(command) != 0: + self.logger.warning( + "Failed to clean BBS database for key '%s'" % + self.inputs['db_key'] + ) + + # Create a bbs_map describing the file mapping on disk + # ---------------------------------------------------------------------- + bbs_map = self._make_bbs_map() + + # Produce a GVDS file, describing the data that must be processed. + gvds_file = self.run_task( + "vdsmaker", + self.inputs['data_mapfile'] + )['gvds'] + + # Construct a parset for BBS GlobalControl by patching the GVDS + # file and database information into the supplied template + # ------------------------------------------------------------------ + self.logger.debug("Building parset for BBS control") + bbs_parset = utilities.patch_parset( + self.parset.makeSubset("BBSControl."), + { + 'Observation': gvds_file, + 'BBDB.Key': self.inputs['db_key'], + 'BBDB.Name': self.inputs['db_name'], + 'BBDB.User': self.inputs['db_user'], + 'BBDB.Host': self.inputs['db_host'], + #'BBDB.Port': self.inputs['db_name'], + } + ) + self.logger.debug("BBS control parset is %s" % (bbs_parset,)) + + try: + # When one of our processes fails, we set the killswitch. + # Everything else will then come crashing down, rather than + # hanging about forever. + # -------------------------------------------------------------- + self.killswitch = threading.Event() + self.killswitch.clear() + signal.signal(signal.SIGTERM, self.killswitch.set) + + # GlobalControl runs in its own thread + # -------------------------------------------------------------- + run_flag = threading.Event() + run_flag.clear() + bbs_control = threading.Thread( + target=self._run_bbs_control, + args=(bbs_parset, run_flag) + ) + bbs_control.start() + run_flag.wait() # Wait for control to start before proceeding + + # We run BBS KernelControl on each compute node by directly + # invoking the node script using SSH + # Note that we use a job_server to send out job details and + # collect logging information, so we define a bunch of + # ComputeJobs. However, we need more control than the generic + # ComputeJob.dispatch method supplies, so we'll control them + # with our own threads. + # -------------------------------------------------------------- + command = "python %s" % (self.__file__.replace('master', 'nodes')) + env = { + "LOFARROOT": utilities.read_initscript(self.logger, self.inputs['initscript'])["LOFARROOT"], + "PYTHONPATH": self.config.get('deploy', 'engine_ppath'), + "LD_LIBRARY_PATH": self.config.get('deploy', 'engine_lpath') + } + jobpool = {} + bbs_kernels = [] + with job_server(self.logger, jobpool, self.error) as (jobhost, jobport): + self.logger.debug("Job server at %s:%d" % (jobhost, jobport)) + for job_id, details in enumerate(bbs_map): + host, files = details + jobpool[job_id] = ComputeJob( + host, command, + arguments=[ + self.inputs['kernel_exec'], + self.inputs['initscript'], + files, + self.inputs['db_key'], + self.inputs['db_name'], + self.inputs['db_user'], + self.inputs['db_host'] + ] + ) + bbs_kernels.append( + threading.Thread( + target=self._run_bbs_kernel, + args=(host, command, env, job_id, + jobhost, str(jobport) + ) + ) + ) + self.logger.info("Starting %d threads" % len(bbs_kernels)) + [thread.start() for thread in bbs_kernels] + self.logger.debug("Waiting for all kernels to complete") + [thread.join() for thread in bbs_kernels] + + + # When GlobalControl finishes, our work here is done + # ---------------------------------------------------------- + self.logger.info("Waiting for GlobalControl thread") + bbs_control.join() + finally: + os.unlink(bbs_parset) + if self.killswitch.isSet(): + # If killswitch is set, then one of our processes failed so + # the whole run is invalid + # ---------------------------------------------------------- + return 1 + + return 0 + + def _run_bbs_kernel(self, host, command, env, *arguments): + """ + Run command with arguments on the specified host using ssh. Return its + return code. + + The resultant process is monitored for failure; see + _monitor_process() for details. + """ + try: + bbs_kernel_process = run_remote_command( + self.config, + self.logger, + host, + command, + env, + arguments=arguments + ) + except Exception, e: + self.logger.exception("BBS Kernel failed to start") + self.killswitch.set() + return 1 + result = self._monitor_process(bbs_kernel_process, "BBS Kernel on %s" % host) + sout, serr = bbs_kernel_process.communicate() + serr = serr.replace("Connection to %s closed.\r\n" % host, "") + log_process_output("SSH session (BBS kernel)", sout, serr, self.logger) + return result + + def _run_bbs_control(self, bbs_parset, run_flag): + """ + Run BBS Global Control and wait for it to finish. Return its return + code. + """ + env = utilities.read_initscript(self.logger, self.inputs['initscript']) + self.logger.info("Running BBS GlobalControl") + working_dir = tempfile.mkdtemp() + with CatchLog4CPlus( + working_dir, + self.logger.name + ".GlobalControl", + os.path.basename(self.inputs['control_exec']) + ): + with utilities.log_time(self.logger): + try: + bbs_control_process = utilities.spawn_process( + [ + self.inputs['control_exec'], + bbs_parset, + "0" + ], + self.logger, + cwd=working_dir, + env=env + ) + # _monitor_process() needs a convenient kill() method. + bbs_control_process.kill = lambda : os.kill(bbs_control_process.pid, signal.SIGKILL) + except OSError, e: + self.logger.error("Failed to spawn BBS Control (%s)" % str(e)) + self.killswitch.set() + return 1 + finally: + run_flag.set() + + returncode = self._monitor_process( + bbs_control_process, "BBS Control" + ) + sout, serr = bbs_control_process.communicate() + shutil.rmtree(working_dir) + log_process_output( + self.inputs['control_exec'], sout, serr, self.logger + ) + return returncode + + def _monitor_process(self, process, name="Monitored process"): + """ + Monitor a process for successful exit. If it fails, set the kill + switch, so everything else gets killed too. If the kill switch is set, + then kill this process off. + + Name is an optional parameter used only for identification in logs. + """ + while True: + try: + returncode = process.poll() + if returncode == None: # Process still running + time.sleep(1) + elif returncode != 0: # Process broke! + self.logger.warn( + "%s returned code %d; aborting run" % (name, returncode) + ) + self.killswitch.set() + break + else: # Process exited cleanly + self.logger.info("%s clean shutdown" % (name)) + break + if self.killswitch.isSet(): # Other process failed; abort + self.logger.warn("Killing %s" % (name)) + process.kill() + returncode = process.wait() + break + except: + # An exception here is likely a ctrl-c or similar. Whatever it + # is, we bail out. + self.killswitch.set() + return returncode + +if __name__ == '__main__': + sys.exit(new_bbs().main()) diff --git a/CEP/Pipeline/recipes/sip/master/parmdb.py b/CEP/Pipeline/recipes/sip/master/parmdb.py index bcd8d3aa930..b2e9041af1b 100644 --- a/CEP/Pipeline/recipes/sip/master/parmdb.py +++ b/CEP/Pipeline/recipes/sip/master/parmdb.py @@ -10,6 +10,7 @@ import os import subprocess import shutil import tempfile +import collections from lofarpipe.support.baserecipe import BaseRecipe from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn @@ -18,6 +19,7 @@ from lofarpipe.support.group_data import load_data_map from lofarpipe.support.pipelinelogging import log_process_output import lofarpipe.support.utilities as utilities import lofarpipe.support.lofaringredient as ingredient +from lofarpipe.support.parset import Parset template = """ create tablename="%s" @@ -54,6 +56,21 @@ class parmdb(BaseRecipe, RemoteCommandRecipeMixIn): '--nproc', help="Maximum number of simultaneous processes per compute node", default=8 + ), + 'suffix': ingredient.StringField( + '--suffix', + help="Suffix of the table name of the instrument model", + default=".instrument" + ), + 'working_directory': ingredient.StringField( + '-w', '--working-directory', + help="Working directory used on output nodes. " + "Results will be written here." + ), + 'mapfile': ingredient.StringField( + '--mapfile', + help="Full path of mapfile to produce; it will contain " + "a list of the generated instrument-model files" ) } @@ -69,7 +86,7 @@ class parmdb(BaseRecipe, RemoteCommandRecipeMixIn): pdbdir = tempfile.mkdtemp( dir=self.config.get("layout", "job_directory") ) - pdbfile = os.path.join(pdbdir, 'instrument') + pdbfile = os.path.join(pdbdir, self.inputs['suffix']) try: parmdbm_process = subprocess.Popen( @@ -93,10 +110,25 @@ class parmdb(BaseRecipe, RemoteCommandRecipeMixIn): data = load_data_map(self.inputs['args'][0]) command = "python %s" % (self.__file__.replace('master', 'nodes')) + outnames = collections.defaultdict(list) jobs = [] for host, ms in data: + outnames[host].append( + os.path.join( + self.inputs['working_directory'], + self.inputs['job_name'], + os.path.basename(ms) + self.inputs['suffix'] + ) + ) jobs.append( - ComputeJob(host, command, arguments=[ms, pdbfile]) + ComputeJob( + host, + command, + arguments=[ + pdbfile, + outnames[host][-1] + ] + ) ) self._schedule_jobs(jobs, max_per_node=self.inputs['nproc']) @@ -108,8 +140,12 @@ class parmdb(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.warn("Detected failed parmdb job") return 1 else: - self.outputs['mapfile'] = self.inputs['args'][0] + self.logger.debug("Writing instrument map file: %s" % + self.inputs['mapfile']) + Parset.fromDict(outnames).writeFile(self.inputs['mapfile']) + self.outputs['mapfile'] = self.inputs['mapfile'] return 0 + if __name__ == '__main__': sys.exit(parmdb().main()) diff --git a/CEP/Pipeline/recipes/sip/master/sourcedb.py b/CEP/Pipeline/recipes/sip/master/sourcedb.py index b75da9c7b5c..a196c0c7f46 100644 --- a/CEP/Pipeline/recipes/sip/master/sourcedb.py +++ b/CEP/Pipeline/recipes/sip/master/sourcedb.py @@ -7,6 +7,7 @@ from __future__ import with_statement import os +import collections import lofarpipe.support.utilities as utilities import lofarpipe.support.lofaringredient as ingredient @@ -15,6 +16,7 @@ from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn from lofarpipe.support.clusterlogger import clusterlogger from lofarpipe.support.group_data import load_data_map from lofarpipe.support.remotecommand import ComputeJob +from lofarpipe.support.parset import Parset class sourcedb(BaseRecipe, RemoteCommandRecipeMixIn): """ @@ -31,17 +33,30 @@ class sourcedb(BaseRecipe, RemoteCommandRecipeMixIn): 'executable': ingredient.ExecField( '--executable', help="Full path to makesourcedb executable", - default="/opt/LofIm/daily/lofar/bin/makesourcedb" ), 'skymodel': ingredient.FileField( '-s', '--skymodel', - dest="skymodel", help="Input sky catalogue" ), + 'mapfile': ingredient.StringField( + '--mapfile', + help="Full path of mapfile to produce; it will contain " + "a list of the generated sky-model files" + ), 'nproc': ingredient.IntField( '--nproc', help="Maximum number of simultaneous processes per compute node", default=8 + ), + 'suffix': ingredient.StringField( + '--suffix', + help="Suffix of the table name of the sky model", + default=".sky" + ), + 'working_directory': ingredient.StringField( + '-w', '--working-directory', + help="Working directory used on output nodes. " + "Results will be written here." ) } @@ -59,12 +74,24 @@ class sourcedb(BaseRecipe, RemoteCommandRecipeMixIn): data = load_data_map(self.inputs['args'][0]) command = "python %s" % (self.__file__.replace('master', 'nodes')) + outnames = collections.defaultdict(list) jobs = [] for host, ms in data: + outnames[host].append( + os.path.join( + self.inputs['working_directory'], + self.inputs['job_name'], + os.path.basename(ms) + self.inputs['suffix'] + ) + ) jobs.append( ComputeJob( - host, command, arguments=[ - self.inputs['executable'], ms, self.inputs['skymodel'] + host, + command, + arguments=[ + self.inputs['executable'], + self.inputs['skymodel'], + outnames[host][-1] ] ) ) @@ -73,7 +100,10 @@ class sourcedb(BaseRecipe, RemoteCommandRecipeMixIn): if self.error.isSet(): return 1 else: - self.outputs['mapfile'] = self.inputs['args'][0] + self.logger.debug("Writing sky map file: %s" % + self.inputs['mapfile']) + Parset.fromDict(outnames).writeFile(self.inputs['mapfile']) + self.outputs['mapfile'] = self.inputs['mapfile'] return 0 if __name__ == '__main__': diff --git a/CEP/Pipeline/recipes/sip/master/target_pipeline.py b/CEP/Pipeline/recipes/sip/master/target_pipeline.py new file mode 100644 index 00000000000..ce584aeb459 --- /dev/null +++ b/CEP/Pipeline/recipes/sip/master/target_pipeline.py @@ -0,0 +1,128 @@ +# LOFAR IMAGING PIPELINE +# +# Calibrator Pipeline recipe +# Marcel Loose, 2011 +# loose@astron.nl +# ------------------------------------------------------------------------------ + +import os +import sys + +from lofarpipe.support.control import control +from lofar.parameterset import parameterset +import lofarpipe.support.lofaringredient as ingredient + +class target_pipeline(control): + """ + The target pipeline can be used to calibrate the observation of a "target" + source using an instrument database that was previously determined using + the calibrator_pipeline. + + This pipeline will perform the following operations: + - DPPP: flagging, using standard parset + - Demix the relevant A-team sources (for now using python script, later + to use DPPP), using the A-team sourcedb. + - Run BBS to correct for instrumental effects using the instrument database + from an earlier calibrator_pipeline run. + """ + + # These input arguments should ultimately be provided by the parameterset + # file, once the scheduler will fill in the appropriate keys. For now, + # let the user specify the path to the instrument mapfile. + inputs = { + 'instrument_mapfile': ingredient.FileField( + '--instrument-mapfile', + help="Full path to the mapfile containing the names of the " + "instrument model files generated by the `parmdb` recipe" + ) + } + + def __init__(self): + control.__init__(self) + self.parset = parameterset() + + + def usage(self): + print >> sys.stderr, "Usage: %s [options] <parset-file>" % sys.argv[0] + return 1 + + + def go(self): + """ + Read the parset-file that was given as input argument, and set the + jobname before calling the base-class's `go()` method. + """ + try: + parset_file = self.inputs['args'][0] + except IndexError: + return self.usage() + self.parset.adoptFile(parset_file) + # Set job-name to basename of parset-file w/o extension, if it's not + # set on the command-line with '-j' or '--job-name' + if not self.inputs.has_key('job_name'): + self.inputs['job_name'] = ( + os.path.splitext(os.path.basename(parset_file))[0] + ) + super(target_pipeline, self).go() + + + def pipeline_logic(self): + """ + Define the individual tasks that comprise the current pipeline. + This method will be invoked by the base-class's `go()` method. + """ + + # Create a parameter-subset containing only python-control stuff. + py_parset = self.parset.makeSubset( + 'ObsSW.Observation.ObservationControl.PythonControl.') + + # Generate a datamap-file, which is a parset-file containing + # key/value pairs of hostname and list of MS-files. + data_mapfile = self.run_task( + "cep2_datamapper", + observation_dir=py_parset.getString('observationDirectory') +# parset=self.inputs['args'][0] + )['mapfile'] + + # Create a sourcedb based on sourcedb's input argument "skymodel" + # (see, e.g., tasks.cfg file). + sourcedb_mapfile = self.run_task("sourcedb", data_mapfile)['mapfile'] + + # Produce a GVDS file describing the data on the compute nodes. + gvds_file = self.run_task("vdsmaker", data_mapfile)['gvds'] + + # Read metadata (e.g., start- and end-time) from the GVDS file. + vdsinfo = self.run_task("vdsreader", gvds=gvds_file) + + # Create a parameter-subset for DPPP and write it to file. + ndppp_parset = os.path.join( + self.config.get("layout", "job_directory"), + "parsets", "NDPPP.parset") + py_parset.makeSubset('DPPP.').writeFile(ndppp_parset) + + # Run the Default Pre-Processing Pipeline (DPPP); + dppp_mapfile = self.run_task( + "ndppp", data_mapfile, + data_start_time=vdsinfo['start_time'], + data_end_time=vdsinfo['end_time'], + parset=ndppp_parset + )['mapfile'] + + # Demix the relevant A-team sources + demix_mapfile = self.run_task("demixing", dppp_mapfile)['mapfile'] + + # Create a parameter-subset for BBS and write it to file. + bbs_parset = os.path.join( + self.config.get("layout", "job_directory"), + "parsets", "BBS.parset") + py_parset.makeSubset('BBS.').writeFile(bbs_parset) + + # Run BBS to calibrate the target source(s). + self.run_task( + "new_bbs", demix_mapfile, + parset=bbs_parset, + instrument_mapfile=self.inputs['instrument_mapfile'], + sky_mapfile=sourcedb_mapfile) + +if __name__ == '__main__': + sys.exit(target_pipeline().main()) diff --git a/CEP/Pipeline/recipes/sip/master/new_vdsmaker.py b/CEP/Pipeline/recipes/sip/master/vdsmaker.py similarity index 94% rename from CEP/Pipeline/recipes/sip/master/new_vdsmaker.py rename to CEP/Pipeline/recipes/sip/master/vdsmaker.py index 3f593a9de6d..6ca1ca56efd 100644 --- a/CEP/Pipeline/recipes/sip/master/new_vdsmaker.py +++ b/CEP/Pipeline/recipes/sip/master/vdsmaker.py @@ -21,7 +21,7 @@ from lofarpipe.support.remotecommand import ComputeJob from lofarpipe.support.group_data import load_data_map from lofarpipe.support.pipelinelogging import log_process_output -class new_vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): +class vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): """ Generate a GVDS file (and, optionally, individual VDS files per subband; see the ``unlink`` input parameter) describing a collection of @@ -65,16 +65,14 @@ class new_vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): } def go(self): - super(new_vdsmaker, self).go() + super(vdsmaker, self).go() # Load file <-> compute node mapping from disk # ---------------------------------------------------------------------- self.logger.debug("Loading map from %s" % self.inputs['args'][0]) data = load_data_map(self.inputs['args'][0]) - command = "python %s" % ( - self.__file__.replace('master', 'nodes').replace('new_vdsmaker', 'vdsmaker') - ) + command = "python %s" % (self.__file__.replace('master', 'nodes')) jobs = [] vdsnames = [] for host, ms in data: @@ -137,4 +135,4 @@ class new_vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): return 0 if __name__ == '__main__': - sys.exit(new_vdsmaker().main()) + sys.exit(vdsmaker().main()) diff --git a/CEP/Pipeline/recipes/sip/master/vdsreader.py b/CEP/Pipeline/recipes/sip/master/vdsreader.py index 968c62231fb..8ba0bb9fcd4 100644 --- a/CEP/Pipeline/recipes/sip/master/vdsreader.py +++ b/CEP/Pipeline/recipes/sip/master/vdsreader.py @@ -9,7 +9,7 @@ import lofarpipe.support.utilities as utilities import lofarpipe.support.lofaringredient as ingredient from lofarpipe.support.baserecipe import BaseRecipe -from lofarpipe.support.utilities import get_parset +from lofar.parameterset import parameterset class vdsreader(BaseRecipe): @@ -40,7 +40,7 @@ class vdsreader(BaseRecipe): super(vdsreader, self).go() try: - gvds = get_parset(self.inputs['gvds']) + gvds = parameterset(self.inputs['gvds']) except: self.logger.error("Unable to read G(V)DS file") raise diff --git a/CEP/Pipeline/recipes/sip/nodes/demix/shiftphasecenter.py b/CEP/Pipeline/recipes/sip/nodes/demix/shiftphasecenter.py index f12ddf397da..001c4dd6591 100644 --- a/CEP/Pipeline/recipes/sip/nodes/demix/shiftphasecenter.py +++ b/CEP/Pipeline/recipes/sip/nodes/demix/shiftphasecenter.py @@ -125,7 +125,8 @@ def shiftphasecenter (msname, targets, N_channel_per_cell, N_time_per_cell): time_stop = time.time() #print time_stop - time_start #sys.stdout.flush() - + + t_field1.close() t1.close() + t.close() - t_field1.close() diff --git a/CEP/Pipeline/recipes/sip/nodes/new_bbs.py b/CEP/Pipeline/recipes/sip/nodes/new_bbs.py new file mode 100644 index 00000000000..d99d66b578a --- /dev/null +++ b/CEP/Pipeline/recipes/sip/nodes/new_bbs.py @@ -0,0 +1,113 @@ +# LOFAR IMAGING PIPELINE +# +# BBS (BlackBoard Selfcal) node +# John Swinbank, 2010 +# swinbank@transientskp.org +# ------------------------------------------------------------------------------ + +from __future__ import with_statement +from subprocess import Popen, CalledProcessError, PIPE, STDOUT +from tempfile import mkstemp, mkdtemp +import os +import sys +import shutil + +from lofarpipe.support.pipelinelogging import CatchLog4CPlus +from lofarpipe.support.lofarnode import LOFARnodeTCP +from lofarpipe.support.utilities import read_initscript +from lofarpipe.support.utilities import get_mountpoint +from lofarpipe.support.utilities import log_time +from lofarpipe.support.pipelinelogging import log_process_output + +from lofar.parameterset import parameterset + + +class new_bbs(LOFARnodeTCP): + # Handles running a single BBS kernel on a compute node + # -------------------------------------------------------------------------- + def run( + self, executable, initscript, infiles, + db_key, db_name, db_user, db_host + ): + # executable : path to KernelControl executable + # initscript : path to lofarinit.sh + # infiles : tuple of MS, instrument- and sky-model files + # db_* : database connection parameters + # ---------------------------------------------------------------------- + self.logger.debug("executable = %s" % executable) + self.logger.debug("initscript = %s" % initscript) + self.logger.debug("infiles = %s" % str(infiles)) + self.logger.debug("db_key = %s" % db_key) + self.logger.debug("db_name = %s" % db_name) + self.logger.debug("db_user = %s" % db_user) + self.logger.debug("db_host = %s" % db_host) + + (ms, parmdb_instrument, parmdb_sky) = infiles + + with log_time(self.logger): + if os.path.exists(ms): + self.logger.info("Processing %s" % (ms)) + else: + self.logger.error("Dataset %s does not exist" % (ms)) + return 1 + + # Build a configuration parset specifying database parameters + # for the kernel + # ------------------------------------------------------------------ + self.logger.debug("Setting up BBSKernel parset") + # Getting the filesystem must be done differently, using the + # DataProduct keys in the parset provided by the scheduler. + filesystem = "%s:%s" % (os.uname()[1], get_mountpoint(ms)) + fd, parset_file = mkstemp() + kernel_parset = parameterset() + for key, value in { + "ObservationPart.Filesystem": filesystem, + "ObservationPart.Path": ms, + "BBDB.Key": db_key, + "BBDB.Name": db_name, + "BBDB.User": db_user, + "BBDB.Host": db_host, + "ParmDB.Sky": parmdb_sky, + "ParmDB.Instrument": parmdb_instrument + }.iteritems(): + kernel_parset.add(key, value) + kernel_parset.writeFile(parset_file) + os.close(fd) + self.logger.debug("BBSKernel parset written to %s" % parset_file) + + # Run the kernel + # Catch & log output from the kernel logger and stdout + # ------------------------------------------------------------------ + working_dir = mkdtemp() + env = read_initscript(self.logger, initscript) + try: + cmd = [executable, parset_file, "0"] + self.logger.debug("Executing BBS kernel") + with CatchLog4CPlus( + working_dir, + self.logger.name + "." + os.path.basename(ms), + os.path.basename(executable), + ): + bbs_kernel_process = Popen( + cmd, stdout=PIPE, stderr=PIPE, cwd=working_dir + ) + sout, serr = bbs_kernel_process.communicate() + log_process_output("BBS kernel", sout, serr, self.logger) + if bbs_kernel_process.returncode != 0: + raise CalledProcessError( + bbs_kernel_process.returncode, executable + ) + except CalledProcessError, e: + self.logger.error(str(e)) + return 1 + finally: + os.unlink(parset_file) + shutil.rmtree(working_dir) + return 0 + +if __name__ == "__main__": + # If invoked directly, parse command line arguments for logger information + # and pass the rest to the run() method defined above + # -------------------------------------------------------------------------- + jobid, jobhost, jobport = sys.argv[1:4] + sys.exit(new_bbs(jobid, jobhost, jobport).run_with_stored_arguments()) diff --git a/CEP/Pipeline/recipes/sip/nodes/parmdb.py b/CEP/Pipeline/recipes/sip/nodes/parmdb.py index 1f65a80392b..ae6ca5a9779 100644 --- a/CEP/Pipeline/recipes/sip/nodes/parmdb.py +++ b/CEP/Pipeline/recipes/sip/nodes/parmdb.py @@ -5,21 +5,15 @@ import shutil, os.path import sys class parmdb(LOFARnodeTCP): - def run(self, infile, pdb): + def run(self, pdb_in, pdb_out): with log_time(self.logger): - if os.path.exists(infile): - self.logger.info("Processing %s" % (infile)) - else: - self.logger.error("Dataset %s does not exist" % (infile)) - return 1 - - output = os.path.join(infile, os.path.basename(pdb)) + self.logger.debug("Copying parmdb: %s --> %s" % (pdb_in, pdb_out)) # Remove any old parmdb database - shutil.rmtree(output, ignore_errors=True) + shutil.rmtree(pdb_out, ignore_errors=True) # And copy the new one into place - shutil.copytree(pdb, output) + shutil.copytree(pdb_in, pdb_out) return 0 diff --git a/CEP/Pipeline/recipes/sip/nodes/sourcedb.py b/CEP/Pipeline/recipes/sip/nodes/sourcedb.py index 220fdb5f594..98751299964 100644 --- a/CEP/Pipeline/recipes/sip/nodes/sourcedb.py +++ b/CEP/Pipeline/recipes/sip/nodes/sourcedb.py @@ -1,8 +1,9 @@ from __future__ import with_statement from subprocess import Popen, CalledProcessError, PIPE, STDOUT -import shutil -import os.path +import errno +import os import tempfile +import shutil import sys from lofarpipe.support.lofarnode import LOFARnodeTCP @@ -12,35 +13,42 @@ from lofarpipe.support.utilities import catch_segfaults class sourcedb(LOFARnodeTCP): - def run(self, executable, infile, catalogue): + def run(self, executable, catalogue, skydb): with log_time(self.logger): - if os.path.exists(infile): - self.logger.info("Processing %s" % (infile)) - else: - self.logger.error("Dataset %s does not exist" % (infile)) - return 1 - - output = os.path.join(infile, "sky") + # Create output directory if it does not yet exist. + skydb_dir = os.path.dirname(skydb) + try: + os.makedirs(skydb_dir) + self.logger.debug("Created output directory %s" % skydb_dir) + except OSError, err: + # Ignore error if directory already exists, otherwise re-raise + if err[0] != errno.EEXIST: + raise # Remove any old sky database - shutil.rmtree(output, ignore_errors=True) + shutil.rmtree(skydb, ignore_errors=True) - working_dir = tempfile.mkdtemp() + self.logger.info("Creating skymodel: %s" % (skydb)) + scratch_dir = tempfile.mkdtemp() try: - cmd = [executable, "format=<", "in=%s" % (catalogue), "out=%s" % (output)] + cmd = [executable, + "format=<", + "in=%s" % (catalogue), + "out=%s" % (skydb) + ] with CatchLog4CPlus( - working_dir, - self.logger.name + "." + os.path.basename(infile), + scratch_dir, + self.logger.name + "." + os.path.basename(skydb), os.path.basename(executable) ) as logger: - catch_segfaults(cmd, working_dir, None, logger) + catch_segfaults(cmd, scratch_dir, None, logger) except CalledProcessError, e: # For CalledProcessError isn't properly propagated by IPython # Temporary workaround... self.logger.error(str(e)) return 1 finally: - shutil.rmtree(working_dir) + shutil.rmtree(scratch_dir) return 0 diff --git a/CEP/Pipeline/recipes/sip/tasks.cfg b/CEP/Pipeline/recipes/sip/tasks.cfg index cb6a2c04faa..68ad2bb2d40 100644 --- a/CEP/Pipeline/recipes/sip/tasks.cfg +++ b/CEP/Pipeline/recipes/sip/tasks.cfg @@ -7,13 +7,14 @@ recipe = datamapper mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/datamapfile [ndppp] -recipe = new_dppp +recipe = dppp executable = %(lofarroot)s/bin/NDPPP initscript = %(lofarroot)s/lofarinit.sh working_directory = %(default_working_directory)s dry_run = False mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/compute_mapfile parset = %(runtime_directory)s/jobs/%(job_name)s/parsets/NDPPP.parset +nproc = 1 clobber = True [bbs] @@ -38,11 +39,15 @@ gvds = %(runtime_directory)s/jobs/%(job_name)s/vds/%(job_name)s.gvds [parmdb] recipe = parmdb executable = %(lofarroot)s/bin/parmdbm +working_directory = %(default_working_directory)s +mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/instrument_mapfile [sourcedb] recipe = sourcedb executable = %(lofarroot)s/bin/makesourcedb skymodel = %(runtime_directory)s/jobs/%(job_name)s/parsets/bbs.skymodel +mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/sky_mapfile +working_directory = %(default_working_directory)s [skymodel] recipe = skymodel @@ -50,7 +55,7 @@ min_flux = 0.5 skymodel_file = %(runtime_directory)s/jobs/%(job_name)s/parsets/bbs.skymodel [vdsmaker] -recipe = new_vdsmaker +recipe = vdsmaker directory = %(runtime_directory)s/jobs/%(job_name)s/vds gvds = %(runtime_directory)s/jobs/%(job_name)s/vds/%(job_name)s.gvds makevds = %(lofarroot)s/bin/makevds @@ -73,4 +78,18 @@ demix_parset_dir = /home/weeren/scripts/demixingfast skymodel = /home/weeren/scripts/Ateam_LBA_CC.skymodel working_directory = %(default_working_directory)s mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/compute_mapfile +nproc=1 +[new_bbs] +recipe = new_bbs +control_exec = %(lofarroot)s/bin/GlobalControl +kernel_exec = %(lofarroot)s/bin/KernelControl +initscript = %(lofarroot)s/lofarinit.sh +parset = %(runtime_directory)s/jobs/%(job_name)s/parsets/BBS.parset +db_key = %(job_name)s +db_host = ldb001 +db_user = postgres +db_name = loose +instrument_mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/instrument_mapfile +sky_mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/sky_mapfile +data_mapfile = %(runtime_directory)s/jobs/%(job_name)s/parsets/bbs_mapfile -- GitLab