Select Git revision
longbaseline_pipeline.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
longbaseline_pipeline.py 7.61 KiB
# longbaseline_pipeline.py
#
# Copyright (C) 2016, 2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
import logging
from math import ceil
from base_pipeline_estimator import BasePipelineResourceEstimator
logger = logging.getLogger(__name__)
DATAPRODUCTS = "Observation.DataProducts."
PIPELINE = "Observation.ObservationControl.PythonControl."
#Observation.DataProducts.Output_Correlated.storageClusterName=
class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator):
""" ResourceEstimator for Long Baseline Pipelines
"""
def __init__(self):
logger.info("init LongBaselinePipelineResourceEstimator")
BasePipelineResourceEstimator.__init__(self, name='pipeline') #FIXME name='longbaseline_pipeline'
self.required_keys = ('Observation.startTime',
'Observation.stopTime',
DATAPRODUCTS + 'Input_Correlated.enabled',
DATAPRODUCTS + 'Input_Correlated.identifications',
#DATAPRODUCTS + 'Input_Correlated.storageClusterName', # enable if input bandwidth is also estimated
DATAPRODUCTS + 'Output_Correlated.enabled',
DATAPRODUCTS + 'Output_Correlated.identifications',
DATAPRODUCTS + 'Output_Correlated.storageClusterName',
PIPELINE + 'LongBaseline.subbandgroups_per_ms',
PIPELINE + 'LongBaseline.subbands_per_subbandgroup')
def _calculate(self, parset, predecessor_estimates):
""" Estimate for Long Baseline Pipeline
calculates: datasize (number of files, file size), bandwidth
For a predecessor_estimates example, see the calibration/averaging
(and possibly the observation) estimator code.
For a return value example, see the calibration/averaging estimator code,
except that there is no 'im' input or output, and instead of a 'start_sb_nr',
we have a 'start_sbg_nr' property.
"""
logger.debug("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset)
result = {'errors': [], 'estimates': []}
subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults?
subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0)
if not subbandgroups_per_ms or not subbands_per_subbandgroup:
logger.error('subbandgroups_per_ms or subbands_per_subbandgroup are not valid')
result['errors'].append('subbandgroups_per_ms or subbands_per_subbandgroup are not valid')
if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'):
logger.error('Output_Correlated is not enabled')
result['errors'].append('Output_Correlated is not enabled')
if result['errors']:
return result
duration = self._getDuration(parset.getString('Observation.startTime'),
parset.getString('Observation.stopTime'))
# NOTE: input bandwidth is not included in the resulting estimate atm.
# Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ...
#input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName')
input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications')
output_ident_uv = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') )
output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName')
for pred_est in predecessor_estimates:
pred_output_files = pred_est.get('output_files')
if pred_output_files is None:
continue
input_files = self._filterInputs(pred_output_files, input_idents_uv)
if not input_files:
continue
if 'uv' not in input_files:
logger.error('Missing uv dataproducts in predecessor output_files')
result['errors'].append('Missing uv dataproducts in predecessor output_files')
continue
estimate = {'input_files': input_files}
nr_input_files = input_files['uv']['properties']['nr_of_uv_files']
uv_input_file_size = input_files['uv']['properties']['uv_file_size'] # same for each uv data product across all SAPs
start_sb_nr = input_files['uv']['properties']['start_sb_nr']
if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0:
logger.error('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs')
result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs')
continue
total_nr_input_files = nr_input_files * pred_est['resource_count']
nr_output_files = total_nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms)
logger.debug("calculate correlated data size")
uv_output_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler
start_sbg_nr = start_sb_nr / (subbands_per_subbandgroup * subbandgroups_per_ms)
logger.info("correlated_uv: {} files {} bytes each".format(nr_output_files, uv_output_file_size))
estimate['output_files'] = {'uv': {'identification': output_ident_uv,
'properties': {'nr_of_uv_files': 1, # also see estimate['resource_count'] below
'uv_file_size': uv_output_file_size,
'start_sbg_nr': start_sbg_nr}}}
# count total data size
data_size = nr_output_files * uv_output_file_size # bytes
if data_size:
bandwidth = int(ceil(8 * data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size}
estimate['resource_count'] = nr_output_files # as such, potentially different resources can be allocated for each output
estimate['root_resource_group'] = output_cluster_uv
else:
logger.error('An estimate of zero was calculated!')
result['errors'].append('An estimate of zero was calculated!')
result['estimates'].append(estimate)
if not result['estimates'] and not result['errors']:
logger.error('long baseline estimator produced no estimates')
result['errors'].append('long baseline pipeline estimator produced no estimates')
return result