diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index 99e78ecd1d1af4abf701c413768ea360076c25f9..226626131e3886944f7c1053e28818787f353685 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -1,6 +1,6 @@ # observation.py # -# Copyright (C) 2016 +# Copyright (C) 2016-2017 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # @@ -21,7 +21,7 @@ # $Id: observation.py 33534 2016-02-08 14:28:26Z schaap $ import logging -from math import ceil, floor +from math import ceil from base_resource_estimator import BaseResourceEstimator logger = logging.getLogger(__name__) @@ -39,6 +39,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): 'Observation.startTime', 'Observation.stopTime', 'Observation.antennaSet', + 'Observation.nrBeams', COBALT + 'Correlator.nrChannelsPerSubband', COBALT + 'Correlator.integrationTime', COBALT + 'BeamFormer.flysEye', @@ -47,138 +48,187 @@ class ObservationResourceEstimator(BaseResourceEstimator): 'Observation.VirtualInstrument.stationList', DATAPRODUCTS + 'Output_Correlated.enabled', DATAPRODUCTS + 'Output_Correlated.identifications', + DATAPRODUCTS + 'Output_Correlated.storageClusterName', DATAPRODUCTS + 'Output_CoherentStokes.enabled', DATAPRODUCTS + 'Output_CoherentStokes.identifications', + DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName', COBALT + 'BeamFormer.CoherentStokes.which', DATAPRODUCTS + 'Output_IncoherentStokes.enabled', DATAPRODUCTS + 'Output_IncoherentStokes.identifications', + DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName', COBALT + 'BeamFormer.IncoherentStokes.which' ) def _calculate(self, parset, input_files={}): - """ Calculate the combined resources needed by the different observation types that - can be in a single observation. - reply is something along the lines of: - {'bandwidth': {'total_size': 19021319494}, - 'storage': {'total_size': 713299481024, - 'output_files': - {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, - 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, - {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, - {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} - ]}}} + """ Calculate the resources needed by the different data product types that can be in a single observation. + + The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters. + Here we have UV data for CEP4 (100 files), and we have for DRAGNET 2 IS (IQUV) and 7 CS TABs (also IQUV), + each split across 5 parts (i.e. 7*4*5 + 2*4*5 = 180 files). + If estimate sizes and root_resource_groups are the same, estimates can be combined into 1 estimate (partly done here). + (Not shown here, but note that for CS complex voltage data (XXYY), we must produce 1 estimate per 4 files (XXYY), + such that the XXYY files for a part end up on the same storage. Such a beam can still be split across >1 parts.) + + For a global filesystem and enough network bandwidth (e.g. CEP4), we produce 1 big estimate having count 1. + For other clusters (e.g. DRAGNET) or multi cluster observations, we produce multiple estimates, as also shown. + + More examples at scu001:/opt/lofar/var/log/resourceassigner.log + { + 'errors': [], + 'estimates': [{ + 'resource_types': {'bandwidth': 536870912, 'storage': 128849018880}, + 'count': 1, 'root_resource_groups': ['CEP4'], # count is per root resource group! + 'output_files': { + 'uv': {'nr_of_uv_files': 100, 'uv_file_size': 1073741824, 'identifications': [...]}, + 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 80, 'start_sb_nr': 0}}, + {'sap_nr': 1, 'properties': {'nr_of_uv_files': 20, 'start_sb_nr': 80}}, + {'sap_nr': 2, 'properties': {'nr_of_uv_files': 20, 'start_sb_nr': 100}} + ] + } + }, {'resources': {'bandwidth': 8947849, 'storage': 2147483648}, + 'count': 180, 'root_resource_groups': ['DRAGNET'], # count is per root resource group! + 'output_files': { + 'cs': {'nr_of_cs_files': 140, 'cs_file_size': 2147483648, 'nr_of_cs_stokes': 4, 'identifications': [...]}, + 'is': {'nr_of_is_files': 40, 'is_file_size': 2147483648, 'nr_of_is_stokes': 4, 'identifications': [...]}, + 'saps': [{'sap_nr': 0, 'properties': {'nr_of_is_files': 20, 'is_tab_nr': 0, # 1 IS: IQUV, 5 parts, + 'nr_of_cs_files': 140, 'nr_of_tabs': 8}}, # + 7 CS: IQUV, 5 parts + {'sap_nr': 2, 'properties': {'nr_of_is_files': 20, 'is_tab_nr': 0}} # 1 IS: IQUV, 5 parts + ] + } + }] + } + The base_resource_estimator adds an {'observation': } around this. """ logger.info("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) + # NOTE: obs estimates appear quite accurate. Diffs come mostly from Observation.stopTime + # being planned instead of real stop time, because of Cobalt block size not being exactly 1.0 s. duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) - - result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} - output_files = {} - correlated_size, correlated_bandwidth, output_files_uv, correlated_saps = self.correlated(parset, duration) - coherentstokes_size, coherentstokes_bandwidth, output_files_cs, coherentstokes_saps = self.coherentstokes(parset, duration) - incoherentstokes_size, incoherentstokes_bandwidth, output_files_is, incoherentstokes_saps = self.incoherentstokes(parset, duration) - - if output_files_uv: - output_files['uv'] = output_files_uv - if output_files_cs: - output_files['cs'] = output_files_cs - if output_files_is: - output_files['is'] = output_files_is - - output_files['saps'] = [] - for sap_nr in xrange(parset.getInt('Observation.nrBeams')): - sap = {'sap_nr': sap_nr, 'properties': {}} - if sap_nr in correlated_saps: - sap['properties'].update(correlated_saps[sap_nr]) - if sap_nr in coherentstokes_saps: - sap['properties'].update(coherentstokes_saps[sap_nr]) - if sap_nr in incoherentstokes_saps: - sap['properties'].update(incoherentstokes_saps[sap_nr]) - if 'nr_of_tabs' in sap['properties']: # These are coherent TABs - sap['properties']['nr_of_tabs'] = sap['properties']['nr_of_tabs'] + 1 - else: - sap['properties']['nr_of_tabs'] = 1 # Only an incoherent TAB for this SAP - output_files['saps'].append(sap) - - total_data_size = correlated_size + coherentstokes_size + incoherentstokes_size - if total_data_size and output_files: - result['storage'] = {'total_size': total_data_size, 'output_files': output_files} - result['bandwidth'] = {'total_size': correlated_bandwidth + coherentstokes_bandwidth + incoherentstokes_bandwidth} - else: - if not total_data_size: - result['errors'].append('Total data size is zero!') - logger.warning('ERROR: A datasize of zero was calculated!') - if not output_files: - result['errors'].append('No output files!') - logger.warning('ERROR: No output files were calculated!') + errors = [] + estimates = [] + uv_estimate = self.correlated(parset, duration) + if uv_estimate is not None: + if uv_estimate['count'] == 0 or uv_estimate['resource_types']['bandwidth'] == 0 or \ + uv_estimate['resource_types']['storage'] == 0: + errors.append('empty UV resource estimate!') + logger.error('resource estimate for UV data is empty!') + else: + estimates.append(uv_estimate) + cs_estimate = self.coherentstokes(parset, duration) + if cs_estimate is not None: + if cs_estimate['count'] == 0 or cs_estimate['resource_types']['bandwidth'] == 0 or \ + cs_estimate['resource_types']['storage'] == 0: + errors.append('empty CS resource estimate!') + logger.error('resource estimate for CS data is empty!') + else: + estimates.append(cs_estimate) + is_estimate = self.incoherentstokes(parset, duration) + if is_estimate is not None: + if is_estimate['count'] == 0 or is_estimate['resource_types']['bandwidth'] == 0 or \ + is_estimate['resource_types']['storage'] == 0: + errors.append('empty IS resource estimate!') + logger.error('resource estimate for IS data is empty!') + else: + estimates.append(is_estimate) + if not estimates: + errors.append('Produced observation resource estimate list is empty!') + logger.error('empty observation resource estimate list!') + + logger.info('Observation resource estimate(s) before merge: {}'.format(estimates)) + + # Merge estimates if possible to alleviate the Resource Assigner and RADB. + self._merge_estimates(estimates) + + result = {'errors': errors, 'estimates': estimates} return result def correlated(self, parset, duration): - """ Estimate number of files, file size and bandwidth needed for correlated data + """ Estimate storage size and bandwidth needed for correlated (i.e. uv) + data products. Also add SAP properties needed by the propagator. """ if not parset.getBool('Observation.DataProducts.Output_Correlated.enabled'): - logger.info("No correlated data") - return (0,0, {}, {}) + logger.info("correlated data output not enabled") + return None logger.info("calculating correlated datasize") - size_of_header = 512 #TODO More magic numbers (probably from Alwin). ScS needs to check these. They look ok though. - size_of_overhead = 600000 + storage_unit = 512 # all sizes in bytes + size_of_header = 512 + size_of_overhead = 600000 # COBALT parset in MS HISTORY subtable + misc size_of_short = 2 - size_of_complex = 8 + size_of_visib = 8 # a visibility is stored as a complex FP32 nr_polarizations = 2 - channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) #TODO should these have defaults? - intergration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1) + channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) # defaults as in COBALT + integration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1) nr_virtual_stations = self._virtual_stations(parset) - integrated_seconds = floor(duration / intergration_time) - nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1.0) / 2.0 #TODO Why is this done in float? - data_size = ceil((nr_baselines * channels_per_subband * nr_polarizations**2 * size_of_complex) / 512.0) * 512.0 #TODO What's the 512.0 magic numbers? - n_sample_size = ceil((nr_baselines * channels_per_subband * size_of_short) / 512.0) * 512.0 + # Reflects MeasurementSets produced by the casacore LOFAR storage manager (LofarStMan) + integrated_seconds = int(duration / integration_time) + nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1) / 2 + data_size = (nr_baselines * channels_per_subband * nr_polarizations * nr_polarizations * \ + size_of_visib + storage_unit-1) / storage_unit * storage_unit + n_sample_size = (nr_baselines * channels_per_subband * size_of_short + storage_unit-1) / \ + storage_unit * storage_unit + file_size = (data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead # bytes # sum of all subbands in all digital beams total_files = 0 - sap_files = {} - + sap_files = [] for sap_nr in xrange(parset.getInt('Observation.nrBeams')): subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr) nr_files = len(subbandList) - sap_files[sap_nr] = {'nr_of_uv_files': nr_files, 'start_sb_nr': total_files} + sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_uv_files': nr_files, + 'start_sb_nr': total_files}}) total_files += nr_files - - file_size = int((data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead) - output_files = {'nr_of_uv_files': total_files, 'uv_file_size': file_size, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')} - logger.info("correlated_uv: {} files {} bytes each".format(total_files, file_size)) - - total_data_size = int(ceil(file_size * total_files)) # bytes - total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - return (total_data_size, total_bandwidth, output_files, sap_files) + if total_files == 0: + logger.warn("Correlated data output enabled, but no UV files across all SAPs") + return None + bandwidth = int(ceil(8 * file_size / duration)) # bits/second + + idents = parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') + output_files = {'uv': {'nr_of_uv_files': total_files, 'uv_file_size': file_size, + 'identifications': idents}, + 'saps': sap_files} + + root_resource_group = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName') + if self._hasGlobalStorage(root_resource_group): + # global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner + file_size *= total_files + bandwidth *= total_files + total_files = 1 + + estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size}, + 'count': total_files, 'root_resource_groups': [root_resource_group], + 'output_files': output_files} + logger.debug("Correlated data estimate: {}".format(estimate)) + return estimate def coherentstokes(self, parset, duration): - """ Estimate number of files, file size and bandwidth needed for coherent stokes + """ Estimate storage size and bandwidth needed for Coherent Stokes (i.e. cs) + data products. Also add SAP properties needed by the propagator. """ if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'): - logger.info("No coherent stokes data") - return (0,0, {}, {}) - + logger.info("coherent stokes data output not enabled") + return None + logger.info("calculate coherentstokes datasize") + size_of_sample = 4 # FP32 coherent_type = parset.getString(COBALT + 'BeamFormer.CoherentStokes.which') subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512) samples_per_second = self._samples_per_second(parset) - integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor') - - nr_coherent = 4 if coherent_type in ('XXYY', 'IQUV') else 1 - if coherent_type in ('XXYY',): - size_per_subband = samples_per_second * 4.0 * duration - else: - size_per_subband = (samples_per_second * 4.0 * duration) / integration_factor + time_integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor') + # Note that complex voltages (XXYY) cannot be meaningfully integrated (i.e. time_integration_factor 1) + size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor + nr_coherent = len(coherent_type) # 'I' or 'IQUV' or 'XXYY' total_nr_stokes = 0 total_files = 0 max_nr_subbands = 0 - sap_files = {} - + sap_files = [] + + doFlysEye = parset.getBool(COBALT + 'BeamFormer.flysEye') + for sap_nr in xrange(parset.getInt('Observation.nrBeams')): logger.info("checking SAP {}".format(sap_nr)) subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr) @@ -186,7 +236,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): max_nr_subbands = max(nr_subbands, max_nr_subbands) nr_files = 0 total_nr_tabs = 0 - + for tab_nr in xrange(parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)): logger.info("checking TAB {}".format(tab_nr)) if parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): @@ -205,49 +255,68 @@ class ObservationResourceEstimator(BaseResourceEstimator): total_nr_stokes += nr_stokes nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file))) - if parset.getBool(COBALT + 'BeamFormer.flysEye'): + if doFlysEye: logger.info("adding flys eye data size") nr_tabs = self._virtual_stations(parset) total_nr_tabs += nr_tabs nr_stokes = nr_tabs * nr_coherent total_nr_stokes += nr_stokes nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file))) - + if nr_files: - sap_files[sap_nr]= {'nr_of_cs_files': nr_files, 'nr_of_tabs': total_nr_tabs} + sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_cs_files': nr_files, + 'nr_of_tabs': total_nr_tabs}}) total_files += nr_files - + if total_files == 0: + logger.warn("Coherent Stokes data output enabled, but no CS files across all SAPs") + return None + # NOTE: TABs in different SAPs can have different subband lists and if a TAB is split into parts (i.e. multiple files), + # the last TAB part may contain fewer subbands. But simplify: compute a single (i.e. max) file size for all TABs or TAB parts. nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) - size_per_file = int(nr_subbands_per_file * size_per_subband) + file_size = int(nr_subbands_per_file * size_per_subband) # bytes + bandwidth = int(ceil(8 * file_size / duration)) # bits/second + + idents = parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications') + output_files = {'cs': {'nr_of_cs_files': total_files, 'cs_file_size': file_size, + 'identifications': idents}, + 'saps': sap_files} - output_files = {'nr_of_cs_files': total_files, 'nr_of_cs_stokes': nr_coherent, 'cs_file_size': size_per_file, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications')} - logger.info("coherentstokes: {} files {} bytes each".format(total_files, size_per_file)) + root_resource_group = parset.getString(DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName') + if self._hasGlobalStorage(root_resource_group): + # global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner + file_size *= total_files + bandwidth *= total_files + total_files = 1 - total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) - total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - return (total_data_size, total_bandwidth, output_files, sap_files) + estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size}, + 'count': total_files, 'root_resource_groups': [root_resource_group], + 'output_files': output_files} + estimate['output_files']['cs']['nr_of_cs_stokes'] = nr_coherent + + logger.debug("Coherent Stokes data estimate: {}".format(estimate)) + return estimate def incoherentstokes(self, parset, duration): - """ Estimate number of files, file size and bandwidth needed for incoherentstokes + """ Estimate storage size and bandwidth needed for Incoherent Stokes (i.e. is) + data products. Also add SAP properties needed by the propagator. """ if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'): - logger.info("No incoherent stokes data") - return (0,0, {}, {}) - + logger.info("incoherent stokes data output not enabled") + return None + logger.info("calculate incoherentstokes data size") + size_of_sample = 4 # FP32 incoherent_type = parset.getString(COBALT + 'BeamFormer.IncoherentStokes.which') subbands_per_file = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.subbandsPerFile', 512) samples_per_second = self._samples_per_second(parset) time_integration_factor = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor') - channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) #TODO should these have defaults? - incoherent_channels_per_subband = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.nrChannelsPerSubband', 0) - - nr_incoherent = 4 if incoherent_type in ('IQUV',) else 1 # Should this also include XXYY ? + size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor + nr_incoherent = len(incoherent_type) # 'I' or 'IQUV' ('XXYY' only possible for coherent stokes) total_nr_stokes = 0 total_files = 0 max_nr_subbands = 0 - sap_files = {} + sap_files = [] for sap_nr in xrange(parset.getInt('Observation.nrBeams')): logger.info("checking SAP {}".format(sap_nr)) @@ -255,40 +324,113 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_subbands = len(subbandList) max_nr_subbands = max(nr_subbands, max_nr_subbands) nr_files = 0 + + # Atm can have 1 IS TAB per SAP, because its pointing is equal to the SAP pointing. + # (When we support online coh dedisp and on multiple DMs, we can have >1 IS per SAP.) is_tab_nr = -1 - total_nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr) - for tab_nr in xrange(total_nr_tabs): + for tab_nr in xrange(parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)): logger.info("checking TAB {}".format(tab_nr)) if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): #not coherent is incoherent logger.info("Found incoherent stokes TAB: %i" % tab_nr) if is_tab_nr >= 0: - logger.warning("TAB nr %i can't be incoherent as %i already is!" % (tab_nr, is_tab_nr)) - # TODO We need to generate an error here, or preferably check before we get here - else: - is_tab_nr = tab_nr - total_nr_stokes += nr_incoherent - nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file))) - + # Could get here to produce >1 IS TAB copies, maybe for some software test + logger.error("SAP %i: >1 incoherent TAB not supported: TAB nrs %i and %i" % (sap_nr, tab_nr, is_tab_nr)) + return None + is_tab_nr = tab_nr + total_nr_stokes += nr_incoherent + nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file))) + if nr_files: - sap_files[sap_nr] = {'nr_of_is_files': nr_files, 'is_tab_nr': is_tab_nr} + sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_is_files': nr_files, + 'nr_of_tabs': 1, + 'is_tab_nr': is_tab_nr}}) total_files += nr_files + if total_files == 0: + logger.warn("Incoherent Stokes data output enabled, but no IS files across all SAPs") + return None + # NOTE: TABs in different SAPs can have different subband lists and if a TAB is split into parts (i.e. multiple files), + # the last TAB part may contain fewer subbands. But simplify: compute a single (i.e. max) file size for all TABs or TAB parts. + nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) + file_size = int(nr_subbands_per_file * size_per_subband) # bytes + bandwidth = int(ceil(8 * file_size / duration)) # bits/second + + idents = parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications') + output_files = {'is': {'nr_of_is_files': total_files, 'is_file_size': file_size, + 'identifications': idents}, + 'saps': sap_files} + + root_resource_group = parset.getString(DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName') + if self._hasGlobalStorage(root_resource_group): + # global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner + file_size *= total_files + bandwidth *= total_files + total_files = 1 + + estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size}, + 'count': total_files, 'root_resource_groups': [root_resource_group], + 'output_files': output_files} + estimate['output_files']['is']['nr_of_is_stokes'] = nr_incoherent + + logger.debug("Incoherent Stokes data estimate: {}".format(estimate)) + return estimate + + def _merge_estimates(self, estimates): + """ Estimates can only be merged if same root_resource_groups and bandwidth and storage, + or if the root_resource_groups have a (single) global filesystem. + + NOTE: assumed good enough to only merge conseq pairs, not all pairs. + """ + i = 1 + while i < len(estimates): # careful iterating while modifying + if estimates[i-1]['root_resource_groups'] == estimates[i]['root_resource_groups'] and \ + ((estimates[i-1]['resource_types']['bandwidth'] == estimates[i]['resource_types']['bandwidth'] and \ + estimates[i-1]['resource_types']['storage'] == estimates[i]['resource_types']['storage']) or \ + all(self._hasGlobalStorage(rg) for rg in root_resource_groups) + ): + # Mergeable. Add uv, cs, is from estimates[i] into estimates[i-1] + if 'uv' in estimates[i]['output_files']: + estimates[i-1]['output_files']['uv'] = estimates[i]['output_files']['uv'] + if 'cs' in estimates[i]['output_files']: + estimates[i-1]['output_files']['cs'] = estimates[i]['output_files']['cs'] + if 'is' in estimates[i]['output_files']: + estimates[i-1]['output_files']['is'] = estimates[i]['output_files']['is'] + + # Merge saps. Saps are listed in order, but a sap_nr may be missing. + # Extend one list by the other, sort, then merge any conseq duplicates. + estimates[i-1]['output_files']['saps'].extend(estimates[i]['output_files']['saps']) + estimates[i-1]['output_files']['saps'].sort(key=lambda sap: sap['sap_nr']) + j = 1 + while j < len(estimates[i-1]['output_files']['saps']): + sap0 = estimates[i-1]['output_files']['saps'][j-1] + sap1 = estimates[i-1]['output_files']['saps'][j ] + if sap0['sap_nr'] == sap1['sap_nr']: + # Sum nr_of_tabs, other keys must be from another data product type + if 'nr_of_tabs' in sap0['properties'] and \ + 'nr_of_tabs' in sap1['properties']: + sap1['properties']['nr_of_tabs'] += sap0['properties']['nr_of_tabs'] + sap0['properties'].update(sap1['properties']) # includes nr_of_tabs + + estimates[i-1]['output_files']['saps'].pop(j) + else: + j += 1 + + if all(self._hasGlobalStorage(rg) for rg in estimates[i]['root_resource_groups']): + # for global fs, collapse regardless + estimates[i-1]['resource_types']['bandwidth'] *= estimates[i-1]['count'] # *= 1, but to be robust and clear + estimates[i-1]['resource_types']['bandwidth'] += estimates[i]['resource_types']['bandwidth'] * estimates[i]['count'] + estimates[i-1]['resource_types']['storage'] *= estimates[i-1]['count'] # *= 1, but to be robust and clear + estimates[i-1]['resource_types']['storage'] += estimates[i]['resource_types']['storage'] * estimates[i]['count'] + estimates[i-1]['count'] = 1 # already 1, but to be robust and clear + else: + # root_resource_groups and values of bandwidth and storage are equal for both estimates + estimates[i-1]['count'] += estimates[i]['count'] + + logger.info('Merged observation resource estimate {} into {}'.format(i, i-1)) + estimates.pop(i) + else: + i += 1 - if incoherent_channels_per_subband > 0: - channel_integration_factor = channels_per_subband / incoherent_channels_per_subband - else: - channel_integration_factor = 1 - - if total_files > 0: - nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) - size_per_subband = int((samples_per_second * 4) / time_integration_factor / channel_integration_factor * duration) - size_per_file = nr_subbands_per_file * size_per_subband - - output_files = {'nr_of_is_files': total_files, 'nr_of_is_stokes': nr_incoherent, 'is_file_size': int(size_per_file), 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications')} - logger.info("incoherentstokes: {} files {} bytes each".format(total_files, size_per_file)) - - total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) # bytes - total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/sec - return (total_data_size, total_bandwidth, output_files, sap_files) + return estimates def _samples_per_second(self, parset): """ set samples per second @@ -316,3 +458,6 @@ class ObservationResourceEstimator(BaseResourceEstimator): logger.info("number of virtual stations = {}".format(nr_virtual_stations)) return nr_virtual_stations + def _hasGlobalStorage(self, resource_group_name): + return resource_group_name == 'CEP4' +