diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index 10f9c3b0d044b51e9a954da6f249741826f7d87c..18a0804aa3765d07eec47b33cdf24e7a7346a8a7 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -230,7 +230,7 @@ class RAtoOTDBTranslator(): nr_cs_stokes = sap['properties']['nr_of_cs_stokes'] # the 'cs_stokes' term here can also mean cv XXYY nr_parts = sap['properties']['nr_of_cs_files'] / nr_cs_stokes # in this prop's claim! nparts_tab = nr_parts_per_tab_per_sap[sap_nr] # alias for readability; this is also per stokes - while nr_parts > 0: # loops over tab nrs + while nr_parts > 0: tab_nr = next_tab_part_nrs_per_sap[sap_nr] / nparts_tab tab_part_nr = next_tab_part_nrs_per_sap[sap_nr] % nparts_tab nparts_remain = min(nr_parts, nparts_tab - tab_part_nr) # nr parts left before we go to the next tab diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index d6cffeb57429e869142e53c30677e0646aac05a1..6572a4984b766930d49571a055e82083f800d2a6 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -660,7 +660,7 @@ class ResourceAssigner(): def getProperties(self, db_resource_prop_types, files_dict, io_type): """ Return list of properties in claim format converted from files_dict. - E.g. files_dict: {'cs': {'sap_nr': 2, ..., 'properties': {'nr_of_uv_files': 123, ...}}, ...} + E.g. files_dict: {'cs': [ {'sap_nr': 2, ..., 'properties': {'nr_of_uv_files': 123, ...}}, {...} ], 'is': ...} """ if files_dict is None: return [] @@ -668,19 +668,20 @@ class ResourceAssigner(): logger.info('getProperties: processing %s_files: %s', io_type, files_dict) properties = [] - for dptype, dptype_dict in files_dict.items(): - sap_nr = dptype_dict.get('sap_nr') # only with obs output and obs successor input + for dptype in files_dict: + for dptype_dict in files_dict[dptype]: + sap_nr = dptype_dict.get('sap_nr') # only with obs output and obs successor input - for prop_type_name, prop_value in dptype_dict['properties'].items(): - rc_property_type_id = db_resource_prop_types.get(prop_type_name) - if rc_property_type_id is None: - logger.error('getProperties: ignoring unknown prop type: %s', prop_type_name) - continue + for prop_type_name, prop_value in dptype_dict['properties'].items(): + rc_property_type_id = db_resource_prop_types.get(prop_type_name) + if rc_property_type_id is None: + logger.error('getProperties: ignoring unknown prop type: %s', prop_type_name) + continue - prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type} - if sap_nr is not None: - prop['sap_nr'] = sap_nr - properties.append(prop) + prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type} + if sap_nr is not None: + prop['sap_nr'] = sap_nr + properties.append(prop) return properties diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py index 6be9a9823e6bd1aa4a2ecf96c9f018561d70e3ad..acc981ab46c5554c6d5c6e72dc3c26d9643ef424 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py @@ -50,7 +50,7 @@ class BasePipelineResourceEstimator(BaseResourceEstimator): def _getOutputIdentification(self, identifications): """ For pipeline output, there must be exactly 1 (non-duplicate) identification string per - data product type. (How can you otherwise refer to it unambiguously?) (Observation output can have 1 per sap.) + data product type. (How can you otherwise refer to it unambiguously?) (Observation output can have 1 per SAP.) """ if len(set(identifications)) != 1: # make set to filter any duplicates if not identifications: diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index 33127049510646cd71284f1a7c756cf075aaeb9f..3b45a0eb47b070e41580635e5a1ce2c533957a09 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -23,6 +23,7 @@ """ Base class for Resource Estimators """ import logging +import copy import pprint from datetime import datetime from lofar.common.datetimeutils import totalSeconds @@ -64,46 +65,56 @@ class BaseResourceEstimator(object): def _calculate(self, parset, predecessor_estimates=[]): raise NotImplementedError('calculate() in base class is called. Please implement calculate() in your subclass') - def _extract_sap_nr(self, identification): - """ Return sap nr as int from identification or None if - no int xxx in '.SAPxxx' in identification. + def _add_predecessor_output(self, input_files, predecessor_estimate, identification, dptype): + """ Add copy of an element under the dptype key in predecessor_estimate + to input_files if it matches identification, or else return False. + But see comment below on resource_count collapsing to convert resource_count > 1 to pipelines. """ - for s in identification.split('.'): # Find the SAP number, if present - if 'SAP' not in s: + if 'output_files' not in predecessor_estimate or \ + dptype not in predecessor_estimate['output_files']: + return False + + for dt_values in predecessor_estimate['output_files'][dptype]: + if dt_values['identification'] != identification: continue - try: - return int(s[3:]) - except: - pass - - return None - - def _filterInputs(self, input_files, identifications): - """ Return copy of input_files with only parts of input_files covered by identifications. - There may not be duplicates in the identifications iterable. - - Example argument values: - 'input_files': { - 'uv': {'identification': 'mom.G777955.B15.1.CPC.uv.dps', # or w/ obs e.g.: 'mom.G777955.B15.1.C.SAP000.uv.dps' - 'uv_file_size': 1073741824, 'nr_of_uv_files': 42, <uv specific key-values>}, - 'im': {'identification': 'mom.G777955.B15.1.CPC.inst.dps', - 'im_file_size': 1000, 'nr_of_im_files': 42, <im specific key-values>} - } - There may also be a key-value pair 'sap_nr': N, typically for observation. - 'identifications': ['mom.G777955.B2.1.C.SAP002.uv.dps', ...] (or without SAPxxx in it, as shown in the input_files example). + + logger.info('Found predecessor output identification matching %s', identification) + if dptype not in input_files: + input_files[dptype] = [] + input_files[dptype].append(copy.deepcopy(dt_values)) + + # Observation estimates have resource_count > 1 to be able to assign each output to another resource, + # but that is not supported atm for pipelines. We only use input params to produce parset filenames etc, + # but not to reserve resources (not covered by resource count). Collapse to implied resource_count of 1. + input_files[dptype][-1]['properties']['nr_of_' + dptype + '_files'] *= predecessor_estimate['resource_count'] + return True + + return False + + def get_inputs_from_predecessors(self, predecessor_estimates, identifications, dptype): + """ Return copy of parts with dptype in predecessor_estimates matching identifications + If any of any of identifications could not be found, the empty dict is returned. + dptype is one of the observation/pipeline data product types, e.g. 'uv', 'cs', 'pulp', ... + No duplicates in the identifications iterable! + + See the calibration and observation pipeline estimators for parameter value examples. """ - output_files = {} - logger.info('parsing input for identifications: %s' % (identifications,)) + input_files = {} + logger.info('get_inputs_from_predecessors: parsing predecessor output for identifications: %s', identifications) for identification in identifications: - sap_nr = self._extract_sap_nr(identification) - for data_type, data_properties in input_files.items(): - if identification == data_properties['identification']: - logger.info('Found input identification matching %s' % (identification,)) - output_files[data_type] = dict(data_properties) # shallow copy is enough to avoid unintended changes - - logger.info('_filterInputs: filtered down to: \n' + pprint.pformat(output_files)) - return output_files + found = False + for estimate in predecessor_estimates: + if self._add_predecessor_output(input_files, estimate, identification, dptype): + found = True + break + if not found: + logger.warn('get_inputs_from_predecessors: failed to find predecessor output matching %s', identification) + return {} + + logger.info('get_inputs_from_predecessors: filtered predecessor output for dptype=' + dptype + + ' down to: \n' + pprint.pformat(input_files)) + return input_files def verify_and_estimate(self, parset, predecessor_estimates=[]): """ Create estimates for an observation or pipeline step based on its parset and, diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index ca2d4c243e560195d3c5871015d5813c80e6c1ec..cc5f1ce58742021cb2ad9cec4001feef76ba58f9 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -52,40 +52,78 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): PIPELINE + 'DPPP.demixer.timestep') def _calculate(self, parset, predecessor_estimates): - """ Estimate for Calibration Pipeline. Also gets used for Averaging Pipeline + """ Estimator for calibration pipeline step. Also used for averaging pipeline step. calculates: datasize (number of files, file size), bandwidth predecessor_estimates looks something like (see also output format in observation.py and (other) pipelines): [{ - 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # per sb + 'resource_types': {'bandwidth': 286331153, 'storage': 1073741824}, # per 'uv' dict 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}] + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, + {'sap_nr': 3, 'identification': 'mom.G777955.B2.1.C.SAP003.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}} + ] } }, - <optionally more estimates> + <optionally more estimates> ] - The reply is something along the lines of: (assumes task duration of 30 s) - [{ - 'resource_types': {'bandwidth': 2236995, 'storage': 67109864}, # for each uv+im output data prod (thus the total is times the resource_count val) - 'resource_count': 20, 'root_resource_group': 'CEP4', - 'input_files': { - 'uv': {'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', # w/ sap only if predecessor is an observation - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, - 'im': {'identification': ..., # no 'im' as input if predecessor is an observation - 'properties': {'im_file_size': 1000, 'nr_of_im_files': 1, 'start_sb_nr': 0}} - }, - 'output_files': { - 'uv': {'identification': 'mom.G777956.B2.1.CPC.uv.dps', - 'properties': {'uv_file_size': 67108864, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, - 'im': {'identification': 'mom.G777956.B2.1.CPC.inst.dps', - 'properties': {'im_file_size': 1000, 'nr_of_im_files': 1, 'start_sb_nr': 0}} - } - }, - <optionally more estimates> + The reply is something along the lines of the example below + (assumes task duration of 30 s, and typically 'im' is not in both input_files and output_files) + { + 'errors': [], + 'estimates': [ + { + 'resource_types': {'bandwidth': 2236995 * 20, 'storage': 67109864 * 20}, + 'resource_count': 1, 'root_resource_group': 'CEP4', + + # input resources not (yet) allocated: bandwidth only, but coupled to specific storage resource + 'input_files': { + 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', # w/ sap only if predecessor is an observation + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 20 'start_sb_nr': 0}}, + {'sap_nr': 3, 'identification': 'mom.G777955.B2.1.C.SAP003.uv.dps', # idem, >1 input SAP possible for e.g. the pulsar pipeline + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 20 'start_sb_nr': 20}} + ] + 'im': [{'identification': ..., # 'im' example; no 'im' input if predecessor is an observation as above + 'properties': {'im_file_size': 1000, 'nr_of_im_files': 20 'start_sb_nr': 0}}] + }, + 'output_files': { + 'uv': [{'identification': 'mom.G777956.B2.1.CPC.uv.dps', + 'properties': {'uv_file_size': 67108864, 'nr_of_uv_files': 40, 'start_sb_nr': 0}}], + 'im': [{'identification': 'mom.G777956.B2.1.CPC.inst.dps', + 'properties': {'im_file_size': 1000, 'nr_of_im_files': 20, 'start_sb_nr': 0}}] + } + }, + <optionally more estimates> ] + } + + The estimates key has a list to support observations with >1 data product type that can have + different properties (typically CS/IS (per SAP), but always used for observations w/ >1 SAP). + This is the reason that observations have an estimate per SAP and per data product type: + their resource_types values may be different. This is needed to employ resource_count > 1. + See the observation estimator for an extensive observation data product type example. + + For each estimate, the total output_files resources to be claimed is resource_count * resources_types. + Thus resource_types is a total across all output_files content. The idea is to keep this + singular per data product type (inner list len 1), but for pipelines this is not possible atm. + + Note that atm input_files resources are not included or claimed. + However, input_files properties must be added to resource claims to later generate the parset. + This caveat must be fixed at some point, but until then, we cannot have input_files-only estimates. + (After it is fixed, we should not have that either; it makes no sense.) + + For pipelines we don't support output to multiple storage areas atm, so resource_count is 1. + We still have to deal with input_files from an observation with >1 SAP (used for the pulsar pipeline). + For this case, we generate 1 estimate, but use a list per data product type (e.g. 'uv': [...]). + Also, we may need multiple data product types in one pipeline estimate, but there the reason + is that e.g. 'uv' and 'im' file(s) belong together, so we must produce one estimate per pair, + (but again, it's a pipeline so atm it is collapsed to a single estimate, i.e. resource_count 1). + The inner data product type list can be removed once pipelines also use resource_count > 1. + + Some RA_Services design aspects work well. Others fail to capture the underlying concepts close enough, hence inelegance. """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) @@ -102,20 +140,35 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): logger.error('Input_Correlated or Output_Correlated is not enabled') result['errors'].append('Input_Correlated or Output_Correlated is not enabled') - if result['errors']: - return result duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) - # NOTE: input bandwidth is not included in the resulting estimate atm. - # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... - #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + input_files = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv') + if not input_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + have_im_input = parset.getBool(DATAPRODUCTS + 'Input_InstrumentModel.enabled') if have_im_input: input_idents_im = parset.getStringVector(DATAPRODUCTS + 'Input_InstrumentModel.identifications') + input_files_im = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_im, 'im') + if not input_files_im: + logger.error('Input_InstrumentModel enabled, but missing \'im\' dataproducts in predecessor output_files') + result['errors'].append('Input_InstrumentModel enabled, but missing \'im\' dataproducts in predecessor output_files') + input_files['im'] = input_files_im['im'] + + if result['errors']: + return result + + estimate = {'input_files': input_files} + + # NOTE: input bandwidth is not included in the resulting estimate atm. + # Proper input bandwidth estimation has limited use atm and is tricky, because of pipeline duration est, tmp files, + # multiple passes, nr nodes and caching, but for sure also because bandwidth must be tied to *predecessor* storage! + #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') + output_ident_uv = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') ) output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') have_im_output = parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled') @@ -127,74 +180,62 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): logger.warn('storageClusterName differs between uv: \'%s\' and im: \'%s\': to be packed in 1 estimate, so ignoring \'im\' storageClusterName', output_cluster_uv, output_cluster_im) - for pred_est in predecessor_estimates: - pred_output_files = pred_est.get('output_files') - if pred_output_files is None: - continue - - idents = input_idents_uv - if have_im_input: - if 'im' in pred_output_files: - idents.extend(input_idents_im) - else: - logger.warn('Input_InstrumentModel enabled, but missing \'im\' dataproducts in predecessor output_files') - input_files = self._filterInputs(pred_output_files, idents) - if not input_files: - continue - - if 'uv' not in input_files: - logger.error('Missing uv dataproducts in predecessor output_files') - result['errors'].append('Missing uv dataproducts in predecessor output_files') - continue - - estimate = {'input_files': input_files} - - nr_input_files = input_files['uv']['properties']['nr_of_uv_files'] - uv_input_file_size = input_files['uv']['properties']['uv_file_size'] - start_sb_nr = input_files['uv']['properties']['start_sb_nr'] - - # TODO: This output file size calculation comes from the (old) Scheduler without explaining comments. - # The reason why it isn't a simple div, is that parts of the metadata are not reduced in size (and casacore storage mgrs). - # With reduction_factor 1, computed output size increases by 53%. Casacore storage mgrs may change size, but that much?!? - # If you can figure out what/how, please fix this calculation. Avoid unnamed magic values and document why! - logger.debug("calculate correlated data size") - new_size = uv_input_file_size / float(reduction_factor) - uv_output_file_size = int(new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0) - - nr_output_files = nr_input_files # pure 'map' (bijective) operation, no split or reduce - logger.info("correlated_uv: {}x {} files of {} bytes each".format(pred_est['resource_count'], nr_output_files, uv_output_file_size)) - estimate['output_files'] = {'uv': {'identification': output_ident_uv, - 'properties': {'nr_of_uv_files': nr_output_files, 'uv_file_size': uv_output_file_size, 'start_sb_nr': start_sb_nr}}} - data_size = uv_output_file_size - - # If instrument model output is needed, add it to the same estimate, - # since it must be written to the same storage as the uv output (same nr_output_files). - if have_im_output: - logger.info("calculate instrument-model data size") - im_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler - - logger.info("correlated_im: {}x {} files {} bytes each".format(pred_est['resource_count'], nr_output_files, im_file_size)) - estimate['output_files']['im'] = {'identification': output_ident_im, - 'properties': {'nr_of_im_files': nr_output_files, 'im_file_size': im_file_size, 'start_sb_nr': start_sb_nr}} - # FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export? - # Need to split averaging pipeline and calibration pipeline - data_size += im_file_size - - data_size *= nr_output_files # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} - estimate['resource_count'] = pred_est['resource_count'] - estimate['root_resource_group'] = output_cluster_uv - else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') - - result['estimates'].append(estimate) - - if not result['estimates'] and not result['errors']: - logger.error('calibration / averaging pipeline estimator produced no estimates') - result['errors'].append('calibration / averaging pipeline estimator produced no estimates') + # Observations can have multiple output estimates, but atm pipelines do not. + # (Reason: incomplete info avail and effective assigner claim merging is harder) + # As long as this is the case, try to do a best effort to map any predecessor (obs or pipeline) to single estimate output. + nr_input_files = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) + + # Assume all uv file sizes are the same size as in dict 0. For uv data, we never had pipelines with >1 dict, + # but this could be meaningful when averaging multiple SAPs in 1 go (and no further processing steps). + # (Never done, since subsequent pipeline steps must then also work on all SAPs. But averaging could be the last step.) + # The potential other case is >1 dict from different obs with different file sizes. + # In general, this requires >1 output est dict, which the estimate fmt allows, but atm is only used for observations. + uv_input_file_size = input_files['uv'][0]['properties']['uv_file_size'] + + # For start_sb_nr, take the minimum of all start_sb_nr values. + # This fails when the input identifications has a sparse SAP list, but that was never supported either. + # A likely setup where this could happen is LOTAAS+pulp, but pulp has no equivalent to start_sb[g]_nr, + # so solve here and in the pulsar pipeline pragmatically each. + start_sb_nr = min([uv_dict['properties']['start_sb_nr'] for uv_dict in input_files['uv']]) + + # TODO: This output file size calculation comes from the (old) Scheduler without explaining comments. + # The reason why it isn't a simple div, is that parts of the metadata are not reduced in size (and casacore storage mgrs). + # With reduction_factor 1, computed output size increases by 53%. Casacore storage mgrs may change size, but that much?!? + # If you can figure out what/how, please fix this calculation. Avoid unnamed magic values and document why! + logger.debug("calculate correlated data size") + new_size = uv_input_file_size / float(reduction_factor) + uv_output_file_size = int(new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0) + + nr_output_files = nr_input_files # pure 'map' (bijective) operation, no split or reduce + logger.info("correlated_uv: {} files of {} bytes each".format(nr_output_files, uv_output_file_size)) + estimate['output_files'] = {'uv': [{'identification': output_ident_uv, + 'properties': {'nr_of_uv_files': nr_output_files, 'uv_file_size': uv_output_file_size, 'start_sb_nr': start_sb_nr}}]} + data_size = uv_output_file_size + + # If instrument model output is needed, add it to the same estimate, + # since it must be written to the same storage as the uv output (same nr_output_files). + if have_im_output: + logger.info("calculate instrument-model data size") + im_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("correlated_im: {} files {} bytes each".format(nr_output_files, im_file_size)) + estimate['output_files']['im'] = [{'identification': output_ident_im, + 'properties': {'nr_of_im_files': nr_output_files, 'im_file_size': im_file_size, 'start_sb_nr': start_sb_nr}}] + # FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export? + # Need to split averaging pipeline and calibration pipeline + data_size += im_file_size + + data_size *= nr_output_files # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = 1 + estimate['root_resource_group'] = output_cluster_uv + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index 810be6a8839e869688392a6c4f31d0bbcb645f24..38e5e9d4b0f1624dd022d5b726e0151cd1b29ed5 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -23,7 +23,6 @@ import logging from math import ceil from base_pipeline_estimator import BasePipelineResourceEstimator -from lofar.parameterset import parameterset logger = logging.getLogger(__name__) @@ -51,7 +50,7 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator): PIPELINE + 'Imaging.subbands_per_image') def _calculate(self, parset, predecessor_estimates): - """ Estimate for Imaging Pipeline. Also gets used for MSSS Imaging Pipeline + """ Estimate for imaging pipeline step. Also used for MSSS imaging pipeline. calculates: datasize (number of files, file size), bandwidth For a predecessor_estimates example, see the calibration/averaging @@ -70,74 +69,61 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator): slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) if slices_per_image < 1 or subbands_per_image < 1: - logger.error('slices_per_image or subbands_per_image are not valid') - result['errors'].append('slices_per_image or subbands_per_image are not valid') + logger.error('slices_per_image or subbands_per_image is not valid') + result['errors'].append('slices_per_image or subbands_per_image is not valid') if not parset.getBool(DATAPRODUCTS + 'Output_SkyImage.enabled'): logger.error('Output_SkyImage is not enabled') result['errors'].append('Output_SkyImage is not enabled') - if result['errors']: - return result duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + input_files = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv') + if not input_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + + if result['errors']: + return result + + estimate = {'input_files': input_files} + # NOTE: input bandwidth is not included in the resulting estimate atm. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') output_ident_img = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_SkyImage.identifications') ) output_cluster_img = parset.getString(DATAPRODUCTS + 'Output_SkyImage.storageClusterName') - for pred_est in predecessor_estimates: - pred_output_files = pred_est.get('output_files') - if pred_output_files is None: - continue - - input_files = self._filterInputs(pred_output_files, input_idents_uv) - if not input_files: - continue - - if 'uv' not in input_files: - logger.error('Missing uv dataproducts in predecessor output_files') - result['errors'].append('Missing uv dataproducts in predecessor output_files') - continue - - estimate = {'input_files': input_files} - - total_nr_input_subbands = nr_uv_files * pred_est['resource_count'] - if total_nr_input_subbands % (subbands_per_image * slices_per_image) > 0: - logger.error('slices_per_image and subbands_per_image not a multiple of number of inputs') - result['errors'].append('slices_per_image and subbands_per_image not a multiple of number of inputs') - continue - nr_images = total_nr_input_subbands / (subbands_per_image * slices_per_image) - - logger.debug("calculate sky image data size") - nr_uv_files = input_files['uv']['properties']['nr_of_uv_files'] - uv_file_size = input_files['uv']['properties']['uv_file_size'] # same for each uv data product across all SAPs - img_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler - - logger.info("sky_images: {} files {} bytes each".format(nr_images, img_file_size)) - estimate['output_files'] = {'img': {'identification': output_ident_img, - 'properties': {'nr_of_img_files': 1, # also see estimate['resource_count'] below - 'img_file_size': img_file_size}}} - - # count total data size - data_size = img_file_size # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} - estimate['resource_count'] = nr_images # as such, potentially different resources can be allocated for each output - estimate['root_resource_group'] = output_cluster_img - else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') - - result['estimates'].append(estimate) - - if not result['estimates'] and not result['errors']: - logger.error('imaging pipeline estimator produced no estimates') - result['errors'].append('imaging pipeline estimator produced no estimates') + # See the calibration pipeline estimator for why this is done in this way atm. + nr_input_subbands = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) + uv_file_size = input_files['uv'][0]['properties']['uv_file_size'] + if nr_input_subbands % (subbands_per_image * slices_per_image) > 0: + logger.error('slices_per_image and subbands_per_image not a multiple of number of inputs') + result['errors'].append('slices_per_image and subbands_per_image not a multiple of number of inputs') + nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) + + logger.debug("calculate sky image data size") + img_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("sky_images: {} files {} bytes each".format(nr_images, img_file_size)) + estimate['output_files'] = {'img': {'identification': output_ident_img, + 'properties': {'nr_of_img_files': nr_images, + 'img_file_size': img_file_size}}} + + # count total data size + data_size = nr_images * img_file_size # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = 1 + estimate['root_resource_group'] = output_cluster_img + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index b898f42c43714597320efc656d03ae3fc240b479..c7f65b47a6d7c030ea281e566a447538f590c832 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -51,7 +51,7 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator): def _calculate(self, parset, predecessor_estimates): - """ Estimate for Long Baseline Pipeline + """ Estimator for long baseline pipeline step. calculates: datasize (number of files, file size), bandwidth For a predecessor_estimates example, see the calibration/averaging @@ -68,78 +68,66 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator): subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) - if not subbandgroups_per_ms or not subbands_per_subbandgroup: - logger.error('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') - result['errors'].append('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') + if subbandgroups_per_ms < 1 or subbands_per_subbandgroup < 1: + logger.error('subbandgroups_per_ms or subbands_per_subbandgroup is not valid') + result['errors'].append('subbandgroups_per_ms or subbands_per_subbandgroup is not valid') if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): logger.error('Output_Correlated is not enabled') result['errors'].append('Output_Correlated is not enabled') - if result['errors']: - return result duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') + input_files = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv') + if not input_files: + logger.error('Missing uv dataproducts in predecessor output_files') + result['errors'].append('Missing uv dataproducts in predecessor output_files') + + if result['errors']: + return result + + estimate = {'input_files': input_files} + # NOTE: input bandwidth is not included in the resulting estimate atm. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') - input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') output_ident_uv = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') ) output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') - for pred_est in predecessor_estimates: - pred_output_files = pred_est.get('output_files') - if pred_output_files is None: - continue - - input_files = self._filterInputs(pred_output_files, input_idents_uv) - if not input_files: - continue - - if 'uv' not in input_files: - logger.error('Missing uv dataproducts in predecessor output_files') - result['errors'].append('Missing uv dataproducts in predecessor output_files') - continue - - estimate = {'input_files': input_files} - - nr_input_files = input_files['uv']['properties']['nr_of_uv_files'] - uv_input_file_size = input_files['uv']['properties']['uv_file_size'] # same for each uv data product across all SAPs - start_sb_nr = input_files['uv']['properties']['start_sb_nr'] - - if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0: - logger.error('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') - result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') - continue - total_nr_input_files = nr_input_files * pred_est['resource_count'] - nr_output_files = total_nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) - - logger.debug("calculate correlated data size") - uv_output_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler - start_sbg_nr = start_sb_nr / (subbands_per_subbandgroup * subbandgroups_per_ms) - - logger.info("correlated_uv: {} files {} bytes each".format(nr_output_files, uv_output_file_size)) - estimate['output_files'] = {'uv': {'identification': output_ident_uv, - 'properties': {'nr_of_uv_files': 1, # also see estimate['resource_count'] below - 'uv_file_size': uv_output_file_size, - 'start_sbg_nr': start_sbg_nr}}} - - # count total data size - data_size = nr_output_files * uv_output_file_size # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} - estimate['resource_count'] = nr_output_files # as such, potentially different resources can be allocated for each output - estimate['root_resource_group'] = output_cluster_uv - else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') - - result['estimates'].append(estimate) - - if not result['estimates'] and not result['errors']: - logger.error('long baseline estimator produced no estimates') - result['errors'].append('long baseline pipeline estimator produced no estimates') + # See the calibration pipeline estimator for why this is done in this way atm. + nr_input_files = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) + uv_input_file_size = input_files['uv'][0]['properties']['uv_file_size'] + start_sb_nr = min([uv_dict['properties']['start_sb_nr'] for uv_dict in input_files['uv']]) + + if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0: + logger.error('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) + + logger.debug("calculate correlated data size") + uv_output_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + start_sbg_nr = start_sb_nr / (subbands_per_subbandgroup * subbandgroups_per_ms) + + logger.info("correlated_uv: {} files {} bytes each".format(nr_output_files, uv_output_file_size)) + estimate['output_files'] = {'uv': {'identification': output_ident_uv, + 'properties': {'nr_of_uv_files': nr_output_files, + 'uv_file_size': uv_output_file_size, + 'start_sbg_nr': start_sbg_nr}}} + + # count total data size + data_size = nr_output_files * uv_output_file_size # bytes + if data_size: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = 1 + estimate['root_resource_group'] = output_cluster_uv + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) return result + diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index b03168803ccc95064f365b52157409593d5ecd17..fed49cb70c20545cda21a786c9c92019282654ea 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -64,50 +64,51 @@ class ObservationResourceEstimator(BaseResourceEstimator): The predecessor_estimates arg is just to implement the same interface as pipelines. Observations have no predecessor. The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters. - NOTE: 'nr_of_XX_files' is for that estimate, times the 'resource_count' (hence each estimates contains 1 SAP). - 'nr_of_cs_parts' is for a CS TAB (per stokes component) in that SAP, not per estimate. - More examples at scu001:/opt/lofar/var/log/raestimatorservice.log + NOTE: 'nr_of_XX_files' is for that SAP estimate. The total is thus times the 'resource_count'. + 'nr_of_cs_parts' is for a full CS TAB (per stokes component) in that SAP; not per estimate, which may still describe one part. + + See the calibration pipeline estimator for some explanation on why parts of this format are needed atm. It also has input_files. { 'errors': [], 'estimates': [{ 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # for each uv output data prod (thus the total is times the resource_count val) 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}} + 'uv': [{'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}] } }, {'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # idem 'resource_count': 60, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}} + 'uv': [{'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}}] } }, {'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # idem 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { - 'uv': {'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}} + 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}}] } }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # for each quad (4 stokes) of cs output tab part (thus the total is times the resource_count val) 'resource_count': 34, 'root_resource_group': 'DRAGNET', 'output_files': { - 'cs': {'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.cs.dps', - 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, - 'nr_of_cs_parts': 2}} # parts per tab for this sap + 'cs': [{'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.cs.dps', + 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, + 'nr_of_cs_parts': 2}}] # parts per tab for this sap } }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # idem 'resource_count': 6, 'root_resource_group': 'DRAGNET', 'output_files': { - 'cs': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.cs.dps', - 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, - 'nr_of_cs_parts': 1, 'is_tab_nr': 0}} # parts per tab for this sap + 'cs': [{'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.cs.dps', + 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, + 'nr_of_cs_parts': 1, 'is_tab_nr': 0}}] # parts per tab for this sap } }, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # for each 'is' output tab part (thus the total is times the resource_count val) 'resource_count': 1, 'root_resource_group': 'DRAGNET', 'output_files': { - 'is': {'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.is.dps', - 'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, 'nr_of_is_stokes': 1, - 'is_tab_nr': 0}} # IS can have >1 parts, but atm max 1 IS TAB per SAP + 'is': [{'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.is.dps', + 'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, 'nr_of_is_stokes': 1, + 'is_tab_nr': 0}}] # IS can have >1 parts, but atm max 1 IS TAB per SAP } }] } @@ -201,9 +202,9 @@ class ObservationResourceEstimator(BaseResourceEstimator): est = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size}, 'resource_count': nr_subbands, 'root_resource_group': root_resource_group, - 'output_files': {'uv': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], - 'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # i.e. total nr_of_uv_files is resource_count times 1 - 'start_sb_nr': total_files}}}} + 'output_files': {'uv': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # i.e. total nr_of_uv_files is resource_count times 1 + 'start_sb_nr': total_files}}]}} total_files += nr_subbands estimates.append(est) @@ -298,11 +299,11 @@ class ObservationResourceEstimator(BaseResourceEstimator): est = {'resource_types': {'storage': storage, 'bandwidth': bandwidth}, 'resource_count': nr_coherent_tabs * nr_parts_per_tab, 'root_resource_group': root_resource_group, - 'output_files': {'cs': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], - 'properties': {'cs_file_size': file_size, 'nr_of_cs_files': nr_coherent, - 'nr_of_cs_stokes': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}}} + 'output_files': {'cs': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'cs_file_size': file_size, 'nr_of_cs_files': nr_coherent, + 'nr_of_cs_stokes': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}]}} if is_tab_nr != -1: # translator to filenames needs to know: it may not have all CS+IS info in one claim - est['output_files']['cs']['properties']['is_tab_nr'] = is_tab_nr + est['output_files']['cs'][0]['properties']['is_tab_nr'] = is_tab_nr estimates.append(est) logger.debug("Coherent Stokes data estimates: {}".format(estimates)) @@ -384,9 +385,9 @@ class ObservationResourceEstimator(BaseResourceEstimator): est = {'resource_types': {'storage': storage, 'bandwidth': bandwidth}, 'resource_count': nr_incoherent_tabs * nr_parts_per_tab, 'root_resource_group': root_resource_group, - 'output_files': {'is': {'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], - 'properties': {'is_file_size': file_size, 'nr_of_is_files': nr_incoherent, - 'nr_of_is_stokes': nr_incoherent, 'is_tab_nr': is_tab_nr}}}} + 'output_files': {'is': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], + 'properties': {'is_file_size': file_size, 'nr_of_is_files': nr_incoherent, + 'nr_of_is_stokes': nr_incoherent, 'is_tab_nr': is_tab_nr}}]}} estimates.append(est) logger.debug("Incoherent Stokes data estimates: {}".format(estimates)) @@ -418,6 +419,20 @@ class ObservationResourceEstimator(BaseResourceEstimator): logger.info("number of virtual stations = {}".format(nr_virtual_stations)) return nr_virtual_stations + def _extract_sap_nr(self, identification): + """ Return sap nr as int from identification or None if + no int xxx in '.SAPxxx.' in identification. + """ + for s in identification.split('.'): # Find the SAP number, if present + if 'SAP' not in s: + continue + try: + return int(s[3:]) + except: + pass + + return None + def _sap_identifications(self, identifications, nr_saps): """ Return list with identifications' identification for sap i at index i, or '' at index i if no such identification for sap i. diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index 4a8b6b6a9031b8fd5e1063acc589b97ed19e2651..bf0cf0a89c0365c9637fd63893f14c9a4db1197f 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -68,74 +68,71 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator): if not parset.getBool(DATAPRODUCTS + 'Output_Pulsar.enabled'): logger.error('Output_Pulsar is not enabled') result['errors'].append('Output_Pulsar is not enabled') - if result['errors']: - return result duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + # The current XML generator can produce a pulsar pipeline step that operates on 1 SAP, + # however, pulsar astronomers produce an XML that works on all SAPs (CS+IS) of an obs. + # This is regular and going on for years, so we need to support multi-SAP pulp input. + # Note that when selecting obs SAP nr > 0 or a sparse SAP nr range, TAB nrs in pulp filenames do not + # match TAB nrs in obs filenames, because there is no pulp equiv of start_sb[g]_nr. Nobody cares. + # (Then there is the possibility of multi-obs pulp input. As-is that may turn out to work as well.) + input_files = {} + input_idents_cs = parset.getStringVector(DATAPRODUCTS + 'Input_CoherentStokes.identifications') + input_files_cs = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_cs, 'cs') + if input_files_cs: + input_files = input_files_cs['cs'] + input_idents_is = parset.getStringVector(DATAPRODUCTS + 'Input_IncoherentStokes.identifications') + input_files_is = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_is, 'is') + if input_files_is: + input_files = input_files_is['is'] + if not input_files: + logger.error('Missing \'cs\' or \'is\' dataproducts in predecessor output_files') + result['errors'].append('Missing \'cs\' or \'is\' dataproducts in predecessor output_files') + + if result['errors']: + return result + + estimate = {'input_files': input_files} + # NOTE: input bandwidth is not included in the resulting estimate atm. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_cs = parset.getString(DATAPRODUCTS + 'Input_CoherentStokes.storageClusterName') #input_cluster_is = parset.getString(DATAPRODUCTS + 'Input_IncoherentStokes.storageClusterName') - input_idents_cs = parset.getStringVector(DATAPRODUCTS + 'Input_CoherentStokes.identifications') - input_idents_is = parset.getStringVector(DATAPRODUCTS + 'Input_IncoherentStokes.identifications') output_ident_pulp = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Pulsar.identifications') ) output_cluster_pulp = parset.getString(DATAPRODUCTS + 'Output_Pulsar.storageClusterName') - for pred_est in predecessor_estimates: - pred_output_files = pred_est.get('output_files') - if pred_output_files is None: - continue - - idents = [] - if 'cs' in pred_output_files: - idents.extend(input_idents_cs) - if 'is' in pred_output_files: - idents.extend(input_idents_is) - input_files = self._filterInputs(pred_output_files, idents) - if not input_files: - continue - - if not 'cs' in input_files and not 'is' in input_files: - logger.error('Missing both CS and IS dataproducts in input_files') - result['errors'].append('Missing both CS and IS dataproducts in input_files') - - estimate = {'input_files': input_files} - - # It seems 1 pulp data product is produced per beam, also with stokes IQUV or with complex voltages (XXYY). - nr_output_files = 0 - if 'cs' in input_files: - nr_output_files += input_files['cs']['properties']['nr_of_cs_files'] / \ - input_files['cs']['properties']['nr_of_cs_stokes'] - if 'is' in input_files: - nr_output_files += input_files['is']['properties']['nr_of_is_files'] / \ - input_files['is']['properties']['nr_of_is_stokes'] - - logger.debug("calculate pulp data size") - pulp_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler - - logger.info("pulsar_pipeline pulp: {}x {} files {} bytes each".format(pred_est['resource_count'], nr_output_files, pulp_file_size)) - estimate['output_files'] = {'pulp': {'identification': output_ident_pulp, - 'properties': {'nr_of_pulp_files': nr_output_files, # times pred_est['resource_count'] in total - 'pulp_file_size': pulp_file_size}}} - - # count total data size - data_size = nr_output_files * pulp_file_size - if data_size > 0: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} - estimate['resource_count'] = pred_est['resource_count'] - estimate['root_resource_group'] = output_cluster_pulp - else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') - - result['estimates'].append(estimate) - - if not result['estimates'] and not result['errors']: - logger.error('pulsar pipeline estimator produced no estimates') - result['errors'].append('pulsar pipeline estimator produced no estimates') + # It seems 1 pulp data product is produced per beam, also with stokes IQUV or with complex voltages (XXYY). + nr_input_files = 0 + if 'cs' in input_files: + nr_input_files += sum([cs_dict['properties']['nr_of_cs_files'] / \ + cs_dict['properties']['nr_of_cs_stokes'] for cs_dict in input_files['cs']]) + if 'is' in input_files: + nr_input_files += sum([is_dict['properties']['nr_of_is_files'] / \ + is_dict['properties']['nr_of_is_stokes'] for is_dict in input_files['is']]) + + logger.debug("calculate pulp data size") + pulp_file_size = 1000 # TODO: 1 kB was hardcoded in the Scheduler + + logger.info("pulsar_pipeline pulp: {} files {} bytes each".format(nr_input_files, pulp_file_size)) + estimate['output_files'] = {'pulp': {'identification': output_ident_pulp, + 'properties': {'nr_of_pulp_files': nr_input_files, + 'pulp_file_size': pulp_file_size}}} + + # count total data size + data_size = nr_input_files * pulp_file_size + if data_size > 0: + bandwidth = int(ceil(8 * data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + estimate['resource_count'] = 1 + estimate['root_resource_group'] = output_cluster_pulp + else: + logger.error('An estimate of zero was calculated!') + result['errors'].append('An estimate of zero was calculated!') + + result['estimates'].append(estimate) return result +