diff --git a/.gitattributes b/.gitattributes index d65f947a0f6e02dbcb8c97472f3bb6bd1998754c..a095ab5c6422052ff3e3bcbfcc7a8e34e9c5d410 100644 --- a/.gitattributes +++ b/.gitattributes @@ -697,7 +697,6 @@ CEP/DP3/AOFlagger/src/strategy/control/actionblock.cpp -text CEP/DP3/AOFlagger/src/strategy/control/actionfactory.cpp -text CEP/DP3/AOFlagger/src/strategy/control/strategyreader.cpp -text CEP/DP3/AOFlagger/src/strategy/control/strategywriter.cpp -text -CEP/DP3/AOFlagger/src/strategy/imagesets/fitsimageset.cpp -text CEP/DP3/AOFlagger/src/strategy/imagesets/imageset.cpp -text CEP/DP3/AOFlagger/src/strategy/imagesets/msimageset.cpp -text CEP/DP3/AOFlagger/src/strategy/imagesets/parmimageset.cpp -text diff --git a/CEP/Pipeline/framework/lofarpipe/support/baserecipe.py b/CEP/Pipeline/framework/lofarpipe/support/baserecipe.py index c2ef0e3e0cc17779cc5f064b3555274064fbecd7..3b43a90c4864da9ab9b3f5cc6885d8723b31fab8 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/baserecipe.py +++ b/CEP/Pipeline/framework/lofarpipe/support/baserecipe.py @@ -8,20 +8,16 @@ from ConfigParser import NoOptionError, NoSectionError from ConfigParser import SafeConfigParser as ConfigParser from threading import Event -from functools import partial import os import sys -import inspect import logging import errno import lofarpipe.support.utilities as utilities -import lofarpipe.support.lofaringredient as ingredient -from lofarpipe.support.lofarexceptions import PipelineException +from lofarpipe.support.lofarexceptions import PipelineException, PipelineRecipeFailed from lofarpipe.cuisine.WSRTrecipe import WSRTrecipe from lofarpipe.support.lofaringredient import RecipeIngredients, LOFARinput, LOFARoutput -from lofarpipe.support.remotecommand import run_remote_command from lofarpipe.support.group_data import store_data_map class BaseRecipe(RecipeIngredients, WSRTrecipe): @@ -42,6 +38,11 @@ class BaseRecipe(RecipeIngredients, WSRTrecipe): super(BaseRecipe, self).__init__() self.error = Event() self.error.clear() + # Environment variables we like to pass on to the node script. + self.environment = dict( + (k,v) for (k,v) in os.environ.iteritems() + if k.endswith('PATH') or k.endswith('ROOT') + ) @property def __file__(self): diff --git a/CEP/Pipeline/framework/lofarpipe/support/lofarnode.py b/CEP/Pipeline/framework/lofarpipe/support/lofarnode.py index 24c3cc79bc9e4d028eb251661f43be418a39112f..4d74f4d278a420d2406a8a1cd1d23e8acb5e911a 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/lofarnode.py +++ b/CEP/Pipeline/framework/lofarpipe/support/lofarnode.py @@ -45,6 +45,7 @@ class LOFARnode(object): self.loghost = loghost self.logport = int(logport) self.outputs = {} + self.environment = os.environ def run_with_logging(self, *args): """ diff --git a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py index c468d54741afafef62c2a40ca76e4aab9987dbdf..efe57b50e86c9cf3c98b41781b673c820e36fd21 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py +++ b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py @@ -204,8 +204,8 @@ class ComputeJob(object): self.host, self.command, { - "PYTHONPATH": config.get('deploy', 'engine_ppath'), - "LD_LIBRARY_PATH": config.get('deploy', 'engine_lpath') + "PYTHONPATH": os.environ.get('PYTHONPATH'), + "LD_LIBRARY_PATH": os.environ.get('LD_LIBRARY_PATH') }, arguments = [id, jobhost, jobport] ) diff --git a/CEP/Pipeline/recipes/sip/master/dppp.py b/CEP/Pipeline/recipes/sip/master/dppp.py index 6335f37d66e52f9432ed97632dbe2705ff5a8f67..4066980ba5d6612602db5d59228b78bbe0f3d607 100644 --- a/CEP/Pipeline/recipes/sip/master/dppp.py +++ b/CEP/Pipeline/recipes/sip/master/dppp.py @@ -38,11 +38,6 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): '--executable', help="The full path to the relevant DPPP executable" ), - 'initscript': ingredient.FileField( - '--initscript', - help="The full path to an (Bourne) shell script which will " - "intialise the environment (ie, ``lofarinit.sh``)" - ), 'suffix': ingredient.StringField( '--suffix', default=".dppp", @@ -210,7 +205,7 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): sourcedb, self.inputs['parset'], self.inputs['executable'], - self.inputs['initscript'], + self.environment, self.inputs['demix_always'], self.inputs['demix_if_needed'], self.inputs['data_start_time'], diff --git a/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py b/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py index 05a82866e7fa727c920d39ccd915a9d84d3fb99c..ab883b0f47cc957db021ef1947ddf2b5d23943c6 100644 --- a/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py +++ b/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py @@ -36,11 +36,6 @@ class gainoutliercorrection(BaseRecipe, RemoteCommandRecipeMixIn): help="Full path to the `parmexportcal` executable, not settings this" " results in edit_parmdb behaviour" ), - 'initscript' : ingredient.FileField( - '--initscript', - help="The full path to an (Bourne) shell script which will " - "intialise the environment (i.e., ``lofarinit.sh``)" - ), 'suffix': ingredient.StringField( '--suffix', help="Suffix of the table name of the instrument model", @@ -119,7 +114,7 @@ class gainoutliercorrection(BaseRecipe, RemoteCommandRecipeMixIn): infile, outfile, self.inputs['executable'], - self.inputs['initscript'], + self.environment, self.inputs['sigma'] ] ) diff --git a/CEP/Pipeline/recipes/sip/master/imager_awimager.py b/CEP/Pipeline/recipes/sip/master/imager_awimager.py index e269c2d919e52b115c58d336c156c7a3c545b35d..1394e2f58d4cd4bac69aba9d46282b1cc2e0934a 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_awimager.py +++ b/CEP/Pipeline/recipes/sip/master/imager_awimager.py @@ -33,11 +33,6 @@ class imager_awimager(BaseRecipe, RemoteCommandRecipeMixIn): '--executable', help="The full path to the awimager executable" ), - 'initscript': ingredient.FileField( - '--initscript', - help='''The full path to an (Bourne) shell script which will\ - intialise the environment (ie, ``lofarinit.sh``)''' - ), 'parset': ingredient.FileField( '-p', '--parset', help="The full path to a awimager configuration parset." @@ -103,7 +98,7 @@ class imager_awimager(BaseRecipe, RemoteCommandRecipeMixIn): #construct and save the output name arguments = [self.inputs['executable'], - self.inputs['initscript'], + self.environment, self.inputs['parset'], self.inputs['working_directory'], self.inputs['output_image'], diff --git a/CEP/Pipeline/recipes/sip/master/imager_create_dbs.py b/CEP/Pipeline/recipes/sip/master/imager_create_dbs.py index 46501855d5056dd015c8f1eeaef03e2096dd0c45..b5afaec1b66f0b139d36ccd76a168198c3d6f76b 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_create_dbs.py +++ b/CEP/Pipeline/recipes/sip/master/imager_create_dbs.py @@ -36,11 +36,6 @@ class imager_create_dbs(BaseRecipe, RemoteCommandRecipeMixIn): 'working_directory': ingredient.StringField( '-w', '--working-directory', help="Working directory used on nodes. Results location" - ), - 'initscript': ingredient.FileField( - '--initscript', - help='''The full path to an (Bourne) shell script which will\ - initialise the environment (ie, ``lofarinit.sh``)''' ), 'sourcedb_suffix': ingredient.StringField( '--sourcedb-suffix', @@ -195,7 +190,7 @@ class imager_create_dbs(BaseRecipe, RemoteCommandRecipeMixIn): self.inputs["parmdb_executable"], slice_paths, self.inputs["parmdb_suffix"], - self.inputs["initscript"], + self.environment, self.inputs["working_directory"], self.inputs["makesourcedb_path"], self.inputs["source_list_path"]] diff --git a/CEP/Pipeline/recipes/sip/master/imager_finalize.py b/CEP/Pipeline/recipes/sip/master/imager_finalize.py index 152046ceb244d16d9c1c1a3438477d9b321cbdb7..c43c2b908c995b774ae6308f8086bd85df91a1f0 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_finalize.py +++ b/CEP/Pipeline/recipes/sip/master/imager_finalize.py @@ -19,11 +19,6 @@ class imager_finalize(BaseRecipe, RemoteCommandRecipeMixIn): This recipe does not have positional commandline arguments """ inputs = { - 'initscript': ingredient.FileField( - '--initscript', - help='''The full path to an (Bourne) shell script which will\ - intialise the environment (ie, ``lofarinit.sh``)''' - ), 'awimager_output_map': ingredient.FileField( '--awimager-output-mapfile', help="""Mapfile containing (host, path) pairs of created sky diff --git a/CEP/Pipeline/recipes/sip/master/imager_prepare.py b/CEP/Pipeline/recipes/sip/master/imager_prepare.py index 1f50c41ac140e88e3c2990b8441150ee18f05d36..db3384e60a6fe3ddeb9f65472df9f8642cb5c659 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/master/imager_prepare.py @@ -48,11 +48,6 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): '--ndppp-exec', help="The full path to the ndppp executable" ), - 'initscript': ingredient.FileField( - '--initscript', - help='''The full path to a shell script which will\ - intialise the environment (ie, ``lofarinit.sh``)''' - ), 'parset': ingredient.FileField( '-p', '--parset', help="The full path to a prepare parset" @@ -176,7 +171,7 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): paths_to_image_mapfiles.append((host, inputs_for_image_mapfile_path)) - arguments = [self.inputs['initscript'], + arguments = [self.environment, self.inputs['parset'], self.inputs['working_directory'], self.inputs['processed_ms_dir'], diff --git a/CEP/Pipeline/recipes/sip/master/imager_source_finding.py b/CEP/Pipeline/recipes/sip/master/imager_source_finding.py index f16d4025d51b6789bf749fb15acc7cd376d201d5..66654b97c80d295aae400a5377a315f714583daa 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_source_finding.py +++ b/CEP/Pipeline/recipes/sip/master/imager_source_finding.py @@ -24,10 +24,6 @@ class imager_source_finding(BaseRecipe, RemoteCommandRecipeMixIn): in. """ inputs = { - 'initscript': ingredient.FileField( - '--initscript', - help="Initscript to source (ie, lofarinit.sh)" - ), 'bdsm_parset_file_run1': ingredient.FileField( '--bdsm-parset-file-run1', help="Path to bdsm parameter set for the first sourcefinding run" @@ -103,7 +99,7 @@ class imager_source_finding(BaseRecipe, RemoteCommandRecipeMixIn): self.inputs["working_directory"], "bdsm_output.img"), self.inputs['sourcedb_target_path'], - self.inputs['initscript'], + self.environment, self.inputs['working_directory'], self.inputs['makesourcedb_path'] ] diff --git a/CEP/Pipeline/recipes/sip/master/new_bbs.py b/CEP/Pipeline/recipes/sip/master/new_bbs.py index 9d53190c2e5d248bedae1ee5928894bfcfc48569..3eb5407f07ba793d34c5868796cbc7a7ff2c2bb7 100644 --- a/CEP/Pipeline/recipes/sip/master/new_bbs.py +++ b/CEP/Pipeline/recipes/sip/master/new_bbs.py @@ -51,11 +51,6 @@ class new_bbs(BaseRecipe): dest="kernel_exec", help="BBS Kernel executable" ), - 'initscript': ingredient.FileField( - '--initscript', - dest="initscript", - help="Initscript to source (ie, lofarinit.sh)" - ), 'parset': ingredient.FileField( '-p', '--parset', dest="parset", @@ -262,11 +257,6 @@ class new_bbs(BaseRecipe): # with our own threads. # -------------------------------------------------------------- command = "python %s" % (self.__file__.replace('master', 'nodes')) - env = { - "LOFARROOT": utilities.read_initscript(self.logger, self.inputs['initscript'])["LOFARROOT"], - "PYTHONPATH": self.config.get('deploy', 'engine_ppath'), - "LD_LIBRARY_PATH": self.config.get('deploy', 'engine_lpath') - } jobpool = {} bbs_kernels = [] with job_server(self.logger, jobpool, self.error) as (jobhost, jobport): @@ -277,7 +267,6 @@ class new_bbs(BaseRecipe): host, command, arguments=[ self.inputs['kernel_exec'], - self.inputs['initscript'], files, self.inputs['db_key'], self.inputs['db_name'], @@ -288,9 +277,7 @@ class new_bbs(BaseRecipe): bbs_kernels.append( threading.Thread( target=self._run_bbs_kernel, - args=(host, command, env, job_id, - jobhost, str(jobport) - ) + args=(host, command, job_id, jobhost, str(jobport)) ) ) self.logger.info("Starting %d threads" % len(bbs_kernels)) @@ -314,7 +301,7 @@ class new_bbs(BaseRecipe): self.outputs['mapfile'] = self.inputs['data_mapfile'] return 0 - def _run_bbs_kernel(self, host, command, env, *arguments): + def _run_bbs_kernel(self, host, command, *arguments): """ Run command with arguments on the specified host using ssh. Return its return code. @@ -328,7 +315,7 @@ class new_bbs(BaseRecipe): self.logger, host, command, - env, + self.environment, arguments=arguments ) except Exception, e: @@ -346,7 +333,6 @@ class new_bbs(BaseRecipe): Run BBS Global Control and wait for it to finish. Return its return code. """ - env = utilities.read_initscript(self.logger, self.inputs['initscript']) self.logger.info("Running BBS GlobalControl") working_dir = tempfile.mkdtemp() with CatchLog4CPlus( @@ -364,7 +350,7 @@ class new_bbs(BaseRecipe): ], self.logger, cwd=working_dir, - env=env + env=self.environment ) # _monitor_process() needs a convenient kill() method. bbs_control_process.kill = lambda : os.kill(bbs_control_process.pid, signal.SIGKILL) diff --git a/CEP/Pipeline/recipes/sip/nodes/dppp.py b/CEP/Pipeline/recipes/sip/nodes/dppp.py index 67d2e13357718921816e9c2295c2c8eef05b0aec..1043612ad18f7101d1dec4498f4d1b30552d5ac9 100644 --- a/CEP/Pipeline/recipes/sip/nodes/dppp.py +++ b/CEP/Pipeline/recipes/sip/nodes/dppp.py @@ -16,7 +16,6 @@ import sys from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_time from lofarpipe.support.parset import patched_parset -from lofarpipe.support.utilities import read_initscript from lofarpipe.support.utilities import create_directory from lofarpipe.support.utilities import catch_segfaults from lofarpipe.support.lofarnode import LOFARnodeTCP @@ -26,7 +25,7 @@ class dppp(LOFARnodeTCP): def run( self, infile, outfile, parmdb, sourcedb, - parsetfile, executable, initscript, demix_always, demix_if_needed, + parsetfile, executable, environment, demix_always, demix_if_needed, start_time, end_time, nthreads, clobber ): # Debugging info @@ -36,7 +35,7 @@ class dppp(LOFARnodeTCP): self.logger.debug("sourcedb = %s" % sourcedb) self.logger.debug("parsetfile = %s" % parsetfile) self.logger.debug("executable = %s" % executable) - self.logger.debug("initscript = %s" % initscript) + self.logger.debug("environment = %s" % environment) self.logger.debug("demix_always = %s" % demix_always) self.logger.debug("demix_if_needed = %s" % demix_if_needed) self.logger.debug("start_time = %s" % start_time) @@ -44,6 +43,8 @@ class dppp(LOFARnodeTCP): self.logger.debug("nthreads = %s" % nthreads) self.logger.debug("clobber = %s" % clobber) + self.environment.update(environment) + if not nthreads: nthreads = 1 if not outfile: @@ -92,9 +93,8 @@ class dppp(LOFARnodeTCP): ) shutil.copytree(infile, tmpfile) - # Initialise environment. Limit number of threads used. - env = read_initscript(self.logger, initscript) - env['OMP_NUM_THREADS'] = str(nthreads) + # Limit number of threads used. + self.environment['OMP_NUM_THREADS'] = str(nthreads) self.logger.debug("Using %s threads for NDPPP" % nthreads) # Put arguments we need to pass to some private methods in a dict @@ -129,7 +129,7 @@ class dppp(LOFARnodeTCP): ) as logger: # Catch NDPPP segfaults (a regular occurance), and retry catch_segfaults( - cmd, working_dir, env, logger, + cmd, working_dir, self.environment, logger, cleanup = lambda : shutil.rmtree(tmpfile, ignore_errors=True) ) # Replace outfile with the updated working copy diff --git a/CEP/Pipeline/recipes/sip/nodes/gainoutliercorrection.py b/CEP/Pipeline/recipes/sip/nodes/gainoutliercorrection.py index 85d216c51e8bf35d874ec7662ae8e940dca1b891..127862b3d538c7ff469a0086b6302a39fe035e0f 100644 --- a/CEP/Pipeline/recipes/sip/nodes/gainoutliercorrection.py +++ b/CEP/Pipeline/recipes/sip/nodes/gainoutliercorrection.py @@ -16,7 +16,7 @@ import errno from lofarpipe.support.lofarnode import LOFARnodeTCP from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_time -from lofarpipe.support.utilities import read_initscript, create_directory, delete_directory +from lofarpipe.support.utilities import create_directory, delete_directory from lofarpipe.support.utilities import catch_segfaults from lofarpipe.support.lofarexceptions import PipelineRecipeFailed @@ -24,8 +24,10 @@ from lofarpipe.recipes.helpers.WritableParmDB import WritableParmDB, list_statio from lofarpipe.recipes.helpers.ComplexArray import ComplexArray, RealImagArray, AmplPhaseArray class GainOutlierCorrection(LOFARnodeTCP): - def run(self, infile, outfile, executable, initscript, sigma): + def run(self, infile, outfile, executable, environment, sigma): + self.environment.update(environment) + # Time execution of this job with log_time(self.logger): if os.path.exists(infile): @@ -49,9 +51,6 @@ class GainOutlierCorrection(LOFARnodeTCP): self.logger.error("Executable %s not found" % executable) return 1 - # Initialize environment - env = read_initscript(self.logger, initscript) - try: temp_dir = tempfile.mkdtemp() with CatchLog4CPlus( @@ -62,7 +61,7 @@ class GainOutlierCorrection(LOFARnodeTCP): catch_segfaults( [executable, '-in', infile, '-out', outfile], temp_dir, - env, + self.environment, logger ) except Exception, excp: diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py b/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py index 13ceaffcb1788aabb602a7f4bb4006c276c0ea46..75c68fe76cab00486ef04cd2d64b6da32edc4cf3 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py @@ -24,7 +24,6 @@ from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_time from lofarpipe.support.utilities import patch_parset from lofarpipe.support.utilities import get_parset -from lofarpipe.support.utilities import read_initscript from lofarpipe.support.utilities import catch_segfaults from lofarpipe.support.lofarexceptions import PipelineException import pyrap.tables as pt #@UnresolvedImport @@ -36,27 +35,12 @@ import lofar.parmdb #@UnresolvedImport import numpy as np class imager_awimager(LOFARnodeTCP): - """ - The imager_awimager creates from a measurement set and a sourcedb an image. - The sources in the database are use to create a mask which can be used in - the awimager to improve the clean function. - - 1. Calculate / retrieve a number of parameters from the measurement set - Cell_size, npix, wmax and W projection planes - 2. Create the needed directorys for the output image - 3. Create a mask (casa image), - 4. extend the parset with calculate parameters and the mask location - 5. Run the Awimager with the extended parset - 6. Return the created image to signal a successfull run - - | **Member functions:** - """ - def run(self, executable, init_script, parset, working_directory, + def run(self, executable, environment, parset, working_directory, output_image, concatenated_measurement_set, sourcedb_path, mask_patch_size): """ :param executable: Path to awimager executable - :param init_script: initscript for catch_segfaults (executable runner) + :param environment: environment for catch_segfaults (executable runner) :param parset: parameters for the awimager, :param working_directory: directory the place temporary files :param output_image: location and filesname to story the output images @@ -71,6 +55,8 @@ class imager_awimager(LOFARnodeTCP): """ self.logger.info("Start imager_awimager node run:") log4_cplus_name = "imager_awimager" + self.environment.update(environment) + with log_time(self.logger): # **************************************************************** # 1. Calculate awimager parameters that depend on measurement set @@ -89,7 +75,7 @@ class imager_awimager(LOFARnodeTCP): # **************************************************************** # 3. Create the mask mask_file_path = self._create_mask(npix, cell_size, output_image, - concatenated_measurement_set, init_script, executable, + concatenated_measurement_set, executable, working_directory, log4_cplus_name, sourcedb_path, mask_patch_size, image_path_head) @@ -120,13 +106,12 @@ class imager_awimager(LOFARnodeTCP): # 5. Run the awimager with the updated parameterset cmd = [executable, calculated_parset_path] try: - environment = read_initscript(self.logger, init_script) with CatchLog4CPlus(working_directory, self.logger.name + "." + os.path.basename(log4_cplus_name), os.path.basename(executable) ) as logger: - catch_segfaults(cmd, working_directory, environment, + catch_segfaults(cmd, working_directory, self.environment, logger) # Thrown by catch_segfault @@ -308,7 +293,7 @@ class imager_awimager(LOFARnodeTCP): return fov, station_diameter def _create_mask(self, npix, cell_size, output_image, - concatenated_measurement_set, init_script, executable, + concatenated_measurement_set, executable, working_directory, log4_cplus_name, sourcedb_path, mask_patch_size, image_path_directory): """ @@ -355,12 +340,11 @@ class imager_awimager(LOFARnodeTCP): cmd = [executable, mask_parset_path] self.logger.info(" ".join(cmd)) try: - environment = read_initscript(self.logger, init_script) with CatchLog4CPlus(working_directory, self.logger.name + "." + os.path.basename(log4_cplus_name), os.path.basename(executable) ) as logger: - catch_segfaults(cmd, working_directory, environment, + catch_segfaults(cmd, working_directory, self.environment, logger) # Thrown by catch_segfault except CalledProcessError, exception: diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py b/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py index 8bccd4591e105c9911ccbb3872e22575e93cc25e..c9ac3acc05125e41977fbc334cc6ce3453f4ff07 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py @@ -15,7 +15,6 @@ import os from lofarpipe.support.lofarnode import LOFARnodeTCP from lofarpipe.support.pipelinelogging import log_process_output from lofarpipe.support.pipelinelogging import CatchLog4CPlus -from lofarpipe.support.utilities import read_initscript from lofarpipe.support.utilities import catch_segfaults import monetdb.sql as db @@ -58,10 +57,11 @@ class imager_create_dbs(LOFARnodeTCP): def run(self, concatenated_measurement_set, sourcedb_target_path, monet_db_hostname, monet_db_port, monet_db_name, monet_db_user, monet_db_password, assoc_theta, parmdb_executable, slice_paths, - parmdb_suffix, init_script, working_directory, makesourcedb_path, + parmdb_suffix, environment, working_directory, makesourcedb_path, source_list_path_extern): self.logger.info("Starting imager_create_dbs Node") + self.environment.update(environment) #******************************************************************* # 1. get a sourcelist: from gsm or from file @@ -73,8 +73,8 @@ class imager_create_dbs(LOFARnodeTCP): #******************************************************************* # 2convert it to a sourcedb (casa table) if self._create_source_db(source_list, sourcedb_target_path, - init_script, working_directory, - makesourcedb_path, append) == None: + working_directory, makesourcedb_path, + append) == None: self.logger.error("failed creating sourcedb") return 1 @@ -122,7 +122,7 @@ class imager_create_dbs(LOFARnodeTCP): return source_list, append - def _create_source_db(self, source_list, sourcedb_target_path, init_script, + def _create_source_db(self, source_list, sourcedb_target_path, working_directory, executable, append=False): """ _create_source_db consumes a sourcelist text file and produces a @@ -146,13 +146,12 @@ class imager_create_dbs(LOFARnodeTCP): # db try: - environment = read_initscript(self.logger, init_script) with CatchLog4CPlus(working_directory, self.logger.name + "." + os.path.basename("makesourcedb"), os.path.basename(executable) ) as logger: - catch_segfaults(cmd, working_directory, environment, - logger, cleanup=None) + catch_segfaults(cmd, working_directory, self.environment, + logger, cleanup = None) except subprocess.CalledProcessError, called_proc_error: self.logger.error("Execution of external failed:") diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py index 7f766feed8b9297bc56c1774cacd6cd4b337243f..85a6da5c56b085deb128b6121daaefefd8ec5f93 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py @@ -13,7 +13,6 @@ import subprocess from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_time from lofarpipe.support.utilities import patch_parset -from lofarpipe.support.utilities import read_initscript from lofarpipe.support.utilities import catch_segfaults from lofarpipe.support.lofarnode import LOFARnodeTCP from lofarpipe.support.utilities import create_directory @@ -41,7 +40,7 @@ class imager_prepare(LOFARnodeTCP): **Members:** """ - def run(self, init_script, parset, working_dir, processed_ms_dir, + def run(self, environment, parset, working_dir, processed_ms_dir, ndppp_executable, output_measurement_set, time_slices_per_image, subbands_per_group, raw_ms_mapfile, asciistat_executable, statplot_executable, msselect_executable, @@ -49,6 +48,7 @@ class imager_prepare(LOFARnodeTCP): """ Entry point for the node recipe """ + self.environment.update(environment) with log_time(self.logger): input_map = load_data_map(raw_ms_mapfile) @@ -80,9 +80,9 @@ class imager_prepare(LOFARnodeTCP): #****************************************************************** # 2. run dppp: collect frequencies into larger group time_slices = \ - self._run_dppp(working_dir, time_slice_dir, - time_slices_per_image, input_map, subbands_per_group, - processed_ms_dir, parset, ndppp_executable, init_script) + self._run_dppp(working_dir, time_slice_dir, + time_slices_per_image, input_map, subbands_per_group, + processed_ms_dir, parset, ndppp_executable) self.logger.debug("Produced time slices: {0}".format(time_slices)) #*********************************************************** @@ -198,7 +198,7 @@ class imager_prepare(LOFARnodeTCP): def _run_dppp(self, working_dir, time_slice_dir_path, slices_per_image, input_map, subbands_per_image, collected_ms_dir_name, parset, - ndppp, init_script): + ndppp): """ Run NDPPP: Create dir for grouped measurements, assure clean workspace @@ -254,10 +254,8 @@ class imager_prepare(LOFARnodeTCP): cmd = [ndppp, nddd_parset_path] try: - environment = read_initscript(self.logger, init_script) - # Actual dppp call to externals (allows mucking) - self._dppp_call(working_dir, ndppp, cmd, environment) + self._dppp_call(working_dir, ndppp, cmd, self.environment) except subprocess.CalledProcessError, exception: self.logger.error(str(exception)) diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py b/CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py index 92ef614513c9789976779e7b516ab6d0432d6bc7..1a794d7ae96fde62e3a687057c43a46bacd3bf9a 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py @@ -7,7 +7,6 @@ from lofar.parameterset import parameterset from lofarpipe.support.lofarnode import LOFARnodeTCP import lofar.bdsm as bdsm#@UnresolvedImport -from lofarpipe.support.utilities import read_initscript from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.utilities import catch_segfaults @@ -35,7 +34,7 @@ class imager_source_finding(LOFARnodeTCP): """ def run(self, input_image, bdsm_parameter_run1_path, bdsm_parameter_run2x_path, catalog_output_path, image_output_path, - sourcedb_target_path, init_script, working_directory, + sourcedb_target_path, environment, working_directory, create_sourcdb_exec): """ :param input_image: image to look for sources in @@ -47,7 +46,7 @@ class imager_source_finding(LOFARnodeTCP): substracted :param sourcedb_target_path: Path to store the sourcedb created from containing all the found sources - :param init_script: Initscript for runwithlog4cplus + :param environment: environment for runwithlog4cplus :param working_directory: Working dir :param create_sourcdb_exec: Path to create sourcedb executable @@ -55,6 +54,7 @@ class imager_source_finding(LOFARnodeTCP): """ self.logger.info("Starting imager_source_finding") + self.environment.update(environment) # default frequency is None (read from image), save for later cycles. # output of pybdsm forgets freq of source image frequency = None @@ -149,7 +149,7 @@ class imager_source_finding(LOFARnodeTCP): # ********************************************************************* # 6. Convert sourcelist to sourcedb self._create_source_db(catalog_output_path, sourcedb_target_path, - init_script, working_directory, create_sourcdb_exec, False) + working_directory, create_sourcdb_exec, False) # Assign the outputs self.outputs["catalog_output_path"] = catalog_output_path self.outputs["source_db"] = sourcedb_target_path @@ -213,7 +213,7 @@ class imager_source_finding(LOFARnodeTCP): catalog_output_path)) - def _create_source_db(self, source_list, sourcedb_target_path, init_script, + def _create_source_db(self, source_list, sourcedb_target_path, working_directory, create_sourcdb_exec, append=False): """ Convert a sourcelist to a sourcedb: @@ -238,13 +238,12 @@ class imager_source_finding(LOFARnodeTCP): self.logger.info(' '.join(cmd)) try: - environment = read_initscript(self.logger, init_script) with CatchLog4CPlus(working_directory, self.logger.name + "." + os.path.basename("makesourcedb"), os.path.basename(create_sourcdb_exec) ) as logger: - catch_segfaults(cmd, working_directory, environment, - logger, cleanup=None) + catch_segfaults(cmd, working_directory, self.environment, + logger, cleanup = None) except Exception, exception: self.logger.error("Execution of external failed:") diff --git a/CEP/Pipeline/recipes/sip/nodes/new_bbs.py b/CEP/Pipeline/recipes/sip/nodes/new_bbs.py index c1d118f5a32fe90f1b9c170f216aa6b8f3a7df12..7cc95be6efc33f56a8ebc2e1a646e758ef1db2b4 100644 --- a/CEP/Pipeline/recipes/sip/nodes/new_bbs.py +++ b/CEP/Pipeline/recipes/sip/nodes/new_bbs.py @@ -14,7 +14,6 @@ import shutil from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.lofarnode import LOFARnodeTCP -from lofarpipe.support.utilities import read_initscript from lofarpipe.support.utilities import get_mountpoint from lofarpipe.support.utilities import log_time from lofarpipe.support.pipelinelogging import log_process_output @@ -25,17 +24,12 @@ from lofar.parameterset import parameterset class new_bbs(LOFARnodeTCP): # Handles running a single BBS kernel on a compute node # -------------------------------------------------------------------------- - def run( - self, executable, initscript, infiles, - db_key, db_name, db_user, db_host - ): + def run(self, executable, infiles, db_key, db_name, db_user, db_host): # executable : path to KernelControl executable - # initscript : path to lofarinit.sh # infiles : tuple of MS, instrument- and sky-model files # db_* : database connection parameters # ---------------------------------------------------------------------- self.logger.debug("executable = %s" % executable) - self.logger.debug("initscript = %s" % initscript) self.logger.debug("infiles = %s" % str(infiles)) self.logger.debug("db_key = %s" % db_key) self.logger.debug("db_name = %s" % db_name) @@ -79,7 +73,6 @@ class new_bbs(LOFARnodeTCP): # Catch & log output from the kernel logger and stdout # ------------------------------------------------------------------ working_dir = mkdtemp() - env = read_initscript(self.logger, initscript) try: self.logger.info("******** {0}".format(open(parset_file).read())) cmd = [executable, parset_file, "0"] diff --git a/CEP/Pipeline/recipes/sip/tasks.cfg.in b/CEP/Pipeline/recipes/sip/tasks.cfg.in index 997488c5784897403a93ec1198036ae1ec807d3d..b64c51546a7c77a4a3793f9c7744b0cf9651d3f2 100644 --- a/CEP/Pipeline/recipes/sip/tasks.cfg.in +++ b/CEP/Pipeline/recipes/sip/tasks.cfg.in @@ -9,7 +9,6 @@ mapfile = %(runtime_directory)s/jobs/%(job_name)s/mapfiles/data.mapfile [ndppp] recipe = dppp executable = %(lofarroot)s/bin/NDPPP -initscript = %(lofarroot)s/lofarinit.sh dry_run = False mapfile = %(runtime_directory)s/jobs/%(job_name)s/mapfiles/dppp.mapfile parset = %(runtime_directory)s/jobs/%(job_name)s/parsets/NDPPP.parset @@ -21,7 +20,6 @@ clobber = False [bbs] recipe = bbs -initscript = %(lofarroot)s/lofarinit.sh control_exec = %(lofarroot)s/bin/GlobalControl kernel_exec = %(lofarroot)s/bin/KernelControl parset = %(runtime_directory)s/jobs/%(job_name)s/parsets/bbs.parset @@ -72,7 +70,6 @@ recipe = flag_baseline [demixing] recipe = demixing -initscript = %(lofarroot)s/lofarinit.sh demix_parset_dir = %(lofarroot)s/share/pipeline/demixing db_host = ldb001 skymodel = %(lofarroot)s/share/pipeline/skymodels/Ateam_LBA_CC.skymodel @@ -84,7 +81,6 @@ nproc=1 recipe = new_bbs control_exec = %(lofarroot)s/bin/GlobalControl kernel_exec = %(lofarroot)s/bin/KernelControl -initscript = %(lofarroot)s/lofarinit.sh parset = %(runtime_directory)s/jobs/%(job_name)s/parsets/BBS.parset gvds = %(runtime_directory)s/jobs/%(job_name)s/vds/%(job_name)s.gvds db_key = %(job_name)s @@ -98,13 +94,11 @@ data_mapfile = %(runtime_directory)s/jobs/%(job_name)s/mapfiles/bbs.mapfile [gainoutliercorrection] recipe = gainoutliercorrection executable = %(lofarroot)s/bin/parmexportcal -initscript = %(lofarroot)s/lofarinit.sh mapfile = %(runtime_directory)s/jobs/%(job_name)s/mapfiles/instrument.mapfile [parmexportcal] recipe = parmexportcal executable = %(lofarroot)s/bin/parmexportcal -initscript = %(lofarroot)s/lofarinit.sh mapfile = %(runtime_directory)s/jobs/%(job_name)s/mapfiles/instrument.mapfile [rficonsole] @@ -116,7 +110,6 @@ recipe = get_metadata [imager_prepare] recipe = imager_prepare -initscript = %(lofarroot)s/lofarinit.sh ndppp_exec = %(lofarroot)s/bin/NDPPP asciistat_executable = %(lofarroot)s/bin/asciistats.py statplot_executable = %(lofarroot)s/bin/statsplot.py @@ -125,12 +118,10 @@ rficonsole_executable = %(lofarroot)s/bin/rficonsole [imager_awimager] recipe = imager_awimager -initscript = %(lofarroot)s/lofarinit.sh executable = %(lofarroot)s/bin/awimager [imager_create_dbs] recipe = imager_create_dbs -initscript = %(lofarroot)s/lofarinit.sh parmdb_executable = %(lofarroot)s/bin/parmdbm makesourcedb_path = %(lofarroot)s/bin/makesourcedb @@ -140,12 +131,10 @@ bbs_executable = %(lofarroot)s/bin/bbs-reducer [imager_source_finding] recipe = imager_source_finding -initscript = %(lofarroot)s/lofarinit.sh makesourcedb_path = %(lofarroot)s/bin/makesourcedb [imager_finalize] recipe = imager_finalize -initscript = %(lofarroot)s/lofarinit.sh fillrootimagegroup_exec = %(lofarroot)s/bin/fillRootImageGroup [copier]