From 26a922d57a8048dbc5e9b6506430707a1819f555 Mon Sep 17 00:00:00 2001 From: Wouter Klijn <klijn@astron.nl> Date: Fri, 4 May 2012 15:23:00 +0000 Subject: [PATCH] Task #3194: Added cleared and copious logging at debug level. Added a hard break when a recipe failed at all nodes. Partial succesfull runs will continue. --- .../recipes/sip/bin/msss_imager_pipeline.py | 170 +++++++++++------- .../recipes/sip/master/imager_awimager.py | 4 +- CEP/Pipeline/recipes/sip/master/imager_bbs.py | 59 +++--- .../recipes/sip/master/imager_create_dbs.py | 120 ++++++++----- .../recipes/sip/master/imager_finalize.py | 42 +++-- .../recipes/sip/master/imager_prepare.py | 21 ++- .../sip/master/imager_source_finding.py | 3 + .../recipes/sip/nodes/imager_awimager.py | 30 +++- CEP/Pipeline/recipes/sip/nodes/imager_bbs.py | 17 +- .../recipes/sip/nodes/imager_create_dbs.py | 83 ++++----- .../recipes/sip/nodes/imager_finalize.py | 9 +- .../recipes/sip/nodes/imager_prepare.py | 50 ++++-- .../sip/nodes/imager_source_finding.py | 28 +-- 13 files changed, 379 insertions(+), 257 deletions(-) diff --git a/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py b/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py index 771fba9ddac..e832819fdc8 100644 --- a/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py @@ -96,9 +96,7 @@ class msss_imager_pipeline(control): self.logger.info("Starting imager pipeline") # Define scratch directory to be used by the compute nodes. - self.scratch_directory = os.path.join( - self.inputs['working_directory'], self.inputs['job_name'] - ) + self.scratch_directory = self.inputs['working_directory'] # Get input/output-data products specifications. self._get_io_product_specs() @@ -118,21 +116,20 @@ class msss_imager_pipeline(control): input_mapfile = os.path.join(mapfile_dir, "uvdata.mapfile") store_data_map(input_mapfile, self.input_data) self.logger.debug( - "Wrote input UV-data mapfile: %s" % input_mapfile - ) + "Wrote input UV-data mapfile: {0}".format(input_mapfile)) + target_mapfile = os.path.join(mapfile_dir, "target.mapfile") store_data_map(target_mapfile, self.target_data) self.logger.debug( - "Wrote target mapfile: %s" % target_mapfile - ) + "Wrote target mapfile: {0}".format(target_mapfile)) + output_image_mapfile = os.path.join(mapfile_dir, "images.mapfile") store_data_map(output_image_mapfile, self.output_data) self.logger.debug( - "Wrote output sky-image mapfile: %s" % output_image_mapfile - ) + "Wrote output sky-image mapfile: {0}".format(output_image_mapfile)) - # TODO: quick and dirty way to get the proper self.parset by just - # re-assigning it the subset of all PythonControl keys. + # reset the parset to a 'parset' subset containing only leafs without + # prepending node names self.parset = self.parset.makeSubset( self.parset.fullModuleName('PythonControl') + '.' ) @@ -143,42 +140,38 @@ class msss_imager_pipeline(control): processed_ms_dir = os.path.join(self.scratch_directory, "subbands") concat_ms_map_path, timeslice_map_path, raw_ms_per_image_map_path = \ self._prepare_phase(input_mapfile, target_mapfile, processed_ms_dir, - skip = False) + skip = True) #We start with an empty source_list - sourcelist_map = None - source_list = "" + source_list = "" #This variable contains possible 'new' star locations from + # found in the pipeline or external to use in the calibration and imaging + # filled at least at the end of the major cycle. + number_of_major_cycles = self.parset.getInt("number_of_major_cycles") for idx_loop in range(number_of_major_cycles): - # TODO: Remove debugging skip code - if idx_loop == 1: - skip_current_loop = False - else: - skip_current_loop = False - # ****************************************************************** # (2) Create dbs and sky model - parmdbs_path, sky_path = self._create_dbs( + parmdbs_path, sourcedb_map_path = self._create_dbs( concat_ms_map_path, timeslice_map_path, source_list = source_list, - skip_create_dbs = False) + skip_create_dbs = True) # ***************************************************************** - # (3) bbs_imager recipe - bbs_output = self._bbs(timeslice_map_path, parmdbs_path, sky_path, - skip = False) + # (3) bbs_imager recipe. + bbs_output = self._bbs(timeslice_map_path, parmdbs_path, sourcedb_map_path, + skip = True) # ****************************************************************** # (4) Get parameters awimager from the prepare_parset and inputs aw_image_mapfile, maxbaseline = self._aw_imager(concat_ms_map_path, - idx_loop, sky_path, - skip = False) + idx_loop, sourcedb_map_path, + skip = True) # ***************************************************************** # (5) Source finding sourcelist_map = self._source_finding(aw_image_mapfile, - idx_loop, skip = False) + idx_loop, skip = True) #should the output be a sourcedb? instead of a sourcelist @@ -245,16 +238,24 @@ class msss_imager_pipeline(control): output_image_mapfile = output_image_mapfile, processed_ms_dir = processed_ms_dir) + return 0 def _source_finding(self, image_map_path, major_cycle, skip = True): bdsm_parset_pass_1 = self.parset.makeSubset("BDSM[0].") - parset_path_pass_1 = self._write_parset_to_file(bdsm_parset_pass_1, "pybdsm_first_pass.par") + parset_path_pass_1 = self._write_parset_to_file(bdsm_parset_pass_1, + "pybdsm_first_pass.par") + self.logger.debug("Wrote sourcefinder first pass parset: {0}".format( + parset_path_pass_1)) bdsm_parset_pass_2 = self.parset.makeSubset("BDSM[1].") - parset_path_pass_2 = self._write_parset_to_file(bdsm_parset_pass_2, "pybdsm_second_pass.par") - + parset_path_pass_2 = self._write_parset_to_file(bdsm_parset_pass_2, + "pybdsm_second_pass.par") + self.logger.debug("Wrote sourcefinder second pass parset: {0}".format( + parset_path_pass_2)) # touch a mapfile to be filled with created sourcelists source_list_map = self._write_datamap_to_file(None, "source_finding_outputs") + self.logger.debug("Touched mapfile for sourcefinding output: {0}".format( + source_list_map)) catalog_path = os.path.join( self.scratch_directory, @@ -277,57 +278,73 @@ class msss_imager_pipeline(control): return source_list_map - def _bbs(self, timeslice_map_path, parmdbs_map_path, sky_path, skip = False): + def _bbs(self, timeslice_map_path, parmdbs_map_path, sourcedb_map_path, skip = False): #create parset for recipe parset = self.parset.makeSubset("BBS.") parset_path = self._write_parset_to_file(parset, "bbs") - + self.logger.debug( + "Wrote parset for bbs: {0}".format(parset_path)) # create the output file path output_mapfile = self._write_datamap_to_file(None, "bbs_output") - sky_parmdb_map_path = self._write_datamap_to_file(None, "sky_parmdb") + self.logger.debug( + "Touched mapfile for bbs output: {0}".format(output_mapfile)) + converted_sourcedb_map_path = self._write_datamap_to_file(None, "parmdb") + self.logger.debug( + "Touched correctly shaped mapfile for input sourcedbs : {0}".format( + converted_sourcedb_map_path)) + if skip: return output_mapfile - # The sky map contains a single sky file while imager_bbs expects a sky + # The source_db_pair map contains a single source_db_pair file while imager_bbs expects a source_db_pair # file for each 'pardbm ms set combination'. - sky_map = load_data_map(sky_path) + sourcedb_map = load_data_map(sourcedb_map_path) parmdbs_map = load_data_map(parmdbs_map_path) - sky_parmdb_map = [] - for (sky, parmdbs) in zip(sky_map, parmdbs_map): - (host_sky, sky_entry) = sky + converted_sourcedb_map = [] + for (source_db_pair, parmdbs) in zip(sourcedb_map, parmdbs_map): + (host_sourcedb, sourcedb_path) = source_db_pair (host_parmdbs, parmdbs_entries) = parmdbs # sanity check: host should be the same - if host_parmdbs != host_sky: + if host_parmdbs != host_sourcedb: self.logger.error("The input files for bbs do not contain " "matching host names for each entry") - self.logger.error(repr(sky_map)) + self.logger.error(repr(sourcedb_map)) self.logger.error(repr(parmdbs_map_path)) #add the entries but with skymap multiplied with len (parmds list) - sky_parmdb_map.append((host_sky, [sky_entry] * len(parmdbs_entries))) + converted_sourcedb_map.append((host_sourcedb, [sourcedb_path] * len(parmdbs_entries))) #save the new mapfile - store_data_map(sky_parmdb_map_path, sky_parmdb_map) + store_data_map(converted_sourcedb_map_path, converted_sourcedb_map) + self.logger.debug("Wrote converted sourcedb datamap with: {0}".format( + converted_sourcedb_map_path)) self.run_task("imager_bbs", timeslice_map_path, parset = parset_path, instrument_mapfile = parmdbs_map_path, - sky_mapfile = sky_parmdb_map_path, + sourcedb_mapfile = converted_sourcedb_map_path, mapfile = output_mapfile, working_directory = self.scratch_directory) return output_mapfile def _aw_imager(self, prepare_phase_output, major_cycle, sky_path, skip = False): + """ + + """ parset = self.parset.makeSubset("AWimager.") mask_patch_size = self.parset.getInt("mask_patch_size") parset_path = self._write_parset_to_file(parset, "awimager_cycle_{0}".format(major_cycle)) + self.logger.debug("Wrote parset for awimager: {0}".format( + parset_path)) image_path = os.path.join( - self.scratch_directory, "awimage_cycle_{0}".format(major_cycle), "image" - ) + self.scratch_directory, "awimage_cycle_{0}".format(major_cycle), + "image") output_mapfile = self._write_datamap_to_file(None, "awimager") + self.logger.debug("Touched output map for awimager recipe: {0}".format( + output_mapfile)) if skip: pass else: @@ -350,13 +367,18 @@ class msss_imager_pipeline(control): ndppp_parset = self.parset.makeSubset("DPPP.") ndppp_parset_path = self._write_parset_to_file(ndppp_parset, "prepare_imager_ndppp") - + self.logger.debug( + "Wrote parset for ndpp: {0}".format(ndppp_parset_path)) # create the output file paths #[1] output -> prepare_output output_mapfile = self._write_datamap_to_file(None, "prepare_output") - time_slices_mapfile = self._write_datamap_to_file(None, "prepare_time_slices") - raw_ms_per_image_mapfile = self._write_datamap_to_file(None, "raw_ms_per_image") - + time_slices_mapfile = self._write_datamap_to_file(None, + "prepare_time_slices") + raw_ms_per_image_mapfile = self._write_datamap_to_file(None, + "raw_ms_per_image") + self.logger.debug( + "Touched the following map files used for output: {0}, {1}, {2}".format( + output_mapfile, time_slices_mapfile, raw_ms_per_image_mapfile)) # Run the prepare phase script if skip: pass @@ -372,25 +394,45 @@ class msss_imager_pipeline(control): working_directory = self.scratch_directory, processed_ms_dir = processed_ms_dir) - - - - # If all is ok output_mapfile == target_mapfile + #validate that the prepare phase produced the correct data + output_keys = outputs.keys() + if not ('mapfile' in output_keys): + error_msg = "The imager_prepare master script did not"\ + "return correct data missing: {0}".format('mapfile') + self.logger.error(error_msg) + raise PipelineException(error_msg) + if not ('slices_mapfile' in output_keys): + error_msg = "The imager_prepare master script did not"\ + "return correct data missing: {0}".format( + 'slices_mapfile') + self.logger.error(error_msg) + raise PipelineException(error_msg) + if not ('raw_ms_per_image_mapfile' in output_keys): + error_msg = "The imager_prepare master script did not"\ + "return correct data missing: {0}".format( + 'raw_ms_per_image_mapfile') + self.logger.error(error_msg) + raise PipelineException(error_msg) + + # Return the mapfiles paths with processed data return output_mapfile, time_slices_mapfile, raw_ms_per_image_mapfile - def _create_dbs(self, input_map_path, timeslice_map_path, source_list = "" , skip_create_dbs = False): + def _create_dbs(self, input_map_path, timeslice_map_path, source_list = "", + skip_create_dbs = False): """ Create for each of the concatenated input measurement sets """ # Create the parameters set parset = self.parset.makeSubset("GSM.") - parset_path = self._write_parset_to_file(parset, "create_dbs") # create the files that will contain the output of the recipe - parmdbs_path = self._write_datamap_to_file(None, "parmdbs") - sky_path = self._write_datamap_to_file(None, "sky_files") - + parmdbs_map_path = self._write_datamap_to_file(None, "parmdbs") + self.logger.debug( + "touched parmdbs output mapfile: {0}".format(parmdbs_map_path)) + sourcedb_map_path = self._write_datamap_to_file(None, "sky_files") + self.logger.debug( + "touched source db output mapfile: {0}".format(sourcedb_map_path)) #run the master script if skip_create_dbs: pass @@ -402,15 +444,15 @@ class msss_imager_pipeline(control): monetdb_user = parset.getString("monetdb_user"), monetdb_password = parset.getString("monetdb_password"), assoc_theta = parset.getString("assoc_theta"), - sourcedb_suffix = ".sky", + sourcedb_suffix = ".sourcedb", slice_paths_mapfile = timeslice_map_path, - parmdb_suffix = ".parmdbm", - parmdbs_path = parmdbs_path, - sky_path = sky_path, + parmdb_suffix = ".parmdb", + parmdbs_map_path = parmdbs_map_path, + sourcedb_map_path = sourcedb_map_path, source_list_path = source_list, working_directory = self.scratch_directory) - return parmdbs_path, sky_path + return parmdbs_map_path, sourcedb_map_path def _write_parset_to_file(self, parset, parset_name): diff --git a/CEP/Pipeline/recipes/sip/master/imager_awimager.py b/CEP/Pipeline/recipes/sip/master/imager_awimager.py index cd437048ffc..ec43129d0b4 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_awimager.py +++ b/CEP/Pipeline/recipes/sip/master/imager_awimager.py @@ -103,8 +103,6 @@ class imager_awimager(BaseRecipe, RemoteCommandRecipeMixIn): measurement_set, sourcedb_path, mask_patch_size] jobs.append(ComputeJob(host, node_command, arguments)) - - # Hand over the job(s) to the pipeline scheduler self._schedule_jobs(jobs) created_awimages = [] @@ -120,6 +118,8 @@ class imager_awimager(BaseRecipe, RemoteCommandRecipeMixIn): return 1 store_data_map(self.inputs['mapfile'], created_awimages) + self.logger.debug("Wrote mapfile containing produces awimages: {0}".format( + self.inputs['mapfile'])) self.outputs["mapfile"] = self.inputs['mapfile'] return 0 diff --git a/CEP/Pipeline/recipes/sip/master/imager_bbs.py b/CEP/Pipeline/recipes/sip/master/imager_bbs.py index ef25c2cfd13..069e0c7fa58 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_bbs.py +++ b/CEP/Pipeline/recipes/sip/master/imager_bbs.py @@ -1,11 +1,7 @@ # LOFAR IMAGING PIPELINE -# imager_bbs: start isolated master/node new_bss runs on compute nodes -# For seperate processing of timeslices -# LOFAR IMAGING PIPELINE -# -# BBS (BlackBoard Selfcal) recipe -# Wouter Klijn -# klijn@astron.nl +# imager_bbz BBS (BlackBoard Selfcal) recipe +# Wouter Klijn +# klijn@astron.nl # ------------------------------------------------------------------------------ from __future__ import with_statement import sys @@ -20,10 +16,12 @@ from lofarpipe.support.remotecommand import ComputeJob class imager_bbs(BaseRecipe, RemoteCommandRecipeMixIn): """ - + imager_bbs master performs a bbs based on the supplied parset it is a shallow + wrapper around bbs + It validates that the input mapfiles are correct and then starts the node + script + **Arguments** - - A mapfile describing the data to be processed. """ inputs = { 'initscript': ingredient.FileField( @@ -43,10 +41,10 @@ class imager_bbs(BaseRecipe, RemoteCommandRecipeMixIn): help = "Full path to the mapfile containing the names of the " "instrument model files generated by the `parmdb` recipe" ), - 'sky_mapfile': ingredient.FileField( - '--sky-mapfile', + 'sourcedb_mapfile': ingredient.FileField( + '--sourcedb-mapfile', help = "Full path to the mapfile containing the names of the " - "sky model files generated by the `sourcedb` recipe" + "sourcedbs generated by the `sourcedb` recipe" ), 'id': ingredient.IntField( '--id', @@ -65,64 +63,71 @@ class imager_bbs(BaseRecipe, RemoteCommandRecipeMixIn): ) } - def go(self): - self.logger.info("Starting imager_bbs run") super(imager_bbs, self).go() + self.logger.info("Starting imager_bbs run") # Load the data ms_map = load_data_map(self.inputs['args'][0]) parmdb_map = load_data_map(self.inputs['instrument_mapfile']) - sky_map = load_data_map(self.inputs['sky_mapfile']) + sourcedb_map = load_data_map(self.inputs['sourcedb_mapfile']) #Check if the input has equal length and on the same nodes - if not validate_data_maps(ms_map, parmdb_map, sky_map): + if not validate_data_maps(ms_map, parmdb_map, sourcedb_map): self.logger.error("The combination of mapfiles failed validation:") self.logger.error(ms_map) self.logger.error(parmdb_map) - self.logger.error(sky_map) + self.logger.error(sourcedb_map) return 1 # Create the jobs jobs = [] outnames = collections.defaultdict(list) node_command = " python %s" % (self.__file__.replace("master", "nodes")) - for (ms, parmdb, sky) in zip(ms_map, parmdb_map, sky_map): + + for (ms, parmdb, sourcedb) in zip(ms_map, parmdb_map, sourcedb_map): #host is same for each entry (validate_data_maps) (host, ms_list) = ms - (host, parmdb_list) = parmdb - (host, sky_list) = sky - #Write data maps to mapfiles + # Write data maps to mapfiles: The (node, data) pairs are inserted + # into an array to allow writing of the mapfiles using the default + # functions map_dir = os.path.join( self.config.get("layout", "job_directory"), "mapfiles") run_id = str(self.inputs.get("id")) ms_list_path = os.path.join(map_dir, host + "_ms_" + run_id + ".map") store_data_map(ms_list_path, [ms]) + self.logger.debug( + "Wrote mapfile with ms: {0}".format(ms_list_path)) parmdb_list_path = os.path.join(map_dir, host + "_parmdb_" + run_id + ".map") store_data_map(parmdb_list_path, [parmdb]) - sky_list_path = os.path.join(map_dir, host + "_sky_" + run_id + ".map") - store_data_map(sky_list_path, [sky]) + self.logger.debug( + "Wrote mapfile with parmdb: {0}".format(parmdb_list_path)) + sourcedb_list_path = os.path.join(map_dir, host + "_sky_" + run_id + ".map") + store_data_map(sourcedb_list_path, [sourcedb]) + self.logger.debug( + "Wrote mapfile with sourcedbs: {0}".format(parmdb_list_path)) outnames[host].extend(ms_list) arguments = [self.inputs['bbs_executable'], self.inputs['parset'], - ms_list_path, parmdb_list_path, sky_list_path] - self.logger.info(arguments) + ms_list_path, parmdb_list_path, sourcedb_list_path] jobs.append(ComputeJob(host, node_command, arguments)) # start and wait till all are finished self._schedule_jobs(jobs) if self.error.isSet(): #if one of the nodes failed - self.logger.error("One of the nodes reporting while performing" + self.logger.error("One of the nodes failed while performing" "a BBS run. Aborting") return 1 # return the output: The measurement set that are calibrated: # calibrated data is placed in the ms sets store_data_map(self.inputs['mapfile'], ms_map) + self.logger.debug("Wrote datamap with calibrated data: {0}".format( + self.inputs['mapfile'])) self.outputs['mapfile'] = self.inputs['mapfile'] return 0 diff --git a/CEP/Pipeline/recipes/sip/master/imager_create_dbs.py b/CEP/Pipeline/recipes/sip/master/imager_create_dbs.py index c5b519ceac3..a3e00e470b6 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_create_dbs.py +++ b/CEP/Pipeline/recipes/sip/master/imager_create_dbs.py @@ -1,33 +1,43 @@ -# LOFAR IMAGING PIPELINE -# -# imager_create_dbs (master) -# -# Wouter Klijn, 2012 -# klijn@astron.nl +# LOFAR AUTOMATIC IMAGING PIPELINE +# imager_create_dbs (master) +# Wouter Klijn, 2012 +# klijn@astron.nl # ------------------------------------------------------------------------------ import os import sys -import collections + import lofarpipe.support.lofaringredient as ingredient from lofarpipe.support.baserecipe import BaseRecipe from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn from lofarpipe.support.remotecommand import ComputeJob -from lofarpipe.support.group_data import load_data_map, store_data_map - +from lofarpipe.support.group_data import load_data_map, store_data_map, validate_data_maps +from lofarpipe.support.lofarexceptions import PipelineException class imager_create_dbs(BaseRecipe, RemoteCommandRecipeMixIn): """ - + imager_create_dbs (master) is the script responsible for creating a number + of databases needed by imaging pipeline. + 1. Using pointing extracted from the input measurement set a database is + created of sources based on information in the global sky model (gsm) + One source db is created for each image/node + a. The pointing is supplied to to GSM database resulting in a sourcelist + b. This sourcelist is converted into a source db + c. Possible additional sourcelist from external sources are added to this + source list + 2. For each of the timeslice in image a parmdb is created. Each timeslice is + recorded on a different time and needs its own calibration and therefore + instrument parameters. """ + inputs = { 'working_directory': ingredient.StringField( '-w', '--working-directory', - help = "Working directory used on outpuconfigt nodes. Results location" + help = "Working directory used on nodes. Results location" ), 'initscript': ingredient.FileField( '--initscript', help = '''The full path to an (Bourne) shell script which will\ - intialise the environment (ie, ``lofarinit.sh``)''' + initialise the environment (ie, ``lofarinit.sh``)''' ), 'sourcedb_suffix': ingredient.StringField( '--sourcedb-suffix', @@ -80,23 +90,25 @@ class imager_create_dbs(BaseRecipe, RemoteCommandRecipeMixIn): help = "Path to sourcelist from external source (eg. bdsm) "\ "use an empty string for gsm generated data" ), - 'parmdbs_path': ingredient.StringField( - '--parmdbs-path', - help = "path to mapfile containing produced sky files" + 'parmdbs_map_path': ingredient.StringField( + '--parmdbs-map-path', + help = "path to mapfile containing produced parmdb files" ), - 'sky_path': ingredient.StringField( - '--sky-path', - help = "path to mapfile containing produced sky files" + 'sourcedb_map_path': ingredient.StringField( + '--sourcedb-map-path', + help = "path to mapfile containing produced sourcedb files" ), - } outputs = { - 'sky_path': ingredient.FileField(), - 'parmdbs_path': ingredient.FileField() + 'sourcedb_map_path': ingredient.FileField( + help = "On succes contains path to mapfile containing produced " + "sourcedb files"), + 'parmdbs_map_path': ingredient.FileField( + help = "On succes contains path to mapfile containing produced" + "parmdb files") } - def __init__(self): super(imager_create_dbs, self).__init__() @@ -126,54 +138,55 @@ class imager_create_dbs(BaseRecipe, RemoteCommandRecipeMixIn): # Get the input data slice_paths_map = load_data_map(self.inputs["slice_paths_mapfile"]) input_map = load_data_map(self.inputs['args'][0]) + try: + if not validate_data_maps(slice_paths_map, input_map): + raise PipelineException("Mapfile Validation failed") + except (PipelineException, AssertionError), e : + self.logger.error(str(e)) + self.logger.error("Incorrect data specification:") + self.logger.error("The supplied input datamaps are {0} and {1}".format( + self.inputs["slice_paths_mapfile"], self.inputs['args'][0])) + self.logger.error("content input_map:") + self.logger.error(input_map) + self.logger.error("content slice_paths_map:") + self.logger.error(slice_paths_map) + return 1 # Compile the command to be executed on the remote machine node_command = " python %s" % (self.__file__.replace("master", "nodes")) # create jobs jobs = [] for (input_ms, slice_paths) in zip(input_map, slice_paths_map): - self.logger.info(slice_paths) host_ms, concatenated_measurement_set = input_ms host_slice, slice_paths = slice_paths - - #Check if inputs are correct. - if host_ms != host_slice: - self.logger.error("Host found in the timeslice and input ms are different") - self.logger.error(input_map) - self.logger.error(slice_paths_map) - return 1 - - host_slice_map = [("paths", slice_paths)] host = host_ms - #Create the parameters depending on the input_map + # Create the parameters depending on the input_map sourcedb_target_path = os.path.join( concatenated_measurement_set + self.inputs["sourcedb_suffix"]) - #The actual call for the node script - arguments = [ concatenated_measurement_set, sourcedb_target_path, + # The actual call for the node script + arguments = [concatenated_measurement_set, sourcedb_target_path, monetdb_hostname, monetdb_port, monetdb_name, monetdb_user, monetdb_password, assoc_theta, parmdb_executable, slice_paths, parmdb_suffix, init_script, working_directory, makesourcedb_path, source_list_path] jobs.append(ComputeJob(host, node_command, arguments)) - - # Hand over the job(s) to the pipeline scheduler self._schedule_jobs(jobs) - #Collect the output of the node scripts write to (map) files - sky_files = [] + # Collect the output of the node scripts write to (map) files + sourcedb_files = [] parmdbs = [] - # now parse the node output append to list for job in jobs: host = job.host - if job.results.has_key("sky"): - sky_files.append((host, job.results["sky"])) + if job.results.has_key("sourcedb"): + sourcedb_files.append((host, job.results["sourcedb"])) else: - self.logger.warn("Warning failed ImagerCreateDBs run detected: No " - "skymap file created, {0} continue".format(host)) + self.logger.warn("Warning failed ImagerCreateDBs run " + "detected: No sourcedb file created, {0} continue".format( + host)) if job.results.has_key("parmdbms"): parmdbs.append((host, job.results["parmdbms"])) @@ -181,13 +194,24 @@ class imager_create_dbs(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.warn("Failed ImagerCreateDBs run detected: No " "parmdbms created{0} continue".format(host)) + # Fail if none of the nodes returned all data + if len(sourcedb_files) == 0 or len(parmdbs) == 0: + self.logger.error("The creation of dbs on the nodes failed:") + self.logger.error("Not a single node produces all needed data") + self.logger.error("products. sourcedb_files: {0} ".format( + sourcedb_files)) + self.logger.error("parameter dbs: {0}".format(parmdbs)) + return 1 + # write the mapfiles - store_data_map(self.inputs["sky_path"], sky_files) - store_data_map(self.inputs["parmdbs_path"], parmdbs) + store_data_map(self.inputs["sourcedb_map_path"], sourcedb_files) + store_data_map(self.inputs["parmdbs_map_path"], parmdbs) + self.logger.debug("Wrote sourcedb dataproducts: {0} \n {1}".format( + self.inputs["sourcedb_map_path"], self.inputs["parmdbs_map_path"])) # Set the outputs - self.outputs['sky_path'] = self.inputs["sky_path"] - self.outputs['parmdbs_path'] = self.inputs["parmdbs_path"] + self.outputs['sourcedb_map_path'] = self.inputs["sourcedb_map_path"] + self.outputs['parmdbs_map_path'] = self.inputs["parmdbs_map_path"] return 0 diff --git a/CEP/Pipeline/recipes/sip/master/imager_finalize.py b/CEP/Pipeline/recipes/sip/master/imager_finalize.py index 34917d0bc0e..bc03b4e4166 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_finalize.py +++ b/CEP/Pipeline/recipes/sip/master/imager_finalize.py @@ -65,16 +65,6 @@ class imager_finalize(BaseRecipe, RemoteCommandRecipeMixIn): def go(self): super(imager_finalize, self).go() - # debug - self.logger.info(self.inputs["awimager_output_map"]) - self.logger.info(self.inputs["raw_ms_per_image_map"]) - self.logger.info(self.inputs["sourcelist_map"]) - self.logger.info(self.inputs["target_mapfile"]) - self.logger.info(self.inputs["output_image_mapfile"]) - - self.logger.info(self.inputs["minbaseline"]) - self.logger.info(self.inputs["maxbaseline"]) - # debug awimager_output_map = load_data_map(self.inputs["awimager_output_map"]) raw_ms_per_image_map = load_data_map(self.inputs["raw_ms_per_image_map"]) @@ -83,37 +73,45 @@ class imager_finalize(BaseRecipe, RemoteCommandRecipeMixIn): output_image_mapfile = load_data_map(self.inputs["output_image_mapfile"]) processed_ms_dir = self.inputs["processed_ms_dir"] fillRootImageGroup_exec = self.inputs["fillrootimagegroup_exec"] - #chech validity of the maps: all on same node with the same length - validate_data_maps(awimager_output_map, raw_ms_per_image_map, - sourcelist_map, target_mapfile, output_image_mapfile) - # debug - self.logger.info(awimager_output_map) - self.logger.info(raw_ms_per_image_map) - self.logger.info(sourcelist_map) - self.logger.info(target_mapfile) - self.logger.info(output_image_mapfile) - # debug + # chech validity of the maps: all on same node with the same length + if not validate_data_maps(awimager_output_map, raw_ms_per_image_map, + sourcelist_map, target_mapfile, output_image_mapfile): + self.logger.error("The suplied datamaps for the imager_finalize" + "are incorrect.") + self.logger.error("awimager_output_map: {0}".format( + awimager_output_map)) + self.logger.error("raw_ms_per_image_map: {0}".format( + raw_ms_per_image_map)) + self.logger.error("sourcelist_map: {0}".format( + sourcelist_map)) + self.logger.error("target_mapfile: {0}".format( + target_mapfile)) + self.logger.error("output_image_mapfile: {0}".format( + output_image_mapfile)) + return 1 + nodeCommand = " python %s" % (self.__file__.replace("master", "nodes")) jobs = [] for (awimager_output_pair, raw_ms_per_image_pair, sourcelist_pair, target_pair, output_image_pair) in zip( awimager_output_map, raw_ms_per_image_map, sourcelist_map, target_mapfile, output_image_mapfile): + # collect the data (host, awimager_output) = awimager_output_pair (host, raw_ms_per_image) = raw_ms_per_image_pair (host, sourcelist) = sourcelist_pair (host, target) = target_pair (host, output_image) = output_image_pair - arguments = [awimager_output, raw_ms_per_image, sourcelist, target, output_image, self.inputs["minbaseline"], self.inputs["maxbaseline"], processed_ms_dir, fillRootImageGroup_exec] self.logger.info(arguments) jobs.append(ComputeJob(host, nodeCommand, arguments)) - self._schedule_jobs(jobs) + + #TODO: validatable output data?? return 0 diff --git a/CEP/Pipeline/recipes/sip/master/imager_prepare.py b/CEP/Pipeline/recipes/sip/master/imager_prepare.py index 2d9a1df200d..213ec78fe0e 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/master/imager_prepare.py @@ -163,9 +163,12 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): n_subband_groups = len(output_map) for idx_sb_group, (host, output_measurement_set) in enumerate(output_map): #create the input files for this node + self.logger.debug("Creating input data subset for processing" + "on: {0}".format(host)) inputs_for_image_mapfile_path = self._create_input_map_for_subband_group( slices_per_image, n_subband_groups, subbands_per_image, idx_sb_group, input_map) + #save the (input) ms, as a list of inputs_for_image_mapfile_path_list.append((host, inputs_for_image_mapfile_path)) @@ -189,7 +192,8 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): slices = [] if self.error.isSet(): #if one of the nodes failed self.logger.warn("Failed prepare_imager run detected: Generating " - "new output_ms_mapfile_path without failed runs!") + "new output_ms_mapfile_path without failed runs:" + " {0}".format(output_ms_mapfile_path)) concatenated_timeslices = [] #scan the return dict for completed key for ((host, output_measurement_set), job) in zip(output_map, jobs): @@ -202,16 +206,29 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): else: self.logger.warn("Failed run on {0}. NOT Created: {1} ".format( host, output_measurement_set)) + if len(concatenated_timeslices) == 0: + self.logger.error("None of the started compute node finished:" + "The current recipe produced no output, aborting") + return 1 + store_data_map(output_ms_mapfile_path, concatenated_timeslices) + self.logger.debug( + "Wrote target mapfile: {0}".format(output_ms_mapfile_path)) else: #Copy output map from input output_ms_mapfile_path and return store_data_map(output_ms_mapfile_path, output_map) for ((host, output_measurement_set), job) in zip(output_map, jobs): if job.results.has_key("time_slices"): slices.append((host, job.results["time_slices"])) + store_data_map(output_slices_mapfile_path, slices) + self.logger.debug( + "Wrote Time_slice mapfile: {0}".format(output_ms_mapfile_path)) store_data_map(self.inputs["raw_ms_per_image_mapfile"], inputs_for_image_mapfile_path_list) + self.logger.debug( + "Wrote mapfile containing (raw) input ms: {0}".format( + self.inputs["raw_ms_per_image_mapfile"])) # Set the outputs self.outputs['mapfile'] = self.inputs["mapfile"] self.outputs['slices_mapfile'] = self.inputs["slices_mapfile"] @@ -247,6 +264,8 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): inputs_for_image_mapfile_path = os.path.join( job_directory, "mapfiles", "ms_per_image_{0}".format(idx_sb_group)) + self.logger.debug("Storing inputmap on location: {0}".format( + inputs_for_image_mapfile_path)) store_data_map(inputs_for_image_mapfile_path, inputs_for_image) return inputs_for_image_mapfile_path diff --git a/CEP/Pipeline/recipes/sip/master/imager_source_finding.py b/CEP/Pipeline/recipes/sip/master/imager_source_finding.py index dc4e5d14bd3..b9f6c67af22 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_source_finding.py +++ b/CEP/Pipeline/recipes/sip/master/imager_source_finding.py @@ -52,6 +52,7 @@ class imager_source_finding(BaseRecipe, RemoteCommandRecipeMixIn): # TODO FIXME: This output path will be, in the testing phase a # subdirectory of the actual output image. + # This might be a bug: dunno image_output_path = os.path.join( self.inputs["working_directory"], "bdsm_output.img" ) @@ -78,6 +79,8 @@ class imager_source_finding(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.info(created_sourcelists) store_data_map(self.inputs['mapfile'], created_sourcelists) + self.logger.debug("Wrote datamap with created sourcelists: {0}".format( + created_sourcelists)) self.outputs["mapfile"] = self.inputs['mapfile'] diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py b/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py index 4ee3b54abbc..5956bf8b272 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py @@ -14,9 +14,7 @@ # klijn@astron.nl # ----------------------------------------------------------------------------- from __future__ import with_statement -import os import sys -import tempfile import shutil import os.path import math @@ -29,7 +27,6 @@ from lofarpipe.support.utilities import get_parset from lofarpipe.support.utilities import read_initscript from lofarpipe.support.utilities import catch_segfaults from lofarpipe.support.lofarexceptions import PipelineException -from lofar.parameterset import parameterset #@UnresolvedImport import pyrap.tables as pt #@UnresolvedImport from subprocess import CalledProcessError from lofarpipe.support.utilities import create_directory @@ -44,7 +41,6 @@ class imager_awimager(LOFARnodeTCP): self.logger.info("Start imager_awimager run: client") log4CPlusName = "imager_awimager" with log_time(self.logger): - size_converter = 4.0 #TODO debugging tool scale the image and cellsize to allow quicker running of the awimager # Calculate awimager parameters that depend on measurement set cell_size, npix, w_max, w_proj_planes = \ @@ -57,6 +53,8 @@ class imager_awimager(LOFARnodeTCP): # if it not exists image_path_head = os.path.dirname(output_image) create_directory(image_path_head) + self.logger.debug("Created directory to place awimager output" + " files: {0}".format(image_path_head)) mask_file_path = self._create_mask(npix, cell_size, output_image, concatenated_measurement_set, init_script, executable, @@ -72,7 +70,7 @@ class imager_awimager(LOFARnodeTCP): 'wprojplanes':str(w_proj_planes), 'image':str(output_image), 'maxsupport':str(npix), - #'mask':str(mask_file_path), #TODO REINTRODUCE MASK + #'mask':str(mask_file_path), #TODO REINTRODUCE MASK, excluded to speed up in this debug stage } # save the parset at the target dir for the image @@ -80,6 +78,8 @@ class imager_awimager(LOFARnodeTCP): calculated_parset_path = os.path.join(image_path_head, "parset.par") shutil.copy(temp_parset_filename, calculated_parset_path) os.unlink(temp_parset_filename) + self.logger.debug("Wrote parset for awimager run: {0}".format( + calculated_parset_path)) # The command and parameters to be run cmd = [executable, calculated_parset_path] @@ -107,7 +107,8 @@ class imager_awimager(LOFARnodeTCP): def _create_mask(self, npix, cell_size, output_image, concatenated_measurement_set, init_script, executable, - working_directory, log4CPlusName, sourcedb_path, mask_patch_size, image_path_image_cycle): + working_directory, log4CPlusName, sourcedb_path, + mask_patch_size, image_path_image_cycle): """ _create_mask creates a casa image containing an mask blocking out the sources in the provided sourcedb. @@ -131,6 +132,8 @@ class imager_awimager(LOFARnodeTCP): mask_parset = Parset.fromDict(mask_patch_dictionary) mask_parset_path = os.path.join(image_path_image_cycle, "mask.par") mask_parset.writeFile(mask_parset_path) + self.logger.debug("Write parset for awimager mask creation: {0}".format( + mask_parset_path)) # The command and parameters to be run cmd = [executable, mask_parset_path] @@ -151,9 +154,11 @@ class imager_awimager(LOFARnodeTCP): self.logger.error(str(e)) return 1 - self.logger.info("mask_patch_size: {0}".format(mask_patch_size)) + self.logger.debug("Started mask creation using mask_patch_size:" + " {0}".format(mask_patch_size)) # create the actual mask self._msss_mask(mask_file_path, sourcedb_path, mask_patch_size) + self.logger.debug("Fished mask creation") return mask_file_path def _msss_mask(self, mask_file_path, sourcedb_path, mask_patch_size = 1.0): @@ -365,7 +370,8 @@ class imager_awimager(LOFARnodeTCP): '{0} where sumsqr(UVW[:2]) <{1} giving as memory]))'.format(\ measurement_set, baseline_limit * baseline_limit))[0] #ask ger van diepen for details if ness. - + self.logger.debug("Calculated maximum baseline: {0}".format( + max_baseline)) t = pt.table(measurement_set) t1 = pt.table(t.getkeyword("SPECTRAL_WINDOW")) freq = t1.getcell("REF_FREQUENCY", 0) @@ -373,11 +379,14 @@ class imager_awimager(LOFARnodeTCP): t1.close() cell_size = (1.0 / 3) * (waveLength / float(max_baseline)) * arc_sec_in_rad + self.logger.debug("Calculated cellsize baseline: {0}".format( + cell_size)) # Calculate the number of pixels in x and y dim # fov and diameter depending on the antenna name fov, station_diameter = self._field_of_view_and_station_diameter(measurement_set) - + self.logger.debug("Calculated fov and station diameter baseline:" + " {0} , {1}".format(fov, station_diameter)) npix = (arc_sec_in_degree * fov) / cell_size npix = self._nearest_ceiled_power2(npix) @@ -390,6 +399,9 @@ class imager_awimager(LOFARnodeTCP): # Calculate number of projection planes w_proj_planes = (max_baseline * waveLength) / (station_diameter ** 2) w_proj_planes = int(round(w_proj_planes)) + self.logger.debug("Calculated w_max and the number pf projection plances:" + " {0} , {1}".format(w_max, w_proj_planes)) + if w_proj_planes > 511: raise Exception("The number of projections planes for the current" + "measurement set is to large.") #FIXME: Ask george diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_bbs.py b/CEP/Pipeline/recipes/sip/nodes/imager_bbs.py index d3328f53ff7..126eb2e694f 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_bbs.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_bbs.py @@ -1,29 +1,32 @@ # LOFAR AUTOMATIC IMAGING PIPELINE -# create dbs +# imager_bbs # Wouter Klijn 2012 # klijn@astron.nl # ----------------------------------------------------------------------------- from __future__ import with_statement - import sys import time import subprocess + from lofarpipe.support.lofarnode import LOFARnodeTCP from lofarpipe.support.pipelinelogging import log_process_output from lofarpipe.support.group_data import load_data_map class imager_bbs(LOFARnodeTCP): + """ + imager_bbs node performs a bbs based on the supplied parset it is a shallow + wrapper around bbs + It starts bbs on a new subprocess and logs the output aborting on failure + """ def run(self, bbs_executable, parset, ms_list_path, parmdb_list_path, sky_list_path): - """ - """ - - #read in the mapfiles to data maps + # read in the mapfiles to data maps: The master recipe added the single + # path to a mapfilem which allows usage of default data methods (load_data_map) node, ms_list = load_data_map(ms_list_path)[0] node, parmdb_list = load_data_map(parmdb_list_path)[0] node, sky_list = load_data_map(sky_list_path)[0] - self.logger.info("Starting imager_bbs Node") + self.logger.debug("Starting imager_bbs Node") try: process_list = [] # ***************************************************************** diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py b/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py index 043cf5f3808..9b60aecd87c 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py @@ -1,19 +1,5 @@ # LOFAR AUTOMATIC IMAGING PIPELINE -# create dbs -# The create dbs recipe is responcible for settings up database -# for subsequenty imaging steps. It creates two databases in three steps -# 1. sourcedb. -# On the first major imaging cycle filled by the gsm. sourcefinding in the -# in the later steps sources found are append to the current list -# Current patch of the sky. It is filled with an initial started set of -# sources created by the Global Sky Model (GSM). -# The GSM does not create a sourceDB. It creates a text file which is con- -# sumed by makesourcedb resulting in a sourceDB (casa table) -# There is a single sourcedb for a measurement set -# 2. parmdb -# Each individual timeslice needs a place to collect parameters: This is -# done in the paramdb. -# +# imager_create_dbs (node) # Wouter Klijn 2012 # klijn@astron.nl # ----------------------------------------------------------------------------- @@ -21,7 +7,6 @@ from __future__ import with_statement import sys import subprocess import math -import tempfile import shutil import pyrap.tables as pt #@UnresolvedImport import os @@ -33,7 +18,7 @@ from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.utilities import read_initscript from lofarpipe.support.utilities import catch_segfaults import monetdb.sql as db -import lofar.gsm.gsmutils as gsm #@UnresolvedImport +import lofar.gsm.gsmutils as gsm #@UnresolvedImport #TODO: A better place for this template template_parmdb = """ @@ -51,14 +36,28 @@ quit """ class imager_create_dbs(LOFARnodeTCP): + """ + create dbs + The create dbs recipe is responcible for settings up database + for subsequenty imaging steps. It creates two databases in three steps + 1. sourcedb. + On the first major imaging cycle filled by the gsm. sourcefinding in the + in the later steps sources found are append to the current list + Current patch of the sky. It is filled with an initial started set of + sources created by the Global Sky Model (GSM). + The GSM does not create a sourceDB. It creates a text file which is con- + sumed by makesourcedb resulting in a sourceDB (casa table) + There is a single sourcedb for a measurement set + 2. parmdb + Each individual timeslice needs a place to collect parameters: This is + done in the paramdb. + """ def run(self, concatenated_measurement_set, sourcedb_target_path, monet_db_hostname, monet_db_port, monet_db_name, monet_db_user, monet_db_password, assoc_theta, parmdb_executable, slice_paths, parmdb_suffix, init_script, working_directory, makesourcedb_path, source_list_path_extern): - """ - - """ + self.logger.info("Starting imager_create_dbs Node") # If a (local) sourcelist is received use it else @@ -66,9 +65,10 @@ class imager_create_dbs(LOFARnodeTCP): if source_list_path_extern == "": #create a temporary file to contain the skymap source_list = sourcedb_target_path + ".temp" - if self._get_sky_model(concatenated_measurement_set, - source_list, monet_db_hostname, monet_db_port, - monet_db_name, monet_db_user, monet_db_password, assoc_theta): + if self._fill_soucelist_based_on_gsm_sky_model( + concatenated_measurement_set, + source_list, monet_db_hostname, monet_db_port, + monet_db_name, monet_db_user, monet_db_password, assoc_theta): self.logger.error("failed creating skymodel") return 1 append = False @@ -82,7 +82,8 @@ class imager_create_dbs(LOFARnodeTCP): makesourcedb_path, append): self.logger.error("failed creating sourcedb") return 1 - self.outputs["sky"] = sourcedb_target_path + + self.outputs["sourcedb"] = sourcedb_target_path if self._create_parmdb_for_timeslices(parmdb_executable, slice_paths, parmdb_suffix): @@ -100,17 +101,15 @@ class imager_create_dbs(LOFARnodeTCP): """ #remove existing sourcedb if not appending if (append == False) and os.path.isdir(sourcedb_target_path): - self.logger.info("Removing existing sky model: {0}".format( - sourcedb_target_path)) shutil.rmtree(sourcedb_target_path) - + self.logger.debug("Removed existing sky model: {0}".format( + sourcedb_target_path)) # The command and parameters to be run cmd = [executable, "in={0}".format(temp_sky_path), "out={0}".format(sourcedb_target_path), - "format=<", # format according to Ger - # Always set append flag: no effect on non exist db - "append=true"] + "format=<", # format according to Ger van Diepen + "append=true"] # Always set append flag: no effect on non exist db try: environment = read_initscript(self.logger, init_script) @@ -121,14 +120,6 @@ class imager_create_dbs(LOFARnodeTCP): catch_segfaults(cmd, working_directory, environment, logger, cleanup = None) - # Thrown by catch_segfault - except CalledProcessError, e: - self.logger.error("Execution of external failed:") - self.logger.error(" ".join(cmd)) - self.logger.error("exception details:") - self.logger.error(str(e)) - return 1 - except Exception, e: self.logger.error("Execution of external failed:") self.logger.error(" ".join(cmd)) @@ -289,7 +280,6 @@ class imager_create_dbs(LOFARnodeTCP): ra_and_decl = t1.getcell("PHASE_DIR", 0)[0] except Exception, e: - #catch all exceptions and log self.logger.error("Error loading FIELD/PHASE_DIR from " "measurementset {0} : {1}".format(measurement_set, @@ -305,34 +295,39 @@ class imager_create_dbs(LOFARnodeTCP): self.logger.error("returned PHASE_DIR data did not contain two values") return None - return (ra_and_decl[0], ra_and_decl[1]) - def _get_sky_model(self, measurement_set, path_output_skymap, + def _fill_soucelist_based_on_gsm_sky_model(self, measurement_set, sourcelist, monet_db_host, monet_db_port, monet_db_name, monet_db_user, monet_db_password, assoc_theta = None): """ Create a bbs sky model. Based on the measurement (set) suplied - The skymap is created at the path_output_skymap + The skymap is created at the sourcelist """ - # Create monetdb connection conn = self._create_monet_db_connection(monet_db_host, monet_db_name, monet_db_user, monet_db_password, monet_db_port) + self.logger.debug("Connected to monet db at: {0}:{1} {2}".format( + monet_db_host, monet_db_port, monet_db_name)) # get position of the target in the sky (ra_c, decl_c) = self._get_ra_and_decl_from_ms(measurement_set) + self.logger.debug("ra and dec from measurement set: {0}, {1}".format( + ra_c, decl_c)) # Get the Fov: sources in this fov should be included in the skumodel fov_radius = self._field_of_view(measurement_set) + self.logger.debug("Using the folowing calculated field of view: {0}".format( + fov_radius)) # !!magic constant!! This value is calculated based on # communications with Bart Sheers if assoc_theta == None: assoc_theta = 90.0 / 3600 try: + # Transform the ra and decl to rad ra_c = float(ra_c) * (180 / math.pi) if ra_c < 0: #gsm utils break when using negative ra_c ergo add 360 ra_c += 360.0 @@ -340,7 +335,7 @@ class imager_create_dbs(LOFARnodeTCP): gsm.expected_fluxes_in_fov(conn, ra_c , decl_c, float(fov_radius), - float(assoc_theta), path_output_skymap, + float(assoc_theta), sourcelist, storespectraplots = False) except Exception, e: self.logger.error("expected_fluxes_in_fov raise exception: " + diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_finalize.py b/CEP/Pipeline/recipes/sip/nodes/imager_finalize.py index 1b3b762ddd5..4d55efe5e70 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_finalize.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_finalize.py @@ -45,15 +45,14 @@ class imager_finalize(LOFARnodeTCP): # save the path processed_ms_paths.append(os.path.join(processed_ms_dir, raw_ms_file_name)) - error = None #@UnusedVariable #add the information the image try: addimg.addImagingInfo (awimager_output, processed_ms_paths, sourcelist, minbaseline, maxbaseline) - except Exception as error: - self.logger.info("addImagingInfo Threw Exception:") - self.logger.info(error) + except Exception, error: + self.logger.error("addImagingInfo Threw Exception:") + self.logger.error(error) # Catch raising of already done error: allows for rerunning # of the recipe if "addImagingInfo already done" in str(error): @@ -71,7 +70,7 @@ class imager_finalize(LOFARnodeTCP): #im.saveas(output_image, hdf5 = True) # TODO: HDF5 version of PIM is different to the system version # dunno the solution: the script breaks. - except Exception as error: + except Exception, error: self.logger.error( "Exception raised inside pyrap.images: {0}".format(str(error))) raise Exception(str(error)) diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py index 8fde480f1d8..53d1407e48a 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py @@ -23,6 +23,7 @@ import pyrap.tables as pt from lofarpipe.support.utilities import create_directory from lofarpipe.support.group_data import load_data_map from argparse import ArgumentError +from lofarpipe.support.lofarexceptions import PipelineException # Some constant settings for the recipe time_slice_dir_name = "time_slices" @@ -38,7 +39,7 @@ class SubProcessGroup(object): self.logger = logger - def run(self, cmd_in): + def run(self, cmd_in, unsave = False): """ Add the cmd as a subprocess to the current group: The process is started! @@ -62,6 +63,16 @@ class SubProcessGroup(object): stderr = subprocess.PIPE) # save the process self.process_group.append((cmd, process)) + + # TODO: SubProcessGroup could saturate a system with to much + # concurent calss: artifical limit to 20 subprocesses + if not unsave and (len(self.process_group) > 20): + self.logger.error("Subprocessgroup could hang with more" + "then 20 concurent calls, call with unsave = True to run" + "with more than 20 subprocesses") + raise PipelineException("Subprocessgroup could hang with more" + "then 20 concurent calls. Aborting") + if self.logger == None: print "Subprocess started: {0}".format(cmd) else: @@ -137,6 +148,8 @@ class imager_prepare(LOFARnodeTCP): os.unlink(os.path.join(root, f)) for d in dirs: shutil.rmtree(os.path.join(root, d)) + self.logger.debug("Created directory: {0}".format(time_slice_dir)) + self.logger.debug("and assured it is empty") #****************************************************************** #Copy the input files (caching included for testing purpose) @@ -154,7 +167,7 @@ class imager_prepare(LOFARnodeTCP): input_map, subbands_per_group, processed_ms_dir, parset, ndppp_executable, init_script) - self.logger.info(time_slices) + self.logger.debug("Produced time slices: {0}".format(time_slices)) #*********************************************************** # run rfi_concole: flag datapoints which are corrupted self._run_rficonsole(rficonsole_executable, time_slice_dir, @@ -166,6 +179,7 @@ class imager_prepare(LOFARnodeTCP): # ndppp_executable fails if not present for ms in time_slices: pt.addImagingColumns(ms) #@UndefinedVariable + self.logger.debug("Added imaging columns to ms: {0}".format(ms)) group_measurement_filtered = self._filter_bad_stations( time_slices, asciistat_executable, @@ -201,6 +215,8 @@ class imager_prepare(LOFARnodeTCP): fp = open(temp_missing, 'w') fp.write(repr(missing_files)) + self.logger.debug( + "Wrote file with missing measurement sets: {0}".format(temp_missing)) fp.close() else: fp = open(temp_missing) @@ -225,7 +241,7 @@ class imager_prepare(LOFARnodeTCP): command = ["rsync", "-r", "{0}:{1}".format(node, path) , "{0}".format(processed_ms_dir)] - self.logger.info(" ".join(command)) + self.logger.debug("executing: " + " ".join(command)) #Spawn a subprocess and connect the pipes copy_process = subprocess.Popen( command, @@ -240,9 +256,9 @@ class imager_prepare(LOFARnodeTCP): #if copy failed log the missing file if exit_status != 0: missing_files.append(path) - self.logger.info("Failed loading file: {0}".format(path)) - self.logger.info(stderrdata) - self.logger.info(stdoutdata) + self.logger.warning("Failed loading file: {0}".format(path)) + self.logger.warning(stderrdata) + self.logger.debug(stdoutdata) # return the missing files (for 'logging') return set(missing_files) @@ -257,10 +273,7 @@ class imager_prepare(LOFARnodeTCP): Call with log for cplus and catch segfaults. Actual parameters are located in temp_parset_filename """ - # TODO: function is to long: refactor into smaller bits - time_slice_path_collected = [] - for idx_time_slice in range(slices_per_image): # Get the subset of ms that are part of the current timeslice input_map_subgroup = \ @@ -294,6 +307,8 @@ class imager_prepare(LOFARnodeTCP): nddd_parset_path = time_slice_path + ".ndppp.par" temp_parset_filename = patch_parset(parset, patchDictionary) shutil.copy(temp_parset_filename, nddd_parset_path) + self.logger.debug("Wrote a ndppp parset with runtime variables:" + " {0}".format(nddd_parset_path)) os.unlink(temp_parset_filename) except Exception, e: @@ -304,7 +319,6 @@ class imager_prepare(LOFARnodeTCP): #run ndppp cmd = [ndppp, nddd_parset_path] - try: environment = read_initscript(self.logger, init_script) with CatchLog4CPlus(working_dir, self.logger.name + @@ -320,7 +334,6 @@ class imager_prepare(LOFARnodeTCP): self.logger.error(str(e)) return 1 - return time_slice_path_collected @@ -332,7 +345,9 @@ class imager_prepare(LOFARnodeTCP): """ pt.msconcat(group_measurements_collected, #@UndefinedVariable output_file_path, concatTime = True) - + self.logger.debug("Concatenated the files: {0} into the single measure" + "mentset: {1}".format( + ", ".join(group_measurements_collected), output_file_path)) def _run_rficonsole(self, rficonsole_executable, time_slice_dir, group_measurements_collected): @@ -351,7 +366,8 @@ class imager_prepare(LOFARnodeTCP): self.logger.info(group_set) command = [rficonsole_executable, "-indirect-read", group_set] - self.logger.info(command) + self.logger.info("executing rficonsole command: {0}".format( + " ".join(command))) #Spawn a subprocess and connect the pipes copy_process = subprocess.Popen( command, @@ -367,8 +383,8 @@ class imager_prepare(LOFARnodeTCP): (stdoutdata, stderrdata) = proc.communicate() #if copy failed log the missing file if proc.returncode != 0: - self.logger.info(stdoutdata) - self.logger.info(stderrdata) + self.logger.error(stdoutdata) + self.logger.error(stderrdata) raise Exception("Error running rficonsole:") else: @@ -391,6 +407,8 @@ class imager_prepare(LOFARnodeTCP): http://www.lofar.org/wiki/lib/exe/fetch.php?media=msss:pandeymartinez-week9-v1p2.pdf """ # run asciistat to collect statistics about the ms + self.logger.info("Filtering bad stations") + self.logger.debug("Collecting statistical properties of input data") asciistat_output = [] asciistat_proc_group = SubProcessGroup(self.logger) for ms in group_measurements_collected: @@ -406,6 +424,7 @@ class imager_prepare(LOFARnodeTCP): raise Exception("an ASCIIStats run failed!") # Determine the station to remove + self.logger.debug("Select bad stations depending on collected stats") asciiplot_output = [] asciiplot_proc_group = SubProcessGroup(self.logger) for (ms, output_dir) in asciistat_output: @@ -420,6 +439,7 @@ class imager_prepare(LOFARnodeTCP): raise Exception("an ASCIIplot run failed!") #remove the bad stations + self.logger.debug("Use ms select to remove bad stations") msselect_output = {} msselect_proc_group = SubProcessGroup(self.logger) for ms, ms_stats in asciiplot_output: diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py b/CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py index fdc31a4c06e..cd097937d30 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py @@ -16,14 +16,7 @@ class imager_source_finding(LOFARnodeTCP): """ def run(self, input_image, bdsm_parameter_run1_path, bdsm_parameter_run2x_path, catalog_output_path, image_output_path): - self.logger.info("Starting imager_source_finding") -# self.logger.debug("input_image = %s" % input_image) -# self.logger.debug("bdsm_parameter_run1_path = %s" % bdsm_parameter_run1_path) -# self.logger.debug("bdsm_parameter_run2x_path = %s" % bdsm_parameter_run2x_path) -# self.logger.debug("catalog_output_path = %s" % catalog_output_path) -# self.logger.debug("image_output_path = %s" % image_output_path) - # default frequency is None (read from image), save for later cycles frequency = None number_of_sourcefind_itterations = None @@ -53,9 +46,10 @@ class imager_source_finding(LOFARnodeTCP): pass #do nothing bdsm_parameters[key] = parameter_value - self.logger.info( - "Starting sourcefinder bdsm on {0} using parameters:".format(input_image_local)) - self.logger.info(repr(bdsm_parameters)) + self.logger.debug( + "Starting sourcefinder bdsm on {0} using parameters:".format( + input_image_local)) + self.logger.debug(repr(bdsm_parameters)) img = bdsm.process_image(bdsm_parameters, filename = input_image_local, frequency = frequency) @@ -63,21 +57,27 @@ class imager_source_finding(LOFARnodeTCP): # If no more matching of sources with gausians is possible (nsrc==0) # break the loop if img.nsrc == 0: - self.logger.info("No sources found: exiting") + self.logger.debug("No sources found: exiting") number_of_sourcefind_itterations = idx break else: # We have at least found a single source! - self.logger.info("Number of source found: {0}".format(img.nsrc)) + self.logger.debug("Number of source found: {0}".format(img.nsrc)) sources_found = True #export the catalog and the image with gausians substracted img.write_catalog(outfile = catalog_output_path + "_{0}".format(str(idx)), catalog_type = 'gaul', clobber = True, format = "bbs") + + self.logger.debug("Wrote list of sources to file at: {0})".format( + catalog_output_path)) img.export_image(outfile = image_output_path_local, img_type = 'gaus_resid', clobber = True, img_format = "fits") + self.logger.debug("Wrote fits image with substracted sources" + " at: {0})".format(catalog_output_path)) + #img does not have close() # Save the frequency from image header of the original input file, # This information is not written by pybdsm to the exported image @@ -91,7 +91,7 @@ class imager_source_finding(LOFARnodeTCP): # Call with the number of loops and the path to the files, only combine # if we found sources if sources_found: - self.logger.info( + self.logger.debug( "Writing source list to file: %s" % catalog_output_path ) self._combine_source_lists(number_of_sourcefind_itterations, @@ -151,6 +151,8 @@ class imager_source_finding(LOFARnodeTCP): fp.write("\n") fp.close() + self.logger.debug("Wrote concatenated sourcelist to: {0}".format( + catalog_output_path)) if __name__ == "__main__": -- GitLab