From 70329c2064d1f7154c6d21b5f00fae9acf8cdcf1 Mon Sep 17 00:00:00 2001 From: Alexander van Amesfoort <amesfoort@astron.nl> Date: Sat, 6 May 2017 17:37:54 +0000 Subject: [PATCH] Task #9939: RA: rework estimator format to fix predecessor issues with calibration pipeline. Also adjust and much better document other estimators. Small adjustment to assigner wrt properties. This prepares for more testing on the test sys, so this commit likely contains brown paper bugs. --- .../lib/translator.py | 2 +- .../ResourceAssigner/lib/assignment.py | 25 +- .../base_pipeline_estimator.py | 2 +- .../base_resource_estimator.py | 81 +++--- .../calibration_pipeline.py | 233 ++++++++++-------- .../resource_estimators/image_pipeline.py | 98 ++++---- .../longbaseline_pipeline.py | 108 ++++---- .../resource_estimators/observation.py | 71 +++--- .../resource_estimators/pulsar_pipeline.py | 111 ++++----- 9 files changed, 385 insertions(+), 346 deletions(-) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index 10f9c3b0d04..18a0804aa37 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -230,7 +230,7 @@ class RAtoOTDBTranslator(): nr_cs_stokes = sap['properties']['nr_of_cs_stokes'] # the 'cs_stokes' term here can also mean cv XXYY nr_parts = sap['properties']['nr_of_cs_files'] / nr_cs_stokes # in this prop's claim! nparts_tab = nr_parts_per_tab_per_sap[sap_nr] # alias for readability; this is also per stokes - while nr_parts > 0: # loops over tab nrs + while nr_parts > 0: tab_nr = next_tab_part_nrs_per_sap[sap_nr] / nparts_tab tab_part_nr = next_tab_part_nrs_per_sap[sap_nr] % nparts_tab nparts_remain = min(nr_parts, nparts_tab - tab_part_nr) # nr parts left before we go to the next tab diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index d6cffeb5742..6572a4984b7 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -660,7 +660,7 @@ class ResourceAssigner(): def getProperties(self, db_resource_prop_types, files_dict, io_type): """ Return list of properties in claim format converted from files_dict. - E.g. files_dict: {'cs': {'sap_nr': 2, ..., 'properties': {'nr_of_uv_files': 123, ...}}, ...} + E.g. files_dict: {'cs': [ {'sap_nr': 2, ..., 'properties': {'nr_of_uv_files': 123, ...}}, {...} ], 'is': ...} """ if files_dict is None: return [] @@ -668,19 +668,20 @@ class ResourceAssigner(): logger.info('getProperties: processing %s_files: %s', io_type, files_dict) properties = [] - for dptype, dptype_dict in files_dict.items(): - sap_nr = dptype_dict.get('sap_nr') # only with obs output and obs successor input + for dptype in files_dict: + for dptype_dict in files_dict[dptype]: + sap_nr = dptype_dict.get('sap_nr') # only with obs output and obs successor input - for prop_type_name, prop_value in dptype_dict['properties'].items(): - rc_property_type_id = db_resource_prop_types.get(prop_type_name) - if rc_property_type_id is None: - logger.error('getProperties: ignoring unknown prop type: %s', prop_type_name) - continue + for prop_type_name, prop_value in dptype_dict['properties'].items(): + rc_property_type_id = db_resource_prop_types.get(prop_type_name) + if rc_property_type_id is None: + logger.error('getProperties: ignoring unknown prop type: %s', prop_type_name) + continue - prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type} - if sap_nr is not None: - prop['sap_nr'] = sap_nr - properties.append(prop) + prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type} + if sap_nr is not None: + prop['sap_nr'] = sap_nr + properties.append(prop) return properties diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py index 6be9a9823e6..acc981ab46c 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py @@ -50,7 +50,7 @@ class BasePipelineResourceEstimator(BaseResourceEstimator): def _getOutputIdentification(self, identifications): """ For pipeline output, there must be exactly 1 (non-duplicate) identification string per - data product type. (How can you otherwise refer to it unambiguously?) (Observation output can have 1 per sap.) + data product type. (How can you otherwise refer to it unambiguously?) (Observation output can have 1 per SAP.) """ if len(set(identifications)) != 1: # make set to filter any duplicates if not identifications: diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index 33127049510..3b45a0eb47b 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -23,6 +23,7 @@ """ Base class for Resource Estimators """ import logging +import copy import pprint from datetime import datetime from lofar.common.datetimeutils import totalSeconds @@ -64,46 +65,56 @@ class BaseResourceEstimator(object): def _calculate(self, parset, predecessor_estimates=[]): raise NotImplementedError('calculate() in base class is called. Please implement calculate() in your subclass') - def _extract_sap_nr(self, identification): - """ Return sap nr as int from identification or None if - no int xxx in '.SAPxxx' in identification. + def _add_predecessor_output(self, input_files, predecessor_estimate, identification, dptype): + """ Add copy of an element under the dptype key in predecessor_estimate + to input_files if it matches identification, or else return False. + But see comment below on resource_count collapsing to convert resource_count > 1 to pipelines. """ - for s in identification.split('.'): # Find the SAP number, if present - if 'SAP' not in s: + if 'output_files' not in predecessor_estimate or \ + dptype not in predecessor_estimate['output_files']: + return False + + for dt_values in predecessor_estimate['output_files'][dptype]: + if dt_values['identification'] != identification: continue - try: - return int(s[3:]) - except: - pass - - return None - - def _filterInputs(self, input_files, identifications): - """ Return copy of input_files with only parts of input_files covered by identifications. - There may not be duplicates in the identifications iterable. - - Example argument values: - 'input_files': { - 'uv': {'identification': 'mom.G777955.B15.1.CPC.uv.dps', # or w/ obs e.g.: 'mom.G777955.B15.1.C.SAP000.uv.dps' - 'uv_file_size': 1073741824, 'nr_of_uv_files': 42, <uv specific key-values>}, - 'im': {'identification': 'mom.G777955.B15.1.CPC.inst.dps', - 'im_file_size': 1000, 'nr_of_im_files': 42, <im specific key-values>} - } - There may also be a key-value pair 'sap_nr': N, typically for observation. - 'identifications': ['mom.G777955.B2.1.C.SAP002.uv.dps', ...] (or without SAPxxx in it, as shown in the input_files example). + + logger.info('Found predecessor output identification matching %s', identification) + if dptype not in input_files: + input_files[dptype] = [] + input_files[dptype].append(copy.deepcopy(dt_values)) + + # Observation estimates have resource_count > 1 to be able to assign each output to another resource, + # but that is not supported atm for pipelines. We only use input params to produce parset filenames etc, + # but not to reserve resources (not covered by resource count). Collapse to implied resource_count of 1. + input_files[dptype][-1]['properties']['nr_of_' + dptype + '_files'] *= predecessor_estimate['resource_count'] + return True + + return False + + def get_inputs_from_predecessors(self, predecessor_estimates, identifications, dptype): + """ Return copy of parts with dptype in predecessor_estimates matching identifications + If any of any of identifications could not be found, the empty dict is returned. + dptype is one of the observation/pipeline data product types, e.g. 'uv', 'cs', 'pulp', ... + No duplicates in the identifications iterable! + + See the calibration and observation pipeline estimators for parameter value examples. """ - output_files = {} - logger.info('parsing input for identifications: %s' % (identifications,)) + input_files = {} + logger.info('get_inputs_from_predecessors: parsing predecessor output for identifications: %s', identifications) for identification in identifications: - sap_nr = self._extract_sap_nr(identification) - for data_type, data_properties in input_files.items(): - if identification == data_properties['identification']: - logger.info('Found input identification matching %s' % (identification,)) - output_files[data_type] = dict(data_properties) # shallow copy is enough to avoid unintended changes - - logger.info('_filterInputs: filtered down to: \n' + pprint.pformat(output_files)) - return output_files + found = False + for estimate in predecessor_estimates: + if self._add_predecessor_output(input_files, estimate, identification, dptype): + found = True + break + if not found: + logger.warn('get_inputs_from_predecessors: failed to find predecessor output matching %s', identification) + return {} + + logger.info('get_inputs_from_predecessors: filtered predecessor output for dptype=' + dptype + + ' down to: \n' + pprint.pformat(input_files)) + return input_files def verify_and_estimate(self, parset, predecessor_estimates=[]): """ Create estimates for an observation or pipeline step based on its parset and, diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index ca2d4c243e5..cc5f1ce5874 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -52,40 +52,78 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): PIPELINE + 'DPPP.demixer.timestep') def _calculate(self, parset, predecessor_estimates): - """ Estimate for Calibration Pipeline. Also gets used for Averaging Pipeline + """ Estimator for calibration pipeline step. Also used for averaging pipeline step. calculates: datasize (number of files, file size), bandwidth predecessor_estimates looks something like (see also output format in observation.py and (other) pipelines): [{ - 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # per sb + 'resource_types': {'bandwidth': 286331153, 'storage': 1073741824}, # per 'uv' dict 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}] + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, + {'sap_nr': 3, 'identification': 'mom.G777955.B2.1.C.SAP003.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}} + ] } }, - <optionally more estimates> + <optionally more estimates> ] - The reply is something along the lines of: (assumes task duration of 30 s) - [{ - 'resource_types': {'bandwidth': 2236995, 'storage': 67109864}, # for each uv+im output data prod (thus the total is times the resource_count val) - 'resource_count': 20, 'root_resource_group': 'CEP4', - 'input_files': { - 'uv': {'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', # w/ sap only if predecessor is an observation - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, - 'im': {'identification': ..., # no 'im' as input if predecessor is an observation - 'properties': {'im_file_size': 1000, 'nr_of_im_files': 1, 'start_sb_nr': 0}} - }, - 'output_files': { - 'uv': {'identification': 'mom.G777956.B2.1.CPC.uv.dps', - 'properties': {'uv_file_size': 67108864, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, - 'im': {'identification': 'mom.G777956.B2.1.CPC.inst.dps', - 'properties': {'im_file_size': 1000, 'nr_of_im_files': 1, 'start_sb_nr': 0}} - } - }, - <optionally more estimates> + The reply is something along the lines of the example below + (assumes task duration of 30 s, and typically 'im' is not in both input_files and output_files) + { + 'errors': [], + 'estimates': [ + { + 'resource_types': {'bandwidth': 2236995 * 20, 'storage': 67109864 * 20}, + 'resource_count': 1, 'root_resource_group': 'CEP4', + + # input resources not (yet) allocated: bandwidth only, but coupled to specific storage resource + 'input_files': { + 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', # w/ sap only if predecessor is an observation + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 20 'start_sb_nr': 0}}, + {'sap_nr': 3, 'identification': 'mom.G777955.B2.1.C.SAP003.uv.dps', # idem, >1 input SAP possible for e.g. the pulsar pipeline + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 20 'start_sb_nr': 20}} + ] + 'im': [{'identification': ..., # 'im' example; no 'im' input if predecessor is an observation as above + 'properties': {'im_file_size': 1000, 'nr_of_im_files': 20 'start_sb_nr': 0}}] + }, + 'output_files': { + 'uv': [{'identification': 'mom.G777956.B2.1.CPC.uv.dps', + 'properties': {'uv_file_size': 67108864, 'nr_of_uv_files': 40, 'start_sb_nr': 0}}], + 'im': [{'identification': 'mom.G777956.B2.1.CPC.inst.dps', + 'properties': {'im_file_size': 1000, 'nr_of_im_files': 20, 'start_sb_nr': 0}}] + } + }, + <optionally more estimates> ] + } + + The estimates key has a list to support observations with >1 data product type that can have + different properties (typically CS/IS (per SAP), but always used for observations w/ >1 SAP). + This is the reason that observations have an estimate per SAP and per data product type: + their resource_types values may be different. This is needed to employ resource_count > 1. + See the observation estimator for an extensive observation data product type example. + + For each estimate, the total output_files resources to be claimed is resource_count * resources_types. + Thus resource_types is a total across all output_files content. The idea is to keep this + singular per data product type (inner list len 1), but for pipelines this is not possible atm. + + Note that atm input_files resources are not included or claimed. + However, input_files properties must be added to resource claims to later generate the parset. + This caveat must be fixed at some point, but until then, we cannot have input_files-only estimates. + (After it is fixed, we should not have that either; it makes no sense.) + + For pipelines we don't support output to multiple storage areas atm, so resource_count is 1. + We still have to deal with input_files from an observation with >1 SAP (used for the pulsar pipeline). + For this case, we generate 1 estimate, but use a list per data product type (e.g. 'uv': [...]). + Also, we may need multiple data product types in one pipeline estimate, but there the reason + is that e.g. 'uv' and 'im' file(s) belong together, so we must produce one estimate per pair, + (but again, it's a pipeline so atm it is collapsed to a single estimate, i.e. resource_count 1). + The inner data product type list can be removed once pipelines also use resource_count > 1. + + Some RA_Services design aspects work well. Others fail to capture the underlying concepts close enough, hence inelegance. """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) @@ -102,20 +140,35 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): logger.error('Input_Correlated or Output_Correlated is not enabled') result['errors'].append('Input_Correlated or Output_Correlated is not enabled') - if result['errors']: - return result duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) - # NOTE: input bandwidth is not included in the resulting estimate atm. - # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... - #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + input_files = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv') + if not input_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + have_im_input = parset.getBool(DATAPRODUCTS + 'Input_InstrumentModel.enabled') if have_im_input: input_idents_im = parset.getStringVector(DATAPRODUCTS + 'Input_InstrumentModel.identifications') + input_files_im = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_im, 'im') + if not input_files_im: + logger.error('Input_InstrumentModel enabled, but missing \'im\' dataproducts in predecessor output_files') + result['errors'].append('Input_InstrumentModel enabled, but missing \'im\' dataproducts in predecessor output_files') + input_files['im'] = input_files_im['im'] + + if result['errors']: + return result + + estimate = {'input_files': input_files} + + # NOTE: input bandwidth is not included in the resulting estimate atm. + # Proper input bandwidth estimation has limited use atm and is tricky, because of pipeline duration est, tmp files, + # multiple passes, nr nodes and caching, but for sure also because bandwidth must be tied to *predecessor* storage! + #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') + output_ident_uv = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') ) output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') have_im_output = parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled') @@ -127,74 +180,62 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): logger.warn('storageClusterName differs between uv: \'%s\' and im: \'%s\': to be packed in 1 estimate, so ignoring \'im\' storageClusterName', output_cluster_uv, output_cluster_im) - for pred_est in predecessor_estimates: - pred_output_files = pred_est.get('output_files') - if pred_output_files is None: - continue - - idents = input_idents_uv - if have_im_input: - if 'im' in pred_output_files: - idents.extend(input_idents_im) - else: - logger.warn('Input_InstrumentModel enabled, but missing \'im\' dataproducts in predecessor output_files') - input_files = self._filterInputs(pred_output_files, idents) - if not input_files: - continue - - if 'uv' not in input_files: - logger.error('Missing uv dataproducts in predecessor output_files') - result['errors'].append('Missing uv dataproducts in predecessor output_files') - continue - - estimate = {'input_files': input_files} - - nr_input_files = input_files['uv']['properties']['nr_of_uv_files'] - uv_input_file_size = input_files['uv']['properties']['uv_file_size'] - start_sb_nr = input_files['uv']['properties']['start_sb_nr'] - - # TODO: This output file size calculation comes from the (old) Scheduler without explaining comments. - # The reason why it isn't a simple div, is that parts of the metadata are not reduced in size (and casacore storage mgrs). - # With reduction_factor 1, computed output size increases by 53%. Casacore storage mgrs may change size, but that much?!? - # If you can figure out what/how, please fix this calculation. Avoid unnamed magic values and document why! - logger.debug("calculate correlated data size") - new_size = uv_input_file_size / float(reduction_factor) - uv_output_file_size = int(new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0) - - nr_output_files = nr_input_files # pure 'map' (bijective) operation, no split or reduce - logger.info("correlated_uv: {}x {} files of {} bytes each".format(pred_est['resource_count'], nr_output_files, uv_output_file_size)) - estimate['output_files'] = {'uv': {'identification': output_ident_uv, - 'properties': {'nr_of_uv_files': nr_output_files, 'uv_file_size': uv_output_file_size, 'start_sb_nr': start_sb_nr}}} - data_size = uv_output_file_size - - # If instrument model output is needed, add it to the same estimate, - # since it must be written to the same storage as the uv output (same nr_output_files). - if have_im_output: - logger.info("calculate instrument-model data size") - im_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler - - logger.info("correlated_im: {}x {} files {} bytes each".format(pred_est['resource_count'], nr_output_files, im_file_size)) - estimate['output_files']['im'] = {'identification': output_ident_im, - 'properties': {'nr_of_im_files': nr_output_files, 'im_file_size': im_file_size, 'start_sb_nr': start_sb_nr}} - # FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export? - # Need to split averaging pipeline and calibration pipeline - data_size += im_file_size - - data_size *= nr_output_files # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} - estimate['resource_count'] = pred_est['resource_count'] - estimate['root_resource_group'] = output_cluster_uv - else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') - - result['estimates'].append(estimate) - - if not result['estimates'] and not result['errors']: - logger.error('calibration / averaging pipeline estimator produced no estimates') - result['errors'].append('calibration / averaging pipeline estimator produced no estimates') + # Observations can have multiple output estimates, but atm pipelines do not. + # (Reason: incomplete info avail and effective assigner claim merging is harder) + # As long as this is the case, try to do a best effort to map any predecessor (obs or pipeline) to single estimate output. + nr_input_files = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) + + # Assume all uv file sizes are the same size as in dict 0. For uv data, we never had pipelines with >1 dict, + # but this could be meaningful when averaging multiple SAPs in 1 go (and no further processing steps). + # (Never done, since subsequent pipeline steps must then also work on all SAPs. But averaging could be the last step.) + # The potential other case is >1 dict from different obs with different file sizes. + # In general, this requires >1 output est dict, which the estimate fmt allows, but atm is only used for observations. + uv_input_file_size = input_files['uv'][0]['properties']['uv_file_size'] + + # For start_sb_nr, take the minimum of all start_sb_nr values. + # This fails when the input identifications has a sparse SAP list, but that was never supported either. + # A likely setup where this could happen is LOTAAS+pulp, but pulp has no equivalent to start_sb[g]_nr, + # so solve here and in the pulsar pipeline pragmatically each. + start_sb_nr = min([uv_dict['properties']['start_sb_nr'] for uv_dict in input_files['uv']]) + + # TODO: This output file size calculation comes from the (old) Scheduler without explaining comments. + # The reason why it isn't a simple div, is that parts of the metadata are not reduced in size (and casacore storage mgrs). + # With reduction_factor 1, computed output size increases by 53%. Casacore storage mgrs may change size, but that much?!? + # If you can figure out what/how, please fix this calculation. Avoid unnamed magic values and document why! + logger.debug("calculate correlated data size") + new_size = uv_input_file_size / float(reduction_factor) + uv_output_file_size = int(new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0) + + nr_output_files = nr_input_files # pure 'map' (bijective) operation, no split or reduce + logger.info("correlated_uv: {} files of {} bytes each".format(nr_output_files, uv_output_file_size)) + estimate['output_files'] = {'uv': [{'identification': output_ident_uv, + 'properties': {'nr_of_uv_files': nr_output_files, 'uv_file_size': uv_output_file_size, 'start_sb_nr': start_sb_nr}}]} + data_size = uv_output_file_size + + # If instrument model output is needed, add it to the same estimate, + # since it must be written to the same storage as the uv output (same nr_output_files). + if have_im_output: + logger.info("calculate instrument-model data size") + im_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("correlated_im: {} files {} bytes each".format(nr_output_files, im_file_size)) + estimate['output_files']['im'] = [{'identification': output_ident_im, + 'properties': {'nr_of_im_files': nr_output_files, 'im_file_size': im_file_size, 'start_sb_nr': start_sb_nr}}] + # FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export? + # Need to split averaging pipeline and calibration pipeline + data_size += im_file_size + + data_size *= nr_output_files # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = 1 + estimate['root_resource_group'] = output_cluster_uv + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index 810be6a8839..38e5e9d4b0f 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -23,7 +23,6 @@ import logging from math import ceil from base_pipeline_estimator import BasePipelineResourceEstimator -from lofar.parameterset import parameterset logger = logging.getLogger(__name__) @@ -51,7 +50,7 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator): PIPELINE + 'Imaging.subbands_per_image') def _calculate(self, parset, predecessor_estimates): - """ Estimate for Imaging Pipeline. Also gets used for MSSS Imaging Pipeline + """ Estimate for imaging pipeline step. Also used for MSSS imaging pipeline. calculates: datasize (number of files, file size), bandwidth For a predecessor_estimates example, see the calibration/averaging @@ -70,74 +69,61 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator): slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) if slices_per_image < 1 or subbands_per_image < 1: - logger.error('slices_per_image or subbands_per_image are not valid') - result['errors'].append('slices_per_image or subbands_per_image are not valid') + logger.error('slices_per_image or subbands_per_image is not valid') + result['errors'].append('slices_per_image or subbands_per_image is not valid') if not parset.getBool(DATAPRODUCTS + 'Output_SkyImage.enabled'): logger.error('Output_SkyImage is not enabled') result['errors'].append('Output_SkyImage is not enabled') - if result['errors']: - return result duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + input_files = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv') + if not input_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + + if result['errors']: + return result + + estimate = {'input_files': input_files} + # NOTE: input bandwidth is not included in the resulting estimate atm. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') output_ident_img = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_SkyImage.identifications') ) output_cluster_img = parset.getString(DATAPRODUCTS + 'Output_SkyImage.storageClusterName') - for pred_est in predecessor_estimates: - pred_output_files = pred_est.get('output_files') - if pred_output_files is None: - continue - - input_files = self._filterInputs(pred_output_files, input_idents_uv) - if not input_files: - continue - - if 'uv' not in input_files: - logger.error('Missing uv dataproducts in predecessor output_files') - result['errors'].append('Missing uv dataproducts in predecessor output_files') - continue - - estimate = {'input_files': input_files} - - total_nr_input_subbands = nr_uv_files * pred_est['resource_count'] - if total_nr_input_subbands % (subbands_per_image * slices_per_image) > 0: - logger.error('slices_per_image and subbands_per_image not a multiple of number of inputs') - result['errors'].append('slices_per_image and subbands_per_image not a multiple of number of inputs') - continue - nr_images = total_nr_input_subbands / (subbands_per_image * slices_per_image) - - logger.debug("calculate sky image data size") - nr_uv_files = input_files['uv']['properties']['nr_of_uv_files'] - uv_file_size = input_files['uv']['properties']['uv_file_size'] # same for each uv data product across all SAPs - img_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler - - logger.info("sky_images: {} files {} bytes each".format(nr_images, img_file_size)) - estimate['output_files'] = {'img': {'identification': output_ident_img, - 'properties': {'nr_of_img_files': 1, # also see estimate['resource_count'] below - 'img_file_size': img_file_size}}} - - # count total data size - data_size = img_file_size # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} - estimate['resource_count'] = nr_images # as such, potentially different resources can be allocated for each output - estimate['root_resource_group'] = output_cluster_img - else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') - - result['estimates'].append(estimate) - - if not result['estimates'] and not result['errors']: - logger.error('imaging pipeline estimator produced no estimates') - result['errors'].append('imaging pipeline estimator produced no estimates') + # See the calibration pipeline estimator for why this is done in this way atm. + nr_input_subbands = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) + uv_file_size = input_files['uv'][0]['properties']['uv_file_size'] + if nr_input_subbands % (subbands_per_image * slices_per_image) > 0: + logger.error('slices_per_image and subbands_per_image not a multiple of number of inputs') + result['errors'].append('slices_per_image and subbands_per_image not a multiple of number of inputs') + nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) + + logger.debug("calculate sky image data size") + img_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("sky_images: {} files {} bytes each".format(nr_images, img_file_size)) + estimate['output_files'] = {'img': {'identification': output_ident_img, + 'properties': {'nr_of_img_files': nr_images, + 'img_file_size': img_file_size}}} + + # count total data size + data_size = nr_images * img_file_size # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = 1 + estimate['root_resource_group'] = output_cluster_img + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index b898f42c437..c7f65b47a6d 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -51,7 +51,7 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator): def _calculate(self, parset, predecessor_estimates): - """ Estimate for Long Baseline Pipeline + """ Estimator for long baseline pipeline step. calculates: datasize (number of files, file size), bandwidth For a predecessor_estimates example, see the calibration/averaging @@ -68,78 +68,66 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator): subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) - if not subbandgroups_per_ms or not subbands_per_subbandgroup: - logger.error('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') - result['errors'].append('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') + if subbandgroups_per_ms < 1 or subbands_per_subbandgroup < 1: + logger.error('subbandgroups_per_ms or subbands_per_subbandgroup is not valid') + result['errors'].append('subbandgroups_per_ms or subbands_per_subbandgroup is not valid') if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): logger.error('Output_Correlated is not enabled') result['errors'].append('Output_Correlated is not enabled') - if result['errors']: - return result duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + input_files = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv') + if not input_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + + if result['errors']: + return result + + estimate = {'input_files': input_files} + # NOTE: input bandwidth is not included in the resulting estimate atm. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') output_ident_uv = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') ) output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') - for pred_est in predecessor_estimates: - pred_output_files = pred_est.get('output_files') - if pred_output_files is None: - continue - - input_files = self._filterInputs(pred_output_files, input_idents_uv) - if not input_files: - continue - - if 'uv' not in input_files: - logger.error('Missing uv dataproducts in predecessor output_files') - result['errors'].append('Missing uv dataproducts in predecessor output_files') - continue - - estimate = {'input_files': input_files} - - nr_input_files = input_files['uv']['properties']['nr_of_uv_files'] - uv_input_file_size = input_files['uv']['properties']['uv_file_size'] # same for each uv data product across all SAPs - start_sb_nr = input_files['uv']['properties']['start_sb_nr'] - - if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0: - logger.error('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') - result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') - continue - total_nr_input_files = nr_input_files * pred_est['resource_count'] - nr_output_files = total_nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) - - logger.debug("calculate correlated data size") - uv_output_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler - start_sbg_nr = start_sb_nr / (subbands_per_subbandgroup * subbandgroups_per_ms) - - logger.info("correlated_uv: {} files {} bytes each".format(nr_output_files, uv_output_file_size)) - estimate['output_files'] = {'uv': {'identification': output_ident_uv, - 'properties': {'nr_of_uv_files': 1, # also see estimate['resource_count'] below - 'uv_file_size': uv_output_file_size, - 'start_sbg_nr': start_sbg_nr}}} - - # count total data size - data_size = nr_output_files * uv_output_file_size # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} - estimate['resource_count'] = nr_output_files # as such, potentially different resources can be allocated for each output - estimate['root_resource_group'] = output_cluster_uv - else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') - - result['estimates'].append(estimate) - - if not result['estimates'] and not result['errors']: - logger.error('long baseline estimator produced no estimates') - result['errors'].append('long baseline pipeline estimator produced no estimates') + # See the calibration pipeline estimator for why this is done in this way atm. + nr_input_files = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) + uv_input_file_size = input_files['uv'][0]['properties']['uv_file_size'] + start_sb_nr = min([uv_dict['properties']['start_sb_nr'] for uv_dict in input_files['uv']]) + + if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0: + logger.error('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) + + logger.debug("calculate correlated data size") + uv_output_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + start_sbg_nr = start_sb_nr / (subbands_per_subbandgroup * subbandgroups_per_ms) + + logger.info("correlated_uv: {} files {} bytes each".format(nr_output_files, uv_output_file_size)) + estimate['output_files'] = {'uv': {'identification': output_ident_uv, + 'properties': {'nr_of_uv_files': nr_output_files, + 'uv_file_size': uv_output_file_size, + 'start_sbg_nr': start_sbg_nr}}} + + # count total data size + data_size = nr_output_files * uv_output_file_size # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = 1 + estimate['root_resource_group'] = output_cluster_uv + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) return result + diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index b03168803cc..fed49cb70c2 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -64,50 +64,51 @@ class ObservationResourceEstimator(BaseResourceEstimator): The predecessor_estimates arg is just to implement the same interface as pipelines. Observations have no predecessor. The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters. - NOTE: 'nr_of_XX_files' is for that estimate, times the 'resource_count' (hence each estimates contains 1 SAP). - 'nr_of_cs_parts' is for a CS TAB (per stokes component) in that SAP, not per estimate. - More examples at scu001:/opt/lofar/var/log/raestimatorservice.log + NOTE: 'nr_of_XX_files' is for that SAP estimate. The total is thus times the 'resource_count'. + 'nr_of_cs_parts' is for a full CS TAB (per stokes component) in that SAP; not per estimate, which may still describe one part. + + See the calibration pipeline estimator for some explanation on why parts of this format are needed atm. It also has input_files. { 'errors': [], 'estimates': [{ 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # for each uv output data prod (thus the total is times the resource_count val) 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}} + 'uv': [{'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}] } }, {'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # idem 'resource_count': 60, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}} + 'uv': [{'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}}] } }, {'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # idem 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}} + 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}}] } }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # for each quad (4 stokes) of cs output tab part (thus the total is times the resource_count val) 'resource_count': 34, 'root_resource_group': 'DRAGNET', 'output_files': { - 'cs': {'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.cs.dps', - 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, - 'nr_of_cs_parts': 2}} # parts per tab for this sap + 'cs': [{'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.cs.dps', + 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, + 'nr_of_cs_parts': 2}}] # parts per tab for this sap } }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # idem 'resource_count': 6, 'root_resource_group': 'DRAGNET', 'output_files': { - 'cs': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.cs.dps', - 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, - 'nr_of_cs_parts': 1, 'is_tab_nr': 0}} # parts per tab for this sap + 'cs': [{'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.cs.dps', + 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, + 'nr_of_cs_parts': 1, 'is_tab_nr': 0}}] # parts per tab for this sap } }, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # for each 'is' output tab part (thus the total is times the resource_count val) 'resource_count': 1, 'root_resource_group': 'DRAGNET', 'output_files': { - 'is': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.is.dps', - 'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, 'nr_of_is_stokes': 1, - 'is_tab_nr': 0}} # IS can have >1 parts, but atm max 1 IS TAB per SAP + 'is': [{'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.is.dps', + 'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, 'nr_of_is_stokes': 1, + 'is_tab_nr': 0}}] # IS can have >1 parts, but atm max 1 IS TAB per SAP } }] } @@ -201,9 +202,9 @@ class ObservationResourceEstimator(BaseResourceEstimator): est = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size}, 'resource_count': nr_subbands, 'root_resource_group': root_resource_group, - 'output_files': {'uv': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], - 'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # i.e. total nr_of_uv_files is resource_count times 1 - 'start_sb_nr': total_files}}}} + 'output_files': {'uv': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # i.e. total nr_of_uv_files is resource_count times 1 + 'start_sb_nr': total_files}}]}} total_files += nr_subbands estimates.append(est) @@ -298,11 +299,11 @@ class ObservationResourceEstimator(BaseResourceEstimator): est = {'resource_types': {'storage': storage, 'bandwidth': bandwidth}, 'resource_count': nr_coherent_tabs * nr_parts_per_tab, 'root_resource_group': root_resource_group, - 'output_files': {'cs': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], - 'properties': {'cs_file_size': file_size, 'nr_of_cs_files': nr_coherent, - 'nr_of_cs_stokes': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}}} + 'output_files': {'cs': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'cs_file_size': file_size, 'nr_of_cs_files': nr_coherent, + 'nr_of_cs_stokes': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}]}} if is_tab_nr != -1: # translator to filenames needs to know: it may not have all CS+IS info in one claim - est['output_files']['cs']['properties']['is_tab_nr'] = is_tab_nr + est['output_files']['cs'][0]['properties']['is_tab_nr'] = is_tab_nr estimates.append(est) logger.debug("Coherent Stokes data estimates: {}".format(estimates)) @@ -384,9 +385,9 @@ class ObservationResourceEstimator(BaseResourceEstimator): est = {'resource_types': {'storage': storage, 'bandwidth': bandwidth}, 'resource_count': nr_incoherent_tabs * nr_parts_per_tab, 'root_resource_group': root_resource_group, - 'output_files': {'is': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], - 'properties': {'is_file_size': file_size, 'nr_of_is_files': nr_incoherent, - 'nr_of_is_stokes': nr_incoherent, 'is_tab_nr': is_tab_nr}}}} + 'output_files': {'is': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'is_file_size': file_size, 'nr_of_is_files': nr_incoherent, + 'nr_of_is_stokes': nr_incoherent, 'is_tab_nr': is_tab_nr}}]}} estimates.append(est) logger.debug("Incoherent Stokes data estimates: {}".format(estimates)) @@ -418,6 +419,20 @@ class ObservationResourceEstimator(BaseResourceEstimator): logger.info("number of virtual stations = {}".format(nr_virtual_stations)) return nr_virtual_stations + def _extract_sap_nr(self, identification): + """ Return sap nr as int from identification or None if + no int xxx in '.SAPxxx.' in identification. + """ + for s in identification.split('.'): # Find the SAP number, if present + if 'SAP' not in s: + continue + try: + return int(s[3:]) + except: + pass + + return None + def _sap_identifications(self, identifications, nr_saps): """ Return list with identifications' identification for sap i at index i, or '' at index i if no such identification for sap i. diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index 4a8b6b6a903..bf0cf0a89c0 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -68,74 +68,71 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator): if not parset.getBool(DATAPRODUCTS + 'Output_Pulsar.enabled'): logger.error('Output_Pulsar is not enabled') result['errors'].append('Output_Pulsar is not enabled') - if result['errors']: - return result duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + # The current XML generator can produce a pulsar pipeline step that operates on 1 SAP, + # however, pulsar astronomers produce an XML that works on all SAPs (CS+IS) of an obs. + # This is regular and going on for years, so we need to support multi-SAP pulp input. + # Note that when selecting obs SAP nr > 0 or a sparse SAP nr range, TAB nrs in pulp filenames do not + # match TAB nrs in obs filenames, because there is no pulp equiv of start_sb[g]_nr. Nobody cares. + # (Then there is the possibility of multi-obs pulp input. As-is that may turn out to work as well.) + input_files = {} + input_idents_cs = parset.getStringVector(DATAPRODUCTS + 'Input_CoherentStokes.identifications') + input_files_cs = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_cs, 'cs') + if input_files_cs: + input_files = input_files_cs['cs'] + input_idents_is = parset.getStringVector(DATAPRODUCTS + 'Input_IncoherentStokes.identifications') + input_files_is = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_is, 'is') + if input_files_is: + input_files = input_files_is['is'] + if not input_files: + logger.error('Missing \'cs\' or \'is\' dataproducts in predecessor output_files') + result['errors'].append('Missing \'cs\' or \'is\' dataproducts in predecessor output_files') + + if result['errors']: + return result + + estimate = {'input_files': input_files} + # NOTE: input bandwidth is not included in the resulting estimate atm. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_cs = parset.getString(DATAPRODUCTS + 'Input_CoherentStokes.storageClusterName') #input_cluster_is = parset.getString(DATAPRODUCTS + 'Input_IncoherentStokes.storageClusterName') - input_idents_cs = parset.getStringVector(DATAPRODUCTS + 'Input_CoherentStokes.identifications') - input_idents_is = parset.getStringVector(DATAPRODUCTS + 'Input_IncoherentStokes.identifications') output_ident_pulp = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Pulsar.identifications') ) output_cluster_pulp = parset.getString(DATAPRODUCTS + 'Output_Pulsar.storageClusterName') - for pred_est in predecessor_estimates: - pred_output_files = pred_est.get('output_files') - if pred_output_files is None: - continue - - idents = [] - if 'cs' in pred_output_files: - idents.extend(input_idents_cs) - if 'is' in pred_output_files: - idents.extend(input_idents_is) - input_files = self._filterInputs(pred_output_files, idents) - if not input_files: - continue - - if not 'cs' in input_files and not 'is' in input_files: - logger.error('Missing both CS and IS dataproducts in input_files') - result['errors'].append('Missing both CS and IS dataproducts in input_files') - - estimate = {'input_files': input_files} - - # It seems 1 pulp data product is produced per beam, also with stokes IQUV or with complex voltages (XXYY). - nr_output_files = 0 - if 'cs' in input_files: - nr_output_files += input_files['cs']['properties']['nr_of_cs_files'] / \ - input_files['cs']['properties']['nr_of_cs_stokes'] - if 'is' in input_files: - nr_output_files += input_files['is']['properties']['nr_of_is_files'] / \ - input_files['is']['properties']['nr_of_is_stokes'] - - logger.debug("calculate pulp data size") - pulp_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler - - logger.info("pulsar_pipeline pulp: {}x {} files {} bytes each".format(pred_est['resource_count'], nr_output_files, pulp_file_size)) - estimate['output_files'] = {'pulp': {'identification': output_ident_pulp, - 'properties': {'nr_of_pulp_files': nr_output_files, # times pred_est['resource_count'] in total - 'pulp_file_size': pulp_file_size}}} - - # count total data size - data_size = nr_output_files * pulp_file_size - if data_size > 0: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} - estimate['resource_count'] = pred_est['resource_count'] - estimate['root_resource_group'] = output_cluster_pulp - else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') - - result['estimates'].append(estimate) - - if not result['estimates'] and not result['errors']: - logger.error('pulsar pipeline estimator produced no estimates') - result['errors'].append('pulsar pipeline estimator produced no estimates') + # It seems 1 pulp data product is produced per beam, also with stokes IQUV or with complex voltages (XXYY). + nr_input_files = 0 + if 'cs' in input_files: + nr_input_files += sum([cs_dict['properties']['nr_of_cs_files'] / \ + cs_dict['properties']['nr_of_cs_stokes'] for cs_dict in input_files['cs']]) + if 'is' in input_files: + nr_input_files += sum([is_dict['properties']['nr_of_is_files'] / \ + is_dict['properties']['nr_of_is_stokes'] for is_dict in input_files['is']]) + + logger.debug("calculate pulp data size") + pulp_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("pulsar_pipeline pulp: {} files {} bytes each".format(nr_input_files, pulp_file_size)) + estimate['output_files'] = {'pulp': {'identification': output_ident_pulp, + 'properties': {'nr_of_pulp_files': nr_input_files, + 'pulp_file_size': pulp_file_size}}} + + # count total data size + data_size = nr_input_files * pulp_file_size + if data_size > 0: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = 1 + estimate['root_resource_group'] = output_cluster_pulp + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) return result + -- GitLab