diff --git a/CEP/Pipeline/docs/sphinx/source/developer/lofarpipe/support/utility.rst b/CEP/Pipeline/docs/sphinx/source/developer/lofarpipe/support/utility.rst index 145582f759f4d458744a23ac7c4956acf85787cc..9a5f958fd127c27d4249de55e8627df4d29e15d3 100644 --- a/CEP/Pipeline/docs/sphinx/source/developer/lofarpipe/support/utility.rst +++ b/CEP/Pipeline/docs/sphinx/source/developer/lofarpipe/support/utility.rst @@ -76,7 +76,7 @@ Iterators and generators .. autofunction:: lofarpipe.support.utilities.is_iterable -.. autofunction:: lofarpipe.support.utilities.izip_longest + #.. autofunction:: lofarpipe.support.utilities.izip_longest #TODO: problem with this function.. dunno what .. autofunction:: lofarpipe.support.utilities.group_iterable diff --git a/CEP/Pipeline/docs/sphinx/source/developer/lofarpipe/tests/ingredients.rst b/CEP/Pipeline/docs/sphinx/source/developer/lofarpipe/tests/ingredients.rst index d888845decd9425557c39938be95bba843a52113..c0ed7e9127759710548fc42aa67938e01f4f1dbd 100644 --- a/CEP/Pipeline/docs/sphinx/source/developer/lofarpipe/tests/ingredients.rst +++ b/CEP/Pipeline/docs/sphinx/source/developer/lofarpipe/tests/ingredients.rst @@ -2,5 +2,5 @@ The :mod:`lofarpipe.tests.lofaringredient` module ************************************************* -.. dsautomodule :: lofarpipe.tests.lofaringredient +.. automodule :: lofarpipe.tests.lofaringredient :members: diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/dppp.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/dppp.rst index b6c47d8ec6d7198a996676034245a71edba4166b..5f0048f8353cbcdbfb4e0ba051e288120819f447 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/dppp.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/dppp.rst @@ -4,5 +4,11 @@ DPPP ==== +***Master Side of the recipe *** + .. autoclass:: lofarpipe.recipes.master.dppp.dppp - :show-inheritance: + +***node Side of the recipe *** + +.. autoclass:: lofarpipe.recipes.nodes.dppp.dppp + :members: run \ No newline at end of file diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/gainoutliercorrection.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/gainoutliercorrection.rst index 1b215d0e2af27efb5c9ffc4871b14f4b5c0d1aa3..0aa46c478111d9575f41a3bc3a8dae42220c6c6d 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/gainoutliercorrection.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/gainoutliercorrection.rst @@ -8,7 +8,7 @@ gainoutliercorrection .. autoclass:: lofarpipe.recipes.master.gainoutliercorrection.gainoutliercorrection -***node Side of the recipe *** +***Node Side of the recipe *** .. autoclass:: lofarpipe.recipes.nodes.gainoutliercorrection.gainoutliercorrection - :members: run \ No newline at end of file + :members: _filter_stations_parmdb, _read_polarisation_data_and_type_from_db, _convert_data_to_ComplexArray, _swap_outliers_with_median, _write_corrected_data diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/get_metadata.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/get_metadata.rst index f4412fb0c370c7594c81134e5b315ecbf497d012..e76896ff942ce787c8bac2e6e20932f0d8a8dd4c 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/get_metadata.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/get_metadata.rst @@ -1,14 +1,14 @@ -.. _recipe-setupparmdb: +.. _recipe-get_metadata: ============ -setupparmdb +get_metadata ============ ***Master Side of the recipe *** -.. autoclass:: lofarpipe.recipes.master.setupparmdb.setupparmdb +.. autoclass:: lofarpipe.recipes.master.get_metadata.get_metadata *** Node Side of the recipe *** -.. autoclass:: lofarpipe.recipes.nodes.setupparmdb.setupparmdb +.. autoclass:: lofarpipe.recipes.nodes.get_metadata.get_metadata diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/index.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/index.rst index 174f1d54db07eec48b2cfb02b812ecb532c47e17..35f378ce57008bd928b64a7eda4ae3a3b03455f9 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/index.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/index.rst @@ -13,29 +13,39 @@ Each of these steps will get more details in each of the chapters Calibrator Pipeline ------------------------------------ + +.. autoclass:: msss_calibrator_pipeline.msss_calibrator_pipeline + +**Recipes of the calibrator pipeline (step)** + +.. toctree:: + :maxdepth: 1 + + vdsmaker (2) <vdsmaker> + vdsreader (2)<vdsreader> + setupparmdb (2,4) <setupparmdb> + setupsourcedb (2,4) <setupsourcedb> + ndppp (3) <dppp> + new_bbs (4) <new_bbs> + gainoutliercorrection (5) <gainoutliercorrection> + get_metadata (6) <get_metadata> + + +Target Pipeline +------------------------------------ .. toctree:: :maxdepth: 1 + missing recipe<imager_bbs> sip datamapper storagemapper dppp rficonsole - bbs - sourcedb - parmdb cimager vdsmaker vdsreader -Target Pipeline ------------------------------------- -.. toctree:: - :maxdepth: 1 - - missing recipe<imager_bbs> - - Imager Pipeline ------------------------------------ @@ -53,7 +63,7 @@ Imager Pipeline 4. imager_awimager <imager_awimager> 5. imager_source_finding <imager_source_finding> 6. imager_finalize <imager_finalize> - 7. get_metadata + 7. get_metadata <get_metadata> \ No newline at end of file diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/new_bbs.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/new_bbs.rst index 2916be1493d8e608706065ea792efeed98dbad58..58ea4f8fed502f8bf381656dfcd6ca151997afe7 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/new_bbs.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/new_bbs.rst @@ -1,8 +1,16 @@ .. _recipe-new_bbs: ========= -NEW_BBS +new_bbs ========= + +***Master Side of the recipe *** + .. autoclass:: lofarpipe.recipes.master.new_bbs.new_bbs - :show-inheritance: + +***node Side of the recipe *** + +.. autoclass:: lofarpipe.recipes.nodes.new_bbs.new_bbs + :members: run + diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/setupparmdb.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/setupparmdb.rst index 5014ab16271bab7572c904c7a7cb2060db754e78..f4412fb0c370c7594c81134e5b315ecbf497d012 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/setupparmdb.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/setupparmdb.rst @@ -1,8 +1,14 @@ -.. _recipe-parmdb: +.. _recipe-setupparmdb: -====== -parmdb -====== +============ +setupparmdb +============ + +***Master Side of the recipe *** .. autoclass:: lofarpipe.recipes.master.setupparmdb.setupparmdb - :show-inheritance: + +*** Node Side of the recipe *** + +.. autoclass:: lofarpipe.recipes.nodes.setupparmdb.setupparmdb + diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/setupsourcedb.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/setupsourcedb.rst index 5385d753ba2d793b0fa20c31b62d042ae29d9776..4b4257291701cfa2fb74f1783d4973fffaea7450 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/setupsourcedb.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/setupsourcedb.rst @@ -4,5 +4,11 @@ sourcedb ======== +***Master Side of the recipe *** + .. autoclass:: lofarpipe.recipes.master.setupsourcedb.setupsourcedb - :show-inheritance: + +***Node Side of the recipe *** + +.. autoclass:: lofarpipe.recipes.nodes.setupsourcedb.setupsourcedb + :members: run \ No newline at end of file diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsmaker.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsmaker.rst index a88c4f8cf7d71a9e3036b5ba360d2a912c02dc22..4c94eec51b1535ae8bc3d42e1553ee3b30208db5 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsmaker.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsmaker.rst @@ -1,6 +1,14 @@ +.. _vdsmaker: + ======== vdsmaker ======== -.. autoclass:: lofarpipe.recipes.master.vdsmaker.vdsmaker - :show-inheritance: +***Master Side of the recipe *** + +.. autoclass:: lofarpipe.recipes.master.vdsmaker.vdsmaker + :members: go + +***Node Side of the recipe*** + +.. autoclass:: lofarpipe.recipes.nodes.vdsmaker.vdsmaker diff --git a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsreader.rst b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsreader.rst index b42a513b21849f6cee7f4c87887a7e360cf97a74..5d281ae9a3d13b7c5157ca420d01609738e32d82 100644 --- a/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsreader.rst +++ b/CEP/Pipeline/docs/sphinx/source/pipelines/sip/recipes/vdsreader.rst @@ -4,5 +4,6 @@ vdsreader ========= +***Master Side of the recipe *** + .. autoclass:: lofarpipe.recipes.master.vdsreader.vdsreader - :show-inheritance: diff --git a/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py b/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py index 0af1a4ad42c2a78809c5c146eaa3f498b8aa2e40..646b4b5aefd3053d396695b3402139308fc66599 100755 --- a/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py @@ -11,7 +11,7 @@ import sys from lofarpipe.support.control import control from lofarpipe.support.lofarexceptions import PipelineException -from lofarpipe.support.group_data import store_data_map, validate_data_maps +from lofarpipe.support.group_data import validate_data_maps from lofarpipe.support.group_data import tally_data_map from lofarpipe.support.utilities import create_directory from lofar.parameterset import parameterset @@ -20,17 +20,29 @@ from lofar.parameterset import parameterset class msss_calibrator_pipeline(control): """ The calibrator pipeline can be used to determine the instrument database - (parmdb) from the observation of a known "calibrator" source. - - This pipeline will perform the following operations: - - Create a empty parmdb for BBS - - Run makesourcedb on skymodel files for calibrator source(s) and the - Ateam, which are to be stored in a standard place ($LOFARROOT/share) - - DPPP: flagging, using standard parset - - Demix the relevant A-team sources (for now using python script, later - to use DPPP), using the A-team sourcedb. - - Run BBS to calibrate the calibrator source(s), again using standard - parset, and the sourcedb made earlier + (parmdb) from the observation of a known "calibrator" source. It creates an + instrument model of the current LOFAR instrument (As sum of instrumental + properties and Ionospere disturbances TODOW). The output of this toplevel + pipeline recipe is this instrument model. Which can be used in a later + target pipeline calibrate target data. + + **This pipeline will perform the following operations:** + + 1. Preparations, Parse and validate input and set local variables + 2. Create database (files), A sourcedb with A-Team sources, a vds file + describing the nodes, a parmdb for calibration solutions + 3. DPPP. flagging, using standard parset + Demix the relevant A-team sources), using the A-team sourcedb. + 4. Run BBS to calibrate the calibrator source(s), again using standard + parset, and the sourcedb made earlier + 5. Perform gain correction on the created instrument table + 6. Create output for consumption by the LOFAR framework + + **Per subband-group, the following output products will be delivered:** + + 1. An parmdb with instrument calibration solution to be applied to a target + measurement set in the target pipeline + """ def __init__(self): @@ -43,6 +55,9 @@ class msss_calibrator_pipeline(control): def usage(self): + """ + Display usage + """ print >> sys.stderr, "Usage: %s [options] <parset-file>" % sys.argv[0] return 1 @@ -52,15 +67,15 @@ class msss_calibrator_pipeline(control): Get input- and output-data product specifications from the parset-file, and do some sanity checks. """ - odp = self.parset.makeSubset( + dataproducts = self.parset.makeSubset( self.parset.fullModuleName('DataProducts') + '.' ) self.input_data = [ tuple(os.path.join(location, filename).split(':')) for location, filename, skip in zip( - odp.getStringVector('Input_Correlated.locations'), - odp.getStringVector('Input_Correlated.filenames'), - odp.getBoolVector('Input_Correlated.skip')) + dataproducts.getStringVector('Input_Correlated.locations'), + dataproducts.getStringVector('Input_Correlated.filenames'), + dataproducts.getBoolVector('Input_Correlated.skip')) if not skip ] self.logger.debug("%d Input_Correlated data products specified" % @@ -68,9 +83,11 @@ class msss_calibrator_pipeline(control): self.output_data = [ tuple(os.path.join(location, filename).split(':')) for location, filename, skip in zip( - odp.getStringVector('Output_InstrumentModel.locations'), - odp.getStringVector('Output_InstrumentModel.filenames'), - odp.getBoolVector('Output_InstrumentModel.skip')) + dataproducts.getStringVector( + 'Output_InstrumentModel.locations'), + dataproducts.getStringVector( + 'Output_InstrumentModel.filenames'), + dataproducts.getBoolVector('Output_InstrumentModel.skip')) if not skip ] self.logger.debug("%d Output_InstrumentModel data products specified" % @@ -126,12 +143,13 @@ class msss_calibrator_pipeline(control): return self.usage() self.parset.adoptFile(parset_file) self.parset_feedback_file = parset_file + "_feedback" + # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' if not self.inputs.has_key('job_name'): self.inputs['job_name'] = ( - os.path.splitext(os.path.basename(parset_file))[0] - ) + os.path.splitext(os.path.basename(parset_file))[0]) + # Call the base-class's `go()` method. return super(msss_calibrator_pipeline, self).go() @@ -141,11 +159,13 @@ class msss_calibrator_pipeline(control): Define the individual tasks that comprise the current pipeline. This method will be invoked by the base-class's `go()` method. """ - + # ********************************************************************* + # 1. Get input from parset, validate and cast to pipeline 'data types' + # Only perform work on existing files + # Created needed directories # Create a parameter-subset containing only python-control stuff. py_parset = self.parset.makeSubset( - self.parset.fullModuleName('PythonControl') + '.' - ) + self.parset.fullModuleName('PythonControl') + '.') # Get input/output-data products specifications. self._get_io_product_specs() @@ -160,20 +180,21 @@ class msss_calibrator_pipeline(control): # Write input- and output data map-files data_mapfile = os.path.join(mapfile_dir, "data.mapfile") - store_data_map(data_mapfile, self.input_data) - self.logger.debug("Wrote input mapfile: %s" % data_mapfile) + self._store_data_map(data_mapfile, self.input_data, "inputs") instrument_mapfile = os.path.join(mapfile_dir, "instrument.mapfile") - store_data_map(instrument_mapfile, self.output_data) - self.logger.debug("Wrote output mapfile: %s" % instrument_mapfile) + self._store_data_map(instrument_mapfile, self.output_data, "output") if len(self.input_data) == 0: self.logger.warn("No input data files to process. Bailing out!") return 0 self.logger.debug("Processing: %s" % - ', '.join(':'.join(f) for f in self.input_data) - ) - + ', '.join(':'.join(f) for f in self.input_data)) + # ********************************************************************* + # 2. Create database needed for performing work: + # Vds, descibing data on the nodes + # sourcedb, For skymodel (A-team) + # parmdb for outputtting solutions # Produce a GVDS file describing the data on the compute nodes. gvds_file = self.run_task("vdsmaker", data_mapfile)['gvds'] @@ -184,22 +205,25 @@ class msss_calibrator_pipeline(control): parmdb_mapfile = self.run_task( "setupparmdb", data_mapfile, mapfile=os.path.join(mapfile_dir, 'dppp.parmdb.mapfile'), - suffix='.dppp.parmdb' - )['mapfile'] + suffix='.dppp.parmdb')['mapfile'] # Create a sourcedb to be used by the demixing phase of DPPP # The path to the A-team sky model is currently hard-coded. + # Run makesourcedb on skymodel files for calibrator source(s) and the + # Ateam, which are to be stored in a standard place ($LOFARROOT/share) sourcedb_mapfile = self.run_task( "setupsourcedb", data_mapfile, skymodel=os.path.join( self.config.get('DEFAULT', 'lofarroot'), - 'share', 'pipeline', 'skymodels', 'Ateam_LBA_CC.skymodel' - ), + 'share', 'pipeline', 'skymodels', 'Ateam_LBA_CC.skymodel'), + # TODO: LBA skymodel!! mapfile=os.path.join(mapfile_dir, 'dppp.sourcedb.mapfile'), suffix='.dppp.sourcedb', - type='blob' - )['mapfile'] + type='blob')['mapfile'] + # ********************************************************************* + # 3. Run NDPPP to demix the A-Team sources + # TODOW: Do flagging? # Create a parameter-subset for DPPP and write it to file. ndppp_parset = os.path.join(parset_dir, "NDPPP.parset") py_parset.makeSubset('DPPP.').writeFile(ndppp_parset) @@ -211,23 +235,28 @@ class msss_calibrator_pipeline(control): data_end_time=vdsinfo['end_time'], parset=ndppp_parset, parmdb_mapfile=parmdb_mapfile, - sourcedb_mapfile=sourcedb_mapfile - )['mapfile'] + sourcedb_mapfile=sourcedb_mapfile)['mapfile'] demix_mapfile = dppp_mapfile - + +# # Old Demixing method: performed now by ndppp # # Demix the relevant A-team sources # demix_mapfile = self.run_task("demixing", dppp_mapfile)['mapfile'] # # Do a second run of flagging, this time using rficonsole # self.run_task("rficonsole", demix_mapfile, indirect_read=True) + # ********************************************************************* + # 4. Run BBS with a model of the calibrator + # Create a parmdb for calibration solutions + # Create sourcedb with known calibration solutions + # Run bbs with both # Create an empty parmdb for BBS parmdb_mapfile = self.run_task( "setupparmdb", data_mapfile, mapfile=os.path.join(mapfile_dir, 'bbs.parmdb.mapfile'), - suffix='.bbs.parmdb' - )['mapfile'] + suffix='.bbs.parmdb')['mapfile'] + # Create a sourcedb based on sourcedb's input argument "skymodel" sourcedb_mapfile = self.run_task( @@ -236,11 +265,9 @@ class msss_calibrator_pipeline(control): self.config.get('DEFAULT', 'lofarroot'), 'share', 'pipeline', 'skymodels', py_parset.getString('Calibration.CalibratorSource') + - '.skymodel' - ), + '.skymodel'), mapfile=os.path.join(mapfile_dir, 'bbs.sourcedb.mapfile'), - suffix='.bbs.sourcedb' - )['mapfile'] + suffix='.bbs.sourcedb')['mapfile'] # Create a parameter-subset for BBS and write it to file. bbs_parset = os.path.join(parset_dir, "BBS.parset") @@ -253,19 +280,24 @@ class msss_calibrator_pipeline(control): instrument_mapfile=parmdb_mapfile, sky_mapfile=sourcedb_mapfile) + # ********************************************************************* + # 5. Perform gain outlier correction on the found calibration solutions + # Swapping outliers in the gains with the median # Export the calibration solutions using gainoutliercorrection and store # the results in the files specified in the instrument mapfile. self.run_task("gainoutliercorrection", (parmdb_mapfile, instrument_mapfile), - sigma=1.0) + sigma=1.0) # TODO: Parset parameter + # ********************************************************************* + # 6. Create feedback file for further processing by the LOFAR framework + # (MAC) # Create a parset-file containing the metadata for MAC/SAS self.run_task("get_metadata", instrument_mapfile, parset_file=self.parset_feedback_file, parset_prefix=( self.parset.getString('prefix') + - self.parset.fullModuleName('DataProducts') - ), + self.parset.fullModuleName('DataProducts')), product_type="InstrumentModel") diff --git a/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py b/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py index 8e2b4afbef05f43f62ca689806deb272979056ed..6e1fd0f0c639a7938edfb13525ba8249298ea3d3 100755 --- a/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/msss_imager_pipeline.py @@ -19,67 +19,66 @@ from lofar.parameterset import parameterset class msss_imager_pipeline(control): - """ + """ The Automatic MSSS imager pipeline is used to generate MSSS images and find sources in the generated images. Generated images and lists of found sources are complemented with meta data and thus ready for consumption by the Long Term Storage (LTA) - + *subband groups* The imager_pipeline is able to generate images on the frequency range of LOFAR in parallel. Combining the frequency subbands together in so called subbandgroups. Each subband group will result in an image and sourcelist, (typically 8, because ten subband groups are combined). - - *Time Slices* + + *Time Slices* MSSS images are compiled from a number of so-called (time) slices. Each slice comprises a short (approx. 10 min) observation of a field (an area on the sky) containing typically 80 subbands. The number of slices will be different for LBA observations (typically 9) and HBA observations (typically 2), due to differences in sensitivity. - + Each image will be compiled on a different cluster node to balance the processing load. The input- and output- files and locations are determined by the scheduler and specified in the parset-file. - - **steps:** - - This pipeline performs the following operations: - - 1. Prepare Phase: Copy the preprocessed MS's from the different compute + + **This pipeline performs the following operations:** + + 1. Prepare Phase. Copy the preprocessed MS's from the different compute nodes to the nodes where the images will be compiled (the prepare phase). Combine the subbands in subband groups, concattenate the timeslice in a single large measurement set and perform flagging, RFI and bad station exclusion. - 2. Create db: Generate a local sky model (LSM) from the global sky model + 2. Create db. Generate a local sky model (LSM) from the global sky model (GSM) for the sources that are in the field-of-view (FoV). The LSM is stored as sourcedb. In step 3 calibration of the measurement sets is performed on these sources and in step 4 to create a mask for the awimager. The calibration solution will be placed in an instrument table/db also created in this step. - 3. BBS: Calibrate the measurement set with the sourcedb from the gsm. + 3. BBS. Calibrate the measurement set with the sourcedb from the gsm. In later iterations sourced found in the created images will be added to this list. Resulting in a selfcalibration cycle. - 4. Awimager: The combined measurement sets are now imaged. The imaging + 4. Awimager. The combined measurement sets are now imaged. The imaging is performed using a mask: The sources in the sourcedb are used to create an casa image masking known sources. Together with the measurement set an image is created. - 5. Sourcefinding: The images created in step 4 are fed to pyBDSM to find and + 5. Sourcefinding. The images created in step 4 are fed to pyBDSM to find and describe sources. In multiple itterations substracting the found sources, all sources are collectedin a sourcelist. - I. The sources found in step 5 are fed back into step 2. This allows the + Step I. The sources found in step 5 are fed back into step 2. This allows the Measurement sets to be calibrated with sources currently found in the image. This loop will continue until convergence (3 times for the time - being). - 6. Finalize: Meta data with regards to the input, computations performed and + being). + 6. Finalize. Meta data with regards to the input, computations performed and results are collected an added to the casa image. The images created are converted from casa to HDF5 and copied to the correct output location. 7. Export meta data: An outputfile with meta data is generated ready for - consumption by the LTA and/or the LOFAR framework - | - Per subband-group, the following output products will be delivered: + consumption by the LTA and/or the LOFAR framework. + + **Per subband-group, the following output products will be delivered:** + a. An image b. A source list c. (Calibration solutions and corrected visibilities) diff --git a/CEP/Pipeline/recipes/sip/master/dppp.py b/CEP/Pipeline/recipes/sip/master/dppp.py index 4066980ba5d6612602db5d59228b78bbe0f3d607..eed0d24f72c02a84ecd3f6860bb9ca52ae23640d 100644 --- a/CEP/Pipeline/recipes/sip/master/dppp.py +++ b/CEP/Pipeline/recipes/sip/master/dppp.py @@ -24,9 +24,18 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): ``IDPPP``) on a number of MeasurementSets. This is used for compressing and/or flagging data - **Arguments** + 1. Load input data files + 2. Load parmdb and sourcedb + 3. Call the node side of the recipe + 4. Parse logfile for fully flagged baselines + 5. Create mapfile with successful noderecipe runs + + **Command line arguments** + + 1. A mapfile describing the data to be processed. + 2. Mapfile with target output locations <if procided input and output + mapfiles are validated> - A mapfile describing the data to be processed. """ inputs = { 'parset': ingredient.FileField( @@ -149,8 +158,9 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): # ---------------------------------------------------------------------- self.logger.searchpatterns["fullyflagged"] = "Fully flagged baselines" - # Load file <-> output node mapping from disk - # ---------------------------------------------------------------------- + # ********************************************************************* + # 1. load input data file, validate output vs the input location if + # output locations are provided args = self.inputs['args'] self.logger.debug("Loading input-data mapfile: %s" % args[0]) indata = load_data_map(args[0]) @@ -172,6 +182,8 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): ) for host, infile in indata ] + # ******************************************************************** + # 2. Load parmdb and sourcedb # Load parmdb-mapfile, if one was given. if self.inputs.has_key('parmdb_mapfile'): self.logger.debug( @@ -180,7 +192,7 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): parmdbdata = load_data_map(self.inputs['parmdb_mapfile']) else: parmdbdata = [(None, None)] * len(indata) - + # Load sourcedb-mapfile, if one was given. if self.inputs.has_key('sourcedb_mapfile'): self.logger.debug( @@ -190,10 +202,12 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): else: sourcedbdata = [(None, None)] * len(indata) + # ******************************************************************** + # 3. Call the node side of the recipe # Create and schedule the compute jobs command = "python %s" % (self.__file__.replace('master', 'nodes')) jobs = [] - for host, infile, outfile, parmdb, sourcedb in (w + (x[1], y[1], z[1]) + for host, infile, outfile, parmdb, sourcedb in (w + (x[1], y[1], z[1]) for w, x, y, z in zip(indata, outdata, parmdbdata, sourcedbdata)): jobs.append( ComputeJob( @@ -217,8 +231,8 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): ) self._schedule_jobs(jobs, max_per_node=self.inputs['nproc']) - # Log number of fully flagged baselines - # ---------------------------------------------------------------------- + # ********************************************************************* + # 4. parse logfile for fully flagged baselines matches = self.logger.searchpatterns["fullyflagged"].results self.logger.searchpatterns.clear() # finished searching stripchars = "".join(set("Fully flagged baselines: ")) @@ -230,6 +244,9 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): baselinecounter[pair] += 1 self.outputs['fullyflagged'] = baselinecounter.keys() + # ********************************************************************* + # 5. Create mapfile with successful noderecipe runs + # fail if no runs succeeded if self.error.isSet(): # dppp needs to continue on partial succes. # Get the status of the jobs diff --git a/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py b/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py index 652fb95f68dd03928579fc6dbaca1df0647be131..708ba258f4b1d4744196c6376b9f5b74ea074091 100644 --- a/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py +++ b/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py @@ -20,14 +20,23 @@ from lofarpipe.support.group_data import validate_data_maps class gainoutliercorrection(BaseRecipe, RemoteCommandRecipeMixIn): """ Recipe to correct outliers in the gain solutions of an parmdb, - using the program `parmexportcal` or an minimal implementation of the edit_parmdb - program. + using the program `parmexportcal` The main purpose of this program is to strip off the time axis information from a instrument model (a.k.a ParmDB) + -or- + a minimal implementation of the edit_parmdb program. Search all gains for + outliers and swap these for the median - **Arguments** + 1. Validate input + 2. load mapfiles, validate if a target output location is provided + 3. Call node side of the recipe + 4. validate performance, return corrected files - A mapfile describing the data to be processed. + **Command line arguments** + + 1. A mapfile describing the data to be processed. + 2. A mapfile with target location <mapfiles are validated if present> + """ inputs = { 'executable': ingredient.StringField( @@ -59,13 +68,16 @@ class gainoutliercorrection(BaseRecipe, RemoteCommandRecipeMixIn): } outputs = { - 'mapfile': ingredient.FileField() + 'mapfile': ingredient.FileField(help="mapfile with corrected parmdbs") } def go(self): + super(gainoutliercorrection, self).go() self.logger.info("Starting gainoutliercorrection run") - #if sigma is none use default behaviour and use executable: test if + # ******************************************************************** + # 1. Validate input + # if sigma is none use default behaviour and use executable: test if # It excists executable = self.inputs['executable'] if executable == "": @@ -76,10 +88,8 @@ class gainoutliercorrection(BaseRecipe, RemoteCommandRecipeMixIn): "path: {0}".format(self.inputs['executable'])) self.logger.warn("Defaulting to edit_parmdb behaviour") - super(gainoutliercorrection, self).go() - - # Load file <-> output node mapping from disk - # ---------------------------------------------------------------------- + # ******************************************************************** + # 2. load mapfiles, validate if a target output location is provided args = self.inputs['args'] self.logger.debug("Loading input-data mapfile: %s" % args[0]) indata = load_data_map(args[0]) @@ -102,6 +112,8 @@ class gainoutliercorrection(BaseRecipe, RemoteCommandRecipeMixIn): ) for host, infile in indata ] + # ******************************************************************** + # 3. Call node side of the recipe command = "python %s" % (self.__file__.replace('master', 'nodes')) jobs = [] for host, infile, outfile in (x + (y[1],) @@ -121,6 +133,8 @@ class gainoutliercorrection(BaseRecipe, RemoteCommandRecipeMixIn): ) self._schedule_jobs(jobs) + # ******************************************************************** + # 4. validate performance, return corrected files if self.error.isSet(): self.logger.warn("Detected failed gainoutliercorrection job") return 1 diff --git a/CEP/Pipeline/recipes/sip/master/get_metadata.py b/CEP/Pipeline/recipes/sip/master/get_metadata.py index 1332b35e5e713427a4c22ab17503d01f4da59979..43c174be838773b2f6a7eb72a9e3be6b736ec14e 100644 --- a/CEP/Pipeline/recipes/sip/master/get_metadata.py +++ b/CEP/Pipeline/recipes/sip/master/get_metadata.py @@ -21,25 +21,31 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): Get the metadata from the given data products and return them as a LOFAR parameterset. - **Arguments** + 1. Parse and validate inputs + 2. Load mapfiles + 3. call node side of the recipe + 4. validate performance + 5. Create the parset-file and write it to disk. + + **Command line arguments** A mapfile describing the data to be processed. """ inputs = { 'product_type': ingredient.StringField( '--product-type', - help = "Data product type", + help="Data product type", # optional=True, # default=None ), 'parset_file': ingredient.StringField( '--parset-file', - help = "Path to the output parset file" + help="Path to the output parset file" ), 'parset_prefix': ingredient.StringField( '--parset-prefix', - help = "Prefix for each key in the output parset file", - default = '' + help="Prefix for each key in the output parset file", + default='' ) } @@ -49,7 +55,8 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): def go(self): super(get_metadata, self).go() - + # ******************************************************************** + # 1. Parse and validate inputs args = self.inputs['args'] product_type = self.inputs['product_type'] global_prefix = self.inputs['parset_prefix'] @@ -63,18 +70,20 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): (product_type, ', '.join(self.valid_product_types)) ) - # Load file <-> compute node mapping from disk - # ---------------------------------------------------------------------- + # ******************************************************************** + # 2. Load mapfiles self.logger.debug("Loading input-data mapfile: %s" % args[0]) data = load_data_map(args[0]) + # ******************************************************************** + # 3. call node side of the recipe command = "python %s" % (self.__file__.replace('master', 'nodes')) jobs = [] for host, infile in data: jobs.append( ComputeJob( host, command, - arguments = [ + arguments=[ infile, self.inputs['product_type'] ] @@ -82,11 +91,14 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): ) self._schedule_jobs(jobs) + # ******************************************************************** + # 4. validate performance if self.error.isSet(): self.logger.warn("Failed get_metadata process detected") return 1 - # Create the parset-file and write it to disk. + # ******************************************************************** + # 5. Create the parset-file and write it to disk. parset = parameterset() prefix = "Output_%s_" % product_type parset.replace('%snrOf%s' % (global_prefix, prefix), str(len(jobs))) diff --git a/CEP/Pipeline/recipes/sip/master/imager_prepare.py b/CEP/Pipeline/recipes/sip/master/imager_prepare.py index db3384e60a6fe3ddeb9f65472df9f8642cb5c659..be479b4d62f5dda1d40dbeca6dc452e64e1807f7 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/master/imager_prepare.py @@ -25,9 +25,9 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): 1. Validate input 2. Create mapfiles with input for work to be perform on the individual nodes - based on the structured input mapfile: The input mapfile contains a list + based on the structured input mapfile. The input mapfile contains a list of measurement sets. - Each node computes a single subband group but needs this for all + Each node computes a single subband group but needs this for all timeslices. 3. Call the node scripts with correct input 4. validate performance @@ -37,7 +37,7 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): The only command line argument is the a to a mapfile containing "all" the measurement sets needed for creating the sky images. First ordered on - timeslice then on subband group and finaly on index in the frequency + timeslice then on subband group and finaly on index in the frequency range. **Arguments:** @@ -269,9 +269,9 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): """ Return 1 if the inputs supplied are incorrect, the number if inputs and output does not match. Return 0 if correct. - The number of inputs is correct iff: - len(input_map) == - len(output_map) * slices_per_image * subbands_per_image + The number of inputs is correct iff. + len(input_map) == + len(output_map) * slices_per_image * subbands_per_image """ # The output_map contains a number of path/node pairs. The final data # dataproduct of the prepare phase: The 'input' for each of these pairs diff --git a/CEP/Pipeline/recipes/sip/master/new_bbs.py b/CEP/Pipeline/recipes/sip/master/new_bbs.py index 3eb5407f07ba793d34c5868796cbc7a7ff2c2bb7..4997c9a2ba8376a2a67df5bcee348ee4b32e2e5e 100644 --- a/CEP/Pipeline/recipes/sip/master/new_bbs.py +++ b/CEP/Pipeline/recipes/sip/master/new_bbs.py @@ -32,6 +32,9 @@ import lofarpipe.support.lofaringredient as ingredient class new_bbs(BaseRecipe): """ + **This bbs recipe still uses the oldstyle bbs with global control** + **New versions will have stand alone capability** + The bbs recipe coordinates running BBS on a group of MeasurementSets. It runs both GlobalControl and KernelControl; as yet, SolverControl has not been integrated. @@ -93,7 +96,7 @@ class new_bbs(BaseRecipe): ), 'gvds': ingredient.StringField( '-g', '--gvds', - help = "Path for output GVDS file" + help="Path for output GVDS file" ) } outputs = { @@ -160,7 +163,7 @@ class new_bbs(BaseRecipe): (dat[0], (dat[1], ins[1], sky[1])) for dat, ins, sky in zip(data_map, instrument_map, sky_map) ] - + return True @@ -208,7 +211,7 @@ class new_bbs(BaseRecipe): gvds_file = self.run_task( "vdsmaker", self.inputs['data_mapfile'], - gvds = self.inputs['gvds'] + gvds=self.inputs['gvds'] )['gvds'] # Construct a parset for BBS GlobalControl by patching the GVDS diff --git a/CEP/Pipeline/recipes/sip/master/setupparmdb.py b/CEP/Pipeline/recipes/sip/master/setupparmdb.py index c066b715559ee9e2fb9dd706de11907ddb0ed71f..98de085b74c28a152f4ba74faf0cc42e416e183b 100644 --- a/CEP/Pipeline/recipes/sip/master/setupparmdb.py +++ b/CEP/Pipeline/recipes/sip/master/setupparmdb.py @@ -38,10 +38,16 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): """ Create a distributed parameter database (ParmDB) for a distributed Measurement set (MS). + + 1. Create a parmdb template at the master side of the recipe + 2. Call node side of recipe with template and possible targets + 3. Validate performance, cleanup of temp files, construct output - **Arguments** + **Command line arguments** - A mapfile describing the data to be processed. + 1. A mapfile describing the data to be processed. + 2. A mapfile with output location (If provide input and output are validated) + """ inputs = { 'executable': ingredient.ExecField( @@ -78,12 +84,17 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.info("Starting setupparmdb run") super(setupparmdb, self).go() + # ********************************************************************* + # 1. Create a temporary template parmdb at the master side of the recipe self.logger.info("Generating template parmdb") + + # generate a temp dir pdbdir = tempfile.mkdtemp( dir=self.config.get("layout", "job_directory") ) pdbfile = os.path.join(pdbdir, self.inputs['suffix']) + # Create a template use tempdir for location try: parmdbm_process = subprocess.Popen( [self.inputs['executable']], @@ -97,8 +108,9 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.error("Failed to spawn parmdbm: %s" % str(err)) return 1 - # try-finally block to always remove temporary files - # ---------------------------------------------------------------------- + # ********************************************************************* + # 2. Call node side of recipe with template and possible targets + # If output location are provided as input these are validated. try: # Load file <-> compute node mapping from disk # ------------------------------------------------------------------ @@ -106,6 +118,7 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.debug("Loading input-data mapfile: %s" % args[0]) indata = load_data_map(args[0]) if len(args) > 1: + # If output location provide validate the input and outputmap self.logger.debug("Loading output-data mapfile: %s" % args[1]) outdata = load_data_map(args[1]) if not validate_data_maps(indata, outdata): @@ -113,6 +126,7 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): "Validation of input/output data mapfiles failed" ) return 1 + # else output location is inputlocation+suffix else: outdata = [ (host, @@ -122,7 +136,7 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): os.path.basename(infile) + self.inputs['suffix']) ) for host, infile in indata ] - + # Call the node side command = "python %s" % (self.__file__.replace('master', 'nodes')) jobs = [] for host, outfile in outdata: @@ -138,6 +152,8 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): ) self._schedule_jobs(jobs, max_per_node=self.inputs['nproc']) + # ********************************************************************* + # 3. validate performance, cleanup of temp files, construct output finally: self.logger.debug("Removing template parmdb") shutil.rmtree(pdbdir, ignore_errors=True) diff --git a/CEP/Pipeline/recipes/sip/master/setupsourcedb.py b/CEP/Pipeline/recipes/sip/master/setupsourcedb.py index 59fd11f33f6c66ea9fd0504f5a9840ce9b264527..33b09559d977665951194b5881380c8d06312022 100644 --- a/CEP/Pipeline/recipes/sip/master/setupsourcedb.py +++ b/CEP/Pipeline/recipes/sip/master/setupsourcedb.py @@ -22,10 +22,17 @@ class setupsourcedb(BaseRecipe, RemoteCommandRecipeMixIn): """ Create a distributed Sky Model database (SourceDB) for a distributed Measurement Set (MS). - - **Arguments** - A mapfile describing the data to be processed. + 1. Load input and output mapfiles. Validate + 2. Check if input skymodel file exists. If not, make filename empty. + 3. Call node side of recipe + 4. Validate performance and create output + + **Command line arguments** + + 1. A mapfile describing the input data to be processed. + 2. A mapfile with target location <if provided it will be validated against + The input data> """ inputs = { 'executable': ingredient.ExecField( @@ -65,7 +72,8 @@ class setupsourcedb(BaseRecipe, RemoteCommandRecipeMixIn): } outputs = { - 'mapfile': ingredient.FileField() + 'mapfile': ingredient.FileField(help="mapfile with created sourcedb" + "paths") } @@ -73,8 +81,9 @@ class setupsourcedb(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.info("Starting setupsourcedb run") super(setupsourcedb, self).go() - # Load file <-> compute node mapping from disk - # ---------------------------------------------------------------------- + # ********************************************************************* + # 1. Load input and output mapfiles. Validate + args = self.inputs['args'] self.logger.debug("Loading input-data mapfile: %s" % args[0]) indata = load_data_map(args[0]) @@ -96,7 +105,8 @@ class setupsourcedb(BaseRecipe, RemoteCommandRecipeMixIn): ) for host, infile in indata ] - # Check if input skymodel file exists. If not, make filename empty. + # ********************************************************************* + # 2. Check if input skymodel file exists. If not, make filename empty. if not os.path.isfile(self.inputs['skymodel']): self.logger.warn( "Source catalog %s does not exist. Using an empty one." % @@ -104,6 +114,8 @@ class setupsourcedb(BaseRecipe, RemoteCommandRecipeMixIn): ) self.inputs['skymodel'] = "" + # ******************************************************************** + # 3. Call node side of script command = "python %s" % (self.__file__.replace('master', 'nodes')) jobs = [] for host, outfile in outdata: @@ -121,6 +133,8 @@ class setupsourcedb(BaseRecipe, RemoteCommandRecipeMixIn): ) self._schedule_jobs(jobs, max_per_node=self.inputs['nproc']) + # ********************************************************************* + # 4. check performance and create output data if self.error.isSet(): return 1 else: diff --git a/CEP/Pipeline/recipes/sip/master/vdsmaker.py b/CEP/Pipeline/recipes/sip/master/vdsmaker.py index b225b0c8093382a55e2283fa1542e19d1bc6950c..355f76d347f4d9e1fea113e806fd8f106eb35429 100644 --- a/CEP/Pipeline/recipes/sip/master/vdsmaker.py +++ b/CEP/Pipeline/recipes/sip/master/vdsmaker.py @@ -24,9 +24,13 @@ class vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): see the ``unlink`` input parameter) describing a collection of MeasurementSets. - **Arguments** + 1. Load data from disk, create the output vds paths + 2. Call the vdsmaker node script to generate the vds files + 3. Combine the vds files in a gvds file (master side operation) + + **Command line arguments** - A mapfile describing the data to be processed. + A mapfile describing the measurementsets to be processed. """ inputs = { 'gvds': ingredient.StringField( @@ -62,23 +66,28 @@ class vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): } def go(self): + """ + Contains functionality of the vdsmaker + """ super(vdsmaker, self).go() - - # Load file <-> compute node mapping from disk - # ---------------------------------------------------------------------- + # ********************************************************************** + # 1. Load data from disk create output files args = self.inputs['args'] self.logger.debug("Loading input-data mapfile: %s" % args[0]) data = load_data_map(args[0]) + # Create output vds names vdsnames = [ os.path.join( self.inputs['directory'], os.path.basename(x[1]) + '.vds' ) for x in data ] + # ********************************************************************* + # 2. Call vdsmaker command = "python %s" % (self.__file__.replace('master', 'nodes')) jobs = [] - for host, infile, outfile in (x+(y,) for x, y in zip(data, vdsnames)): + for host, infile, outfile in (x + (y,) for x, y in zip(data, vdsnames)): jobs.append( ComputeJob( host, command, @@ -96,14 +105,15 @@ class vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.warn("Failed vdsmaker process detected") return 1 - # Combine VDS files to produce GDS + # ********************************************************************* + # 3. Combine VDS files to produce GDS failure = False self.logger.info("Combining VDS files") executable = self.inputs['combinevds'] gvds_out = self.inputs['gvds'] # Create the gvds directory for output files, needed for combine create_directory(os.path.dirname(gvds_out)) - + try: command = [executable, gvds_out] + vdsnames combineproc = subprocess.Popen( @@ -134,8 +144,9 @@ class vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): for name in vdsnames: os.unlink(name) self.logger.info("vdsmaker done") + if failure: - self.logger.info("Failure was set") + self.logger.info("Error was set, exit vds maker with error state") return 1 elif not self.outputs.complete(): self.logger.info("Outputs incomplete") diff --git a/CEP/Pipeline/recipes/sip/master/vdsreader.py b/CEP/Pipeline/recipes/sip/master/vdsreader.py index 8ba0bb9fcd482ededd883462a38081be8a3e1bc3..ecde6efb9919e4f2fc4a1e4f816fbdd290249a4f 100644 --- a/CEP/Pipeline/recipes/sip/master/vdsreader.py +++ b/CEP/Pipeline/recipes/sip/master/vdsreader.py @@ -16,10 +16,15 @@ class vdsreader(BaseRecipe): """ Read a GVDS file and return a list of the MS filenames referenced therein together with selected metadata. + + This recipe performs it's functionality at the master side of the recipe: + + 1. Open the gvds file as a parameterset + 2. Convert all part FileNames to mss + 3. Parse start and end time and pointing information - **Arguments** + **no command line arguments:** - None. """ inputs = { 'gvds': ingredient.FileField( @@ -39,6 +44,8 @@ class vdsreader(BaseRecipe): self.logger.info("Starting vdsreader run") super(vdsreader, self).go() + # ********************************************************************* + # 1. Open the gvds file as a parameterset try: gvds = parameterset(self.inputs['gvds']) except: @@ -46,6 +53,9 @@ class vdsreader(BaseRecipe): raise self.logger.info("Building list of measurementsets") + + # ********************************************************************** + # 2. convert al partx.FileName values to ms ms_names = [ gvds.getString("Part%d.FileName" % (part_no,)) for part_no in xrange(gvds.getInt("NParts")) @@ -53,6 +63,9 @@ class vdsreader(BaseRecipe): self.logger.debug(ms_names) self.outputs['data'] = ms_names + + # **********************************************************************\ + # 3. parse start and end time and pointing information try: self.outputs['start_time'] = gvds.getString('StartTime') self.outputs['end_time'] = gvds.getString('EndTime') diff --git a/CEP/Pipeline/recipes/sip/nodes/dppp.py b/CEP/Pipeline/recipes/sip/nodes/dppp.py index 1a90979288ef14a1138f7a459e968d77185d6444..de369323e889d343b29f7e794720b3d590ef3079 100644 --- a/CEP/Pipeline/recipes/sip/nodes/dppp.py +++ b/CEP/Pipeline/recipes/sip/nodes/dppp.py @@ -22,12 +22,24 @@ from lofarpipe.support.lofarnode import LOFARnodeTCP from lofar.parameterset import parameterset class dppp(LOFARnodeTCP): + """ + Call ndppp with a parset augmented with locally calculate parameters: + + 1. preparations. set nthreads, Validate input, clean workspace + 2. Perform house keeping, test if work is already done + 3. Update the parset with locally calculate information + 4. Add ms names to the parset, start/end times if availabe, etc. + 5. Add demixing parameters to the parset + 6. Run ndppp + + """ - def run( - self, infile, outfile, parmdb, sourcedb, - parsetfile, executable, environment, demix_always, demix_if_needed, - start_time, end_time, nthreads, clobber - ): + def run(self, infile, outfile, parmdb, sourcedb, + parsetfile, executable, environment, demix_always, demix_if_needed, + start_time, end_time, nthreads, clobber): + """ + This function contains all the needed functionality + """ # Debugging info self.logger.debug("infile = %s" % infile) self.logger.debug("outfile = %s" % outfile) @@ -44,7 +56,10 @@ class dppp(LOFARnodeTCP): self.logger.debug("clobber = %s" % clobber) self.environment.update(environment) - + + # ******************************************************************** + # 1. preparations. set nthreads, Validate input, clean workspace + # if not nthreads: nthreads = 1 if not outfile: @@ -72,10 +87,12 @@ class dppp(LOFARnodeTCP): "Input and output are identical, not clobbering %s" % outfile ) - else: + else: self.logger.info("Removing previous output %s" % outfile) shutil.rmtree(outfile, ignore_errors=True) + # ***************************************************************** + # 2. Perform house keeping, test if work is already done # If input and output files are different, and if output file # already exists, then we're done. if outfile != infile and os.path.exists(outfile): @@ -97,6 +114,9 @@ class dppp(LOFARnodeTCP): self.environment['OMP_NUM_THREADS'] = str(nthreads) self.logger.debug("Using %s threads for NDPPP" % nthreads) + # ***************************************************************** + # 3. Update the parset with locally calculate information + # Put arguments we need to pass to some private methods in a dict kwargs = { 'infile' : infile, @@ -112,14 +132,19 @@ class dppp(LOFARnodeTCP): # Prepare for the actual DPPP run. with patched_parset( + # ***************************************************************** + # 4. Add ms names to the parset, start/end times if availabe, etc. + # 5. Add demixing parameters to the parset parsetfile, self._prepare_steps(**kwargs) #, unlink=False ) as temp_parset_filename: - self.logger.debug("Created temporary parset file: %s" % + self.logger.debug("Created temporary parset file: %s" % temp_parset_filename ) try: working_dir = tempfile.mkdtemp() + # **************************************************************** + # 6. Run ndppp cmd = [executable, temp_parset_filename, '1'] with CatchLog4CPlus( @@ -128,9 +153,10 @@ class dppp(LOFARnodeTCP): os.path.basename(executable), ) as logger: # Catch NDPPP segfaults (a regular occurance), and retry + catch_segfaults( - cmd, working_dir, self.environment, logger, - cleanup = lambda : shutil.rmtree(tmpfile, ignore_errors=True) + cmd, working_dir, self.environment, logger, + cleanup=lambda : shutil.rmtree(tmpfile, ignore_errors=True) ) # Replace outfile with the updated working copy shutil.rmtree(outfile, ignore_errors=True) @@ -174,7 +200,7 @@ class dppp(LOFARnodeTCP): patch_dictionary['msin.starttime'] = kwargs['start_time'] if kwargs['end_time']: patch_dictionary['msin.endtime'] = kwargs['end_time'] - + # If we need to do a demixing step, we have to do some extra work. # We have to read the parsetfile to check this. parset = parameterset(kwargs['parsetfile']) @@ -183,11 +209,11 @@ class dppp(LOFARnodeTCP): patch_dictionary.update( self._prepare_demix_step(step, **kwargs) ) - + # Return the patch dictionary that must be applied to the parset. return patch_dictionary - - + + def _prepare_demix_step(self, stepname, **kwargs): """ Prepare for a demixing step. This requires the setting of some @@ -201,16 +227,16 @@ class dppp(LOFARnodeTCP): # Add demix directory to sys.path before importing find_a_team module. sys.path.insert(0, os.path.join(os.path.dirname(sys.argv[0]), "demix")) from find_a_team import getAteamList - + patch_dictionary = {} if kwargs['parmdb']: patch_dictionary[stepname + '.instrumentmodel'] = kwargs['parmdb'] if kwargs['sourcedb']: patch_dictionary[stepname + '.skymodel'] = kwargs['sourcedb'] - + demix_always = set(kwargs['demix_always']) demix_if_needed = set(kwargs['demix_if_needed']) - + # If the user specified a list of candidate A-team sources to remove, # then determine the intersection of that list and the list of sources # that need demixing according to the heuristics of getAteamList(). @@ -231,7 +257,7 @@ class dppp(LOFARnodeTCP): ) ) patch_dictionary[stepname + '.subtractsources'] = demix_sources - + # Return the patch dictionary. return patch_dictionary diff --git a/CEP/Pipeline/recipes/sip/nodes/gainoutliercorrection.py b/CEP/Pipeline/recipes/sip/nodes/gainoutliercorrection.py index 127862b3d538c7ff469a0086b6302a39fe035e0f..9d87b0a042ee3aa0711bf1e507c47d9f5c296d94 100644 --- a/CEP/Pipeline/recipes/sip/nodes/gainoutliercorrection.py +++ b/CEP/Pipeline/recipes/sip/nodes/gainoutliercorrection.py @@ -23,11 +23,22 @@ from lofarpipe.support.lofarexceptions import PipelineRecipeFailed from lofarpipe.recipes.helpers.WritableParmDB import WritableParmDB, list_stations from lofarpipe.recipes.helpers.ComplexArray import ComplexArray, RealImagArray, AmplPhaseArray -class GainOutlierCorrection(LOFARnodeTCP): - def run(self, infile, outfile, executable, environment, sigma): +class gainoutliercorrection(LOFARnodeTCP): + """ + Perform a gain outlier correction on the provided parmdb. + The functionality is based on the edit_parmdb script of John Swinbank. + + Outliers in the gain are swapped with the median. resulting gains + are written back to the supplied ms: + + 1. Select correction correction method + 2. Call parmexportcal for gain correction + 3. use gainoutliercorrect from Swinbank + Step are summarized in the functions of this recipe + """ + def run(self, infile, outfile, executable, environment, sigma): self.environment.update(environment) - # Time execution of this job with log_time(self.logger): if os.path.exists(infile): @@ -39,18 +50,25 @@ class GainOutlierCorrection(LOFARnodeTCP): return 1 # Create output directory (if it doesn't already exist) create_directory(os.path.dirname(outfile)) - + # ******************************************************************** + # 1. Select correction method if not os.access(executable, os.X_OK) and sigma != None: # If the executable is not accesable and we have a sigma: # use the 'local' functionality (edit parmdb) + + # ********************************************************************* + # 3. use gainoutliercorrect from Swinbank self._filter_stations_parmdb(infile, outfile, sigma) return 0 + # else we need an executable # Check if exists and is executable. if not os.access(executable, os.X_OK): self.logger.error("Executable %s not found" % executable) return 1 + # ******************************************************************** + # 2. Call parmexportcal for gain correction try: temp_dir = tempfile.mkdtemp() with CatchLog4CPlus( @@ -78,8 +96,8 @@ class GainOutlierCorrection(LOFARnodeTCP): the corrected parmdb written to outfile. Outliers in the gain with a distance of median of sigma times std are replaced with the mean. The last value of the complex array - is skipped (John Swinbank: "I found it was bad when I hacked" - " together some code to do this") + is skipped (John Swinbank: "I found it [the last value] was bad when + I hacked together some code to do this") """ sigma = float(sigma) # Create copy of the input file @@ -122,10 +140,11 @@ class GainOutlierCorrection(LOFARnodeTCP): return parmdb, corected_data def _read_polarisation_data_and_type_from_db(self, parmdb, station): - all_matching_names = parmdb.getNames("Gain:*:*:*:{0}".format(station)) """ Read the polarisation data and type from the db. """ + all_matching_names = parmdb.getNames("Gain:*:*:*:{0}".format(station)) + # get the polarisation_data eg: 1:1 # This is based on the 1 trough 3th entry in the parmdb name entry pols = set(":".join(x[1:3]) for x in (x.split(":") for x in all_matching_names)) @@ -237,4 +256,4 @@ if __name__ == "__main__": # and pass the rest to the run() method defined above # -------------------------------------------------------------------------- jobid, jobhost, jobport = sys.argv[1:4] - sys.exit(GainOutlierCorrection(jobid, jobhost, jobport).run_with_stored_arguments()) + sys.exit(gainoutliercorrection(jobid, jobhost, jobport).run_with_stored_arguments()) diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py index 85a6da5c56b085deb128b6121daaefefd8ec5f93..0d7bf436523ca0e980780c75e62d2f25ed13ca2d 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py @@ -35,7 +35,7 @@ class imager_prepare(LOFARnodeTCP): 3. Flag rfi. 4. Add addImagingColumns to the casa ms. 5. Concatenate the time slice measurment sets, to a single virtual ms. - 6. Filter bad stations: Find station with repeated bad measurement and + 6. Filter bad stations. Find station with repeated bad measurement and remove these completely from the dataset. **Members:** @@ -80,8 +80,8 @@ class imager_prepare(LOFARnodeTCP): #****************************************************************** # 2. run dppp: collect frequencies into larger group time_slices = \ - self._run_dppp(working_dir, time_slice_dir, - time_slices_per_image, input_map, subbands_per_group, + self._run_dppp(working_dir, time_slice_dir, + time_slices_per_image, input_map, subbands_per_group, processed_ms_dir, parset, ndppp_executable) self.logger.debug("Produced time slices: {0}".format(time_slices)) @@ -324,6 +324,7 @@ class imager_prepare(LOFARnodeTCP): which produces a set of bad stations. 3. In the final step the bad stations are removed from the dataset using ms select + REF: http://www.lofar.org/wiki/lib/exe/fetch.php?media=msss:pandeymartinez-week9-v1p2.pdf """ # run asciistat to collect statistics about the ms diff --git a/CEP/Pipeline/recipes/sip/nodes/new_bbs.py b/CEP/Pipeline/recipes/sip/nodes/new_bbs.py index 7cc95be6efc33f56a8ebc2e1a646e758ef1db2b4..a0ee543182e7e52ddf4abd582fa7b5c0df8b5755 100644 --- a/CEP/Pipeline/recipes/sip/nodes/new_bbs.py +++ b/CEP/Pipeline/recipes/sip/nodes/new_bbs.py @@ -25,6 +25,9 @@ class new_bbs(LOFARnodeTCP): # Handles running a single BBS kernel on a compute node # -------------------------------------------------------------------------- def run(self, executable, infiles, db_key, db_name, db_user, db_host): + """ + Depricated functionality + """ # executable : path to KernelControl executable # infiles : tuple of MS, instrument- and sky-model files # db_* : database connection parameters @@ -83,7 +86,7 @@ class new_bbs(LOFARnodeTCP): os.path.basename(executable), ): bbs_kernel_process = Popen( - cmd, stdout = PIPE, stderr = PIPE, cwd = working_dir + cmd, stdout=PIPE, stderr=PIPE, cwd=working_dir ) sout, serr = bbs_kernel_process.communicate() log_process_output("BBS kernel", sout, serr, self.logger) diff --git a/CEP/Pipeline/recipes/sip/nodes/setupparmdb.py b/CEP/Pipeline/recipes/sip/nodes/setupparmdb.py index 57a0563433651306263570d60cc6cc331653802a..d2932ea44f958f6b5e695ae8d92372d3bcfd4aa6 100644 --- a/CEP/Pipeline/recipes/sip/nodes/setupparmdb.py +++ b/CEP/Pipeline/recipes/sip/nodes/setupparmdb.py @@ -11,6 +11,12 @@ import shutil import sys class setupparmdb(LOFARnodeTCP): + """ + Put the provided template parmdb at the target location: + + 1. Remove a possible old parmdb at the target location. + 2. Copy the template to the target location + """ def run(self, pdb_in, pdb_out): with log_time(self.logger): self.logger.debug("Copying parmdb: %s --> %s" % (pdb_in, pdb_out)) diff --git a/CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py b/CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py index b2abea746f47f49866e35c1aef55c026cc46db9f..ee35999e2d938e85f86acfab0991718ec62c5e32 100644 --- a/CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py +++ b/CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py @@ -19,9 +19,21 @@ from lofarpipe.support.utilities import catch_segfaults class setupsourcedb(LOFARnodeTCP): + """ + Create the sourcedb at the supplied location + + 1. Create output directory if it does not yet exist. + 2. Create sourcedb + 3. validate performance, cleanup + + """ def run(self, executable, catalogue, skydb, dbtype): + """ + Contains all functionality + """ with log_time(self.logger): - # Create output directory if it does not yet exist. + # **************************************************************** + # 1. Create output directory if it does not yet exist. skydb_dir = os.path.dirname(skydb) try: os.makedirs(skydb_dir) @@ -31,7 +43,9 @@ class setupsourcedb(LOFARnodeTCP): if err[0] != errno.EEXIST: raise - # Remove any old sky database + # **************************************************************** + # 2 Remove any old sky database + # Create the sourcedb shutil.rmtree(skydb, ignore_errors=True) self.logger.info("Creating skymodel: %s" % (skydb)) @@ -50,6 +64,9 @@ class setupsourcedb(LOFARnodeTCP): os.path.basename(executable) ) as logger: catch_segfaults(cmd, scratch_dir, None, logger) + + # ***************************************************************** + # 3. Validate performance and cleanup temp files except CalledProcessError, err: # For CalledProcessError isn't properly propagated by IPython # Temporary workaround... diff --git a/CEP/Pipeline/recipes/sip/nodes/vdsmaker.py b/CEP/Pipeline/recipes/sip/nodes/vdsmaker.py index a9c31ad4c18225a694257204ae81ae80b0e08020..6a5e3e7c0b87a0cfe83e69f13795f416be28132e 100644 --- a/CEP/Pipeline/recipes/sip/nodes/vdsmaker.py +++ b/CEP/Pipeline/recipes/sip/nodes/vdsmaker.py @@ -18,6 +18,9 @@ from lofarpipe.support.lofarnode import LOFARnodeTCP class vdsmaker(LOFARnodeTCP): """ Make a VDS file for the input MS in a specificed location. + + 1. Call the vdsmake executable with supplied arguments + 2. Perform some error checking and validation """ def run(self, infile, clusterdesc, outfile, executable): with log_time(self.logger):