diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index 2c4a3bca8ae97677eb97c44209ac1d5ccd2067d6..c521b4215b393b81dd57d509277e1b29bbaca556 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -163,8 +163,10 @@ class RAtoOTDBPropagator(): u'type_name': u'nr_of_uv_files', u'value': 240, u'io_type_id': 1, u'type_id': 2, u'id': 810}, {u'io_type_name': u'input', u'type_name': u'uv_file_size', u'value': 43957416, u'io_type_id': 1, u'type_id': 10, u'id': 811}]} + output something like: - {'output_files': {u'nr_of_im_files': 488, u'nr_of_uv_files': 488, u'im_file_size': 1000, u'uv_file_size': 32500565}} + {'output_files': {'resource_name': u'CEP4_storage:/data', u'nr_of_im_files': 488, + u'nr_of_uv_files': 488, u'im_file_size': 1000, u'uv_file_size': 32500565}} """ input_files = {} output_files = {} @@ -216,7 +218,7 @@ class RAtoOTDBPropagator(): info["endtime"] = task["endtime"] info["status"] = task["status"] info["type"] = task["type"] - info["cluster"] = task["cluster"] # tie this to data product type when >1 cluster per obs is supported + info["cluster"] = task["cluster"] claims = self.radbrpc.getResourceClaims(task_ids=ra_id, resource_type='storage', extended=True, include_properties=True) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index 86b0c9e02c24d6204972dc84768b7837f2427d24..cc1c0302544c278997db7044e3cd3e4a1a7bdad8 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -110,7 +110,7 @@ class RAtoOTDBTranslator(): pipeline_storage_props.append(prop) continue - ## It's an observation + ## It's an observation (or pipeline input from an observation) for sap in prop['saps']: if 'nr_of_uv_files' not in sap['properties'] or sap['properties']['nr_of_uv_files'] < 1: continue @@ -149,7 +149,7 @@ class RAtoOTDBTranslator(): result[PREFIX + 'DataProducts.%s_Correlated.skip' % (io_type)] = '[' + to_csv_string(obs_skip) + ']' - # Pipeline (never in the same claims as Observation *and* io_type 'Output', as Obs always is) + # Pipeline (output, or input from another pipeline) locations = [] filenames = [] skip = [] diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index e398802dfb616ac3886225375dda03437c58bd62..d6cffeb57429e869142e53c30677e0646aac05a1 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -660,6 +660,7 @@ class ResourceAssigner(): def getProperties(self, db_resource_prop_types, files_dict, io_type): """ Return list of properties in claim format converted from files_dict. + E.g. files_dict: {'cs': {'sap_nr': 2, ..., 'properties': {'nr_of_uv_files': 123, ...}}, ...} """ if files_dict is None: return [] @@ -667,33 +668,19 @@ class ResourceAssigner(): logger.info('getProperties: processing %s_files: %s', io_type, files_dict) properties = [] - for group_name, needed_prop_group in files_dict.items(): # e.g. 'cs', {'...': 123, ...} or 'saps', [{...}, {...}] - if group_name == 'saps': - for sap_dict in needed_prop_group: - props = self.makeProperties(db_resource_prop_types, sap_dict['properties'], - sap_dict['sap_nr'], io_type) - properties.extend(props) - else: - props = self.makeProperties(db_resource_prop_types, needed_prop_group, - None, io_type) - properties.extend(props) - - return properties + for dptype, dptype_dict in files_dict.items(): + sap_nr = dptype_dict.get('sap_nr') # only with obs output and obs successor input - def makeProperties(self, db_resource_prop_types, properties_dict, sap_nr, io_type): - """ helper for getProperties() """ - properties = [] - - for prop_type_name, prop_value in properties_dict.items(): - rc_property_type_id = db_resource_prop_types.get(prop_type_name) - if rc_property_type_id is None: - logger.warn('makeProperties: unknown prop_type: %s', prop_type_name) - continue - prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type} - if sap_nr is not None: - prop['sap_nr'] = sap_nr + for prop_type_name, prop_value in dptype_dict['properties'].items(): + rc_property_type_id = db_resource_prop_types.get(prop_type_name) + if rc_property_type_id is None: + logger.error('getProperties: ignoring unknown prop type: %s', prop_type_name) + continue - properties.append(prop) + prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type} + if sap_nr is not None: + prop['sap_nr'] = sap_nr + properties.append(prop) return properties diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py index 1d666829ccbf019353c3955a1d9e600bd7d92b26..5e0ddef38c578451f45b71dad2bc2ec61181f48e 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py @@ -18,11 +18,9 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: base_resource_estimator.py 33534 2016-02-08 14:28:26Z schaap $ +# $Id$ import logging -import pprint -from math import ceil from base_resource_estimator import BaseResourceEstimator logger = logging.getLogger(__name__) @@ -49,3 +47,17 @@ class BasePipelineResourceEstimator(BaseResourceEstimator): logger.error(e) logger.info("Could not get duration from parset, returning default pipeline duration of 1 hour") return 3600 + + def _getOutputIdentification(self, identifications_key): + """ For pipeline output, there must be exactly 1 (non-duplicate) identification string per + data product type. (How can you otherwise refer to it unambiguously?) (Observation output can have 1 per sap.) + """ + idents = parset.getStringVector(identifications_key) + if len(set(idents)) != 1: # make set to filter any duplicates + if not idents: + return '' # allow, because irrelevant if no successor planned + else: + logger.error("Cannot have multiple pipeline output identifications. Dropping all but the first in: %s", idents) # see doc string + return idents[0] + + diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index a1022934622a4a1f6bbfac53c3ad97acc3abc32a..33127049510646cd71284f1a7c756cf075aaeb9f 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -18,12 +18,11 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: base_resource_estimator.py 33534 2016-02-08 14:28:26Z schaap $ +# $Id$ """ Base class for Resource Estimators """ import logging -import copy import pprint from datetime import datetime from lofar.common.datetimeutils import totalSeconds @@ -62,56 +61,56 @@ class BaseResourceEstimator(object): return totalSeconds(endTime - startTime) #TODO check if this makes duration = int(parset.get('duration', 0)) as a key reduntant? - def _calculate(self, parset, input_files={}): + def _calculate(self, parset, predecessor_estimates=[]): raise NotImplementedError('calculate() in base class is called. Please implement calculate() in your subclass') + def _extract_sap_nr(self, identification): + """ Return sap nr as int from identification or None if + no int xxx in '.SAPxxx' in identification. + """ + for s in identification.split('.'): # Find the SAP number, if present + if 'SAP' not in s: + continue + try: + return int(s[3:]) + except: + pass + + return None + def _filterInputs(self, input_files, identifications): - """'input_files': -TODO - {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104, - 'identifications':['mom.G355415.B2.1.C.SAP000.uv.dps','mom.G355415.B2.1.C.SAP001.uv.dps','mom.G355415.B2.1.C.SAP002.uv.dps']}, - 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319, ...}}, - {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81, ...}}, - {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81, ...}} - ]}}} - 'identifications': 'mom.G355415.B2.1.C.SAP000.uv.dps','mom.G355415.B2.1.C.SAP001.uv.dps' + """ Return copy of input_files with only parts of input_files covered by identifications. + There may not be duplicates in the identifications iterable. + + Example argument values: + 'input_files': { + 'uv': {'identification': 'mom.G777955.B15.1.CPC.uv.dps', # or w/ obs e.g.: 'mom.G777955.B15.1.C.SAP000.uv.dps' + 'uv_file_size': 1073741824, 'nr_of_uv_files': 42, <uv specific key-values>}, + 'im': {'identification': 'mom.G777955.B15.1.CPC.inst.dps', + 'im_file_size': 1000, 'nr_of_im_files': 42, <im specific key-values>} + } + There may also be a key-value pair 'sap_nr': N, typically for observation. + 'identifications': ['mom.G777955.B2.1.C.SAP002.uv.dps', ...] (or without SAPxxx in it, as shown in the input_files example). """ output_files = {} - if 'saps' in input_files: - output_files['saps'] = [] logger.info('parsing input for identifications: %s' % (identifications,)) + for identification in identifications: + sap_nr = self._extract_sap_nr(identification) for data_type, data_properties in input_files.items(): - if data_type == 'saps': - continue - if identification in data_properties['identifications']: #This data_type matches an input identification + if identification == data_properties['identification']: logger.info('Found input identification matching %s' % (identification,)) - output_files[data_type] = copy.deepcopy(data_properties) - if 'SAP' in identification: #Special case for identifications that contain a SAP number - # We would need to make this smarter if we can have the data from multiple SAPs as input. - # Or or different input sources of the same data_type - for s in identification.split('.'): # Find the SAP number - if 'SAP' in s: - try: - sap_nr = int(s[3:]) - except: - sap_nr = None - for sap in input_files['saps']: - if sap['sap_nr'] == sap_nr: - for sap_data_type, sap_data_value in sap['properties'].items(): - if sap_data_type in data_properties: # We found this SAP's nr_of_<data_type>_files - output_files[data_type][sap_data_type] = sap_data_value # We only count the nr_of_files from this SAP - output_files['saps'].append({'sap_nr': sap_nr, 'properties': sap['properties']}) - if sap_data_type == 'start_sb_nr': - output_files[data_type]['start_sb_nr'] = sap_data_value - logger.info('filtered input files down to: \n' + pprint.pformat(output_files)) - return output_files + output_files[data_type] = dict(data_properties) # shallow copy is enough to avoid unintended changes + logger.info('_filterInputs: filtered down to: \n' + pprint.pformat(output_files)) + return output_files - def verify_and_estimate(self, parset, input_files={}): - """ Create estimates for a single process based on its parset and input files""" + def verify_and_estimate(self, parset, predecessor_estimates=[]): + """ Create estimates for an observation or pipeline step based on its parset and, + in case of a pipeline step, all estimates of its direct predecessor(s). + """ if self._checkParsetForRequiredKeys(parset): - estimates = self._calculate(parameterset(parset), input_files) + estimates = self._calculate(parameterset(parset), predecessor_estimates) else: raise ValueError('The parset is incomplete') result = {} diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index 2ad2ff2b8293d40ec776bb286fe7c0081e3fbfb1..f0a2e1f9ea77f3acafc5b5d55a67fc42861f7f95 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -1,4 +1,4 @@ -# base_resource_estimator.py +# calibration_pipeline.py # # Copyright (C) 2016, 2017 # ASTRON (Netherlands Institute for Radio Astronomy) @@ -18,10 +18,9 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: base_resource_estimator.py 33534 2016-02-08 14:28:26Z schaap $ +# $Id$ import logging -import pprint from math import ceil from base_pipeline_estimator import BasePipelineResourceEstimator @@ -42,106 +41,160 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.identifications', - #DATAPRODUCTS + 'Input_Correlated.storageClusterName', # TODO: also add input estimates + #DATAPRODUCTS + 'Input_Correlated.storageClusterName', # enable if input bandwidth is also estimated DATAPRODUCTS + 'Output_InstrumentModel.enabled', DATAPRODUCTS + 'Output_InstrumentModel.identifications', - #DATAPRODUCTS + 'Output_InstrumentModel.storageClusterName', # not used atm, typically too small to care + DATAPRODUCTS + 'Output_InstrumentModel.storageClusterName', DATAPRODUCTS + 'Output_Correlated.enabled', DATAPRODUCTS + 'Output_Correlated.identifications', DATAPRODUCTS + 'Output_Correlated.storageClusterName', PIPELINE + 'DPPP.demixer.freqstep', PIPELINE + 'DPPP.demixer.timestep') - def _calculate(self, parset, input_files): - """ Estimate for CalibrationPipeline. Also gets used for AveragingPipeline + def _calculate(self, parset, predecessor_estimates): + """ Estimate for Calibration Pipeline. Also gets used for Averaging Pipeline calculates: datasize (number of files, file size), bandwidth - input_files should look something like: -TODO - 'input_files': - {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} - - reply is something along the lines of: - {'bandwidth': {'total_size': 19021319494}, - 'storage': {'total_size': 713299481024, - 'output_files': - {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, - 'im': {'nr_of_im_files': 481, 'im_file_size': 148295} - }} + + predecessor_estimates looks something like (see also output format in observation.py and (other) pipelines): + [{ + 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # per sb + 'resource_count': 20, 'root_resource_group': 'CEP4', + 'output_files': { + 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}] + } + }, + <optionally more estimates> + ] + + The reply is something along the lines of: (assumes task duration of 30 s) + [{ + 'resource_types': {'bandwidth': 2236995, 'storage': 67109864}, # for each uv+im output data prod (thus the total is times the resource_count val) + 'resource_count': 20, 'root_resource_group': 'CEP4', + 'input_files': { + 'uv': {'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', # w/ sap only if predecessor is an observation + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, + 'im': {'identification': ..., # no 'im' as input if predecessor is an observation + 'properties': {'im_file_size': 1000, 'nr_of_im_files': 1, 'start_sb_nr': 0}} + }, + 'output_files': { + 'uv': {'identification': 'mom.G777956.B2.1.CPC.uv.dps', + 'properties': {'uv_file_size': 67108864, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, + 'im': {'identification': 'mom.G777956.B2.1.CPC.inst.dps', + 'properties': {'im_file_size': 1000, 'nr_of_im_files': 1, 'start_sb_nr': 0}} + } + }, + <optionally more estimates> + ] """ logger.debug("start estimate '{}'".format(self.name)) - logger.info('parset:\n%s' % (parset,)) - logger.info('input_files: \n' + pprint.pformat(input_files)) - result = {'errors': [], 'estimates': [{}]} # can all be described in 1 estimate here -#TODO: really? What if input_files from output_files which has a list of len > 1? -#parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - identifications = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + \ - parset.getStringVector(DATAPRODUCTS + 'Input_InstrumentModel.identifications') - input_files = self._filterInputs(input_files, identifications) - result['estimates'][0]['input_files'] = input_files + logger.info('parset: %s ' % parset) + + result = {'errors': [], 'estimates': []} - duration = self._getDuration(parset.getString('Observation.startTime'), - parset.getString('Observation.stopTime')) freq_step = parset.getInt(PIPELINE + 'DPPP.demixer.freqstep', 1) #TODO, should these have defaults? time_step = parset.getInt(PIPELINE + 'DPPP.demixer.timestep', 1) reduction_factor = freq_step * time_step - - if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): - logger.error('Output_Correlated is not enabled') - result['errors'].append('Output_Correlated is not enabled') - if not 'uv' in input_files: - logger.error('Missing UV Dataproducts in input_files') - result['errors'].append('Missing UV Dataproducts in input_files') if reduction_factor < 1: logger.error('freqstep * timestep is not positive: %d' % reduction_factor) result['errors'].append('freqstep * timestep is not positive: %d' % reduction_factor) + if not parset.getBool(DATAPRODUCTS + 'Input_Correlated.enabled') or \ + not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): + logger.error('Input_Correlated or Output_Correlated is not enabled') + result['errors'].append('Input_Correlated or Output_Correlated is not enabled') if result['errors']: return result - logger.debug("calculate correlated data size") - result['estimates'][0]['output_files'] = {} -#TODO: problem? ['uv']['uv_file_size'] not in observation.py result - input_file_size = input_files['uv']['uv_file_size'] - output_file_size = 0.0 - new_size = input_file_size / float(reduction_factor) - output_file_size = new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0 - result['estimates'][0]['output_files']['uv'] = {'nr_of_uv_files': input_files['uv']['nr_of_uv_files'], - 'uv_file_size': int(output_file_size), - 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications'), - 'start_sb_nr': input_files['uv']['start_sb_nr']} - logger.info("correlated_uv: {} files {} bytes each".format(result['estimates'][0]['output_files']['uv']['nr_of_uv_files'], - result['estimates'][0]['output_files']['uv']['uv_file_size'])) - - if parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled'): - logger.info("calculate instrument-model data size") - result['estimates'][0]['output_files']['im'] = {'nr_of_im_files': input_files['uv']['nr_of_uv_files'], - 'im_file_size': 1000, # TODO: fix this; 1 kB was hardcoded in the Scheduler - 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_InstrumentModel.identifications'), - 'start_sb_nr': input_files['uv']['start_sb_nr']} - logger.info("correlated_im: {} files {} bytes each".format(result['estimates'][0]['output_files']['im']['nr_of_im_files'], - result['estimates'][0]['output_files']['im']['im_file_size'])) - - # count total data size - total_data_size = 0 - for data_type in result['estimates'][0]['output_files']: - total_data_size += result['estimates'][0]['output_files'][data_type]['nr_of_' + data_type + '_files'] * \ - result['estimates'][0]['output_files'][data_type][data_type + '_file_size'] # bytes - # FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export? - # Need to split averaging pipeline and calibration pipeline - #total_data_size = result['estimates'][0]['output_files']['uv']['nr_of_uv_files'] * \ - # result['estimates'][0]['output_files']['uv']['uv_file_size'] + \ - # result['estimates'][0]['output_files']['im']['nr_of_im_files'] * \ - # result['estimates'][0]['output_files']['im']['im_file_size'] # bytes - total_bandwidth = int(ceil(total_data_size * 8 / duration)) # bits/second - if total_data_size: - needed_resource_types = {'bandwidth': total_bandwidth, 'storage': total_data_size} - result['resource_types'] = needed_resource_types - result['resource_count'] = 1 - - # Apart from Output_Correlated, there is also Output_InstrumentModel.storageClusterName, - # but instrument models are relatively small, and in practice pipeline cluster is always CEPn. - result['root_resource_group'] = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') - else: - result['errors'].append('Total data size is zero!') - logger.error('ERROR: A datasize of zero was calculated!') + duration = self._getDuration(parset.getString('Observation.startTime'), + parset.getString('Observation.stopTime')) + + # NOTE: input bandwidth is not included in the resulting estimate atm. + # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... + #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') + + input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + have_im_input = parset.getBool(DATAPRODUCTS + 'Input_InstrumentModel.enabled') + if have_im_input: + input_idents_im = parset.getStringVector(DATAPRODUCTS + 'Input_InstrumentModel.identifications') + output_ident_uv = self._getOutputIdentification(DATAPRODUCTS + 'Output_Correlated.identifications') + output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') + have_im_output = parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled') + if have_im_output: + output_ident_im = self._getOutputIdentification(DATAPRODUCTS + 'Output_InstrumentModel.identifications') + + output_cluster_im = parset.getString(DATAPRODUCTS + 'Output_InstrumentModel.storageClusterName') + if output_cluster_uv != output_cluster_im: + logger.warn('storageClusterName differs between uv: \'%s\' and im: \'%s\': to be packed in 1 estimate, so ignoring \'im\' storageClusterName', + output_cluster_uv, output_cluster_im) + + for pred_est in predecessor_estimates: + pred_output_files = pred_est.get('output_files') + if pred_output_files is None: + continue + + idents = input_idents_uv + if have_im_input: + if 'im' in pred_output_files: + idents.extend(input_idents_im) + else: + logger.warn('Input_InstrumentModel enabled, but missing \'im\' dataproducts in predecessor output_files') + input_files = self._filterInputs(pred_output_files, idents) + if not input_files: + continue + + if 'uv' not in input_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + continue + + estimate = {'input_files': input_files} + + nr_input_files = input_files['uv']['properties']['nr_of_uv_files'] + uv_input_file_size = input_files['uv']['properties']['uv_file_size'] + start_sb_nr = input_files['uv']['properties']['start_sb_nr'] + + # TODO: This output file size calculation comes from the (old) Scheduler without explaining comments. + # The reason why it isn't a simple div, is that parts of the metadata are not reduced in size (and casacore storage mgrs). + # With reduction_factor 1, computed output size increases by 53%. Casacore storage mgrs may change size, but that much?!? + # If you can figure out what/how, please fix this calculation. Avoid unnamed magic values and document why! + logger.debug("calculate correlated data size") + new_size = uv_input_file_size / float(reduction_factor) + uv_output_file_size = int(new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0) + + nr_output_files = nr_input_files # pure 'map' (bijective) operation, no split or reduce + logger.info("correlated_uv: {} files {} bytes each".format(nr_output_files, uv_output_file_size)) + estimate['output_files'] = {'uv': {'identification': output_ident_uv, + 'properties': {'nr_of_uv_files': nr_output_files, 'uv_file_size': uv_output_file_size, 'start_sb_nr': start_sb_nr}}} + data_size = uv_output_file_size + + # If instrument model output is needed, add it to the same estimate, + # since it must be written to the same storage as the uv output (same nr_output_files). + if have_im_output: + logger.info("calculate instrument-model data size") + im_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("correlated_im: {} files {} bytes each".format(nr_output_files, im_file_size)) + estimate['output_files']['im'] = {'identification': output_ident_im, + 'properties': {'nr_of_im_files': nr_output_files, 'im_file_size': im_file_size, 'start_sb_nr': start_sb_nr}} + # FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export? + # Need to split averaging pipeline and calibration pipeline + data_size += im_file_size + + data_size *= nr_output_files # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = pred_est['resource_count'] + estimate['root_resource_group'] = output_cluster_uv + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) + + if not result['estimates'] and not result['errors']: + logger.error('calibration / averaging pipeline estimator produced no estimates') + result['errors'].append('calibration / averaging pipeline estimator produced no estimates') + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index 8ab525183d5ef65e5a8c46d049de44d7a1e65526..41e3e359688b7650a76649df9071361226a9abe2 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -1,4 +1,4 @@ -# base_resource_estimator.py +# image_pipeline.py # # Copyright (C) 2016, 2017 # ASTRON (Netherlands Institute for Radio Astronomy) @@ -18,7 +18,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: base_resource_estimator.py 33534 2016-02-08 14:28:26Z schaap $ +# $Id$ import logging from math import ceil @@ -43,79 +43,101 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator): 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.identifications', - #DATAPRODUCTS + 'Input_Correlated.storageClusterName', # TODO: also add input estimates + #DATAPRODUCTS + 'Input_Correlated.storageClusterName', # enable if input bandwidth is also estimated DATAPRODUCTS + 'Output_SkyImage.enabled', DATAPRODUCTS + 'Output_SkyImage.identifications', DATAPRODUCTS + 'Output_SkyImage.storageClusterName', PIPELINE + 'Imaging.slices_per_image', PIPELINE + 'Imaging.subbands_per_image') - def _calculate(self, parset, input_files): + def _calculate(self, parset, predecessor_estimates): """ Estimate for Imaging Pipeline. Also gets used for MSSS Imaging Pipeline calculates: datasize (number of files, file size), bandwidth - input_files should look something like: -TODO - 'input_files': - {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} - - reply is something along the lines of: - {'bandwidth': {'total_size': 19021319494}, - 'storage': {'total_size': 713299481024, - 'output_files': - {'img': {'nr_of_img_files': 481, 'img_file_size': 148295} - }} + + For a predecessor_estimates example, see the calibration/averaging + (and possibly the observation) estimator code. + + For a return value example, see the calibration/averaging estimator code, + except that here we have instead of 'uv' and optionally 'im', e.g.: + 'img': {'identification': ..., + 'properties': {'nr_of_img_files': 481, 'img_file_size': 148295}} """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - result = {'errors': [], 'estimates': [{}]} # can all be described in 1 estimate here -#TODO: really? What if input_files from output_files which has a list of len > 1? -#parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - identifications = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') - input_files = self._filterInputs(input_files, identifications) - result['estimates'][0]['input_files'] = input_files - duration = self._getDuration(parset.getString('Observation.startTime'), - parset.getString('Observation.stopTime')) + result = {'errors': [], 'estimates': []} + slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) - + if slices_per_image < 1 or subbands_per_image < 1: + logger.error('slices_per_image or subbands_per_image are not valid') + result['errors'].append('slices_per_image or subbands_per_image are not valid') if not parset.getBool(DATAPRODUCTS + 'Output_SkyImage.enabled'): logger.error('Output_SkyImage is not enabled') result['errors'].append('Output_SkyImage is not enabled') - if not 'uv' in input_files: - logger.error('Missing UV Dataproducts in input_files') - result['errors'].append('Missing UV Dataproducts in input_files') - else: - nr_input_subbands = input_files['uv']['nr_of_uv_files'] - if not slices_per_image or not subbands_per_image: - logger.error('slices_per_image or subbands_per_image are not valid') - result['errors'].append('Missing UV Dataproducts in input_files') - if nr_input_subbands % (subbands_per_image * slices_per_image) > 0: - logger.error('slices_per_image and subbands_per_image not a multiple of number of inputs') - result['errors'].append('slices_per_image and subbands_per_image not a multiple of number of inputs') if result['errors']: return result - logger.debug("calculate sky image data size") - result['estimates'][0]['output_files'] = {} - nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) - result['estimates'][0]['output_files']['img'] = {'nr_of_img_files': nr_images, - 'img_file_size': 1000, # 1 kB was hardcoded in the Scheduler - 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_SkyImage.identifications')} - logger.info("sky_images: {} files {} bytes each".format(result['estimates'][0]['output_files']['img']['nr_of_img_files'], - result['estimates'][0]['output_files']['img']['img_file_size'])) - - # count total data size - total_data_size = result['estimates'][0]['output_files']['img']['nr_of_img_files'] * \ - result['estimates'][0]['output_files']['img']['img_file_size'] # bytes - total_bandwidth = int(ceil(total_data_size * 8 / duration)) # bits/second - if total_data_size: - needed_resource_types = {'bandwidth': total_bandwidth, 'storage': total_data_size} - result['resource_types'] = needed_resource_types - result['resource_count'] = 1 - result['root_resource_group'] = parset.getString(DATAPRODUCTS + 'Output_SkyImage.storageClusterName') - else: - result['errors'].append('Total data size is zero!') - logger.error('ERROR: A datasize of zero was calculated!') + duration = self._getDuration(parset.getString('Observation.startTime'), + parset.getString('Observation.stopTime')) + + # NOTE: input bandwidth is not included in the resulting estimate atm. + # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... + #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') + + input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + output_ident_img = self._getOutputIdentification(DATAPRODUCTS + 'Output_SkyImage.identifications') + output_cluster_img = parset.getString(DATAPRODUCTS + 'Output_SkyImage.storageClusterName') + + for pred_est in predecessor_estimates: + pred_output_files = pred_est.get('output_files') + if pred_output_files is None: + continue + + input_files = self._filterInputs(pred_output_files, input_idents_uv) + if not input_files: + continue + + if 'uv' not in pred_output_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + continue + + estimate = {'input_files': input_files} + + total_nr_input_subbands = nr_uv_files * pred_est['resource_count'] + if total_nr_input_subbands % (subbands_per_image * slices_per_image) > 0: + logger.error('slices_per_image and subbands_per_image not a multiple of number of inputs') + result['errors'].append('slices_per_image and subbands_per_image not a multiple of number of inputs') + continue + nr_images = total_nr_input_subbands / (subbands_per_image * slices_per_image) + + logger.debug("calculate sky image data size") + nr_uv_files = input_files['uv']['properties']['nr_of_uv_files'] + uv_file_size = input_files['uv']['properties']['uv_file_size'] # same for each uv data product across all SAPs + img_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("sky_images: {} files {} bytes each".format(nr_images, img_file_size)) + estimate['output_files'] = {'img': {'identification': output_ident_img, + 'properties': {'nr_of_img_files': 1, # also see estimate['resource_count'] below + 'img_file_size': img_file_size}}} + + # count total data size + data_size = img_file_size # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = nr_images # as such, potentially different resources can be allocated for each output + estimate['root_resource_group'] = output_cluster_img + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) + + if not result['estimates'] and not result['errors']: + logger.error('imaging pipeline estimator produced no estimates') + result['errors'].append('imaging pipeline estimator produced no estimates') + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index 60426c069b5f8294a7606a70207c99ec4996afc6..92e182fa5f19420656ed328b261b26c5bd459cb1 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -1,4 +1,4 @@ -# base_resource_estimator.py +# longbaseline_pipeline.py # # Copyright (C) 2016, 2017 # ASTRON (Netherlands Institute for Radio Astronomy) @@ -18,7 +18,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: base_resource_estimator.py 33534 2016-02-08 14:28:26Z schaap $ +# $Id$ import logging from math import ceil @@ -41,80 +41,105 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator): 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.identifications', - #DATAPRODUCTS + 'Input_Correlated.storageClusterName', # TODO: also add input estimates + #DATAPRODUCTS + 'Input_Correlated.storageClusterName', # enable if input bandwidth is also estimated DATAPRODUCTS + 'Output_Correlated.enabled', DATAPRODUCTS + 'Output_Correlated.identifications', DATAPRODUCTS + 'Output_Correlated.storageClusterName', PIPELINE + 'LongBaseline.subbandgroups_per_ms', PIPELINE + 'LongBaseline.subbands_per_subbandgroup') - def _calculate(self, parset, input_files): + + + def _calculate(self, parset, predecessor_estimates): """ Estimate for Long Baseline Pipeline calculates: datasize (number of files, file size), bandwidth - input_files should look something like: -TODO - 'input_files': - {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} - - reply is something along the lines of: - {'bandwidth': {'total_size': 19021319494}, - 'storage': {'total_size': 713299481024, - 'output_files': - {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104} - }} + + For a predecessor_estimates example, see the calibration/averaging + (and possibly the observation) estimator code. + + For a return value example, see the calibration/averaging estimator code, + except that there is no 'im' input or output, and instead of a 'start_sb_nr', + we have a 'start_sbg_nr' property. """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - result = {'errors': [], 'estimates': [{}]} # can all be described in 1 estimate here -#TODO: really? What if input_files from output_files which has a list of len > 1? -#parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - identifications = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') - input_files = self._filterInputs(input_files, identifications) - result['estimates'][0]['input_files'] = input_files - duration = self._getDuration(parset.getString('Observation.startTime'), - parset.getString('Observation.stopTime')) + result = {'errors': [], 'estimates': []} + subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) - nr_input_files = 0 - + if not subbandgroups_per_ms or not subbands_per_subbandgroup: + logger.error('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') + result['errors'].append('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): logger.error('Output_Correlated is not enabled') result['errors'].append('Output_Correlated is not enabled') - if not 'uv' in input_files: - logger.error('Missing UV Dataproducts in input_files') - result['errors'].append('Missing UV Dataproducts in input_files') - else: - nr_input_files = input_files['uv']['nr_of_uv_files'] - if not subbandgroups_per_ms or not subbands_per_subbandgroup: - logger.error('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') - result['errors'].append('Missing UV Dataproducts in input_files') - if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0: - logger.error('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') - result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') if result['errors']: return result - logger.debug("calculate correlated data size") - result['estimates'][0]['output_files'] = {} - nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) - result['estimates'][0]['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, - 'uv_file_size': 1000, # 1 kB was hardcoded in the Scheduler - 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications'), - 'start_sbg_nr': input_files['uv']['start_sb_nr'] / (subbands_per_subbandgroup * subbandgroups_per_ms)} - logger.info("correlated_uv: {} files {} bytes each".format(result['estimates'][0]['output_files']['uv']['nr_of_uv_files'], - result['estimates'][0]['output_files']['uv']['uv_file_size'])) - - # count total data size - total_data_size = result['estimates'][0]['output_files']['uv']['nr_of_uv_files'] * \ - result['estimates'][0]['output_files']['uv']['uv_file_size'] # bytes - total_bandwidth = int(ceil(total_data_size * 8 / duration)) # bits/second - if total_data_size: - needed_resource_types = {'bandwidth': total_bandwidth, 'storage': total_data_size} - result['resource_types'] = needed_resource_types - result['resource_count'] = 1 - result['root_resource_group'] = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') - else: - result['errors'].append('Total data size is zero!') - logger.error('ERROR: A datasize of zero was calculated!') + duration = self._getDuration(parset.getString('Observation.startTime'), + parset.getString('Observation.stopTime')) + + # NOTE: input bandwidth is not included in the resulting estimate atm. + # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... + #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') + + input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + output_ident_uv = self._getOutputIdentification(DATAPRODUCTS + 'Output_Correlated.identifications') + output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') + + for pred_est in predecessor_estimates: + pred_output_files = pred_est.get('output_files') + if pred_output_files is None: + continue + + input_files = self._filterInputs(pred_output_files, input_idents_uv) + if not input_files: + continue + + if 'uv' not in pred_output_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + continue + + estimate = {'input_files': input_files} + + nr_input_files = input_files['uv']['properties']['nr_of_uv_files'] + uv_input_file_size = input_files['uv']['properties']['uv_file_size'] # same for each uv data product across all SAPs + start_sb_nr = input_files['uv']['properties']['start_sb_nr'] + + if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0: + logger.error('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + continue + total_nr_input_files = nr_input_files * pred_est['resource_count'] + nr_output_files = total_nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) + + logger.debug("calculate correlated data size") + uv_output_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + start_sbg_nr = start_sb_nr / (subbands_per_subbandgroup * subbandgroups_per_ms) + + logger.info("correlated_uv: {} files {} bytes each".format(nr_output_files, uv_output_file_size)) + estimate['output_files'] = {'uv': {'identification': output_ident_uv, + 'properties': {'nr_of_uv_files': 1, # also see estimate['resource_count'] below + 'uv_file_size': uv_output_file_size, + 'start_sbg_nr': start_sbg_nr}}} + + # count total data size + data_size = nr_output_files * uv_output_file_size # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = nr_output_files # as such, potentially different resources can be allocated for each output + estimate['root_resource_group'] = output_cluster_uv + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) + + if not result['estimates'] and not result['errors']: + logger.error('long baseline estimator produced no estimates') + result['errors'].append('long baseline pipeline estimator produced no estimates') + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index 68dc895aa9ce05e033d1e3ff88a17a31788ab3a1..1f2a362c05f029c2f0bd191691da39241f238366 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -18,7 +18,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: observation.py 33534 2016-02-08 14:28:26Z schaap $ +# $Id$ import logging from math import ceil @@ -59,8 +59,9 @@ class ObservationResourceEstimator(BaseResourceEstimator): COBALT + 'BeamFormer.IncoherentStokes.which' ) - def _calculate(self, parset, input_files={}): + def _calculate(self, parset, predecessor_estimates=[]): """ Calculate the resources needed by the different data product types that can be in a single observation. + The predecessor_estimates arg is just to implement the same interface as pipelines. Observations have no predecessor. The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters. NOTE: 'nr_of_XX_files' is for that estimate, times the 'resource_count' (hence each estimates contains 1 SAP). @@ -69,49 +70,47 @@ class ObservationResourceEstimator(BaseResourceEstimator): { 'errors': [], 'estimates': [{ - 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # per sb + 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # for each uv output data prod (thus the total is times the resource_count val) 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'identifications': [...]}, - 'saps': [{'sap_nr': 0, 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}] + 'uv': {'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}} } - }, {'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # per sb + }, {'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # idem 'resource_count': 60, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'identifications': [...]}, - 'saps': [{'sap_nr': 1, 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}}] + 'uv': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}} } - }, {'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # per sb + }, {'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # idem 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'identifications': [...]}, - 'saps': [{'sap_nr': 2, 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}}] + 'uv': {'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}} } - }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # per tab part, incl all stokes + }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # for each quad (4 stokes) of cs output tab part (thus the total is times the resource_count val) 'resource_count': 34, 'root_resource_group': 'DRAGNET', 'output_files': { - 'cs': {'nr_of_cs_stokes': 4, 'identifications': [...]}, - 'saps': [{'sap_nr': 0, 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, - 'nr_of_cs_parts': 2}}] # parts per tab for this sap + 'cs': {'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.cs.dps', + 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, + 'nr_of_cs_parts': 2}} # parts per tab for this sap } - }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # per tab part, incl all stokes + }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # idem 'resource_count': 6, 'root_resource_group': 'DRAGNET', 'output_files': { - 'cs': {'nr_of_cs_stokes': 4, 'identifications': [...]}, - 'saps': [{'sap_nr': 1, 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, - 'is_tab_nr': 0, 'nr_of_cs_parts': 1}}] # parts per tab for this sap + 'cs': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.cs.dps', + 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, + 'nr_of_cs_parts': 1, 'is_tab_nr': 0}} # parts per tab for this sap } - }, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # per tab part, incl all stokes + }, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # for each 'is' output tab part (thus the total is times the resource_count val) 'resource_count': 1, 'root_resource_group': 'DRAGNET', 'output_files': { - 'is': {'nr_of_is_stokes': 1, 'identifications': [...]}, - 'saps': [{'sap_nr': 1, 'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, - 'is_tab_nr': 0}}] # IS can have >1 parts, but atm max 1 IS TAB per SAP + 'is': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.is.dps', + 'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, 'nr_of_is_stokes': 1, + 'is_tab_nr': 0}} # IS can have >1 parts, but atm max 1 IS TAB per SAP } }] } - - The base_resource_estimator adds an {'observation': } around this. """ logger.info("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) @@ -178,14 +177,17 @@ class ObservationResourceEstimator(BaseResourceEstimator): bandwidth = int(ceil(8 * file_size / duration)) # bits/second root_resource_group = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') - idents = parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') nr_saps = parset.getInt('Observation.nrBeams') - if nr_saps == 0: - logger.error("Correlated data output enabled, but nrBeams == 0") + if nr_saps < 1: + logger.error("Correlated data output enabled, but nrBeams < 1") return None - estimates = [] + # Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP. + # Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering. + identifications = parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') + sap_idents = self._sap_identifications(identifications, nr_saps) + total_files = 0 # sum of all subbands in all digital beams for sap_nr in xrange(nr_saps): subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr) @@ -196,11 +198,11 @@ class ObservationResourceEstimator(BaseResourceEstimator): return None est = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size}, + 'resource_count': nr_subbands, 'root_resource_group': root_resource_group, - 'output_files': {'uv': {'identifications': idents}}} - est['resource_count'] = nr_subbands - est['output_files']['saps'] = [{'sap_nr': sap_nr, 'properties': {'uv_file_size': file_size, - 'nr_of_uv_files': 1, 'start_sb_nr': total_files}}] # i.e. total nr_of_uv_files is resource_count times 1 + 'output_files': {'uv': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # i.e. total nr_of_uv_files is resource_count times 1 + 'start_sb_nr': total_files}}}} total_files += nr_subbands estimates.append(est) @@ -230,13 +232,17 @@ class ObservationResourceEstimator(BaseResourceEstimator): doFlysEye = parset.getBool(COBALT + 'BeamFormer.flysEye') root_resource_group = parset.getString(DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName') - idents = parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications') nr_saps = parset.getInt('Observation.nrBeams') - if nr_saps == 0: - logger.error("Coherent Stokes data output enabled, but nrBeams == 0") + if nr_saps < 1: + logger.error("Coherent Stokes data output enabled, but nrBeams < 1") return None + # Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP. + # Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering. + identifications = parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications') + sap_idents = self._sap_identifications(identifications, nr_saps) + estimates = [] for sap_nr in xrange(nr_saps): logger.info("checking SAP {}".format(sap_nr)) @@ -288,15 +294,14 @@ class ObservationResourceEstimator(BaseResourceEstimator): bandwidth = int(ceil(8 * storage / duration)) # bits/second nr_parts_per_tab = int(ceil(nr_subbands / float(nr_subbands_per_file))) # thus per tab per stokes - est = {'resource_types': {}, 'root_resource_group': root_resource_group, - 'output_files': {'cs': {'nr_of_cs_stokes': nr_coherent, 'identifications': idents}}} - est['resource_types']['storage'] = storage - est['resource_types']['bandwidth'] = bandwidth - est['resource_count'] = nr_coherent_tabs * nr_parts_per_tab - est['output_files']['saps'] = [{'sap_nr': sap_nr, 'properties': {'cs_file_size': file_size, - 'nr_of_cs_files': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}] + est = {'resource_types': {'storage': storage, 'bandwidth': bandwidth}, + 'resource_count': nr_coherent_tabs * nr_parts_per_tab, + 'root_resource_group': root_resource_group, + 'output_files': {'cs': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'cs_file_size': file_size, 'nr_of_cs_files': nr_coherent, + 'nr_of_cs_stokes': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}}} if is_tab_nr != -1: # translator to filenames needs to know: it may not have all CS+IS info in one claim - est['output_files']['saps'][0]['properties']['is_tab_nr'] = is_tab_nr + est['output_files']['cs']['properties']['is_tab_nr'] = is_tab_nr estimates.append(est) logger.debug("Coherent Stokes data estimates: {}".format(estimates)) @@ -323,13 +328,17 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_incoherent = len(incoherent_type) # 'I' or 'IQUV' ('XXYY' only possible for coherent stokes) root_resource_group = parset.getString(DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName') - idents = parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications') nr_saps = parset.getInt('Observation.nrBeams') - if nr_saps == 0: - logger.error("Incoherent Stokes data output enabled, but nrBeams == 0") + if nr_saps < 1: + logger.error("Incoherent Stokes data output enabled, but nrBeams < 1") return None + # Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP. + # Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering. + identifications = parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications') + sap_idents = self._sap_identifications(identifications, nr_saps) + estimates = [] for sap_nr in xrange(nr_saps): logger.info("checking SAP {}".format(sap_nr)) @@ -371,13 +380,12 @@ class ObservationResourceEstimator(BaseResourceEstimator): bandwidth = int(ceil(8 * storage / duration)) # bits/second nr_parts_per_tab = int(ceil(nr_subbands / float(nr_subbands_per_file))) # thus per tab per stokes - est = {'resource_types': {}, 'root_resource_group': root_resource_group, - 'output_files': {'is': {'nr_of_is_stokes': nr_incoherent, 'identifications': idents}}} - est['resource_types']['storage'] = storage - est['resource_types']['bandwidth'] = bandwidth - est['resource_count'] = nr_incoherent_tabs * nr_parts_per_tab - est['output_files']['saps'] = [{'sap_nr': sap_nr, 'properties': {'is_file_size': file_size, - 'nr_of_is_files': nr_incoherent, 'is_tab_nr': is_tab_nr}}] + est = {'resource_types': {'storage': storage, 'bandwidth': bandwidth}, + 'resource_count': nr_incoherent_tabs * nr_parts_per_tab, + 'root_resource_group': root_resource_group, + 'output_files': {'is': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'is_file_size': file_size, 'nr_of_is_files': nr_incoherent, + 'nr_of_is_stokes': nr_incoherent, 'is_tab_nr': is_tab_nr}}}} estimates.append(est) logger.debug("Incoherent Stokes data estimates: {}".format(estimates)) @@ -409,3 +417,30 @@ class ObservationResourceEstimator(BaseResourceEstimator): logger.info("number of virtual stations = {}".format(nr_virtual_stations)) return nr_virtual_stations + def _sap_identifications(self, identifications, nr_saps): + """ Return list with identifications' identification for sap i at index i, + or '' at index i if no such identification for sap i. + NOTE: identifications should not contain entries for multiple data product types, + otherwise we cannot return a single identification per sap nr. + + For output, there must be exactly 1 (non-duplicate) identification string per + data product type (how can you otherwise refer to it unambiguously?), + and per sap (per sap for observations only, but always the case here). + """ + sap_idents = [''] * nr_saps + + for ident in identifications: + sap_nr = self._extract_sap_nr(ident) + try: + ident_seen = sap_idents[sap_nr] + except Exception as e: # e.g. sap_nr is None or out of bounds + logger.error("Ignoring observation identification string with no or invalid sap nr: %s", str(e)) + continue + + if not ident_seen: + sap_idents[sap_nr] = ident + elif ident_seen != ident: + logger.error("Cannot have multiple observation identifications per sap. Dropping %s", ident) # see doc string + + return sap_idents + diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index 544a3ab5d57730fe25d3fb70020ab49ab6dd00d2..f6303a6d91e687670f77d52e2a82c3e8f7527a10 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -1,4 +1,4 @@ -# base_resource_estimator.py +# pulsar_pipeline.py # # Copyright (C) 2016, 2017 # ASTRON (Netherlands Institute for Radio Astronomy) @@ -18,7 +18,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: base_resource_estimator.py 33534 2016-02-08 14:28:26Z schaap $ +# $Id$ import logging from math import ceil @@ -41,42 +41,29 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator): 'Observation.stopTime', DATAPRODUCTS + 'Input_CoherentStokes.enabled', DATAPRODUCTS + 'Input_CoherentStokes.identifications', - #DATAPRODUCTS + 'Input_CoherentStokes.storageClusterName', # TODO: also add input estimates + #DATAPRODUCTS + 'Input_CoherentStokes.storageClusterName', # enable if input bandwidth is also estimated DATAPRODUCTS + 'Input_IncoherentStokes.enabled', DATAPRODUCTS + 'Input_IncoherentStokes.identifications', - #DATAPRODUCTS + 'Input_IncoherentStokes.storageClusterName', # TODO: also add input estimates + #DATAPRODUCTS + 'Input_IncoherentStokes.storageClusterName', # enable if input bandwidth is also estimated DATAPRODUCTS + 'Output_Pulsar.enabled', DATAPRODUCTS + 'Output_Pulsar.identifications', DATAPRODUCTS + 'Output_Pulsar.storageClusterName') - def _calculate(self, parset, input_files): + def _calculate(self, parset, predecessor_estimates): """ Estimate for Pulsar Pipeline calculates: datasize (number of files, file size), bandwidth - input_files should look something like: -TODO - 'input_files': - {'cs': {'nr_of_cs_files': 48, 'nr_of_cs_stokes': 4, 'cs_file_size': 1482104}, ...} - - reply is something along the lines of: - {'bandwidth': {'total_size': 19021319494}, - 'storage': {'total_size': 713299481024, - 'output_files': - {'pulp': {'nr_of_pulp_files': 48, 'pulp_file_size': 185104} - }} + + For a predecessor_estimates example, see the observation estimator code. + + For a return value example, see the calibration/averaging estimator code, + except that here we have instead of 'cs' or 'is', e.g.: + 'pulp': {'identification': ..., + 'properties': {'nr_of_pulp_files': 48, 'pulp_file_size': 185104}} """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - result = {'errors': [], 'estimates': [{}]} # can all be described in 1 estimate here -#TODO: really? What if input_files from output_files which has a list of len > 1? -#parset.getString(DATAPRODUCTS + 'Input_CoherentStokes.storageClusterName') -#parset.getString(DATAPRODUCTS + 'Input_IncoherentStokes.storageClusterName') - identifications = parset.getStringVector(DATAPRODUCTS + 'Input_CoherentStokes.identifications') + \ - parset.getStringVector(DATAPRODUCTS + 'Input_IncoherentStokes.identifications') - input_files = self._filterInputs(input_files, identifications) - result['estimates'][0]['input_files'] = input_files - duration = self._getDuration(parset.getString('Observation.startTime'), - parset.getString('Observation.stopTime')) + result = {'errors': [], 'estimates': []} if not parset.getBool(DATAPRODUCTS + 'Output_Pulsar.enabled'): logger.error('Output_Pulsar is not enabled') @@ -87,31 +74,64 @@ TODO if result['errors']: return result - logger.debug("calculate pulp data size") - result['estimates'][0]['output_files'] = {} - nr_output_files = 0 - if 'cs' in input_files: -#TODO: ['cs']['nr_of_cs_files'] may no longer exist; idem for 'is' below - nr_output_files += input_files['cs']['nr_of_cs_files'] / input_files['cs']['nr_of_cs_stokes'] - if 'is' in input_files: - nr_output_files += input_files['is']['nr_of_is_files'] / input_files['is']['nr_of_is_stokes'] - - result['estimates'][0]['output_files']['pulp'] = {'nr_of_pulp_files': nr_output_files, - 'pulp_file_size': 1000, # 1 kB was hardcoded in the Scheduler - 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Pulsar.identifications')} - logger.info("pulsar_pipeline pulp: {} files {} bytes each".format(result['estimates'][0]['output_files']['pulp']['nr_of_pulp_files'], - result['estimates'][0]['output_files']['pulp']['pulp_file_size'])) - - # count total data size - total_data_size = result['estimates'][0]['output_files']['pulp']['nr_of_pulp_files'] * \ - result['estimates'][0]['output_files']['pulp']['pulp_file_size'] # bytes - total_bandwidth = int(ceil(total_data_size * 8 / duration)) # bits/second - if total_data_size: - needed_resource_types = {'bandwidth': total_bandwidth, 'storage': total_data_size} - result['resource_types'] = needed_resource_types - result['resource_count'] = 1 - result['root_resource_group'] = parset.getString(DATAPRODUCTS + 'Output_Pulsar.storageClusterName') - else: - result['errors'].append('Total data size is zero!') - logger.error('ERROR: A datasize of zero was calculated!') + duration = self._getDuration(parset.getString('Observation.startTime'), + parset.getString('Observation.stopTime')) + + # NOTE: input bandwidth is not included in the resulting estimate atm. + # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... + #input_cluster_cs = parset.getString(DATAPRODUCTS + 'Input_CoherentStokes.storageClusterName') + #input_cluster_is = parset.getString(DATAPRODUCTS + 'Input_IncoherentStokes.storageClusterName') + + input_idents_cs = parset.getStringVector(DATAPRODUCTS + 'Input_CoherentStokes.identifications') + input_idents_is = parset.getStringVector(DATAPRODUCTS + 'Input_IncoherentStokes.identifications') + output_ident_pulp = self._getOutputIdentification(DATAPRODUCTS + 'Output_Pulsar.identifications') + output_cluster_pulp = parset.getString(DATAPRODUCTS + 'Output_Pulsar.storageClusterName') + + for pred_est in predecessor_estimates: + pred_output_files = pred_est.get('output_files') + if pred_output_files is None: + continue + + idents = [] + if 'cs' in input_files: + idents.extend(input_idents_cs) + if 'is' in input_files: + idents.extend(input_idents_is) + input_files = self._filterInputs(pred_output_files, idents) + if not input_files: + continue + + estimate = {'input_files': input_files} + + nr_output_files = 0 + if 'cs' in input_files: + nr_output_files += input_files['cs']['properties']['nr_of_cs_files'] # includes nr_of_cs_stokes factor + if 'is' in input_files: + nr_output_files += input_files['is']['properties']['nr_of_is_files'] # includes nr_of_is_stokes factor + + logger.debug("calculate pulp data size") + pulp_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("pulsar_pipeline pulp: {} files {} bytes each".format(nr_output_files, pulp_file_size)) + estimate['output_files'] = {'pulp': {'identification': output_ident_pulp, + 'properties': {'nr_of_pulp_files': nr_output_files, # times pred_est['resource_count'] in total + 'pulp_file_size': pulp_file_size}}} + + # count total data size + data_size = nr_output_files * pulp_file_size + if data_size > 0: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = pred_est['resource_count'] + estimate['root_resource_group'] = output_cluster_pulp + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) + + if not result['estimates'] and not result['errors']: + logger.error('pulsar pipeline estimator produced no estimates') + result['errors'].append('pulsar pipeline estimator produced no estimates') + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index 6d20158d0a663787c451851d2f4e43a76eab350c..da5360bd3fd008b5af5c5333d283ab59dbb48af9 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -63,7 +63,7 @@ class ResourceEstimatorHandler(MessageHandlerInterface): return task_estimate - #TODO use something else than .values()[0]['storage'] ?? + #TODO use something else than .values()[0]['estimates'] ?? def get_subtree_estimate(self, specification_tree): otdb_id = specification_tree['otdb_id'] parset = specification_tree['specification'] @@ -84,36 +84,31 @@ class ResourceEstimatorHandler(MessageHandlerInterface): logger.info(("Branch estimates for %s\n" % otdb_id) + pprint.pformat(branch_estimates)) if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']: - input_files = {} - for pred_otdb_id, estimate in branch_estimates.items(): - logger.info('Looking at predecessor %s' % pred_otdb_id) - predecessor_output = estimate.values()[0]['storage']['output_files'] - if not 'im' in predecessor_output and 'uv' in predecessor_output: # Not a calibrator pipeline - logger.info('found %s as the target of pipeline %s' % (pred_otdb_id, otdb_id)) - input_files['uv'] = predecessor_output['uv'] - if 'saps' in predecessor_output: - input_files['saps'] = predecessor_output['saps'] - elif 'im' in predecessor_output: - logger.info('found %s as the calibrator of pipeline %s' % (pred_otdb_id, otdb_id)) - input_files['im'] = predecessor_output['im'] - return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, input_files), otdb_id)} + predecessor_estimates = [] + for branch_otdb_id, branch_estimate in branch_estimates.items(): + logger.info('Looking at predecessor %s' % branch_otdb_id) + estimates = branch_estimate.values()[0]['estimates'] + if any(['uv' in est['output_files'] and 'im' not in est['output_files'] for est in estimates]): # Not a calibrator pipeline + logger.info('found %s as the target of pipeline %s' % (branch_otdb_id, otdb_id)) + predecessor_estimates.extend(estimates) + elif any(['im' in est['output_files'] for est in estimates]): + logger.info('found %s as the calibrator of pipeline %s' % (branch_otdb_id, otdb_id)) + predecessor_estimates.extend(estimates) + return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, predecessor_estimates), otdb_id)} if len(branch_estimates) > 1: logger.warning('Pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys())) return {str(otdb_id): {'pipeline': {'errors': ['Pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys())]}}} - predecessor_output = branch_estimates.values()[0].values()[0]['storage']['output_files'] + predecessor_estimates = branch_estimates.values()[0]['estimates'] if specification_tree['task_subtype'] in ['imaging pipeline', 'imaging pipeline msss']: - input_files = predecessor_output - return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, input_files), otdb_id)} + return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, predecessor_estimates), otdb_id)} if specification_tree['task_subtype'] in ['long baseline pipeline']: - input_files = predecessor_output - return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, input_files), otdb_id)} + return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, predecessor_estimates), otdb_id)} if specification_tree['task_subtype'] in ['pulsar pipeline']: - input_files = predecessor_output - return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, input_files), otdb_id)} + return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, predecessor_estimates), otdb_id)} else: # reservation, maintenance, system tasks? logger.warning("ID %s is not a pipeline or observation." % otdb_id) @@ -126,17 +121,13 @@ class ResourceEstimatorHandler(MessageHandlerInterface): 'predecessors': [...]} reply is something along the lines of: - {'452648': - {'observation': -TODO - {'bandwidth': {'total_size': 19021319494}, - 'storage': {'total_size': 713299481024, - 'output_files': - {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, - 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, - {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, - {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} - ]}}}}} + { + '452648': { # otdb_id + 'pipeline': { # or 'observation' or 'reservation' ... + <see observation and calibration pipeline .py files for example estimator outputs> + } + } + } """ logger.info('get_estimated_resources on: %s' % specification_tree) return self.get_subtree_estimate(specification_tree)