diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index d1a9701a01818aee3a3c40f7c0e206b1f3da387a..45b18dc87c7d762be6509a695284bba48e4dc4ce 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -55,14 +55,15 @@ class BaseResourceEstimator(object): startTime = self._getDateTime(start) endTime = self._getDateTime(end) if startTime >= endTime: + logger.warning("startTime is after endTime") return 1 ##TODO To prevent divide by zero later return totalSeconds(endTime - startTime) #TODO check if this makes duration = int(parset.get('duration', 0)) as a key reduntant? def _calculate(self, parset, input_files={}): - raise NotImplementedError('estimate() in base class is called. Please implement estimate() in your subclass') + raise NotImplementedError('calculate() in base class is called. Please implement calculate() in your subclass') - def estimate(self, parset, input_files={}): + def verify_and_estimate(self, parset, input_files={}): """ Create estimates for a single process based on its parset and input files""" if self._checkParsetForRequiredKeys(parset): estimates = self._calculate(parameterset(parset), input_files) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index d3b6b98dd1e998657e08f0794e76a4b74e91cb12..96f4b4044ee5d428c106f6cfcfce12d7c21d7ae9 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -23,50 +23,82 @@ import logging from math import ceil from base_resource_estimator import BaseResourceEstimator -from lofar.parameterset import parameterset logger = logging.getLogger(__name__) -class CalibrationPipelineResourceEstimator(BaseResourceEstimator): - """ CalibrationPipelineResourceEstimator +DATAPRODUCTS = "Observation.DataProducts." +PIPELINE = "Observation.ObservationControl.PythonControl." + +#Observation.DataProducts.Output_Correlated.storageClusterName= +class CalibrationPipelineResourceEstimator(BaseResourceEstimator): + """ ResourceEstimator for Calibration Pipelines """ def __init__(self, kwargs, input_files): + logger.info("init CalibrationPipelineResourceEstimator") BaseResourceEstimator.__init__(self, name='calibration_pipeline') - self.required_keys = ('correlated.enabled', 'correlated.demixing_settings.freq_step', - 'correlated.demixing_settings.time_step', 'instrument_model.enabled') + self.required_keys = ('Observation.startTime', + 'Observation.stopTime', + DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Output_InstrumentModel.enabled', + DATAPRODUCTS + 'Output_Correlated.enabled', + PIPELINE + 'DPPP.demixer.freqstep', + PIPELINE + 'DPPP.demixer.timestep') def _calculate(self, parset, input_files): - """ Estimate for calibration pipeline + """ Estimate for CalibrationPipeline. Also gets used for AveragingPipeline calculates: datasize (number of files, file size), bandwidth + input_files should look something like: + 'input_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} + + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, + 'im': {'nr_of_im_files': 481, 'im_file_size': 148295} + }} """ logger.debug("start estimate '{}'".format(self.name)) - parset = parameterset(parset).makeSubset('dp.output') - output_files = {} - duration = int(kwargs.get('observation.duration', 1)) - if 'dp_correlated_uv' in input_files: - if parset['correlated']['enabled'] == 'true': - logger.debug("calculate correlated data size") - freq_step = int(parset['correlated']['demixing_settings']['freq_step']) - time_step = int(parset['correlated']['demixing_settings']['time_step']) - reduction_factor = freq_step * time_step - input_file_size = int(input_files['dp_correlated_uv']['file_size']) - output_file_size = 0.0 - if reduction_factor > 0: - new_size = input_file_size / reduction_factor - output_file_size = new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0 - output_files['dp_correlated_uv'] = {'nr_files': int(input_files['dp_correlated_uv']['nr_files']), 'file_size': int(output_file_size)} - logger.debug("dp_correlated_uv: {} files {} bytes each".format(int(input_files['dp_correlated_uv']['nr_files']), int(output_file_size))) + logger.info('parset: %s ' % parset) + result = {'errors': []} + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + freq_step = parset.getInt(PIPELINE + 'DPPP.demixer.freqstep', 1) #TODO, should these have defaults? + time_step = parset.getInt(PIPELINE + 'DPPP.demixer.timestep', 1) + reduction_factor = freq_step * time_step + + if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): + logger.warning('Output_Correlated is not enabled') + result['errors'].append('Output_Correlated is not enabled') + if not 'uv' in input_files: + logger.warning('Missing UV Dataproducts in input_files') + result['errors'].append('Missing UV Dataproducts in input_files') + if reduction_factor < 1: + logger.warning('freqstep * timestep is not valid') + result['errors'].append('freqstep * timestep is not positive') + if result['errors']: + return result + + logger.debug("calculate correlated data size") + result['output_files'] = {} + input_file_size = input_files['uv']['uv_file_size'] + output_file_size = 0.0 + new_size = input_file_size / float(reduction_factor) + output_file_size = new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0 + result['output_files']['uv'] = {'nr_of_uv_files': input_files['uv']['nr_of_uv_files'], 'uv_file_size': int(output_file_size)} + logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['uv']['nr_of_uv_files'], result['output_files']['uv']['uv_file_size'])) - if parset['instrument_model']['enabled'] == 'true': - logger.debug("calculate instrument-model data size") - output_files['dp_instrument_model'] = {'nr_files': int(input_files['dp_correlated_uv']['nr_files']), 'file_size': 1000} - logger.debug("dp_instrument_model: {} files {} bytes each".format(int(input_files['dp_correlated_uv']['nr_files']), 1000)) + if parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled'): + logger.debug("calculate instrument-model data size") + result['output_files']['im'] = {'nr_of_im_files': input_files['uv']['nr_of_uv_files'], 'im_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['im']['nr_of_im_files'], result['output_files']['im']['im_file_size'])) - # count total data size - total_data_size = 0 - for values in output_files.itervalues(): - total_data_size += values['nr_files'] * values['file_size'] - total_bandwidth = ceil((self.total_data_size * 8) / duration) # bits/second - return {"total_data_size":total_data_size, "total_bandwidth":total_bandwidth, "output_files":output_files} + # count total data size + total_data_size = result['output_files']['uv']['nr_of_uv_files'] * result['output_files']['uv']['uv_file_size'] + \ + result['output_files']['im']['nr_of_im_files'] * result['output_files']['im']['im_file_size'] # bytes + total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second + result['storage'] = {'total_size': total_data_size} + result['bandwidth'] = {'total_size': total_bandwidth} + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index e3d239100b7edcf45e9989e5f05924080545ad65..1157dc3601884831ee552b9abd8d262f91975306 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -27,40 +27,73 @@ from lofar.parameterset import parameterset logger = logging.getLogger(__name__) -class ImagePipelineResourceEstimator(BaseResourceEstimator): - """ ImagePipelineResourceEstimator +DATAPRODUCTS = "Observation.DataProducts." +PIPELINE = "Observation.ObservationControl.PythonControl." + +#Observation.DataProducts.Output_Correlated.storageClusterName= +#Observation.ObservationControl.PythonControl.AWimager +class ImagePipelineResourceEstimator(BaseResourceEstimator): + """ ResourceEstimator for Imaging Pipelines """ def __init__(self, kwargs, input_files): - BaseResourceEstimator.__init__(self, name='image_pipeline') - self.parset = ParameterSet(kwargs).make_subset('dp.output') - self.duration = int(kwargs.get('observation.duration', 1)) - self.input_files = input_files - self.required_keys = ('skyimage.enabled', 'skyimage.slices_per_image', 'skyimage.subbands_per_image') - if self.checkParsetForRequiredKeys(): - self.estimate() - return + logger.info("init ImagePipelineResourceEstimator") + BaseResourceEstimator.__init__(self, name='imaging_pipeline') + self.required_keys = ('Observation.startTime', + 'Observation.stopTime', + DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Output_SkyImage.enabled', + PIPELINE + 'Imaging.slices_per_image', + PIPELINE + 'Imaging.subbands_per_image') - def estimate(self): - """ Estimate for image pipeline + def _calculate(self, parset, input_files): + """ Estimate for Imaging Pipeline. Also gets used for MSSS Imaging Pipeline calculates: datasize (number of files, file size), bandwidth + input_files should look something like: + 'input_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} + + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'img': {'nr_of_img_files': 481, 'img_file_size': 148295} + }} """ logger.debug("start estimate '{}'".format(self.name)) - if 'dp_correlated_uv' in self.input_files: - if self.parset['skyimage']['enabled'] == 'true': - logger.debug("calculate skyimage data size") - slices_per_image = int(self.parset['skyimage']['slices_per_image']) - subbands_per_image = int(self.parset['skyimage']['subbands_per_image']) - if slices_per_image and subbands_per_image: - nr_input_subbands = int(self.input_files['dp_correlated_uv']['nr_files']) - if (nr_input_subbands % (subbands_per_image * slices_per_image)) == 0: - nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) - self.output_files['dp_sky_image'] = {'nr_files': nr_images, 'file_size': 1000} - logger.debug("dp_sky_image: {} files {} bytes each".format(nr_images, 1000)) + logger.info('parset: %s ' % parset) + result = {'errors': []} + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? + subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) + + if not parset.getBool(DATAPRODUCTS + 'Output_SkyImage.enabled'): + logger.warning('Output_SkyImage is not enabled') + result['errors'].append('Output_SkyImage is not enabled') + if not 'uv' in input_files: + logger.warning('Missing UV Dataproducts in input_files') + result['errors'].append('Missing UV Dataproducts in input_files') + else: + nr_input_subbands = input_files['uv']['nr_of_uv_files'] + if not slices_per_image or not subbands_per_image: + logger.warning('slices_per_image or subbands_per_image are not valid') + result['errors'].append('Missing UV Dataproducts in input_files') + if nr_input_subbands % (subbands_per_image * slices_per_image) > 0: + logger.warning('slices_per_image and subbands_per_image not a multiple of number of inputs') + result['errors'].append('slices_per_image and subbands_per_image not a multiple of number of inputs') + if result['errors']: + return result + + logger.debug("calculate sky image data size") + result['output_files'] = {} + nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) + result['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("sky_images: {} files {} bytes each".format(result['output_files']['img']['nr_of_img_files'], result['output_files']['img']['img_file_size'])) - # count total data size - for values in self.output_files.itervalues(): - self.total_data_size += values['nr_files'] * values['file_size'] - self.total_bandwidth = ceil((self.total_data_size * 8) / self.duration) # bits/second - return + # count total data size + total_data_size = result['output_files']['img']['nr_of_img_files'] * result['output_files']['img']['img_file_size'] # bytes + total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second + result['storage'] = {'total_size': total_data_size} + result['bandwidth'] = {'total_size': total_bandwidth} + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index eb52034868de6d720f577a0728ae8c94047f426d..9adffa1a555851b3533ab7216aa8c51307cd151a 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -23,48 +23,74 @@ import logging from math import ceil from base_resource_estimator import BaseResourceEstimator -from lofar.parameterset import parameterset logger = logging.getLogger(__name__) +DATAPRODUCTS = "Observation.DataProducts." +PIPELINE = "Observation.ObservationControl.PythonControl." + +#Observation.DataProducts.Output_Correlated.storageClusterName= class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): - """ LongBaselinePipelineResourceEstimator + """ ResourceEstimator for Long Baseline Pipelines """ def __init__(self, kwargs, input_files): + logger.info("init LongBaselinePipelineResourceEstimator") BaseResourceEstimator.__init__(self, name='longbaseline_pipeline') - logger.debug("init LongBaselinePipelineResourceEstimator") - self.parset = ParameterSet(kwargs).make_subset('dp.output') - self.duration = int(kwargs.get('observation.duration', 1)) - self.input_files = input_files - self.required_keys = ('correlated.enabled', 'longbaseline.subband_groups_per_ms', - 'longbaseline.subbands_per_subband_group') - if self.checkParsetForRequiredKeys(): - self.estimate() - return + self.required_keys = ('Observation.startTime', + 'Observation.stopTime', + DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Output_Correlated.enabled', + PIPELINE + 'LongBaseline.subbandgroups_per_ms', + PIPELINE + 'LongBaseline.subbands_per_subbandgroup') - def estimate(self): - """ Estimate for calibration pipeline + def _calculate(self, parset, input_files): + """ Estimate for Long Baseline Pipeline calculates: datasize (number of files, file size), bandwidth + input_files should look something like: + 'input_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} + + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104} + }} """ logger.debug("start estimate '{}'".format(self.name)) - if 'dp_correlated_uv' in self.input_files: - if self.parset['correlated']['enabled'] == 'true': - logger.debug("calculate long baseline data size") + logger.info('parset: %s ' % parset) + result = {'errors': []} + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? + subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) - nr_output_files = 0 - subband_groups_per_ms = int(self.parset['longbaseline']['subband_groups_per_ms']) - subbands_per_subband_group = int(self.parset['longbaseline']['subbands_per_subband_group']) - if subband_groups_per_ms and subbands_per_subband_group: - nr_input_files = int(self.input_files['dp_correlated_uv']['nr_files']) - if (nr_input_files % (subbands_per_subband_group * subband_groups_per_ms)) == 0: - nr_output_files = nr_input_files / (subbands_per_subband_group * subband_groups_per_ms) - self.output_files['dp_correlated_uv'] = {'nr_files': nr_output_files, 'file_size': 1000} - logger.debug("dp_correlated_uv: {} files {} bytes each".format(nr_output_files, 1000)) + if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): + logger.warning('Output_Correlated is not enabled') + result['errors'].append('Output_Correlated is not enabled') + if not 'uv' in input_files: + logger.warning('Missing UV Dataproducts in input_files') + result['errors'].append('Missing UV Dataproducts in input_files') + else: + nr_input_files = input_files['uv']['nr_of_uv_files'] + if not subbandgroups_per_ms or not subbands_per_subbandgroup: + logger.warning('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') + result['errors'].append('Missing UV Dataproducts in input_files') + if nr_input_files % (subbands_per_subband_group * subband_groups_per_ms) > 0: + logger.warning('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + if result['errors']: + return result - # count total data size - for values in self.output_files.itervalues(): - self.total_data_size += values['nr_files'] * values['file_size'] - self.total_bandwidth = ceil((self.total_data_size * 8) / self.duration) # bits/second - return + logger.debug("calculate correlated data size") + result['output_files'] = {} + nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) + result['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['uv']['nr_of_uv_files'], result['output_files']['uv']['uv_file_size'])) + # count total data size + total_data_size = result['output_files']['uv']['nr_of_uv_files'] * result['output_files']['uv']['uv_file_size'] # bytes + total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second + result['storage'] = {'total_size': total_data_size} + result['bandwidth'] = {'total_size': total_bandwidth} + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index 50b01555b0ede30943c986b29fddbe86b33c7ea6..e3e8e1622c7f1e28172caad8d2ceb804b77af5ff 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -53,6 +53,16 @@ class ObservationResourceEstimator(BaseResourceEstimator): def _calculate(self, parset, input_files={}): """ Calculate the combined resources needed by the different observation types that can be in a single observation. + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, + 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, + {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, + {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} + ]}}} + The base_resource_estimator adds an {'observation': } around this. """ logger.info("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) @@ -88,7 +98,8 @@ class ObservationResourceEstimator(BaseResourceEstimator): return result def correlated(self, parset, duration): - """ Estimate number of files, file size and bandwidth needed for correlated data""" + """ Estimate number of files, file size and bandwidth needed for correlated data + """ logger.info("calculating correlated datasize") size_of_header = 512 #TODO More magic numbers (probably from Alwin). ScS needs to check these. They look ok though. size_of_overhead = 600000 diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index d907c2f2f2e51e56b158e996ee88c21e26410f0f..f3a8e12949f0d69f6e5f889384871aec9fbd6c65 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -23,49 +23,74 @@ import logging from math import ceil from base_resource_estimator import BaseResourceEstimator -from lofar.parameterset import parameterset logger = logging.getLogger(__name__) -class PulsarPipelineResourceEstimator(BaseResourceEstimator): - """ PulsarPipelineResourceEstimator +DATAPRODUCTS = "Observation.DataProducts." +PIPELINE = "Observation.ObservationControl.PythonControl." + +#Observation.DataProducts.Output_Correlated.storageClusterName= +class PulsarPipelineResourceEstimator(BaseResourceEstimator): + """ ResourceEstimator for Pulsar Pipelines """ def __init__(self, kwargs, input_files): + logger.info("init PulsarPipelineResourceEstimator") BaseResourceEstimator.__init__(self, name='pulsar_pipeline') - self.parset = ParameterSet(kwargs).make_subset('dp.output') - self.duration = int(kwargs.get('observation.duration', 1)) - self.coherent_stokes_type = kwargs.get('observation.coherent_stokes.type') - self.input_files = input_files - self.required_keys = ('pulsar.enabled',) - if self.checkParsetForRequiredKeys(): - self.estimate() - return + self.required_keys = ('Observation.startTime', + 'Observation.stopTime', + DATAPRODUCTS + 'Input_CoherentStokes.enabled', + DATAPRODUCTS + 'Input_IncoherentStokes.enabled', + DATAPRODUCTS + 'Output_Pulsar.enabled') - def estimate(self): - """ Estimate for pulsar pipeline + def _calculate(self, parset, input_files): + """ Estimate for Pulsar Pipeline calculates: datasize (number of files, file size), bandwidth + input_files should look something like: + 'input_files': + {'cs': {'nr_of_cs_files': 48, 'nr_of_cs_stokes': 4, 'cs_file_size': 1482104}, ...} + + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'pulp': {'nr_of_pulp_files': 48, 'pulp_file_size': 185104} + }} """ logger.debug("start estimate '{}'".format(self.name)) - if self.parset['pulsar']['enabled'] == 'true': - logger.debug("calculate pulsar data size") - nr_output_files = 0 - if 'dp_coherent_stokes' in self.input_files: - nr_input_files = int(self.input_files['dp_coherent_stokes']['nr_files']) - if self.coherent_stokes_type == 'DATA_TYPE_XXYY': - nr_output_files += nr_input_files / 4 - else: - nr_output_files += nr_input_files + logger.info('parset: %s ' % parset) + result = {'errors': []} + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) - if 'dp_incoherent_stokes' in self.input_files: - nr_input_files = int(self.input_files['dp_incoherent_stokes']['nr_files']) + if not parset.getBool(DATAPRODUCTS + 'Output_Pulsar.enabled'): + logger.warning('Output_Pulsar is not enabled') + result['errors'].append('Output_Pulsar is not enabled') + if not 'cs' in input_files and not 'is' in input_files: + logger.warning('Missing Both CS and IS Dataproducts in input_files') + result['errors'].append('Missing Both CS and IS Dataproducts in input_files') + if result['errors']: + return result + + logger.debug("calculate pulp data size") + result['output_files'] = {} + nr_output_files = 0 + if 'cs' in input_files: + nr_input_files = input_files['cs']['nr_of_cs_files'] + if input_files['cs']['nr_of_cs_stokes'] == 4: ##TODO Check if this is the same as coherent_stokes_type == 'XXYY' + nr_output_files += nr_input_files / 4 ## Then nr_output_files = nr_input_files / input_files['cs']['nr_of_cs_stokes'] + else: nr_output_files += nr_input_files - self.output_files['dp_pulsar'] = {'nr_files': nr_output_files, 'file_size': 1000} - logger.debug("dp_pulsar: {} files {} bytes each".format(nr_output_files, 1000)) + if 'is' in input_files: + nr_input_files = input_files['is']['nr_of_is_files'] + nr_output_files += nr_input_files + + result['output_files']['pulp'] = {'nr_of_pulp_files': nr_output_files, 'pulp_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['pulp']['nr_of_pulp_files'], result['output_files']['pulp']['pulp_file_size'])) - # count total data size - for values in self.output_files.itervalues(): - self.total_data_size += values['nr_files'] * values['file_size'] - self.total_bandwidth = ceil((self.total_data_size * 8) / self.duration) # bits/second - return + # count total data size + total_data_size = result['output_files']['pulp']['nr_of_pulp_files'] * result['output_files']['pulp']['pulp_file_size'] # bytes + total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second + result['storage'] = {'total_size': total_data_size} + result['bandwidth'] = {'total_size': total_bandwidth} + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index 342e180c4aabcd444aa6bb17f98dbca39c556e65..66b4b27b9f3d97e1f766a37ae44f1ff2a6250836 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -18,44 +18,74 @@ class ResourceEstimatorHandler(MessageHandlerInterface): def __init__(self, **kwargs): super(ResourceEstimatorHandler, self).__init__(**kwargs) self.observation = ObservationResourceEstimator() - #self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator() - #self.calibration_pipeline = CalibrationPipelineResourceEstimator() - #self.pulsar_pipeline = PulsarPipelineResourceEstimator() - #self.imaging_pipeline = ImagePipelineResourceEstimator() + self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator() + self.calibration_pipeline = CalibrationPipelineResourceEstimator() + self.pulsar_pipeline = PulsarPipelineResourceEstimator() + self.imaging_pipeline = ImagePipelineResourceEstimator() def handle_message(self, content): specification_tree = content["specification_tree"] return self._get_estimated_resources(specification_tree) ##TODO also handle MoM tasks in RA 1.2 - #def _getPredecessors(self, parset): - - - def _get_estimated_resources(self, specification_tree): - logger.info('get_estimated_resources on: %s' % specification_tree) - result = {} - + def get_subtree_estimate(self, specification_tree): otdb_id = specification_tree['otdb_id'] - main_parset = specification_tree['specification'] + parset = specification_tree['specification'] if specification_tree['task_type'] == 'observation': - result[str(otdb_id)] = self.observation.estimate(main_parset) - - #TODO: implement properly - #pipeline_input_files = result['observation']['output_files'] - - #longbaseline = LongBaselinePipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) - #result.update(longbaseline.result_as_dict()) - - #calibration = CalibrationPipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) - #result.update(calibration.result_as_dict()) - - #pulsar = PulsarPipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) - #result.update(pulsar.result_as_dict()) - - #image = ImagePipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) - #result.update(image.result_as_dict()) - - return result + return {str(otdb_id): self.observation.verify_and_estimate(parset)} + elif specification_tree['task_type'] == 'pipeline': + branch_estimates = {} + for branch in specification_tree['predecessors']: + branch_estimates.update(get_subtree_estimate(branch)) + + if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']: + for id, estimate in branch_estimates: + if not 'im' in estimate['output_files'] and 'uv' in estimate['output_files']: # Not a calibrator pipeline + logger.info('found %d as the target of pipeline %d' % (id, otdb_id)) + input_files = estimate['output_files'] # Need sap here as well + return {str(otdb_id): self.calibration_pipeline.verify_and_estimate(parset, input_files)} + + if specification_tree['task_subtype'] in ['imaging pipeline', 'imaging pipeline msss']: + if len(branch_estimates) > 1: + logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) + input_files = branch_estimates.items()[0][1]['ouput_files'] + return {str(otdb_id): self.calibration_pipeline.verify_and_estimate(parset, input_files)} + + if specification_tree['task_subtype'] in ['long baseline pipeline']: + if len(branch_estimates) > 1: + logger.error('Long baseline pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) + input_files = branch_estimates.items()[0][1]['ouput_files'] + return {str(otdb_id): self.longbaseline_pipeline.verify_and_estimate(parset, input_files)} + + if specification_tree['task_subtype'] in ['pulsar pipeline']: + if len(branch_estimates) > 1: + logger.error('Pulsar pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) + input_files = branch_estimates.items()[0][1]['ouput_files'] + return {str(otdb_id): self.pulsar_pipeline.verify_and_estimate(parset, input_files)} + + else: # reservation, maintenance, system tasks? + logger.info("It's not a pipeline or observation: %s" % otdb_id) + return {str(otdb_id): {}} + def _get_estimated_resources(self, specification_tree): + """ Input is like: + {"otdb_id": otdb_id, "state": 'prescheduled', 'specification': ..., + 'task_type': "pipeline", 'task_subtype': "long baseline pipeline", + 'predecessors': [...]} + + reply is something along the lines of: + {'452648': + {'observation': + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, + 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, + {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, + {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} + ]}}}}} + """ + logger.info('get_estimated_resources on: %s' % specification_tree) + return self.get_subtree_estimate(specification_tree): def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None): return Service(servicename=servicename,