From 17f1159cc3d58bca48b20eb8f25f0702c88f95a3 Mon Sep 17 00:00:00 2001 From: Wouter Klijn <klijn@astron.nl> Date: Thu, 18 Oct 2012 15:00:45 +0000 Subject: [PATCH] Task #3569: Merge from taskbranche. Delete temp files from /tmp World readable parsets Minor comment and code style fixes --- .../recipes/sip/bin/msss_imager_pipeline.py | 131 +++++++++--------- CEP/Pipeline/recipes/sip/master/new_bbs.py | 129 ++++++++++------- .../recipes/sip/nodes/imager_awimager.py | 102 ++++++++------ .../recipes/sip/nodes/imager_prepare.py | 84 ++++++----- 4 files changed, 249 insertions(+), 197 deletions(-) diff --git a/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py b/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py index 6e1fd0f0c63..534c0ca7928 100755 --- a/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py @@ -1,10 +1,12 @@ #!/usr/bin/env python -# LOFAR IMAGING PIPELINE +# LOFAR IMAGING PIPELINE # -# Imager Pipeline recipe -# Marcel Loose, 2012 -# loose@astron.nl -# ------------------------------------------------------------------------------ +# Imager Pipeline recipe +# Marcel Loose, 2012 +# loose@astron.nl +# Wouter Klijn, 2012 +# klijn@astron.nl +# ----------------------------------------------------------------------------- import os import sys @@ -21,20 +23,20 @@ from lofar.parameterset import parameterset class msss_imager_pipeline(control): """ The Automatic MSSS imager pipeline is used to generate MSSS images and find - sources in the generated images. Generated images and lists of found sources - are complemented with meta data and thus ready for consumption by the - Long Term Storage (LTA) + sources in the generated images. Generated images and lists of found + sources are complemented with meta data and thus ready for consumption by + the Long Term Storage (LTA) *subband groups* The imager_pipeline is able to generate images on the frequency range of - LOFAR in parallel. Combining the frequency subbands together in so called - subbandgroups. Each subband group will result in an image and sourcelist, - (typically 8, because ten subband groups are combined). + LOFAR in parallel. Combining the frequency subbands together in so called + subbandgroups. Each subband group will result in an image and sourcelist, + (typically 8, because ten subband groups are combined). *Time Slices* - MSSS images are compiled from a number of so-called (time) slices. Each slice - comprises a short (approx. 10 min) observation of a field (an area on the - sky) containing typically 80 subbands. The number of slices will be + MSSS images are compiled from a number of so-called (time) slices. Each + slice comprises a short (approx. 10 min) observation of a field (an area on + the sky) containing typically 80 subbands. The number of slices will be different for LBA observations (typically 9) and HBA observations (typically 2), due to differences in sensitivity. @@ -45,38 +47,39 @@ class msss_imager_pipeline(control): **This pipeline performs the following operations:** 1. Prepare Phase. Copy the preprocessed MS's from the different compute - nodes to the nodes where the images will be compiled (the prepare phase). + nodes to the nodes where the images will be compiled (the prepare phase) Combine the subbands in subband groups, concattenate the timeslice in a single large measurement set and perform flagging, RFI and bad station exclusion. 2. Create db. Generate a local sky model (LSM) from the global sky model (GSM) for the sources that are in the field-of-view (FoV). The LSM is stored as sourcedb. - In step 3 calibration of the measurement sets is performed on these - sources and in step 4 to create a mask for the awimager. The calibration + In step 3 calibration of the measurement sets is performed on these + sources and in step 4 to create a mask for the awimager. The calibration solution will be placed in an instrument table/db also created in this step. 3. BBS. Calibrate the measurement set with the sourcedb from the gsm. In later iterations sourced found in the created images will be added - to this list. Resulting in a selfcalibration cycle. - 4. Awimager. The combined measurement sets are now imaged. The imaging - is performed using a mask: The sources in the sourcedb are used to create - an casa image masking known sources. Together with the measurement set - an image is created. - 5. Sourcefinding. The images created in step 4 are fed to pyBDSM to find and - describe sources. In multiple itterations substracting the found sources, - all sources are collectedin a sourcelist. - Step I. The sources found in step 5 are fed back into step 2. This allows the - Measurement sets to be calibrated with sources currently found in the - image. This loop will continue until convergence (3 times for the time - being). - 6. Finalize. Meta data with regards to the input, computations performed and - results are collected an added to the casa image. The images created are - converted from casa to HDF5 and copied to the correct output location. + to this list. Resulting in a selfcalibration cycle. + 4. Awimager. The combined measurement sets are now imaged. The imaging + is performed using a mask: The sources in the sourcedb are used to + create an casa image masking known sources. Together with the + measurement set an image is created. + 5. Sourcefinding. The images created in step 4 are fed to pyBDSM to find + and describe sources. In multiple itterations substracting the found + sources, all sources are collectedin a sourcelist. + Step I. The sources found in step 5 are fed back into step 2. + This allows the Measurement sets to be calibrated with sources currently + found in the image. This loop will continue until convergence (3 times + for the time being). + 6. Finalize. Meta data with regards to the input, computations performed + and results are collected an added to the casa image. The images created + are converted from casa to HDF5 and copied to the correct output + location. 7. Export meta data: An outputfile with meta data is generated ready for consumption by the LTA and/or the LOFAR framework. - + **Per subband-group, the following output products will be delivered:** a. An image @@ -98,7 +101,6 @@ class msss_imager_pipeline(control): self.parset_dir = None self.mapfile_dir = None - def usage(self): """ Display usage information @@ -119,13 +121,12 @@ class msss_imager_pipeline(control): self.parset_feedback_file = parset_file + "_feedback" # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' - if not self.inputs.has_key('job_name'): + if not 'job_name' in self.inputs: self.inputs['job_name'] = ( os.path.splitext(os.path.basename(parset_file))[0] ) return super(msss_imager_pipeline, self).go() - def pipeline_logic(self): """ Define the individual tasks that comprise the current pipeline. @@ -155,7 +156,7 @@ class msss_imager_pipeline(control): # ********************************************************************* # (INPUT) Get the input from external sources and create pipeline types - # Input measure ment sets + # Input measure ment sets input_mapfile = os.path.join(self.mapfile_dir, "uvdata.mapfile") store_data_map(input_mapfile, self.input_data) self.logger.debug( @@ -174,7 +175,7 @@ class msss_imager_pipeline(control): "Wrote output sky-image mapfile: {0}".format(output_image_mapfile)) # ****************************************************************** - # (1) prepare phase: copy and collect the ms + # (1) prepare phase: copy and collect the ms concat_ms_map_path, timeslice_map_path, raw_ms_per_image_map_path, \ processed_ms_dir = self._prepare_phase(input_mapfile, target_mapfile, skip=False) @@ -196,15 +197,14 @@ class msss_imager_pipeline(control): bbs_output = self._bbs(timeslice_map_path, parmdbs_path, sourcedb_map_path, skip=False) - - # ****************************************************************** + # ***************************************************************** # (4) Get parameters awimager from the prepare_parset and inputs aw_image_mapfile, maxbaseline = self._aw_imager(concat_ms_map_path, idx_loop, sourcedb_map_path, skip=False) # ***************************************************************** - # (5) Source finding + # (5) Source finding sourcelist_map, found_sourcedb_path = self._source_finding( aw_image_mapfile, idx_loop, skip=False) #should the output be a sourcedb? instead of a sourcelist @@ -271,7 +271,7 @@ class msss_imager_pipeline(control): """ Perform the final step of the imager: Convert the output image to hdf5 and copy to output location - Collect meta data and add to the image + Collect meta data and add to the image """ placed_image_mapfile = self._write_datamap_to_file(None, @@ -317,7 +317,8 @@ class msss_imager_pipeline(control): sourcedb_map_path = self._write_datamap_to_file(None, "source_dbs_outputs", "Map to sourcedbs based in found sources") - # construct the location to save the output products of the sourcefinder + # construct the location to save the output products of the + # sourcefinder cycle_path = os.path.join(self.scratch_directory, "awimage_cycle_{0}".format(major_cycle)) catalog_path = os.path.join(cycle_path, "bdsm_catalog") @@ -340,31 +341,30 @@ class msss_imager_pipeline(control): return source_list_map, sourcedb_map_path - def _bbs(self, timeslice_map_path, parmdbs_map_path, sourcedb_map_path, skip=False): """ Perform a calibration step. First with a set of sources from the gsm and in later iterations also on the found sources """ - #create parset for bbs run + #create parset for bbs run parset = self.parset.makeSubset("BBS.") parset_path = self._write_parset_to_file(parset, "bbs", - "Parset for calibration on local sky model") + "Parset for calibration on local sky model") # create the output file path output_mapfile = self._write_datamap_to_file(None, "bbs_output", "Mapfile with calibrated measurement sets.") converted_sourcedb_map_path = self._write_datamap_to_file(None, - "source_db", "correctly shaped mapfile for input sourcedbs") + "source_db", "correctly shaped mapfile for input sourcedbs") if skip: return output_mapfile # The create db step produces a mapfile with a single sourcelist for # the different timeslices. Generate a mapfile with copies of the - # sourcelist location: This allows validation of maps in combination + # sourcelist location: This allows validation of maps in combination # get the original map data sourcedb_map = load_data_map(sourcedb_map_path) @@ -408,12 +408,17 @@ class msss_imager_pipeline(control): parset = self.parset.makeSubset("AWimager.") # Get maxbaseline from 'full' parset max_baseline = self.parset.getInt("Imaging.maxbaseline") - patch_dictionary = {"maxbaseline":str( + patch_dictionary = {"maxbaseline": str( max_baseline)} - temp_parset_filename = patch_parset(parset, patch_dictionary) - aw_image_parset = get_parset(temp_parset_filename) - aw_image_parset_path = self._write_parset_to_file(aw_image_parset, - "awimager_cycle_{0}".format(major_cycle), "Awimager recipe parset") + try: + temp_parset_filename = patch_parset(parset, patch_dictionary) + aw_image_parset = get_parset(temp_parset_filename) + aw_image_parset_path = self._write_parset_to_file(aw_image_parset, + "awimager_cycle_{0}".format(major_cycle), + "Awimager recipe parset") + finally: + # remove tempfile + os.remove(temp_parset_filename) # Create path to write the awimage files intermediate_image_path = os.path.join(self.scratch_directory, @@ -438,16 +443,15 @@ class msss_imager_pipeline(control): return output_mapfile, max_baseline - def _prepare_phase(self, input_ms_map_path, target_mapfile, skip=False): """ - Copy ms to correct location, combine the ms in slices and combine + Copy ms to correct location, combine the ms in slices and combine the time slices into a large virtual measurement set """ # Create the dir where found and processed ms are placed # raw_ms_per_image_map_path contains all the original ms locations: - # this list contains possible missing files + # this list contains possible missing files processed_ms_dir = os.path.join(self.scratch_directory, "subbands") # get the parameters, create a subset for ndppp, save @@ -459,7 +463,7 @@ class msss_imager_pipeline(control): #[1] output -> prepare_output output_mapfile = self._write_datamap_to_file(None, "prepare_output") time_slices_mapfile = self._write_datamap_to_file(None, - "prepare_time_slices") + "prepare_time_slices") raw_ms_per_image_mapfile = self._write_datamap_to_file(None, "raw_ms_per_image") @@ -505,11 +509,10 @@ class msss_imager_pipeline(control): return output_mapfile, time_slices_mapfile, raw_ms_per_image_mapfile, \ processed_ms_dir - def _create_dbs(self, input_map_path, timeslice_map_path, source_list="", skip_create_dbs=False): """ - Create for each of the concatenated input measurement sets + Create for each of the concatenated input measurement sets an instrument model and parmdb """ # Create the parameters set @@ -545,10 +548,9 @@ class msss_imager_pipeline(control): # TODO: Move these helpers to the parent class def _write_parset_to_file(self, parset, parset_name, message): """ - Write the suplied the suplied parameterset to the parameter set + Write the suplied the suplied parameterset to the parameter set directory in the jobs dir with the filename suplied in parset_name. Return the full path to the created file. - """ parset_dir = os.path.join( self.config.get("layout", "job_directory"), "parsets") @@ -566,12 +568,11 @@ class msss_imager_pipeline(control): return parset_path - def _write_datamap_to_file(self, datamap, mapfile_name, message=""): """ - Write the suplied the suplied map to the mapfile + Write the suplied the suplied map to the mapfile. directory in the jobs dir with the filename suplied in mapfile_name. - Return the full path to the created file. + Return the full path to the created file. Id supllied data is None then the file is touched if not existing, but existing files are kept as is """ @@ -596,10 +597,8 @@ class msss_imager_pipeline(control): self.logger.debug( "Touched mapfile <{0}>: {1}".format(mapfile_path, message)) - return mapfile_path if __name__ == '__main__': sys.exit(msss_imager_pipeline().main()) - diff --git a/CEP/Pipeline/recipes/sip/master/new_bbs.py b/CEP/Pipeline/recipes/sip/master/new_bbs.py index 4997c9a2ba8..02b99cb8076 100644 --- a/CEP/Pipeline/recipes/sip/master/new_bbs.py +++ b/CEP/Pipeline/recipes/sip/master/new_bbs.py @@ -1,9 +1,11 @@ -# LOFAR IMAGING PIPELINE +# LOFAR IMAGING PIPELINE # -# BBS (BlackBoard Selfcal) recipe -# John Swinbank, 2009-10 -# swinbank@transientskp.org -# ------------------------------------------------------------------------------ +# BBS (BlackBoard Selfcal) recipe +# John Swinbank, 2009-10 +# swinbank@transientskp.org +# Wouter Klijn, 2012 +# klijn@astron.nl +# ----------------------------------------------------------------------------- from __future__ import with_statement import subprocess @@ -20,7 +22,6 @@ from lofar.parameterset import parameterset from lofarpipe.support.baserecipe import BaseRecipe from lofarpipe.support.group_data import load_data_map, store_data_map from lofarpipe.support.group_data import validate_data_maps -from lofarpipe.support.lofarexceptions import PipelineException from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_process_output from lofarpipe.support.remotecommand import run_remote_command @@ -28,13 +29,14 @@ from lofarpipe.support.remotecommand import ComputeJob from lofarpipe.support.jobserver import job_server import lofarpipe.support.utilities as utilities import lofarpipe.support.lofaringredient as ingredient +from lofarpipe.support.utilities import create_directory class new_bbs(BaseRecipe): """ **This bbs recipe still uses the oldstyle bbs with global control** **New versions will have stand alone capability** - + The bbs recipe coordinates running BBS on a group of MeasurementSets. It runs both GlobalControl and KernelControl; as yet, SolverControl has not been integrated. @@ -111,7 +113,6 @@ class new_bbs(BaseRecipe): self.parset = parameterset() self.killswitch = threading.Event() - def _set_input(self, in_key, ps_key): """ Set the input-key `in_key` to the value of `ps_key` in the parset, if @@ -119,9 +120,8 @@ class new_bbs(BaseRecipe): """ try: self.inputs[in_key] = self.parset.getString(ps_key) - except RuntimeError, e: - self.logger.warn(str(e)) - + except RuntimeError, exceptionobject: + self.logger.warn(str(exceptionobject)) def _make_bbs_map(self): """ @@ -140,7 +140,7 @@ class new_bbs(BaseRecipe): '/data/scratch/loose/L29697/L29697_SAP000_SB000_uv.MS.instrument', '/data/scratch/loose/L29697/L29697_SAP000_SB000_uv.MS.sky') ) - + Returns `False` if validation of the three map-files fails, otherwise returns `True`. """ @@ -166,13 +166,12 @@ class new_bbs(BaseRecipe): return True - def go(self): self.logger.info("Starting BBS run") super(new_bbs, self).go() - # Check for relevant input parameters in the parset-file - # ---------------------------------------------------------------------- + # Check for relevant input parameters in the parset-file + # --------------------------------------------------------------------- self.logger.debug("Reading parset from %s" % self.inputs['parset']) self.parset = parameterset(self.inputs['parset']) @@ -183,8 +182,8 @@ class new_bbs(BaseRecipe): #self.logger.debug("self.inputs = %s" % self.inputs) - # Clean the blackboard database - # ---------------------------------------------------------------------- + # Clean the blackboard database + # --------------------------------------------------------------------- self.logger.info( "Cleaning BBS database for key '%s'" % (self.inputs['db_key']) ) @@ -202,8 +201,8 @@ class new_bbs(BaseRecipe): self.inputs['db_key'] ) - # Create a bbs_map describing the file mapping on disk - # ---------------------------------------------------------------------- + # Create a bbs_map describing the file mapping on disk + # --------------------------------------------------------------------- if not self._make_bbs_map(): return 1 @@ -218,18 +217,33 @@ class new_bbs(BaseRecipe): # file and database information into the supplied template # ------------------------------------------------------------------ self.logger.debug("Building parset for BBS control") - bbs_parset = utilities.patch_parset( - self.parset, - { - 'Observation': gvds_file, - 'BBDB.Key': self.inputs['db_key'], - 'BBDB.Name': self.inputs['db_name'], - 'BBDB.User': self.inputs['db_user'], - 'BBDB.Host': self.inputs['db_host'], - #'BBDB.Port': self.inputs['db_name'], - } - ) - self.logger.debug("BBS control parset is %s" % (bbs_parset,)) + # Create a location for parsets + job_directory = self.config.get( + "layout", "job_directory") + parset_directory = os.path.join(job_directory, "parsets") + create_directory(parset_directory) + + # patch the parset and copy result to target location remove tempfile + try: + bbs_parset = utilities.patch_parset( + self.parset, + { + 'Observation': gvds_file, + 'BBDB.Key': self.inputs['db_key'], + 'BBDB.Name': self.inputs['db_name'], + 'BBDB.User': self.inputs['db_user'], + 'BBDB.Host': self.inputs['db_host'], + #'BBDB.Port': self.inputs['db_name'], + } + ) + bbs_parset_path = os.path.join(parset_directory, + "bbs_control.parset") + shutil.copyfile(bbs_parset, bbs_parset_path) + self.logger.debug("BBS control parset is %s" % (bbs_parset_path,)) + + finally: + # Always remove the file in the tempdir + os.remove(bbs_parset) try: # When one of our processes fails, we set the killswitch. @@ -262,7 +276,8 @@ class new_bbs(BaseRecipe): command = "python %s" % (self.__file__.replace('master', 'nodes')) jobpool = {} bbs_kernels = [] - with job_server(self.logger, jobpool, self.error) as (jobhost, jobport): + with job_server(self.logger, jobpool, self.error) as(jobhost, + jobport): self.logger.debug("Job server at %s:%d" % (jobhost, jobport)) for job_id, details in enumerate(self.bbs_map): host, files = details @@ -284,10 +299,11 @@ class new_bbs(BaseRecipe): ) ) self.logger.info("Starting %d threads" % len(bbs_kernels)) - [thread.start() for thread in bbs_kernels] + for thread in bbs_kernels: + thread.start() self.logger.debug("Waiting for all kernels to complete") - [thread.join() for thread in bbs_kernels] - + for thread in bbs_kernels: + thread.join() # When GlobalControl finishes, our work here is done # ---------------------------------------------------------- @@ -295,11 +311,12 @@ class new_bbs(BaseRecipe): bbs_control.join() finally: os.unlink(bbs_parset) - if self.killswitch.isSet(): - # If killswitch is set, then one of our processes failed so - # the whole run is invalid - # ---------------------------------------------------------- - return 1 + + if self.killswitch.isSet(): + # If killswitch is set, then one of our processes failed so + # the whole run is invalid + # ---------------------------------------------------------- + return 1 self.outputs['mapfile'] = self.inputs['data_mapfile'] return 0 @@ -321,11 +338,12 @@ class new_bbs(BaseRecipe): self.environment, arguments=arguments ) - except Exception, e: + except OSError: self.logger.exception("BBS Kernel failed to start") self.killswitch.set() return 1 - result = self._monitor_process(bbs_kernel_process, "BBS Kernel on %s" % host) + result = self._monitor_process(bbs_kernel_process, + "BBS Kernel on %s" % host) sout, serr = bbs_kernel_process.communicate() serr = serr.replace("Connection to %s closed.\r\n" % host, "") log_process_output("SSH session (BBS kernel)", sout, serr, self.logger) @@ -356,9 +374,11 @@ class new_bbs(BaseRecipe): env=self.environment ) # _monitor_process() needs a convenient kill() method. - bbs_control_process.kill = lambda : os.kill(bbs_control_process.pid, signal.SIGKILL) + bbs_control_process.kill = lambda : os.kill( + bbs_control_process.pid, signal.SIGKILL) except OSError, e: - self.logger.error("Failed to spawn BBS Control (%s)" % str(e)) + self.logger.error( + "Failed to spawn BBS Control (%s)" % str(e)) self.killswitch.set() return 1 finally: @@ -385,22 +405,33 @@ class new_bbs(BaseRecipe): while True: try: returncode = process.poll() - if returncode == None: # Process still running + # Process still running + if returncode == None: time.sleep(1) - elif returncode != 0: # Process broke! + + # Process broke! + elif returncode != 0: self.logger.warn( - "%s returned code %d; aborting run" % (name, returncode) + "%s returned code %d; aborting run" % (name, + returncode) ) self.killswitch.set() break - else: # Process exited cleanly + + # Process exited cleanly + else: self.logger.info("%s clean shutdown" % (name)) break - if self.killswitch.isSet(): # Other process failed; abort + + # Other process failed; abort + if self.killswitch.isSet(): self.logger.warn("Killing %s" % (name)) process.kill() returncode = process.wait() break + + # Catch All exceptions: we need to take down all processes whatever + # is throw except: # An exception here is likely a ctrl-c or similar. Whatever it # is, we bail out. diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py b/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py index 75c68fe76ca..a2af35124a3 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_awimager.py @@ -34,14 +34,15 @@ from lofarpipe.support.parset import Parset import lofar.parmdb #@UnresolvedImport import numpy as np + class imager_awimager(LOFARnodeTCP): def run(self, executable, environment, parset, working_directory, output_image, concatenated_measurement_set, sourcedb_path, mask_patch_size): - """ + """ :param executable: Path to awimager executable :param environment: environment for catch_segfaults (executable runner) - :param parset: parameters for the awimager, + :param parset: parameters for the awimager, :param working_directory: directory the place temporary files :param output_image: location and filesname to story the output images the multiple images are appended with type extentions @@ -49,14 +50,14 @@ class imager_awimager(LOFARnodeTCP): :param sourcedb_path: Path the the sourcedb used to create the image mask :param mask_patch_size: Scaling of the patch around the source in the - mask + mask :rtype: self.outputs["image"] The path to the output image """ self.logger.info("Start imager_awimager node run:") log4_cplus_name = "imager_awimager" self.environment.update(environment) - + with log_time(self.logger): # **************************************************************** # 1. Calculate awimager parameters that depend on measurement set @@ -79,28 +80,33 @@ class imager_awimager(LOFARnodeTCP): working_directory, log4_cplus_name, sourcedb_path, mask_patch_size, image_path_head) - # ****************************************************************** + # ***************************************************************** # 4. Update the parset with calculated parameters, and output image patch_dictionary = {'uselogger': 'True', # enables log4cpluscd log 'ms': str(concatenated_measurement_set), 'cellsize': str(cell_size), 'npix': str(npix), 'wmax': str(w_max), - 'wprojplanes':str(w_proj_planes), - 'image':str(output_image), - 'maxsupport':str(npix), - #'mask':str(mask_file_path), #TODO REINTRODUCE + 'wprojplanes': str(w_proj_planes), + 'image': str(output_image), + 'maxsupport': str(npix), + #'mask':str(mask_file_path), #TODO REINTRODUCE # MASK, excluded to speed up in this debug stage } - # save the parset at the target dir for the image - temp_parset_filename = patch_parset(parset, patch_dictionary) + # save the parset at the target dir for the image calculated_parset_path = os.path.join(image_path_head, - "parset.par") - # Copy tmp file to the final location - shutil.copy(temp_parset_filename, calculated_parset_path) - self.logger.debug("Wrote parset for awimager run: {0}".format( + "parset.par") + + try: + temp_parset_filename = patch_parset(parset, patch_dictionary) + # Copy tmp file to the final location + shutil.copyfile(temp_parset_filename, calculated_parset_path) + self.logger.debug("Wrote parset for awimager run: {0}".format( calculated_parset_path)) + finally: + # remove temp file + os.remove(temp_parset_filename) # ***************************************************************** # 5. Run the awimager with the updated parameterset @@ -126,24 +132,26 @@ class imager_awimager(LOFARnodeTCP): # ********************************************************************* # 6. Return output # Append static .restored: This might change but prob. not - # The actual output image has this extention always, default of awimager + # The actual output image has this extention always, default of + # awimager self.outputs["image"] = output_image + ".restored" return 0 def _calc_par_from_measurement(self, measurement_set, parset): """ - (1) calculate and format some parameters that are determined runtime. + (1) calculate and format some parameters that are determined runtime. Based on values in the measurementset and input parameter (set): - a. <string> The cellsize + a. <string> The cellsize b. <int> The npixels in a each of the two dimension of the image c. <string> The largest baseline in the ms smaller then the maxbaseline d. <string> The number of projection planes - + The calculation of these parameters is done in three steps: 1. Calculate intermediate results based on the ms. - 2. The calculation of the actual target values using intermediate result + 2. The calculation of the actual target values using intermediate + result 3. Scaling of cellsize and npix to allow for user input of the npix """ @@ -155,16 +163,17 @@ class imager_awimager(LOFARnodeTCP): # npix round up to nearest pow 2 parset_npix = self._nearest_ceiled_power2(parset_object.getInt('npix')) - # Get the longest baseline + # Get the longest baseline sqrt_max_baseline = pt.taql( 'CALC sqrt(max([select sumsqr(UVW[:2]) from ' + \ '{0} where sumsqr(UVW[:2]) <{1} giving as memory]))'.format(\ measurement_set, baseline_limit * - baseline_limit))[0] #ask ger van diepen for details if ness. + baseline_limit))[0] # ask ger van diepen for details if ness. #Calculate the wave_length table_ms = pt.table(measurement_set) - table_spectral_window = pt.table(table_ms.getkeyword("SPECTRAL_WINDOW")) + table_spectral_window = pt.table( + table_ms.getkeyword("SPECTRAL_WINDOW")) freq = table_spectral_window.getcell("REF_FREQUENCY", 0) table_spectral_window.close() wave_length = pt.taql('CALC C()') / freq @@ -200,12 +209,12 @@ class imager_awimager(LOFARnodeTCP): "Calculated w_max and the number pf projection plances:" " {0} , {1}".format(w_max, w_proj_planes)) - # MAximum number of proj planes set to 1024: George Heald, Ger van + # MAximum number of proj planes set to 1024: George Heald, Ger van # Diepen if this exception occurs maxsupport = max(1024, npix) if w_proj_planes > maxsupport: - raise Exception("The number of projections planes for the current" + - "measurement set is to large.") + raise Exception("The number of projections planes for the current" + + "measurement set is to large.") # ********************************************************************* # 3. if the npix from the parset is different to the ms calculations, @@ -231,7 +240,7 @@ class imager_awimager(LOFARnodeTCP): """ _field_of_view calculates the fov, which is dependend on the station type, location and mode: - For details see: + For details see: (1) http://www.astron.nl/radio-observatory/astronomers/lofar-imaging-capabilities-sensitivity/lofar-imaging-capabilities/lofar """ @@ -247,7 +256,7 @@ class imager_awimager(LOFARnodeTCP): antenna_set = observation.getcell('LOFAR_ANTENNA_SET', 0) observation.close() - #static parameters for the station diameters ref (1) + #static parameters for the station diameters ref (1) hba_core_diameter = 30.8 hba_remote_diameter = 41.1 lba_inner = 32.3 @@ -275,13 +284,14 @@ class imager_awimager(LOFARnodeTCP): "Unknown antenna type encountered in Measurement set") #Get the wavelength - spectral_window_table = pt.table(table_ms.getkeyword("SPECTRAL_WINDOW")) + spectral_window_table = pt.table(table_ms.getkeyword( + "SPECTRAL_WINDOW")) freq = float(spectral_window_table.getcell("REF_FREQUENCY", 0)) wave_length = pt.taql('CALC C()') / freq spectral_window_table.close() # Now calculate the FOV see ref (1) - # alpha_one is a magic parameter: The value 1.3 is representative for a + # alpha_one is a magic parameter: The value 1.3 is representative for a # WSRT dish, where it depends on the dish illumination alpha_one = 1.3 @@ -322,17 +332,18 @@ class imager_awimager(LOFARnodeTCP): # 1. Create the parset used to make a mask mask_file_path = output_image + ".mask" - mask_patch_dictionary = {"npix":str(npix), - "cellsize":str(cell_size), - "image":str(mask_file_path), - "ms":str(concatenated_measurement_set), - "operation":"empty", - "stokes":"'I'" + mask_patch_dictionary = {"npix": str(npix), + "cellsize": str(cell_size), + "image": str(mask_file_path), + "ms": str(concatenated_measurement_set), + "operation": "empty", + "stokes": "'I'" } mask_parset = Parset.fromDict(mask_patch_dictionary) mask_parset_path = os.path.join(image_path_directory, "mask.par") mask_parset.writeFile(mask_parset_path) - self.logger.debug("Write parset for awimager mask creation: {0}".format( + self.logger.debug( + "Write parset for awimager mask creation: {0}".format( mask_parset_path)) # ********************************************************************* @@ -393,21 +404,21 @@ class imager_awimager(LOFARnodeTCP): Version 0.3 (Wouter Klijn, klijn@astron.nl) - Usage of sourcedb instead of txt document as 'source' of sources This allows input from different source sources - Version 0.31 (Wouter Klijn, klijn@astron.nl) + Version 0.31 (Wouter Klijn, klijn@astron.nl) - Adaptable patch size (patch size needs specification) - Patch size and geometry is broken: needs some astronomer magic to fix it, problem with afine transformation prol. Version 0.32 (Wouter Klijn, klijn@astron.nl) - - Renaming of variable names to python convention + - Renaming of variable names to python convention """ - pad = 500. # increment in maj/minor axes [arcsec] + # increment in maj/minor axes [arcsec] + pad = 500. # open mask mask = pim.image(mask_file_path, overwrite=True) mask_data = mask.getdata() xlen, ylen = mask.shape()[2:] - freq, stokes, null, null = mask.toworld([0, 0, 0, 0]) #@UnusedVariable - + freq, stokes, null, null = mask.toworld([0, 0, 0, 0]) #Open the sourcedb: table = pt.table(sourcedb_path + "::SOURCES") @@ -442,12 +453,13 @@ class imager_awimager(LOFARnodeTCP): # minor radius (+pad) in rad minor = (((min_raw + pad)) / 3600.) * np.pi / 180. pix_asc = pa_raw * np.pi / 180. - # wenss writes always 'GAUSSIAN' even for point sources + # wenss writes always 'GAUSSIAN' even for point sources #-> set to wenss beam+pad if maj == 0 or minor == 0: maj = ((54. + pad) / 3600.) * np.pi / 180. minor = ((54. + pad) / 3600.) * np.pi / 180. - elif source_type == 0: # set to wenss beam+pad + # set to wenss beam+pad + elif source_type == 0: maj = (((54. + pad) / 2.) / 3600.) * np.pi / 180. minor = (((54. + pad) / 2.) / 3600.) * np.pi / 180. pix_asc = 0. @@ -505,7 +517,7 @@ class imager_awimager(LOFARnodeTCP): # some helper functions def _nearest_ceiled_power2(self, value): """ - Return int value of the nearest Ceiled power of 2 for the + Return int value of the nearest Ceiled power of 2 for the suplied argument """ diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py index 691ddc2935e..59994ae6b51 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py @@ -1,9 +1,9 @@ # LOFAR IMAGING PIPELINE -# Prepare phase node -# Wouter Klijn +# Prepare phase node +# Wouter Klijn # 2012 # klijn@astron.nl -# ------------------------------------------------------------------------------ +# ----------------------------------------------------------------------------- from __future__ import with_statement import sys import shutil @@ -19,7 +19,7 @@ from lofarpipe.support.utilities import create_directory from lofarpipe.support.group_data import load_data_map from lofarpipe.support.subprocessgroup import SubProcessGroup -import pyrap.tables as pt #@UnresolvedImport +import pyrap.tables as pt # Some constant settings for the recipe _time_slice_dir_name = "time_slices" @@ -37,8 +37,8 @@ class imager_prepare(LOFARnodeTCP): 5. Concatenate the time slice measurment sets, to a single virtual ms. 6. Filter bad stations. Find station with repeated bad measurement and remove these completely from the dataset. - - **Members:** + + **Members:** """ def run(self, environment, parset, working_dir, processed_ms_dir, ndppp_executable, output_measurement_set, @@ -53,10 +53,10 @@ class imager_prepare(LOFARnodeTCP): input_map = load_data_map(raw_ms_mapfile) #****************************************************************** - # I. Create the directories used in this recipe + # I. Create the directories used in this recipe create_directory(processed_ms_dir) - # time slice dir_to_remove: assure empty directory: Stale data + # time slice dir_to_remove: assure empty directory: Stale data # is problematic for dppp time_slice_dir = os.path.join(working_dir, _time_slice_dir_name) create_directory(time_slice_dir) @@ -95,7 +95,8 @@ class imager_prepare(LOFARnodeTCP): # ndppp_executable fails if not present for ms in time_slices: pt.addImagingColumns(ms) - self.logger.debug("Added imaging columns to ms: {0}".format(ms)) + self.logger.debug( + "Added imaging columns to ms: {0}".format(ms)) #***************************************************************** # 5. Filter bad stations @@ -109,7 +110,7 @@ class imager_prepare(LOFARnodeTCP): output_measurement_set) #****************************************************************** - # return + # return self.outputs["time_slices"] = group_measurement_filtered self.outputs["completed"] = "true" @@ -120,7 +121,7 @@ class imager_prepare(LOFARnodeTCP): """ Perform a optionalskip_copy copy of the input ms: For testing purpose the output, the missing_files can be saved - allowing the skip of this step + allowing the skip of this step """ missing_files = [] temp_missing = os.path.join(processed_ms_dir, "temp_missing") @@ -147,7 +148,7 @@ class imager_prepare(LOFARnodeTCP): """ Collect all the measurement sets in a single directory: The measurement sets are located on different nodes on the cluster. - This function collects all the file in the input map in the + This function collects all the file in the input map in the processed_ms_dir Return value is a set of missing files """ missing_files = [] @@ -155,22 +156,23 @@ class imager_prepare(LOFARnodeTCP): #loop all measurement sets for node, path in input_map: # construct copy command - command = ["rsync", "-r", "{0}:{1}".format(node, path) , + command = ["rsync", "-r", "{0}:{1}".format(node, path), "{0}".format(processed_ms_dir)] self.logger.debug("executing: " + " ".join(command)) # Spawn a subprocess and connect the pipes - # DO NOT USE SUBPROCESSGROUP - # The copy step is performed 720 at once in that case which might - # saturate the cluster. + # DO NOT USE SUBPROCESSGROUP + # The copy step is performed 720 at once in that case which might + # saturate the cluster. copy_process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Wait for finish of copy inside the loop: enforce single tread copy + # Wait for finish of copy inside the loop: enforce single tread + # copy (stdoutdata, stderrdata) = copy_process.communicate() exit_status = copy_process.returncode @@ -200,9 +202,9 @@ class imager_prepare(LOFARnodeTCP): input_map, subbands_per_image, collected_ms_dir_name, parset, ndppp): """ - Run NDPPP: + Run NDPPP: Create dir for grouped measurements, assure clean workspace - Call with log for cplus and catch segfaults. Pparameters are + Call with log for cplus and catch segfaults. Pparameters are supplied in parset """ time_slice_path_collected = [] @@ -213,7 +215,7 @@ class imager_prepare(LOFARnodeTCP): ((idx_time_slice + 1) * subbands_per_image)] # get the filenames - input_subgroups = map(lambda x: x.split("/")[-1] , + input_subgroups = map(lambda x: x.split("/")[-1], list(zip(*input_map_subgroup)[1])) # join with the group_measurement_directory to get the locations @@ -232,23 +234,30 @@ class imager_prepare(LOFARnodeTCP): # Update the parset with computed parameters patch_dictionary = {'uselogger': 'True', # enables log4cplus 'msin': msin, - 'msout':time_slice_path} + 'msout': time_slice_path} nddd_parset_path = time_slice_path + ".ndppp.par" - temp_parset_filename = patch_parset(parset, patch_dictionary) - shutil.copy(temp_parset_filename, nddd_parset_path) + try: + temp_parset_filename = patch_parset(parset, patch_dictionary) + shutil.copyfile(temp_parset_filename, nddd_parset_path) + # Remove the temp file + finally: + os.remove(temp_parset_filename) try: nddd_parset_path = time_slice_path + ".ndppp.par" temp_parset_filename = patch_parset(parset, patch_dictionary) shutil.copy(temp_parset_filename, nddd_parset_path) - self.logger.debug("Wrote a ndppp parset with runtime variables:" + self.logger.debug( + "Wrote a ndppp parset with runtime variables:" " {0}".format(nddd_parset_path)) - os.unlink(temp_parset_filename) except Exception, exception: self.logger.error("failed loading and updating the " + "parset: {0}".format(parset)) raise exception + # remove the temp file + finally: + os.unlink(temp_parset_filename) #run ndppp cmd = [ndppp, nddd_parset_path] @@ -270,9 +279,9 @@ class imager_prepare(LOFARnodeTCP): output_file_path): """ Msconcat to combine the time slices in a single ms: - It is a virtual ms, a ms with symbolic links to actual data is created! + It is a virtual ms, a ms with symbolic links to actual data is created! """ - pt.msconcat(group_measurements_collected, #@UndefinedVariable + pt.msconcat(group_measurements_collected, output_file_path, concatTime=True) self.logger.debug("Concatenated the files: {0} into the single measure" "mentset: {1}".format( @@ -281,9 +290,9 @@ class imager_prepare(LOFARnodeTCP): def _run_rficonsole(self, rficonsole_executable, time_slice_dir, time_slices): """ - _run_rficonsole runs the rficonsole application on the supplied timeslices - in time_slices. - + _run_rficonsole runs the rficonsole application on the supplied + timeslices in time_slices. + """ #loop all measurement sets @@ -293,7 +302,7 @@ class imager_prepare(LOFARnodeTCP): try: rfi_console_proc_group = SubProcessGroup(self.logger) for time_slice in time_slices: - # Each rfi console needs own working space for temp files + # Each rfi console needs own working space for temp files temp_slice_path = os.path.join(rfi_temp_dir, os.path.basename(time_slice)) create_directory(temp_slice_path) @@ -320,13 +329,13 @@ class imager_prepare(LOFARnodeTCP): """ A Collection of scripts for finding and filtering of bad stations: - 1. First a number of statistics with regards to the spread of the data + 1. First a number of statistics with regards to the spread of the data is collected using the asciistat_executable. 2. Secondly these statistics are consumed by the statplot_executable which produces a set of bad stations. - 3. In the final step the bad stations are removed from the dataset using - ms select - + 3. In the final step the bad stations are removed from the dataset + using ms select + REF: http://www.lofar.org/wiki/lib/exe/fetch.php?media=msss:pandeymartinez-week9-v1p2.pdf """ # run asciistat to collect statistics about the ms @@ -351,7 +360,8 @@ class imager_prepare(LOFARnodeTCP): asciiplot_output = [] asciiplot_proc_group = SubProcessGroup(self.logger) for (ms, output_dir) in asciistat_output: - ms_stats = os.path.join(output_dir, os.path.split(ms)[1] + ".stats") + ms_stats = os.path.join( + output_dir, os.path.split(ms)[1] + ".stats") cmd_string = "{0} -i {1} -o {2}".format(statplot_executable, ms_stats, ms_stats) @@ -381,7 +391,7 @@ class imager_prepare(LOFARnodeTCP): #add the name of station station_to_filter.append(entries[1]) - # if this measurement does not contain baselines to skip do not + # if this measurement does not contain baselines to skip do not # filter and provide the original ms as output if len(station_to_filter) == 0: msselect_output[ms] = ms -- GitLab