Skip to content
Snippets Groups Projects
Commit 2e8b733b authored by Alexander van Amesfoort's avatar Alexander van Amesfoort
Browse files

Task #9939: rework observation estimate format. Other estimators and tests...

Task #9939: rework observation estimate format. Other estimators and tests will be adjusted later when it turns out this is fine.
parent d937e613
Branches
Tags
No related merge requests found
# observation.py # observation.py
# #
# Copyright (C) 2016 # Copyright (C) 2016-2017
# ASTRON (Netherlands Institute for Radio Astronomy) # ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
# #
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
# $Id: observation.py 33534 2016-02-08 14:28:26Z schaap $ # $Id: observation.py 33534 2016-02-08 14:28:26Z schaap $
import logging import logging
from math import ceil, floor from math import ceil
from base_resource_estimator import BaseResourceEstimator from base_resource_estimator import BaseResourceEstimator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -39,6 +39,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -39,6 +39,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
'Observation.startTime', 'Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
'Observation.antennaSet', 'Observation.antennaSet',
'Observation.nrBeams',
COBALT + 'Correlator.nrChannelsPerSubband', COBALT + 'Correlator.nrChannelsPerSubband',
COBALT + 'Correlator.integrationTime', COBALT + 'Correlator.integrationTime',
COBALT + 'BeamFormer.flysEye', COBALT + 'BeamFormer.flysEye',
...@@ -47,137 +48,186 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -47,137 +48,186 @@ class ObservationResourceEstimator(BaseResourceEstimator):
'Observation.VirtualInstrument.stationList', 'Observation.VirtualInstrument.stationList',
DATAPRODUCTS + 'Output_Correlated.enabled', DATAPRODUCTS + 'Output_Correlated.enabled',
DATAPRODUCTS + 'Output_Correlated.identifications', DATAPRODUCTS + 'Output_Correlated.identifications',
DATAPRODUCTS + 'Output_Correlated.storageClusterName',
DATAPRODUCTS + 'Output_CoherentStokes.enabled', DATAPRODUCTS + 'Output_CoherentStokes.enabled',
DATAPRODUCTS + 'Output_CoherentStokes.identifications', DATAPRODUCTS + 'Output_CoherentStokes.identifications',
DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName',
COBALT + 'BeamFormer.CoherentStokes.which', COBALT + 'BeamFormer.CoherentStokes.which',
DATAPRODUCTS + 'Output_IncoherentStokes.enabled', DATAPRODUCTS + 'Output_IncoherentStokes.enabled',
DATAPRODUCTS + 'Output_IncoherentStokes.identifications', DATAPRODUCTS + 'Output_IncoherentStokes.identifications',
DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName',
COBALT + 'BeamFormer.IncoherentStokes.which' COBALT + 'BeamFormer.IncoherentStokes.which'
) )
def _calculate(self, parset, input_files={}): def _calculate(self, parset, input_files={}):
""" Calculate the combined resources needed by the different observation types that """ Calculate the resources needed by the different data product types that can be in a single observation.
can be in a single observation.
reply is something along the lines of: The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters.
{'bandwidth': {'total_size': 19021319494}, Here we have UV data for CEP4 (100 files), and we have for DRAGNET 2 IS (IQUV) and 7 CS TABs (also IQUV),
'storage': {'total_size': 713299481024, each split across 5 parts (i.e. 7*4*5 + 2*4*5 = 180 files).
'output_files': If estimate sizes and root_resource_groups are the same, estimates can be combined into 1 estimate (partly done here).
{'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, (Not shown here, but note that for CS complex voltage data (XXYY), we must produce 1 estimate per 4 files (XXYY),
'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, such that the XXYY files for a part end up on the same storage. Such a beam can still be split across >1 parts.)
{'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}},
{'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} For a global filesystem and enough network bandwidth (e.g. CEP4), we produce 1 big estimate having count 1.
]}}} For other clusters (e.g. DRAGNET) or multi cluster observations, we produce multiple estimates, as also shown.
More examples at scu001:/opt/lofar/var/log/resourceassigner.log
{
'errors': [],
'estimates': [{
'resource_types': {'bandwidth': 536870912, 'storage': 128849018880},
'count': 1, 'root_resource_groups': ['CEP4'], # count is per root resource group!
'output_files': {
'uv': {'nr_of_uv_files': 100, 'uv_file_size': 1073741824, 'identifications': [...]},
'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 80, 'start_sb_nr': 0}},
{'sap_nr': 1, 'properties': {'nr_of_uv_files': 20, 'start_sb_nr': 80}},
{'sap_nr': 2, 'properties': {'nr_of_uv_files': 20, 'start_sb_nr': 100}}
]
}
}, {'resources': {'bandwidth': 8947849, 'storage': 2147483648},
'count': 180, 'root_resource_groups': ['DRAGNET'], # count is per root resource group!
'output_files': {
'cs': {'nr_of_cs_files': 140, 'cs_file_size': 2147483648, 'nr_of_cs_stokes': 4, 'identifications': [...]},
'is': {'nr_of_is_files': 40, 'is_file_size': 2147483648, 'nr_of_is_stokes': 4, 'identifications': [...]},
'saps': [{'sap_nr': 0, 'properties': {'nr_of_is_files': 20, 'is_tab_nr': 0, # 1 IS: IQUV, 5 parts,
'nr_of_cs_files': 140, 'nr_of_tabs': 8}}, # + 7 CS: IQUV, 5 parts
{'sap_nr': 2, 'properties': {'nr_of_is_files': 20, 'is_tab_nr': 0}} # 1 IS: IQUV, 5 parts
]
}
}]
}
The base_resource_estimator adds an {'observation': } around this. The base_resource_estimator adds an {'observation': } around this.
""" """
logger.info("start estimate '{}'".format(self.name)) logger.info("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset) logger.info('parset: %s ' % parset)
# NOTE: obs estimates appear quite accurate. Diffs come mostly from Observation.stopTime
# being planned instead of real stop time, because of Cobalt block size not being exactly 1.0 s.
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} errors = []
output_files = {} estimates = []
correlated_size, correlated_bandwidth, output_files_uv, correlated_saps = self.correlated(parset, duration) uv_estimate = self.correlated(parset, duration)
coherentstokes_size, coherentstokes_bandwidth, output_files_cs, coherentstokes_saps = self.coherentstokes(parset, duration) if uv_estimate is not None:
incoherentstokes_size, incoherentstokes_bandwidth, output_files_is, incoherentstokes_saps = self.incoherentstokes(parset, duration) if uv_estimate['count'] == 0 or uv_estimate['resource_types']['bandwidth'] == 0 or \
uv_estimate['resource_types']['storage'] == 0:
if output_files_uv: errors.append('empty UV resource estimate!')
output_files['uv'] = output_files_uv logger.error('resource estimate for UV data is empty!')
if output_files_cs:
output_files['cs'] = output_files_cs
if output_files_is:
output_files['is'] = output_files_is
output_files['saps'] = []
for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
sap = {'sap_nr': sap_nr, 'properties': {}}
if sap_nr in correlated_saps:
sap['properties'].update(correlated_saps[sap_nr])
if sap_nr in coherentstokes_saps:
sap['properties'].update(coherentstokes_saps[sap_nr])
if sap_nr in incoherentstokes_saps:
sap['properties'].update(incoherentstokes_saps[sap_nr])
if 'nr_of_tabs' in sap['properties']: # These are coherent TABs
sap['properties']['nr_of_tabs'] = sap['properties']['nr_of_tabs'] + 1
else: else:
sap['properties']['nr_of_tabs'] = 1 # Only an incoherent TAB for this SAP estimates.append(uv_estimate)
output_files['saps'].append(sap) cs_estimate = self.coherentstokes(parset, duration)
if cs_estimate is not None:
if cs_estimate['count'] == 0 or cs_estimate['resource_types']['bandwidth'] == 0 or \
cs_estimate['resource_types']['storage'] == 0:
errors.append('empty CS resource estimate!')
logger.error('resource estimate for CS data is empty!')
else:
estimates.append(cs_estimate)
is_estimate = self.incoherentstokes(parset, duration)
if is_estimate is not None:
if is_estimate['count'] == 0 or is_estimate['resource_types']['bandwidth'] == 0 or \
is_estimate['resource_types']['storage'] == 0:
errors.append('empty IS resource estimate!')
logger.error('resource estimate for IS data is empty!')
else:
estimates.append(is_estimate)
if not estimates:
errors.append('Produced observation resource estimate list is empty!')
logger.error('empty observation resource estimate list!')
logger.info('Observation resource estimate(s) before merge: {}'.format(estimates))
total_data_size = correlated_size + coherentstokes_size + incoherentstokes_size # Merge estimates if possible to alleviate the Resource Assigner and RADB.
if total_data_size and output_files: self._merge_estimates(estimates)
result['storage'] = {'total_size': total_data_size, 'output_files': output_files}
result['bandwidth'] = {'total_size': correlated_bandwidth + coherentstokes_bandwidth + incoherentstokes_bandwidth} result = {'errors': errors, 'estimates': estimates}
else:
if not total_data_size:
result['errors'].append('Total data size is zero!')
logger.warning('ERROR: A datasize of zero was calculated!')
if not output_files:
result['errors'].append('No output files!')
logger.warning('ERROR: No output files were calculated!')
return result return result
def correlated(self, parset, duration): def correlated(self, parset, duration):
""" Estimate number of files, file size and bandwidth needed for correlated data """ Estimate storage size and bandwidth needed for correlated (i.e. uv)
data products. Also add SAP properties needed by the propagator.
""" """
if not parset.getBool('Observation.DataProducts.Output_Correlated.enabled'): if not parset.getBool('Observation.DataProducts.Output_Correlated.enabled'):
logger.info("No correlated data") logger.info("correlated data output not enabled")
return (0,0, {}, {}) return None
logger.info("calculating correlated datasize") logger.info("calculating correlated datasize")
size_of_header = 512 #TODO More magic numbers (probably from Alwin). ScS needs to check these. They look ok though. storage_unit = 512 # all sizes in bytes
size_of_overhead = 600000 size_of_header = 512
size_of_overhead = 600000 # COBALT parset in MS HISTORY subtable + misc
size_of_short = 2 size_of_short = 2
size_of_complex = 8 size_of_visib = 8 # a visibility is stored as a complex FP32
nr_polarizations = 2 nr_polarizations = 2
channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) #TODO should these have defaults? channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) # defaults as in COBALT
intergration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1) integration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1)
nr_virtual_stations = self._virtual_stations(parset) nr_virtual_stations = self._virtual_stations(parset)
integrated_seconds = floor(duration / intergration_time) # Reflects MeasurementSets produced by the casacore LOFAR storage manager (LofarStMan)
nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1.0) / 2.0 #TODO Why is this done in float? integrated_seconds = int(duration / integration_time)
data_size = ceil((nr_baselines * channels_per_subband * nr_polarizations**2 * size_of_complex) / 512.0) * 512.0 #TODO What's the 512.0 magic numbers? nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1) / 2
n_sample_size = ceil((nr_baselines * channels_per_subband * size_of_short) / 512.0) * 512.0 data_size = (nr_baselines * channels_per_subband * nr_polarizations * nr_polarizations * \
size_of_visib + storage_unit-1) / storage_unit * storage_unit
n_sample_size = (nr_baselines * channels_per_subband * size_of_short + storage_unit-1) / \
storage_unit * storage_unit
file_size = (data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead # bytes
# sum of all subbands in all digital beams # sum of all subbands in all digital beams
total_files = 0 total_files = 0
sap_files = {} sap_files = []
for sap_nr in xrange(parset.getInt('Observation.nrBeams')): for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr) subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
nr_files = len(subbandList) nr_files = len(subbandList)
sap_files[sap_nr] = {'nr_of_uv_files': nr_files, 'start_sb_nr': total_files} sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_uv_files': nr_files,
'start_sb_nr': total_files}})
total_files += nr_files total_files += nr_files
if total_files == 0:
file_size = int((data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead) logger.warn("Correlated data output enabled, but no UV files across all SAPs")
output_files = {'nr_of_uv_files': total_files, 'uv_file_size': file_size, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')} return None
logger.info("correlated_uv: {} files {} bytes each".format(total_files, file_size)) bandwidth = int(ceil(8 * file_size / duration)) # bits/second
total_data_size = int(ceil(file_size * total_files)) # bytes idents = parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')
total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second output_files = {'uv': {'nr_of_uv_files': total_files, 'uv_file_size': file_size,
return (total_data_size, total_bandwidth, output_files, sap_files) 'identifications': idents},
'saps': sap_files}
root_resource_group = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName')
if self._hasGlobalStorage(root_resource_group):
# global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
file_size *= total_files
bandwidth *= total_files
total_files = 1
estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
'count': total_files, 'root_resource_groups': [root_resource_group],
'output_files': output_files}
logger.debug("Correlated data estimate: {}".format(estimate))
return estimate
def coherentstokes(self, parset, duration): def coherentstokes(self, parset, duration):
""" Estimate number of files, file size and bandwidth needed for coherent stokes """ Estimate storage size and bandwidth needed for Coherent Stokes (i.e. cs)
data products. Also add SAP properties needed by the propagator.
""" """
if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'): if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'):
logger.info("No coherent stokes data") logger.info("coherent stokes data output not enabled")
return (0,0, {}, {}) return None
logger.info("calculate coherentstokes datasize") logger.info("calculate coherentstokes datasize")
size_of_sample = 4 # FP32
coherent_type = parset.getString(COBALT + 'BeamFormer.CoherentStokes.which') coherent_type = parset.getString(COBALT + 'BeamFormer.CoherentStokes.which')
subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512) subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512)
samples_per_second = self._samples_per_second(parset) samples_per_second = self._samples_per_second(parset)
integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor') time_integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor')
# Note that complex voltages (XXYY) cannot be meaningfully integrated (i.e. time_integration_factor 1)
nr_coherent = 4 if coherent_type in ('XXYY', 'IQUV') else 1 size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor
if coherent_type in ('XXYY',): nr_coherent = len(coherent_type) # 'I' or 'IQUV' or 'XXYY'
size_per_subband = samples_per_second * 4.0 * duration
else:
size_per_subband = (samples_per_second * 4.0 * duration) / integration_factor
total_nr_stokes = 0 total_nr_stokes = 0
total_files = 0 total_files = 0
max_nr_subbands = 0 max_nr_subbands = 0
sap_files = {} sap_files = []
doFlysEye = parset.getBool(COBALT + 'BeamFormer.flysEye')
for sap_nr in xrange(parset.getInt('Observation.nrBeams')): for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
logger.info("checking SAP {}".format(sap_nr)) logger.info("checking SAP {}".format(sap_nr))
...@@ -205,7 +255,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -205,7 +255,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
total_nr_stokes += nr_stokes total_nr_stokes += nr_stokes
nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file))) nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
if parset.getBool(COBALT + 'BeamFormer.flysEye'): if doFlysEye:
logger.info("adding flys eye data size") logger.info("adding flys eye data size")
nr_tabs = self._virtual_stations(parset) nr_tabs = self._virtual_stations(parset)
total_nr_tabs += nr_tabs total_nr_tabs += nr_tabs
...@@ -214,40 +264,59 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -214,40 +264,59 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file))) nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
if nr_files: if nr_files:
sap_files[sap_nr]= {'nr_of_cs_files': nr_files, 'nr_of_tabs': total_nr_tabs} sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_cs_files': nr_files,
'nr_of_tabs': total_nr_tabs}})
total_files += nr_files total_files += nr_files
if total_files == 0:
logger.warn("Coherent Stokes data output enabled, but no CS files across all SAPs")
return None
# NOTE: TABs in different SAPs can have different subband lists and if a TAB is split into parts (i.e. multiple files),
# the last TAB part may contain fewer subbands. But simplify: compute a single (i.e. max) file size for all TABs or TAB parts.
nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
size_per_file = int(nr_subbands_per_file * size_per_subband) file_size = int(nr_subbands_per_file * size_per_subband) # bytes
bandwidth = int(ceil(8 * file_size / duration)) # bits/second
output_files = {'nr_of_cs_files': total_files, 'nr_of_cs_stokes': nr_coherent, 'cs_file_size': size_per_file, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications')} idents = parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications')
logger.info("coherentstokes: {} files {} bytes each".format(total_files, size_per_file)) output_files = {'cs': {'nr_of_cs_files': total_files, 'cs_file_size': file_size,
'identifications': idents},
'saps': sap_files}
total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) root_resource_group = parset.getString(DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName')
total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second if self._hasGlobalStorage(root_resource_group):
return (total_data_size, total_bandwidth, output_files, sap_files) # global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
file_size *= total_files
bandwidth *= total_files
total_files = 1
estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
'count': total_files, 'root_resource_groups': [root_resource_group],
'output_files': output_files}
estimate['output_files']['cs']['nr_of_cs_stokes'] = nr_coherent
logger.debug("Coherent Stokes data estimate: {}".format(estimate))
return estimate
def incoherentstokes(self, parset, duration): def incoherentstokes(self, parset, duration):
""" Estimate number of files, file size and bandwidth needed for incoherentstokes """ Estimate storage size and bandwidth needed for Incoherent Stokes (i.e. is)
data products. Also add SAP properties needed by the propagator.
""" """
if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'): if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'):
logger.info("No incoherent stokes data") logger.info("incoherent stokes data output not enabled")
return (0,0, {}, {}) return None
logger.info("calculate incoherentstokes data size") logger.info("calculate incoherentstokes data size")
size_of_sample = 4 # FP32
incoherent_type = parset.getString(COBALT + 'BeamFormer.IncoherentStokes.which') incoherent_type = parset.getString(COBALT + 'BeamFormer.IncoherentStokes.which')
subbands_per_file = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.subbandsPerFile', 512) subbands_per_file = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.subbandsPerFile', 512)
samples_per_second = self._samples_per_second(parset) samples_per_second = self._samples_per_second(parset)
time_integration_factor = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor') time_integration_factor = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor')
channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) #TODO should these have defaults? size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor
incoherent_channels_per_subband = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.nrChannelsPerSubband', 0) nr_incoherent = len(incoherent_type) # 'I' or 'IQUV' ('XXYY' only possible for coherent stokes)
nr_incoherent = 4 if incoherent_type in ('IQUV',) else 1 # Should this also include XXYY ?
total_nr_stokes = 0 total_nr_stokes = 0
total_files = 0 total_files = 0
max_nr_subbands = 0 max_nr_subbands = 0
sap_files = {} sap_files = []
for sap_nr in xrange(parset.getInt('Observation.nrBeams')): for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
logger.info("checking SAP {}".format(sap_nr)) logger.info("checking SAP {}".format(sap_nr))
...@@ -255,40 +324,113 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -255,40 +324,113 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_subbands = len(subbandList) nr_subbands = len(subbandList)
max_nr_subbands = max(nr_subbands, max_nr_subbands) max_nr_subbands = max(nr_subbands, max_nr_subbands)
nr_files = 0 nr_files = 0
# Atm can have 1 IS TAB per SAP, because its pointing is equal to the SAP pointing.
# (When we support online coh dedisp and on multiple DMs, we can have >1 IS per SAP.)
is_tab_nr = -1 is_tab_nr = -1
total_nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr) for tab_nr in xrange(parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)):
for tab_nr in xrange(total_nr_tabs):
logger.info("checking TAB {}".format(tab_nr)) logger.info("checking TAB {}".format(tab_nr))
if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): #not coherent is incoherent if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): #not coherent is incoherent
logger.info("Found incoherent stokes TAB: %i" % tab_nr) logger.info("Found incoherent stokes TAB: %i" % tab_nr)
if is_tab_nr >= 0: if is_tab_nr >= 0:
logger.warning("TAB nr %i can't be incoherent as %i already is!" % (tab_nr, is_tab_nr)) # Could get here to produce >1 IS TAB copies, maybe for some software test
# TODO We need to generate an error here, or preferably check before we get here logger.error("SAP %i: >1 incoherent TAB not supported: TAB nrs %i and %i" % (sap_nr, tab_nr, is_tab_nr))
else: return None
is_tab_nr = tab_nr is_tab_nr = tab_nr
total_nr_stokes += nr_incoherent total_nr_stokes += nr_incoherent
nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file))) nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file)))
if nr_files: if nr_files:
sap_files[sap_nr] = {'nr_of_is_files': nr_files, 'is_tab_nr': is_tab_nr} sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_is_files': nr_files,
'nr_of_tabs': 1,
'is_tab_nr': is_tab_nr}})
total_files += nr_files total_files += nr_files
if total_files == 0:
if incoherent_channels_per_subband > 0: logger.warn("Incoherent Stokes data output enabled, but no IS files across all SAPs")
channel_integration_factor = channels_per_subband / incoherent_channels_per_subband return None
else: # NOTE: TABs in different SAPs can have different subband lists and if a TAB is split into parts (i.e. multiple files),
channel_integration_factor = 1 # the last TAB part may contain fewer subbands. But simplify: compute a single (i.e. max) file size for all TABs or TAB parts.
if total_files > 0:
nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
size_per_subband = int((samples_per_second * 4) / time_integration_factor / channel_integration_factor * duration) file_size = int(nr_subbands_per_file * size_per_subband) # bytes
size_per_file = nr_subbands_per_file * size_per_subband bandwidth = int(ceil(8 * file_size / duration)) # bits/second
idents = parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications')
output_files = {'is': {'nr_of_is_files': total_files, 'is_file_size': file_size,
'identifications': idents},
'saps': sap_files}
root_resource_group = parset.getString(DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName')
if self._hasGlobalStorage(root_resource_group):
# global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
file_size *= total_files
bandwidth *= total_files
total_files = 1
estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
'count': total_files, 'root_resource_groups': [root_resource_group],
'output_files': output_files}
estimate['output_files']['is']['nr_of_is_stokes'] = nr_incoherent
logger.debug("Incoherent Stokes data estimate: {}".format(estimate))
return estimate
def _merge_estimates(self, estimates):
""" Estimates can only be merged if same root_resource_groups and bandwidth and storage,
or if the root_resource_groups have a (single) global filesystem.
NOTE: assumed good enough to only merge conseq pairs, not all pairs.
"""
i = 1
while i < len(estimates): # careful iterating while modifying
if estimates[i-1]['root_resource_groups'] == estimates[i]['root_resource_groups'] and \
((estimates[i-1]['resource_types']['bandwidth'] == estimates[i]['resource_types']['bandwidth'] and \
estimates[i-1]['resource_types']['storage'] == estimates[i]['resource_types']['storage']) or \
all(self._hasGlobalStorage(rg) for rg in root_resource_groups)
):
# Mergeable. Add uv, cs, is from estimates[i] into estimates[i-1]
if 'uv' in estimates[i]['output_files']:
estimates[i-1]['output_files']['uv'] = estimates[i]['output_files']['uv']
if 'cs' in estimates[i]['output_files']:
estimates[i-1]['output_files']['cs'] = estimates[i]['output_files']['cs']
if 'is' in estimates[i]['output_files']:
estimates[i-1]['output_files']['is'] = estimates[i]['output_files']['is']
# Merge saps. Saps are listed in order, but a sap_nr may be missing.
# Extend one list by the other, sort, then merge any conseq duplicates.
estimates[i-1]['output_files']['saps'].extend(estimates[i]['output_files']['saps'])
estimates[i-1]['output_files']['saps'].sort(key=lambda sap: sap['sap_nr'])
j = 1
while j < len(estimates[i-1]['output_files']['saps']):
sap0 = estimates[i-1]['output_files']['saps'][j-1]
sap1 = estimates[i-1]['output_files']['saps'][j ]
if sap0['sap_nr'] == sap1['sap_nr']:
# Sum nr_of_tabs, other keys must be from another data product type
if 'nr_of_tabs' in sap0['properties'] and \
'nr_of_tabs' in sap1['properties']:
sap1['properties']['nr_of_tabs'] += sap0['properties']['nr_of_tabs']
sap0['properties'].update(sap1['properties']) # includes nr_of_tabs
estimates[i-1]['output_files']['saps'].pop(j)
else:
j += 1
if all(self._hasGlobalStorage(rg) for rg in estimates[i]['root_resource_groups']):
# for global fs, collapse regardless
estimates[i-1]['resource_types']['bandwidth'] *= estimates[i-1]['count'] # *= 1, but to be robust and clear
estimates[i-1]['resource_types']['bandwidth'] += estimates[i]['resource_types']['bandwidth'] * estimates[i]['count']
estimates[i-1]['resource_types']['storage'] *= estimates[i-1]['count'] # *= 1, but to be robust and clear
estimates[i-1]['resource_types']['storage'] += estimates[i]['resource_types']['storage'] * estimates[i]['count']
estimates[i-1]['count'] = 1 # already 1, but to be robust and clear
else:
# root_resource_groups and values of bandwidth and storage are equal for both estimates
estimates[i-1]['count'] += estimates[i]['count']
output_files = {'nr_of_is_files': total_files, 'nr_of_is_stokes': nr_incoherent, 'is_file_size': int(size_per_file), 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications')} logger.info('Merged observation resource estimate {} into {}'.format(i, i-1))
logger.info("incoherentstokes: {} files {} bytes each".format(total_files, size_per_file)) estimates.pop(i)
else:
i += 1
total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) # bytes return estimates
total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/sec
return (total_data_size, total_bandwidth, output_files, sap_files)
def _samples_per_second(self, parset): def _samples_per_second(self, parset):
""" set samples per second """ set samples per second
...@@ -316,3 +458,6 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -316,3 +458,6 @@ class ObservationResourceEstimator(BaseResourceEstimator):
logger.info("number of virtual stations = {}".format(nr_virtual_stations)) logger.info("number of virtual stations = {}".format(nr_virtual_stations))
return nr_virtual_stations return nr_virtual_stations
def _hasGlobalStorage(self, resource_group_name):
return resource_group_name == 'CEP4'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment