Skip to content
Snippets Groups Projects
Select Git revision
  • 706f0f28a27c28f05811a0694a65424fc6b8e6df
  • master default protected
  • MAM-110-propagate-output-sasid
  • MAM-109-specify-ingest-location
  • master-backup-september-2024
5 results

models.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    longbaseline_pipeline.py 7.61 KiB
    # longbaseline_pipeline.py
    #
    # Copyright (C) 2016, 2017
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    #
    # $Id$
    
    import logging
    from math import ceil
    from base_pipeline_estimator import BasePipelineResourceEstimator
    
    logger = logging.getLogger(__name__)
    
    DATAPRODUCTS = "Observation.DataProducts."
    PIPELINE = "Observation.ObservationControl.PythonControl."
    
    #Observation.DataProducts.Output_Correlated.storageClusterName=
    
    class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator):
        """ ResourceEstimator for Long Baseline Pipelines
        """
        def __init__(self):
            logger.info("init LongBaselinePipelineResourceEstimator")
            BasePipelineResourceEstimator.__init__(self, name='pipeline') #FIXME name='longbaseline_pipeline'
            self.required_keys = ('Observation.startTime',
                                  'Observation.stopTime',
                                  DATAPRODUCTS + 'Input_Correlated.enabled',
                                  DATAPRODUCTS + 'Input_Correlated.identifications',
                                  #DATAPRODUCTS + 'Input_Correlated.storageClusterName',  # enable if input bandwidth is also estimated
                                  DATAPRODUCTS + 'Output_Correlated.enabled',
                                  DATAPRODUCTS + 'Output_Correlated.identifications',
                                  DATAPRODUCTS + 'Output_Correlated.storageClusterName',
                                  PIPELINE + 'LongBaseline.subbandgroups_per_ms',
                                  PIPELINE + 'LongBaseline.subbands_per_subbandgroup')
    
    
    
        def _calculate(self, parset, predecessor_estimates):
            """ Estimate for Long Baseline Pipeline
            calculates: datasize (number of files, file size), bandwidth
    
            For a predecessor_estimates example, see the calibration/averaging
            (and possibly the observation) estimator code.
    
            For a return value example, see the calibration/averaging estimator code,
            except that there is no 'im' input or output, and instead of a 'start_sb_nr',
            we have a 'start_sbg_nr' property.
            """
            logger.debug("start estimate '{}'".format(self.name))
            logger.info('parset: %s ' % parset)
    
            result = {'errors': [], 'estimates': []}
    
            subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults?
            subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0)
            if not subbandgroups_per_ms or not subbands_per_subbandgroup:
                logger.error('subbandgroups_per_ms or subbands_per_subbandgroup are not valid')
                result['errors'].append('subbandgroups_per_ms or subbands_per_subbandgroup are not valid')
            if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'):
                logger.error('Output_Correlated is not enabled')
                result['errors'].append('Output_Correlated is not enabled')
            if result['errors']:
                return result
    
            duration = self._getDuration(parset.getString('Observation.startTime'),
                                         parset.getString('Observation.stopTime'))
    
            # NOTE: input bandwidth is not included in the resulting estimate atm.
            # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ...
            #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName')
    
            input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications')
            output_ident_uv = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') )
            output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName')
    
            for pred_est in predecessor_estimates:
                pred_output_files = pred_est.get('output_files')
                if pred_output_files is None:
                    continue
    
                input_files = self._filterInputs(pred_output_files, input_idents_uv)
                if not input_files:
                    continue
    
                if 'uv' not in input_files:
                    logger.error('Missing uv dataproducts in predecessor output_files')
                    result['errors'].append('Missing uv dataproducts in predecessor output_files')
                    continue
    
                estimate = {'input_files': input_files}
    
                nr_input_files     = input_files['uv']['properties']['nr_of_uv_files']
                uv_input_file_size = input_files['uv']['properties']['uv_file_size']  # same for each uv data product across all SAPs
                start_sb_nr        = input_files['uv']['properties']['start_sb_nr']
    
                if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0:
                    logger.error('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs')
                    result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs')
                    continue
                total_nr_input_files = nr_input_files * pred_est['resource_count']
                nr_output_files = total_nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms)
    
                logger.debug("calculate correlated data size")
                uv_output_file_size = 1000  # TODO: 1 kB was hardcoded in the Scheduler
                start_sbg_nr = start_sb_nr / (subbands_per_subbandgroup * subbandgroups_per_ms)
    
                logger.info("correlated_uv: {} files {} bytes each".format(nr_output_files, uv_output_file_size))
                estimate['output_files'] = {'uv': {'identification': output_ident_uv,
                                                   'properties': {'nr_of_uv_files': 1,  # also see estimate['resource_count'] below
                                                                  'uv_file_size': uv_output_file_size,
                                                                  'start_sbg_nr': start_sbg_nr}}}
    
                # count total data size
                data_size = nr_output_files * uv_output_file_size  # bytes
                if data_size:
                    bandwidth = int(ceil(8 * data_size / duration))  # bits/second
                    estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size}
                    estimate['resource_count'] = nr_output_files  # as such, potentially different resources can be allocated for each output
                    estimate['root_resource_group'] = output_cluster_uv
                else:
                    logger.error('An estimate of zero was calculated!')
                    result['errors'].append('An estimate of zero was calculated!')
    
                result['estimates'].append(estimate)
    
            if not result['estimates'] and not result['errors']:
                logger.error('long baseline estimator produced no estimates')
                result['errors'].append('long baseline pipeline estimator produced no estimates')
    
            return result