diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py index acc981ab46c5554c6d5c6e72dc3c26d9643ef424..4d9bfacef5d84877d48fe7893d0a5ee11ba3b11b 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_pipeline_estimator.py @@ -46,7 +46,7 @@ class BasePipelineResourceEstimator(BaseResourceEstimator): except Exception as e: logger.error(e) logger.info("Could not get duration from parset, returning default pipeline duration of 1 hour") - return 3600 + return 3600.0 def _getOutputIdentification(self, identifications): """ For pipeline output, there must be exactly 1 (non-duplicate) identification string per diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index ee83de541ee2d444bf184038f65e0882ef72fd02..4a13c7deb62e10b50c162f638c8b6a1a58011d79 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -34,7 +34,7 @@ logger = logging.getLogger(__name__) class BaseResourceEstimator(object): - """ Base class for all other resource estimater classes + """ Base class for all other resource estimator classes """ def __init__(self, name): self.name = name @@ -51,11 +51,14 @@ class BaseResourceEstimator(object): return True def _getDuration(self, start, end): + """ Returns number of fractional seconds as a float(!) (as totalSeconds()) + between start and end. + """ startTime = parseDatetime(start) endTime = parseDatetime(end) if startTime >= endTime: logger.warning("startTime is not before endTime") - return 1 ##TODO To prevent divide by zero later + return 1.0 ##TODO To prevent divide by zero later return totalSeconds(endTime - startTime) #TODO check if this makes duration = int(parset.get('duration', 0)) as a key reduntant? @@ -81,7 +84,7 @@ class BaseResourceEstimator(object): input_files[dptype].append(copy.deepcopy(dt_values)) # Observation estimates have resource_count > 1 to be able to assign each output to another resource, - # but that is not supported atm for pipelines. We only use input params to produce parset filenames etc, + # but that is currently not supported for pipelines. We only use input parameters to produce parset filenames etc, # but not to reserve resources (not covered by resource count). Collapse to implied resource_count of 1. input_files[dptype][-1]['properties']['nr_of_' + dptype + '_files'] *= predecessor_estimate['resource_count'] return True @@ -90,7 +93,7 @@ class BaseResourceEstimator(object): def get_inputs_from_predecessors(self, predecessor_estimates, identifications, dptype): """ Return copy of parts with dptype in predecessor_estimates matching identifications - If any of any of identifications could not be found, the empty dict is returned. + If any of identifications could not be found, the empty dict is returned. dptype is one of the observation/pipeline data product types, e.g. 'uv', 'cs', 'pulp', ... No duplicates in the identifications iterable! diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index 30a791c91fd8c78bbfd453664a0ab6892cb35908..790019962c6344b2b019c3beb18ab60455c21905 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -63,8 +63,15 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', - 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}, - {'sap_nr': 3, 'identification': 'mom.G777955.B2.1.C.SAP003.uv.dps', + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}} + ] + } + }, + { + 'resource_types': {'bandwidth': 286331153, 'storage': 1073741824}, # per 'uv' dict + 'resource_count': 20, 'root_resource_group': 'CEP4', + 'output_files': { + 'uv': [{'sap_nr': 3, 'identification': 'mom.G777955.B2.1.C.SAP003.uv.dps', 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}} ] } @@ -81,7 +88,8 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): 'resource_types': {'bandwidth': 2236995 * 20, 'storage': 67109864 * 20}, 'resource_count': 1, 'root_resource_group': 'CEP4', - # input resources not (yet) allocated: bandwidth only, but coupled to specific storage resource + # Note that the 2 predecessor estimates have been converted into an input 'uv' list. This works, + # as long as input resources are not (yet) scheduled. Currently, resource_* values apply to output_files only. 'input_files': { 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', # w/ sap only if predecessor is an observation 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 20, 'start_sb_nr': 0}}, @@ -110,19 +118,19 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): For each estimate, the total output_files resources to be claimed is resource_count * resources_types. Thus resource_types is a total across all output_files content. The idea is to keep this - singular per data product type (inner list len 1), but for pipelines this is not possible atm. + singular per data product type (inner list size 1), but for pipelines this is currently not possible. - Note that atm input_files resources are not included or claimed. - However, input_files properties must be added to resource claims to later generate the parset. + Note that input_files resources are currently not included or claimed. + However, input_files properties must be added to resource claims to later generate parset values. This caveat must be fixed at some point, but until then, we cannot have input_files-only estimates. - (After it is fixed, we should not have that either; it makes no sense.) + (After it is fixed, we should not have input_files-only estimates either; it makes no sense.) - For pipelines we don't support output to multiple storage areas atm, so resource_count is 1. + For pipelines we currently do not support output to multiple storage areas, so resource_count is 1. We still have to deal with input_files from an observation with >1 SAP (used for the pulsar pipeline). For this case, we generate 1 estimate, but use a list per data product type (e.g. 'uv': [...]). Also, we may need multiple data product types in one pipeline estimate, but there the reason - is that e.g. 'uv' and 'im' file(s) belong together, so we must produce one estimate per pair, - (but again, it's a pipeline so atm it is collapsed to a single estimate, i.e. resource_count 1). + is that e.g. 'uv' and 'im' files belong together, so we produce one estimate per pair, + (but again, it is a pipeline so currently it is collapsed to a single estimate, thus resource_count 1). The inner data product type list can be removed once pipelines also use resource_count > 1. Some RA_Services design aspects work well. Others fail to capture the underlying concepts close enough, hence inelegance. @@ -147,10 +155,11 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): parset.getString('Observation.stopTime')) input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications') - input_files = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv') - if not input_files: + input_files_uv = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv') + if not input_files_uv: logger.error('Missing uv dataproducts in predecessor output_files') result['errors'].append('Missing uv dataproducts in predecessor output_files') + input_files = input_files_uv have_im_input = parset.getBool(DATAPRODUCTS + 'Input_InstrumentModel.enabled') if have_im_input: @@ -166,8 +175,8 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): estimate = {'input_files': input_files} - # NOTE: input bandwidth is not included in the resulting estimate atm. - # Proper input bandwidth estimation has limited use atm and is tricky, because of pipeline duration est, tmp files, + # NOTE: input bandwidth is currently not included in the resulting estimate. + # Proper input bandwidth estimation has limited use currently and is tricky, because of pipeline duration estimation, tmp files, # multiple passes, nr nodes and caching, but for sure also because bandwidth must be tied to *predecessor* storage! #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') @@ -179,19 +188,20 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): output_cluster_im = parset.getString(DATAPRODUCTS + 'Output_InstrumentModel.storageClusterName') if output_cluster_uv != output_cluster_im: - logger.warn('storageClusterName differs between uv: \'%s\' and im: \'%s\': to be packed in 1 estimate, so ignoring \'im\' storageClusterName', - output_cluster_uv, output_cluster_im) + logger.error('Output_InstrumentModel is enabled, but its storageClusterName \'%s\' differs from Output_Correlated.storageClusterName \'%s\'', + output_cluster_uv, output_cluster_im) + result['errors'].append('Output_InstrumentModel is enabled, but its storageClusterName \'%s\' differs from Output_Correlated.storageClusterName \'%s\'' % (output_cluster_im, output_cluster_uv)) - # Observations can have multiple output estimates, but atm pipelines do not. - # (Reason: incomplete info avail and effective assigner claim merging is harder) - # As long as this is the case, try to do a best effort to map any predecessor (obs or pipeline) to single estimate output. + # Observations can have multiple output estimates, but currently pipelines do not. + # (Reason: incomplete info available and effective assigner claim merging is harder) + # As long as this is the case, try to do a best effort to map any predecessor (observation or pipeline) to single estimate output. nr_input_files = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) # Assume all uv file sizes are the same size as in dict 0. For uv data, we never had pipelines with >1 dict, # but this could be meaningful when averaging multiple SAPs in 1 go (and no further processing steps). # (Never done, since subsequent pipeline steps must then also work on all SAPs. But averaging could be the last step.) - # The potential other case is >1 dict from different obs with different file sizes. - # In general, this requires >1 output est dict, which the estimate fmt allows, but atm is only used for observations. + # The potential other case is >1 dict from different observations with different file sizes. + # In general, this requires >1 output estimate dict, which the estimate format allows, but is currently only used for observations. uv_input_file_size = input_files['uv'][0]['properties']['uv_file_size'] # For start_sb_nr, take the minimum of all start_sb_nr values. @@ -201,8 +211,8 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): start_sb_nr = min([uv_dict['properties']['start_sb_nr'] for uv_dict in input_files['uv']]) # TODO: This output file size calculation comes from the (old) Scheduler without explaining comments. - # The reason why it isn't a simple div, is that parts of the metadata are not reduced in size (and casacore storage mgrs). - # With reduction_factor 1, computed output size increases by 53%. Casacore storage mgrs may change size, but that much?!? + # The reason why it isn't a simple division, is that parts of the metadata are not reduced in size (and casacore storage managers). + # With reduction_factor 1, computed output size increases by 53%... Casacore storage managers may change size, but that much?!? # If you can figure out what/how, please fix this calculation. Avoid unnamed magic values and document why! logger.debug("calculate correlated data size") new_size = uv_input_file_size / float(reduction_factor) @@ -227,15 +237,15 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator): # Need to split averaging pipeline and calibration pipeline data_size += im_file_size - data_size *= nr_output_files # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + total_data_size = data_size * nr_output_files # bytes + if total_data_size: + bandwidth = int(ceil(8 * total_data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': total_data_size} estimate['resource_count'] = 1 estimate['root_resource_group'] = output_cluster_uv else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') + logger.error('Estimated total data size is zero!') + result['errors'].append('Estimated total data size is zero!') result['estimates'].append(estimate) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index 960c38330fe3940bce7dd7700b587bc0f2e8abef..ea0c31da256f0eae40d566a80314b50433c7c353 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -89,14 +89,14 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator): estimate = {'input_files': input_files} - # NOTE: input bandwidth is not included in the resulting estimate atm. + # NOTE: input bandwidth is currently not included in the resulting estimate. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') output_ident_img = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_SkyImage.identifications') ) output_cluster_img = parset.getString(DATAPRODUCTS + 'Output_SkyImage.storageClusterName') - # See the calibration pipeline estimator for why this is done in this way atm. + # See the calibration pipeline estimator for why this is currently done this way. nr_input_subbands = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) uv_file_size = input_files['uv'][0]['properties']['uv_file_size'] if nr_input_subbands % (subbands_per_image * slices_per_image) > 0: @@ -112,16 +112,15 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator): 'properties': {'nr_of_img_files': nr_images, 'img_file_size': img_file_size}}]} - # count total data size - data_size = nr_images * img_file_size # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + total_data_size = nr_images * img_file_size # bytes + if total_data_size: + bandwidth = int(ceil(8 * total_data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': total_data_size} estimate['resource_count'] = 1 estimate['root_resource_group'] = output_cluster_img else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') + logger.error('Estimated total data size is zero!') + result['errors'].append('Estimated total data size is zero!') result['estimates'].append(estimate) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index e74b8f8efa486bfaa9960d2974836058025f6511..0e17f26a0bba9d512590431c688d821bae206ab3 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -89,14 +89,14 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator): estimate = {'input_files': input_files} - # NOTE: input bandwidth is not included in the resulting estimate atm. + # NOTE: input bandwidth is currently not included in the resulting estimate. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName') output_ident_uv = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') ) output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') - # See the calibration pipeline estimator for why this is done in this way atm. + # See the calibration pipeline estimator for why this is currently done this way. nr_input_files = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']]) uv_input_file_size = input_files['uv'][0]['properties']['uv_file_size'] start_sb_nr = min([uv_dict['properties']['start_sb_nr'] for uv_dict in input_files['uv']]) @@ -119,16 +119,15 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator): 'uv_file_size': uv_output_file_size, 'start_sbg_nr': start_sbg_nr}}]} - # count total data size - data_size = nr_output_files * uv_output_file_size # bytes - if data_size: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + total_data_size = nr_output_files * uv_output_file_size # bytes + if total_data_size: + bandwidth = int(ceil(8 * total_data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': total_data_size} estimate['resource_count'] = 1 estimate['root_resource_group'] = output_cluster_uv else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') + logger.error('Estimated total data size is zero!') + result['errors'].append('Estimated total data size is zero!') result['estimates'].append(estimate) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index f0f4753c2ad0a4ff7edcf5ea2197b6ba3636ab7a..11a348796e478344bb1d1bfb72c657d9fb3256f5 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -21,6 +21,7 @@ # $Id$ import logging +import pprint from math import ceil from base_resource_estimator import BaseResourceEstimator from lofar.stationmodel.antennasets_parser import AntennaSetsParser @@ -66,17 +67,17 @@ class ObservationResourceEstimator(BaseResourceEstimator): def _calculate(self, parset, predecessor_estimates=[]): """ Calculate the resources needed by the different data product types that can be in a single observation. - The predecessor_estimates arg is just to implement the same interface as pipelines. Observations have no predecessor. + The predecessor_estimates argument is just to implement the same interface as pipelines. Observations have no predecessor. - The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters. + The following return value example is for an observation duration of 240.0 s and 3 data product types for 2 clusters. NOTE: 'nr_of_XX_files' is for that SAP estimate. The total is thus times the 'resource_count'. 'nr_of_cs_parts' is for a full CS TAB (per stokes component) in that SAP; not per estimate, which may still describe one part. - See the calibration pipeline estimator for some explanation on why parts of this format are needed atm. It also has input_files. + See the calibration pipeline estimator for some explanation on why parts of this format are currently needed. It also has input_files. { 'errors': [], 'estimates': [{ - 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # for each uv output data prod (thus the total is times the resource_count val) + 'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # for each uv output data product (thus the total is times the resource_count value) 'resource_count': 20, 'root_resource_group': 'CEP4', 'output_files': { 'uv': [{'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.uv.dps', @@ -94,7 +95,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): 'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}}] } - }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # for each quad (4 stokes) of cs output tab part (thus the total is times the resource_count val) + }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # for each quad (4 stokes) of cs output tab part (thus the total is times the resource_count value) 'resource_count': 34, 'root_resource_group': 'DRAGNET', 'output_files': { 'cs': [{'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.cs.dps', @@ -108,79 +109,82 @@ class ObservationResourceEstimator(BaseResourceEstimator): 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4, 'nr_of_cs_parts': 1, 'is_tab_nr': 0}}] # parts per tab for this sap } - }, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # for each 'is' output tab part (thus the total is times the resource_count val) + }, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # for each 'is' output tab part (thus the total is times the resource_count value) 'resource_count': 1, 'root_resource_group': 'DRAGNET', 'output_files': { 'is': [{'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.is.dps', 'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, 'nr_of_is_stokes': 1, - 'is_tab_nr': 0}}] # IS can have >1 parts, but atm max 1 IS TAB per SAP + 'is_tab_nr': 0}}] # IS can have >1 parts, but currently max 1 IS TAB per SAP } }] } """ logger.info("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - # NOTE: obs estimates appear quite accurate. Diffs come mostly from Observation.stopTime + # NOTE: observation estimates appear quite accurate. Most of the difference comes from Observation.stopTime # being planned instead of real stop time, because of Cobalt block size not being exactly 1.0 s. - duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + duration = self._getDuration(parset.getString('Observation.startTime'), + parset.getString('Observation.stopTime')) errors = [] estimates = [] - uv_estimates = self.correlated(parset, duration) - if uv_estimates is None: - errors.append('empty UV resource estimates!') - logger.error('resource estimates for UV data is empty!') - else: - estimates.extend(uv_estimates) - cs_estimates = self.coherentstokes(parset, duration) - if cs_estimates is None: - errors.append('empty CS resource estimate!') - logger.error('resource estimate for CS data is empty!') - else: - estimates.extend(cs_estimates) - is_estimates = self.incoherentstokes(parset, duration) - if is_estimates is None: - errors.append('empty IS resource estimate!') - logger.error('resource estimate for IS data is empty!') - else: - estimates.extend(is_estimates) - station_estimates = self.stations(parset) - if station_estimates is None: - errors.append('empty STATION resource estimate!') - logger.error('resource estimate for STATIONS is empty!') - else: - estimates.extend(station_estimates) + try: + if parset.getBool('Observation.DataProducts.Output_Correlated.enabled'): + estimates.extend(self.correlated(parset, duration)) + except ValueError as exc: + logger.error(exc) + errors.append(str(exc)) + + try: + if parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'): + estimates.extend(self.coherentstokes(parset, duration)) + except ValueError as exc: + logger.error(exc) + errors.append(str(exc)) + + try: + if parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'): + estimates.extend(self.incoherentstokes(parset, duration)) + except ValueError as exc: + logger.error(exc) + errors.append(str(exc)) if not estimates: - errors.append('Produced observation resource estimate list is empty!') - logger.error('empty observation resource estimate list!') + logger.error('no data product estimates in observation resource estimate list!') + errors.append('Produced observation resource estimate list has no data product estimates!') + + try: + estimates.extend(self.stations(parset)) + except ValueError as exc: + logger.error(exc) + errors.append(str(exc)) - logger.debug('Observation resource estimate(s): {}'.format(estimates)) + logger.debug('Observation resource estimates:\n' + pprint.pformat(estimates)) result = {'errors': errors, 'estimates': estimates} return result def correlated(self, parset, duration): - """ Estimate storage size and bandwidth needed for correlated (i.e. uv) + """ Estimate storage size and bandwidth needed for correlated ('uv') data products. Also add SAP properties needed by the propagator. - Return list of estimates, max 1 SAP per estimate (easier for assigner). + The duration argument is a float in (fractional) seconds. + Return list of estimates, max 1 SAP per estimate (easier for assigner), + or raise ValueError on error. """ - if not parset.getBool('Observation.DataProducts.Output_Correlated.enabled'): - logger.info("correlated data output not enabled") - return [] + logger.info("calculating correlated data size") - logger.info("calculating correlated datasize") storage_unit = 512 # all sizes in bytes size_of_header = 512 size_of_overhead = 600000 # COBALT parset in MS HISTORY subtable + misc size_of_short = 2 - size_of_visib = 8 # a visibility is stored as a complex FP32 + size_of_visib = 8 # a visibility is stored as a std::complex<float> nr_polarizations = 2 channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) # defaults as in COBALT integration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1) nr_virtual_stations = self._virtual_stations(parset) # Reflects MeasurementSets produced by the casacore LOFAR storage manager (LofarStMan) + # The sub-expression '+ val-1) / val' computes a rounded (positive) integer division. integrated_seconds = int(duration / integration_time) nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1) / 2 data_size = (nr_baselines * channels_per_subband * nr_polarizations * nr_polarizations * \ @@ -194,8 +198,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_saps = parset.getInt('Observation.nrBeams') if nr_saps < 1: - logger.error("Correlated data output enabled, but nrBeams < 1") - return None + raise ValueError("Correlated data output enabled, but nrBeams < 1") # Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP. # Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering. @@ -204,44 +207,44 @@ class ObservationResourceEstimator(BaseResourceEstimator): total_files = 0 # sum of all subbands in all digital beams estimates = [] - for sap_nr in xrange(nr_saps): + for sap_nr in range(nr_saps): subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr) nr_subbands = len(subbandList) if nr_subbands == 0: # Replace here by 'continue' (+ check total_files > 0 at the end) once we support separate subband lists for UV, CS, IS - logger.error("Correlated data output enabled, but empty subband list for sap %d", sap_nr) - return None + raise ValueError("Correlated data output enabled, but empty subband list for sap %d" % sap_nr) est = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size}, 'resource_count': nr_subbands, 'root_resource_group': root_resource_group, 'output_files': {'uv': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], - 'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # i.e. total nr_of_uv_files is resource_count times 1 + 'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # thus total nr_of_uv_files is resource_count times 1 'start_sb_nr': total_files}}]}} total_files += nr_subbands estimates.append(est) - logger.debug("Correlated data estimates: {}".format(estimates)) + logger.debug("Correlated data estimates:\n" + pprint.pformat(estimates)) return estimates def coherentstokes(self, parset, duration): - """ Estimate storage size and bandwidth needed for Coherent Stokes (i.e. cs) + """ Estimate storage size and bandwidth needed for Coherent Stokes ('cs') data products. Also add SAP properties needed by the propagator. - Return list of estimates, max 1 SAP per estimate (easier for assigner), or None on error. + The duration argument is a float in (fractional) seconds. + Return list of estimates, max 1 SAP per estimate (easier for assigner), + or raise ValueError on error. """ - if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'): - logger.info("coherent stokes data output not enabled") - return [] + logger.info("calculate coherent stokes data size") - logger.info("calculate coherentstokes datasize") - size_of_sample = 4 # FP32 + size_of_sample = 4 # single precision float coherent_type = parset.getString(COBALT + 'BeamFormer.CoherentStokes.which') subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512) + if subbands_per_file < 0: + raise ValueError('BeamFormer.CoherentStokes.subbandsPerFile may not be negative, but is %d' % subbands_per_file) if subbands_per_file == 0: subbands_per_file = 512 samples_per_second = self._samples_per_second(parset) time_integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor') - # Note that complex voltages (XXYY) cannot be meaningfully integrated (i.e. time_integration_factor 1) + # Note that complex voltages (XXYY) cannot be meaningfully integrated (time_integration_factor 1) size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor nr_coherent = len(coherent_type) # 'I' or 'IQUV' or 'XXYY' @@ -250,8 +253,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_saps = parset.getInt('Observation.nrBeams') if nr_saps < 1: - logger.error("Coherent Stokes data output enabled, but nrBeams < 1") - return None + raise ValueError("Coherent Stokes data output enabled, but nrBeams < 1") # Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP. # Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering. @@ -259,31 +261,29 @@ class ObservationResourceEstimator(BaseResourceEstimator): sap_idents = self._sap_identifications(identifications, nr_saps) estimates = [] - for sap_nr in xrange(nr_saps): + for sap_nr in range(nr_saps): logger.info("checking SAP {}".format(sap_nr)) subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr) nr_subbands = len(subbandList) if nr_subbands == 0: - logger.error("Coherent Stokes data output enabled, but empty subband list for sap %d", sap_nr) - return None + raise ValueError("Coherent Stokes data output enabled, but empty subband list for sap %d" % sap_nr) nr_subbands_per_file = min(subbands_per_file, nr_subbands) nr_coherent_tabs = 0 - is_tab_nr = -1 + is_tab_nr = None nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr) - for tab_nr in xrange(nr_tabs): + for tab_nr in range(nr_tabs): if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): is_tab_nr = tab_nr logger.info("coherentstokes: skipping incoherent tab") continue nr_coherent_tabs += 1 - logger.info("added %d regular tabs", nr_coherent_tabs) + logger.info("added %d coherent tabs before considering tab rings and fly's eye tabs", nr_coherent_tabs) nr_tab_rings = parset.getInt('Observation.Beam[%d].nrTabRings' % sap_nr) if nr_tab_rings < 0: - logger.error("SAP %d: nr of tab rings is %d", sap_nr, nr_tab_rings) - return None + raise ValueError("SAP %d: nr of tab rings is < 0: %d" % (sap_nr, nr_tab_rings)) elif nr_tab_rings > 0: nr_tabs = (3 * nr_tab_rings * (nr_tab_rings + 1) + 1) nr_coherent_tabs += nr_tabs @@ -295,15 +295,14 @@ class ObservationResourceEstimator(BaseResourceEstimator): logger.info("added %d fly's eye tabs", nr_tabs) if nr_coherent_tabs == 0: - logger.error("Coherent Stokes data output enabled, but no coherent tabs for sap %d", sap_nr) - return None + raise ValueError("Coherent Stokes data output enabled, but no coherent tabs for sap %d" % sap_nr) # Keep XXYY/IQUV together (>1 parts still possible). # Else translator to parset filenames cannot know which stokes (nr_of_XX_stokes property too coarse). # Also for complex voltages (XXYY) only: pipeline needs all 4 XXYY accessible from the same node. # # NOTE: If a TAB is split into parts, then the last TAB part may contain fewer subbands. - # Simplify: compute a single (i.e. max) file size for all TABs or TAB parts. + # Simplify: compute a single (max) file size for all TABs or TAB parts. file_size = int(nr_subbands_per_file * size_per_subband) # bytes storage = file_size * nr_coherent # bytes bandwidth = int(ceil(8 * storage / duration)) # bits/second @@ -315,26 +314,27 @@ class ObservationResourceEstimator(BaseResourceEstimator): 'output_files': {'cs': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr], 'properties': {'cs_file_size': file_size, 'nr_of_cs_files': nr_coherent, 'nr_of_cs_stokes': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}]}} - if is_tab_nr != -1: # translator to filenames needs to know: it may not have all CS+IS info in one claim + if is_tab_nr is not None: # translator to filenames needs to know: it may not have all CS+IS info in one claim est['output_files']['cs'][0]['properties']['is_tab_nr'] = is_tab_nr estimates.append(est) - logger.debug("Coherent Stokes data estimates: {}".format(estimates)) + logger.debug("Coherent Stokes data estimates:\n" + pprint.pformat(estimates)) return estimates def incoherentstokes(self, parset, duration): - """ Estimate storage size and bandwidth needed for Incoherent Stokes (i.e. is) + """ Estimate storage size and bandwidth needed for Incoherent Stokes ('is') data products. Also add SAP properties needed by the propagator. - Return list of estimates, max 1 SAP per estimate (easier for assigner). + The duration argument is a float in (fractional) seconds. + Return list of estimates, max 1 SAP per estimate (easier for assigner), + or raise ValueError on error. """ - if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'): - logger.info("incoherent stokes data output not enabled") - return [] + logger.info("calculate incoherent stokes data size") - logger.info("calculate incoherentstokes data size") - size_of_sample = 4 # FP32 + size_of_sample = 4 # single precision float incoherent_type = parset.getString(COBALT + 'BeamFormer.IncoherentStokes.which') subbands_per_file = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.subbandsPerFile', 512) + if subbands_per_file < 0: + raise ValueError('BeamFormer.IncoherentStokes.subbandsPerFile may not be negative, but is %d' % subbands_per_file) if subbands_per_file == 0: subbands_per_file = 512 samples_per_second = self._samples_per_second(parset) @@ -346,8 +346,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_saps = parset.getInt('Observation.nrBeams') if nr_saps < 1: - logger.error("Incoherent Stokes data output enabled, but nrBeams < 1") - return None + raise ValueError("Incoherent Stokes data output enabled, but nrBeams < 1") # Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP. # Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering. @@ -355,13 +354,12 @@ class ObservationResourceEstimator(BaseResourceEstimator): sap_idents = self._sap_identifications(identifications, nr_saps) estimates = [] - for sap_nr in xrange(nr_saps): + for sap_nr in range(nr_saps): logger.info("checking SAP {}".format(sap_nr)) subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr) nr_subbands = len(subbandList) if nr_subbands == 0: - logger.error("Incoherent Stokes data output enabled, but empty subband list for sap %d", sap_nr) - return None + raise ValueError("Incoherent Stokes data output enabled, but empty subband list for sap %d" % sap_nr) nr_subbands_per_file = min(subbands_per_file, nr_subbands) # Atm can have 1 IS TAB per SAP, because its pointing is equal to the SAP pointing. @@ -369,27 +367,25 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_incoherent_tabs = 0 nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr) - for tab_nr in xrange(nr_tabs): + for tab_nr in range(nr_tabs): if parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): continue if nr_incoherent_tabs > 0: # Could get here to produce >1 IS TAB copies, maybe for some software test - logger.error("SAP %i: >1 incoherent TAB not supported: TAB nrs %i and %i" % (sap_nr, tab_nr, is_tab_nr)) - return None + raise ValueError("SAP %i: >1 incoherent TAB not supported: TAB nrs %i and %i" % (sap_nr, tab_nr, is_tab_nr)) is_tab_nr = tab_nr nr_incoherent_tabs += 1 logger.info("added %d incoherent tab(s)", nr_incoherent_tabs) if nr_incoherent_tabs == 0: - logger.error("Incoherent Stokes data output enabled, but no incoherent tabs for sap %d", sap_nr) - return None + raise ValueError("Incoherent Stokes data output enabled, but no incoherent tabs for sap %d" % sap_nr) # Keep IQUV together (>1 parts still possible). # Else translator to parset filenames cannot know which stokes (nr_of_XX_stokes property too coarse). # # NOTE: If a TAB is split into parts, then the last TAB part may contain fewer subbands. - # Simplify: compute a single (i.e. max) file size for all TABs or TAB parts. + # Simplify: compute a single (max) file size for all TABs or TAB parts. file_size = int(nr_subbands_per_file * size_per_subband) # bytes storage = file_size * nr_incoherent # bytes bandwidth = int(ceil(8 * storage / duration)) # bits/second @@ -403,7 +399,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): 'nr_of_is_stokes': nr_incoherent, 'is_tab_nr': is_tab_nr}}]}} estimates.append(est) - logger.debug("Incoherent Stokes data estimates: {}".format(estimates)) + logger.debug("Incoherent Stokes data estimates:\n" + pprint.pformat(estimates)) return estimates def _samples_per_second(self, parset): @@ -477,11 +473,13 @@ class ObservationResourceEstimator(BaseResourceEstimator): """ Estimate required RSPs and RCUs per station. One or two RSP boards are returned per station depending on antennaset. RCUs are encoded as a bitfield, to be able to tell which RCUs are actually neeeded. - Return list of estimates. + Return list of estimates, or raise ValueError on error. """ estimates = [] antennaset = parset.getString('Observation.antennaSet') stationset = parset.getStringVector('Observation.VirtualInstrument.stationList') + if not stationset: + raise ValueError("Observation.VirtualInstrument.stationList is empty") rculists = self.asp.get_receiver_units_configuration_per_station(antennaset, stationset) for station in stationset: @@ -526,7 +524,8 @@ class ObservationResourceEstimator(BaseResourceEstimator): def _required_rsps(self, station, antennaset, parset): """ Takes station name and list of antennafields. - Returns list with one or both required rsps and number of channelbits. + Returns list with one or both required rsps and number of channelbits, + or raises ValueError on error. """ if station.startswith('CS'): required_rsps = ['RSP0'] # default @@ -538,14 +537,20 @@ class ObservationResourceEstimator(BaseResourceEstimator): required_rsps = ['RSP'] # default for non-core stations nr_saps = parset.getInt('Observation.nrBeams') + if nr_saps < 1: + raise ValueError('Observation.nrBeams must be at least 1, but is %d' % nr_saps) subBandList = [] for nr in range(nr_saps): key = 'Observation.Beam['+str(nr)+'].subbandList' sblist = parset.getStringVector(key) + if not sblist: + raise ValueError("%s is empty" % key) subBandList.extend(sblist) nrSubbands = len(subBandList) nrBitsPerSample = parset.getInt('Observation.nrBitsPerSample') + if nrBitsPerSample != 16 and nrBitsPerSample != 8 and nrBitsPerSample != 4: + raise ValueError('Observation.nrBitsPerSample must be 16, 8, or 4, but is %d' % nrBitsPerSample) channelbits = nrSubbands * nrBitsPerSample return required_rsps, channelbits diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index 05220b949ba9607227d1e38c5eb6a5769864f043..2f8f402dfae68f3ad03129038c48142bafa8413c 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -96,7 +96,7 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator): estimate = {'input_files': input_files} - # NOTE: input bandwidth is not included in the resulting estimate atm. + # NOTE: input bandwidth is currently not included in the resulting estimate. # Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ... #input_cluster_cs = parset.getString(DATAPRODUCTS + 'Input_CoherentStokes.storageClusterName') #input_cluster_is = parset.getString(DATAPRODUCTS + 'Input_IncoherentStokes.storageClusterName') @@ -105,7 +105,7 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator): output_cluster_pulp = parset.getString(DATAPRODUCTS + 'Output_Pulsar.storageClusterName') # The pulsar pipeline ('pulp') produces 1 data product per tied-array beam, it seems also for complex voltages (XXYY) and stokes IQUV(?). - # For XXYY it really needs all 4 components at once. For IQUV this is less important, but atm we treat it the same (1 obs output estimate). + # For XXYY it really needs all 4 components at once. For IQUV this is less important, but currently we treat it the same (1 obs output estimate). # Note that it also produces 1 additional "summary" data product per data product *type* (i.e. 1 for 'cs' and/or 1 for 'is'), # but the RA_Services sub-system does not know about it. Adding support may be a waste of time(?). # Currently, RO controlled pulp grabs all inputs given some project name/id(?) and obs id, not from rotspservice generated parset parts. @@ -126,15 +126,15 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator): 'pulp_file_size': pulp_file_size}}]} # count total data size - data_size = nr_input_files * pulp_file_size - if data_size > 0: - bandwidth = int(ceil(8 * data_size / duration)) # bits/second - estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size} + total_data_size = nr_input_files * pulp_file_size + if total_data_size > 0: + bandwidth = int(ceil(8 * total_data_size / duration)) # bits/second + estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': total_data_size} estimate['resource_count'] = 1 estimate['root_resource_group'] = output_cluster_pulp else: - logger.error('An estimate of zero was calculated!') - result['errors'].append('An estimate of zero was calculated!') + logger.error('Estimated total data size is zero!') + result['errors'].append('Estimated total data size is zero!') result['estimates'].append(estimate) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/reservation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/reservation.py index 24b274ad7ffe53c8cebb258b00230e2665e48d8d..767bf799d72b965dc0081b5b2bc4f9a1e7b32528 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/reservation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/reservation.py @@ -50,7 +50,8 @@ class ReservationResourceEstimator(BaseResourceEstimator): logger.info("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) # NOTE: Observation.stopTime may differ from real stop time, because of Cobalt block size not being exactly 1.0 s. - duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + duration = self._getDuration(parset.getString('Observation.startTime'), + parset.getString('Observation.stopTime')) errors = [] estimates = [] @@ -85,7 +86,7 @@ class ReservationResourceEstimator(BaseResourceEstimator): rsps, channelbits = self._max_rsps(station) - bitfield = len(rculists[station])*'1' # claim all RCUs irrespective of use in given antennaset, we actually only need the AntennasetsParser to obatin the numbe rof RCUs + bitfield = len(rculists[station])*'1' # claim all RCUs irrespective of use in given antennaset, we actually only need the AntennasetsParser to obtain the number of RCUs est = {'resource_types': {'rcu': bitfield}, 'resource_count': 1, diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index ad62ad399684d23aa78ccaef48705c71aeae46fe..0f0428057c4b29df8f63c0cef881a9504bcbf5c1 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -89,13 +89,22 @@ class ResourceEstimatorHandler(MessageHandlerInterface): predecessor_estimates = [] for branch_otdb_id, branch_estimate in branch_estimates.items(): logger.info('Looking at predecessor %s' % branch_otdb_id) + estimates = branch_estimate.values()[0]['estimates'] - if any(['uv' in est['output_files'] and 'im' not in est['output_files'] for est in estimates if 'output_files' in est]): # Not a calibrator pipeline - logger.info('found %s as the target of pipeline %s' % (branch_otdb_id, otdb_id)) - predecessor_estimates.extend(estimates) - elif any(['im' in est['output_files'] for est in estimates if 'output_files' in est]): - logger.info('found %s as the calibrator of pipeline %s' % (branch_otdb_id, otdb_id)) - predecessor_estimates.extend(estimates) + for est in estimates: + if 'output_files' not in est: + continue + has_uv = 'uv' in est['output_files'] + has_im = 'im' in est['output_files'] + if has_uv and not has_im: # Not a calibrator pipeline + logger.info('found %s as the target of pipeline %s' % (branch_otdb_id, otdb_id)) + predecessor_estimates.extend(estimates) + break + elif has_im: + logger.info('found %s as the calibrator of pipeline %s' % (branch_otdb_id, otdb_id)) + predecessor_estimates.extend(estimates) + break + return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, predecessor_estimates), otdb_id)} if len(branch_estimates) > 1: