Skip to content
Snippets Groups Projects
Select Git revision
  • 68490db7c02fa2edd6eef880a2b58295f53b1570
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

vdsmaker.py

Blame
  • Marcel Loose's avatar
    Marcel Loose authored
    Task #2699: Fixed a recently introduced bug in vdsmaker. You cannot delete items from the vdsnames list by job index, because deleting items changes the posiition of the remaining items and the length of the list (causing a IndexError).
    68490db7
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    vdsmaker.py 5.69 KiB
    #                                                         LOFAR IMAGING PIPELINE
    #
    #                                     New vdsmaker recipe: fixed node allocation
    #                                                            John Swinbank, 2010
    #                                                      swinbank@transientskp.org
    # ------------------------------------------------------------------------------
    
    from __future__ import with_statement
    import sys
    import os
    import subprocess
    
    import lofarpipe.support.lofaringredient as ingredient
    from lofarpipe.support.utilities import create_directory
    from lofarpipe.support.baserecipe import BaseRecipe
    from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn
    from lofarpipe.support.remotecommand import ComputeJob
    from lofarpipe.support.data_map import DataMap
    from lofarpipe.support.pipelinelogging import log_process_output
    
    class vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn):
        """
        Generate a GVDS file (and, optionally, individual VDS files per subband;
        see the ``unlink`` input parameter) describing a collection of
        MeasurementSets.
    
        1. Load data from disk, create the output vds paths
        2. Call the vdsmaker node script to generate the vds files
        3. Combine the vds files in a gvds file (master side operation)
        
        **Command line arguments**
    
        A mapfile describing the measurementsets to be processed.
        """
        inputs = {
            'gvds': ingredient.StringField(
                '-g', '--gvds',
                help="File name for output GVDS file"
            ),
            'directory': ingredient.DirectoryField(
                '--directory',
                help="Directory for output GVDS file"
            ),
            'makevds': ingredient.ExecField(
                '--makevds',
                help="Full path to makevds executable"
            ),
            'combinevds': ingredient.ExecField(
                '--combinevds',
                help="Full path to combinevds executable"
            ),
            'unlink': ingredient.BoolField(
                '--unlink',
                help="Unlink VDS files after combining",
                default=True
            ),
            'nproc': ingredient.IntField(
                '--nproc',
                help="Maximum number of simultaneous processes per compute node",
                default=8
            )
        }
    
        outputs = {
            'gvds': ingredient.FileField()
        }
    
        def go(self):
            """
            Contains functionality of the vdsmaker
            """
            super(vdsmaker, self).go()
            # **********************************************************************
            # 1. Load data from disk create output files
            args = self.inputs['args']
            self.logger.debug("Loading input-data mapfile: %s" % args[0])
            data = DataMap.load(args[0])
    
            # Skip items in `data` that have 'skip' set to True
            data.iterator = DataMap.SkipIterator
    
            # Create output vds names
            vdsnames = [
                os.path.join(
                    self.inputs['directory'], os.path.basename(item.file) + '.vds'
                ) for item in data
            ]
    
            # *********************************************************************
            # 2. Call vdsmaker 
            command = "python %s" % (self.__file__.replace('master', 'nodes'))
            jobs = []
            for inp, vdsfile in zip(data, vdsnames):
                jobs.append(
                    ComputeJob(
                        inp.host, command,
                        arguments=[
                            inp.file,
                            self.config.get('cluster', 'clusterdesc'),
                            vdsfile,
                            self.inputs['makevds']
                        ]
                    )
                )
            self._schedule_jobs(jobs, max_per_node=self.inputs['nproc'])
            vdsnames = [
                vds for vds, job in zip(vdsnames, jobs) 
                if job.results['returncode'] == 0
            ]
    
            # *********************************************************************
            # 3. Combine VDS files to produce GDS
            failure = False
            self.logger.info("Combining VDS files")
            executable = self.inputs['combinevds']
            gvds_out = self.inputs['gvds']
            # Create the gvds directory for output files, needed for combine
            create_directory(os.path.dirname(gvds_out))
    
            try:
                command = [executable, gvds_out] + vdsnames
                combineproc = subprocess.Popen(
                    command,
                    close_fds=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE
                )
                sout, serr = combineproc.communicate()
                log_process_output(executable, sout, serr, self.logger)
                if combineproc.returncode != 0:
                    raise subprocess.CalledProcessError(
                        combineproc.returncode, command
                    )
                self.outputs['gvds'] = gvds_out
                self.logger.info("Wrote combined VDS file: %s" % gvds_out)
            except subprocess.CalledProcessError, cpe:
                self.logger.exception(
                    "combinevds failed with status %d: %s" % (cpe.returncode, serr)
                )
                failure = True
            except OSError, err:
                self.logger.error("Failed to spawn combinevds (%s)" % str(err))
                failure = True
            finally:
                if self.inputs["unlink"]:
                    self.logger.debug("Unlinking temporary files")
                    for name in vdsnames:
                        os.unlink(name)
                self.logger.info("vdsmaker done")
    
            if failure:
                self.logger.info("Error was set, exit vds maker with error state")
                return 1
            elif not self.outputs.complete():
                self.logger.info("Outputs incomplete")
            else:
                return 0
    
    
    if __name__ == '__main__':
        sys.exit(vdsmaker().main())