Skip to content
Snippets Groups Projects
Commit 82a89225 authored by Alexander van Amesfoort's avatar Alexander van Amesfoort
Browse files

Task #9939: RA estimator: address Adriaan's review comments + a few small cleanups of my own

parent 8862c83f
No related branches found
No related tags found
No related merge requests found
Showing
with 188 additions and 162 deletions
......@@ -46,7 +46,7 @@ class BasePipelineResourceEstimator(BaseResourceEstimator):
except Exception as e:
logger.error(e)
logger.info("Could not get duration from parset, returning default pipeline duration of 1 hour")
return 3600
return 3600.0
def _getOutputIdentification(self, identifications):
""" For pipeline output, there must be exactly 1 (non-duplicate) identification string per
......
......@@ -34,7 +34,7 @@ logger = logging.getLogger(__name__)
class BaseResourceEstimator(object):
""" Base class for all other resource estimater classes
""" Base class for all other resource estimator classes
"""
def __init__(self, name):
self.name = name
......@@ -51,11 +51,14 @@ class BaseResourceEstimator(object):
return True
def _getDuration(self, start, end):
""" Returns number of fractional seconds as a float(!) (as totalSeconds())
between start and end.
"""
startTime = parseDatetime(start)
endTime = parseDatetime(end)
if startTime >= endTime:
logger.warning("startTime is not before endTime")
return 1 ##TODO To prevent divide by zero later
return 1.0 ##TODO To prevent divide by zero later
return totalSeconds(endTime - startTime)
#TODO check if this makes duration = int(parset.get('duration', 0)) as a key reduntant?
......@@ -81,7 +84,7 @@ class BaseResourceEstimator(object):
input_files[dptype].append(copy.deepcopy(dt_values))
# Observation estimates have resource_count > 1 to be able to assign each output to another resource,
# but that is not supported atm for pipelines. We only use input params to produce parset filenames etc,
# but that is currently not supported for pipelines. We only use input parameters to produce parset filenames etc,
# but not to reserve resources (not covered by resource count). Collapse to implied resource_count of 1.
input_files[dptype][-1]['properties']['nr_of_' + dptype + '_files'] *= predecessor_estimate['resource_count']
return True
......@@ -90,7 +93,7 @@ class BaseResourceEstimator(object):
def get_inputs_from_predecessors(self, predecessor_estimates, identifications, dptype):
""" Return copy of parts with dptype in predecessor_estimates matching identifications
If any of any of identifications could not be found, the empty dict is returned.
If any of identifications could not be found, the empty dict is returned.
dptype is one of the observation/pipeline data product types, e.g. 'uv', 'cs', 'pulp', ...
No duplicates in the identifications iterable!
......
......@@ -63,8 +63,15 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator):
'resource_count': 20, 'root_resource_group': 'CEP4',
'output_files': {
'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps',
'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}},
{'sap_nr': 3, 'identification': 'mom.G777955.B2.1.C.SAP003.uv.dps',
'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0}}
]
}
},
{
'resource_types': {'bandwidth': 286331153, 'storage': 1073741824}, # per 'uv' dict
'resource_count': 20, 'root_resource_group': 'CEP4',
'output_files': {
'uv': [{'sap_nr': 3, 'identification': 'mom.G777955.B2.1.C.SAP003.uv.dps',
'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 20}}
]
}
......@@ -81,7 +88,8 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator):
'resource_types': {'bandwidth': 2236995 * 20, 'storage': 67109864 * 20},
'resource_count': 1, 'root_resource_group': 'CEP4',
# input resources not (yet) allocated: bandwidth only, but coupled to specific storage resource
# Note that the 2 predecessor estimates have been converted into an input 'uv' list. This works,
# as long as input resources are not (yet) scheduled. Currently, resource_* values apply to output_files only.
'input_files': {
'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps', # w/ sap only if predecessor is an observation
'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 20, 'start_sb_nr': 0}},
......@@ -110,19 +118,19 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator):
For each estimate, the total output_files resources to be claimed is resource_count * resources_types.
Thus resource_types is a total across all output_files content. The idea is to keep this
singular per data product type (inner list len 1), but for pipelines this is not possible atm.
singular per data product type (inner list size 1), but for pipelines this is currently not possible.
Note that atm input_files resources are not included or claimed.
However, input_files properties must be added to resource claims to later generate the parset.
Note that input_files resources are currently not included or claimed.
However, input_files properties must be added to resource claims to later generate parset values.
This caveat must be fixed at some point, but until then, we cannot have input_files-only estimates.
(After it is fixed, we should not have that either; it makes no sense.)
(After it is fixed, we should not have input_files-only estimates either; it makes no sense.)
For pipelines we don't support output to multiple storage areas atm, so resource_count is 1.
For pipelines we currently do not support output to multiple storage areas, so resource_count is 1.
We still have to deal with input_files from an observation with >1 SAP (used for the pulsar pipeline).
For this case, we generate 1 estimate, but use a list per data product type (e.g. 'uv': [...]).
Also, we may need multiple data product types in one pipeline estimate, but there the reason
is that e.g. 'uv' and 'im' file(s) belong together, so we must produce one estimate per pair,
(but again, it's a pipeline so atm it is collapsed to a single estimate, i.e. resource_count 1).
is that e.g. 'uv' and 'im' files belong together, so we produce one estimate per pair,
(but again, it is a pipeline so currently it is collapsed to a single estimate, thus resource_count 1).
The inner data product type list can be removed once pipelines also use resource_count > 1.
Some RA_Services design aspects work well. Others fail to capture the underlying concepts close enough, hence inelegance.
......@@ -147,10 +155,11 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator):
parset.getString('Observation.stopTime'))
input_idents_uv = parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications')
input_files = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv')
if not input_files:
input_files_uv = self.get_inputs_from_predecessors(predecessor_estimates, input_idents_uv, 'uv')
if not input_files_uv:
logger.error('Missing uv dataproducts in predecessor output_files')
result['errors'].append('Missing uv dataproducts in predecessor output_files')
input_files = input_files_uv
have_im_input = parset.getBool(DATAPRODUCTS + 'Input_InstrumentModel.enabled')
if have_im_input:
......@@ -166,8 +175,8 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator):
estimate = {'input_files': input_files}
# NOTE: input bandwidth is not included in the resulting estimate atm.
# Proper input bandwidth estimation has limited use atm and is tricky, because of pipeline duration est, tmp files,
# NOTE: input bandwidth is currently not included in the resulting estimate.
# Proper input bandwidth estimation has limited use currently and is tricky, because of pipeline duration estimation, tmp files,
# multiple passes, nr nodes and caching, but for sure also because bandwidth must be tied to *predecessor* storage!
#input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName')
......@@ -179,19 +188,20 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator):
output_cluster_im = parset.getString(DATAPRODUCTS + 'Output_InstrumentModel.storageClusterName')
if output_cluster_uv != output_cluster_im:
logger.warn('storageClusterName differs between uv: \'%s\' and im: \'%s\': to be packed in 1 estimate, so ignoring \'im\' storageClusterName',
logger.error('Output_InstrumentModel is enabled, but its storageClusterName \'%s\' differs from Output_Correlated.storageClusterName \'%s\'',
output_cluster_uv, output_cluster_im)
result['errors'].append('Output_InstrumentModel is enabled, but its storageClusterName \'%s\' differs from Output_Correlated.storageClusterName \'%s\'' % (output_cluster_im, output_cluster_uv))
# Observations can have multiple output estimates, but atm pipelines do not.
# (Reason: incomplete info avail and effective assigner claim merging is harder)
# As long as this is the case, try to do a best effort to map any predecessor (obs or pipeline) to single estimate output.
# Observations can have multiple output estimates, but currently pipelines do not.
# (Reason: incomplete info available and effective assigner claim merging is harder)
# As long as this is the case, try to do a best effort to map any predecessor (observation or pipeline) to single estimate output.
nr_input_files = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']])
# Assume all uv file sizes are the same size as in dict 0. For uv data, we never had pipelines with >1 dict,
# but this could be meaningful when averaging multiple SAPs in 1 go (and no further processing steps).
# (Never done, since subsequent pipeline steps must then also work on all SAPs. But averaging could be the last step.)
# The potential other case is >1 dict from different obs with different file sizes.
# In general, this requires >1 output est dict, which the estimate fmt allows, but atm is only used for observations.
# The potential other case is >1 dict from different observations with different file sizes.
# In general, this requires >1 output estimate dict, which the estimate format allows, but is currently only used for observations.
uv_input_file_size = input_files['uv'][0]['properties']['uv_file_size']
# For start_sb_nr, take the minimum of all start_sb_nr values.
......@@ -201,8 +211,8 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator):
start_sb_nr = min([uv_dict['properties']['start_sb_nr'] for uv_dict in input_files['uv']])
# TODO: This output file size calculation comes from the (old) Scheduler without explaining comments.
# The reason why it isn't a simple div, is that parts of the metadata are not reduced in size (and casacore storage mgrs).
# With reduction_factor 1, computed output size increases by 53%. Casacore storage mgrs may change size, but that much?!?
# The reason why it isn't a simple division, is that parts of the metadata are not reduced in size (and casacore storage managers).
# With reduction_factor 1, computed output size increases by 53%... Casacore storage managers may change size, but that much?!?
# If you can figure out what/how, please fix this calculation. Avoid unnamed magic values and document why!
logger.debug("calculate correlated data size")
new_size = uv_input_file_size / float(reduction_factor)
......@@ -227,15 +237,15 @@ class CalibrationPipelineResourceEstimator(BasePipelineResourceEstimator):
# Need to split averaging pipeline and calibration pipeline
data_size += im_file_size
data_size *= nr_output_files # bytes
if data_size:
bandwidth = int(ceil(8 * data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size}
total_data_size = data_size * nr_output_files # bytes
if total_data_size:
bandwidth = int(ceil(8 * total_data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': total_data_size}
estimate['resource_count'] = 1
estimate['root_resource_group'] = output_cluster_uv
else:
logger.error('An estimate of zero was calculated!')
result['errors'].append('An estimate of zero was calculated!')
logger.error('Estimated total data size is zero!')
result['errors'].append('Estimated total data size is zero!')
result['estimates'].append(estimate)
......
......@@ -89,14 +89,14 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator):
estimate = {'input_files': input_files}
# NOTE: input bandwidth is not included in the resulting estimate atm.
# NOTE: input bandwidth is currently not included in the resulting estimate.
# Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ...
#input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName')
output_ident_img = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_SkyImage.identifications') )
output_cluster_img = parset.getString(DATAPRODUCTS + 'Output_SkyImage.storageClusterName')
# See the calibration pipeline estimator for why this is done in this way atm.
# See the calibration pipeline estimator for why this is currently done this way.
nr_input_subbands = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']])
uv_file_size = input_files['uv'][0]['properties']['uv_file_size']
if nr_input_subbands % (subbands_per_image * slices_per_image) > 0:
......@@ -112,16 +112,15 @@ class ImagePipelineResourceEstimator(BasePipelineResourceEstimator):
'properties': {'nr_of_img_files': nr_images,
'img_file_size': img_file_size}}]}
# count total data size
data_size = nr_images * img_file_size # bytes
if data_size:
bandwidth = int(ceil(8 * data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size}
total_data_size = nr_images * img_file_size # bytes
if total_data_size:
bandwidth = int(ceil(8 * total_data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': total_data_size}
estimate['resource_count'] = 1
estimate['root_resource_group'] = output_cluster_img
else:
logger.error('An estimate of zero was calculated!')
result['errors'].append('An estimate of zero was calculated!')
logger.error('Estimated total data size is zero!')
result['errors'].append('Estimated total data size is zero!')
result['estimates'].append(estimate)
......
......@@ -89,14 +89,14 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator):
estimate = {'input_files': input_files}
# NOTE: input bandwidth is not included in the resulting estimate atm.
# NOTE: input bandwidth is currently not included in the resulting estimate.
# Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ...
#input_cluster_uv = parset.getString(DATAPRODUCTS + 'Input_Correlated.storageClusterName')
output_ident_uv = self._getOutputIdentification( parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications') )
output_cluster_uv = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName')
# See the calibration pipeline estimator for why this is done in this way atm.
# See the calibration pipeline estimator for why this is currently done this way.
nr_input_files = sum([uv_dict['properties']['nr_of_uv_files'] for uv_dict in input_files['uv']])
uv_input_file_size = input_files['uv'][0]['properties']['uv_file_size']
start_sb_nr = min([uv_dict['properties']['start_sb_nr'] for uv_dict in input_files['uv']])
......@@ -119,16 +119,15 @@ class LongBaselinePipelineResourceEstimator(BasePipelineResourceEstimator):
'uv_file_size': uv_output_file_size,
'start_sbg_nr': start_sbg_nr}}]}
# count total data size
data_size = nr_output_files * uv_output_file_size # bytes
if data_size:
bandwidth = int(ceil(8 * data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size}
total_data_size = nr_output_files * uv_output_file_size # bytes
if total_data_size:
bandwidth = int(ceil(8 * total_data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': total_data_size}
estimate['resource_count'] = 1
estimate['root_resource_group'] = output_cluster_uv
else:
logger.error('An estimate of zero was calculated!')
result['errors'].append('An estimate of zero was calculated!')
logger.error('Estimated total data size is zero!')
result['errors'].append('Estimated total data size is zero!')
result['estimates'].append(estimate)
......
......@@ -21,6 +21,7 @@
# $Id$
import logging
import pprint
from math import ceil
from base_resource_estimator import BaseResourceEstimator
from lofar.stationmodel.antennasets_parser import AntennaSetsParser
......@@ -66,17 +67,17 @@ class ObservationResourceEstimator(BaseResourceEstimator):
def _calculate(self, parset, predecessor_estimates=[]):
""" Calculate the resources needed by the different data product types that can be in a single observation.
The predecessor_estimates arg is just to implement the same interface as pipelines. Observations have no predecessor.
The predecessor_estimates argument is just to implement the same interface as pipelines. Observations have no predecessor.
The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters.
The following return value example is for an observation duration of 240.0 s and 3 data product types for 2 clusters.
NOTE: 'nr_of_XX_files' is for that SAP estimate. The total is thus times the 'resource_count'.
'nr_of_cs_parts' is for a full CS TAB (per stokes component) in that SAP; not per estimate, which may still describe one part.
See the calibration pipeline estimator for some explanation on why parts of this format are needed atm. It also has input_files.
See the calibration pipeline estimator for some explanation on why parts of this format are currently needed. It also has input_files.
{
'errors': [],
'estimates': [{
'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # for each uv output data prod (thus the total is times the resource_count val)
'resource_types': {'bandwidth': 35791395, 'storage': 1073741824}, # for each uv output data product (thus the total is times the resource_count value)
'resource_count': 20, 'root_resource_group': 'CEP4',
'output_files': {
'uv': [{'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.uv.dps',
......@@ -94,7 +95,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
'uv': [{'sap_nr': 2, 'identification': 'mom.G777955.B2.1.C.SAP002.uv.dps',
'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}}]
}
}, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # for each quad (4 stokes) of cs output tab part (thus the total is times the resource_count val)
}, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # for each quad (4 stokes) of cs output tab part (thus the total is times the resource_count value)
'resource_count': 34, 'root_resource_group': 'DRAGNET',
'output_files': {
'cs': [{'sap_nr': 0, 'identification': 'mom.G777955.B2.1.C.SAP000.cs.dps',
......@@ -108,79 +109,82 @@ class ObservationResourceEstimator(BaseResourceEstimator):
'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, 'nr_of_cs_stokes': 4,
'nr_of_cs_parts': 1, 'is_tab_nr': 0}}] # parts per tab for this sap
}
}, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # for each 'is' output tab part (thus the total is times the resource_count val)
}, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # for each 'is' output tab part (thus the total is times the resource_count value)
'resource_count': 1, 'root_resource_group': 'DRAGNET',
'output_files': {
'is': [{'sap_nr': 1, 'identification': 'mom.G777955.B2.1.C.SAP001.is.dps',
'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, 'nr_of_is_stokes': 1,
'is_tab_nr': 0}}] # IS can have >1 parts, but atm max 1 IS TAB per SAP
'is_tab_nr': 0}}] # IS can have >1 parts, but currently max 1 IS TAB per SAP
}
}]
}
"""
logger.info("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset)
# NOTE: obs estimates appear quite accurate. Diffs come mostly from Observation.stopTime
# NOTE: observation estimates appear quite accurate. Most of the difference comes from Observation.stopTime
# being planned instead of real stop time, because of Cobalt block size not being exactly 1.0 s.
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
duration = self._getDuration(parset.getString('Observation.startTime'),
parset.getString('Observation.stopTime'))
errors = []
estimates = []
uv_estimates = self.correlated(parset, duration)
if uv_estimates is None:
errors.append('empty UV resource estimates!')
logger.error('resource estimates for UV data is empty!')
else:
estimates.extend(uv_estimates)
cs_estimates = self.coherentstokes(parset, duration)
if cs_estimates is None:
errors.append('empty CS resource estimate!')
logger.error('resource estimate for CS data is empty!')
else:
estimates.extend(cs_estimates)
is_estimates = self.incoherentstokes(parset, duration)
if is_estimates is None:
errors.append('empty IS resource estimate!')
logger.error('resource estimate for IS data is empty!')
else:
estimates.extend(is_estimates)
station_estimates = self.stations(parset)
if station_estimates is None:
errors.append('empty STATION resource estimate!')
logger.error('resource estimate for STATIONS is empty!')
else:
estimates.extend(station_estimates)
try:
if parset.getBool('Observation.DataProducts.Output_Correlated.enabled'):
estimates.extend(self.correlated(parset, duration))
except ValueError as exc:
logger.error(exc)
errors.append(str(exc))
try:
if parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'):
estimates.extend(self.coherentstokes(parset, duration))
except ValueError as exc:
logger.error(exc)
errors.append(str(exc))
try:
if parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'):
estimates.extend(self.incoherentstokes(parset, duration))
except ValueError as exc:
logger.error(exc)
errors.append(str(exc))
if not estimates:
errors.append('Produced observation resource estimate list is empty!')
logger.error('empty observation resource estimate list!')
logger.error('no data product estimates in observation resource estimate list!')
errors.append('Produced observation resource estimate list has no data product estimates!')
try:
estimates.extend(self.stations(parset))
except ValueError as exc:
logger.error(exc)
errors.append(str(exc))
logger.debug('Observation resource estimate(s): {}'.format(estimates))
logger.debug('Observation resource estimates:\n' + pprint.pformat(estimates))
result = {'errors': errors, 'estimates': estimates}
return result
def correlated(self, parset, duration):
""" Estimate storage size and bandwidth needed for correlated (i.e. uv)
""" Estimate storage size and bandwidth needed for correlated ('uv')
data products. Also add SAP properties needed by the propagator.
Return list of estimates, max 1 SAP per estimate (easier for assigner).
The duration argument is a float in (fractional) seconds.
Return list of estimates, max 1 SAP per estimate (easier for assigner),
or raise ValueError on error.
"""
if not parset.getBool('Observation.DataProducts.Output_Correlated.enabled'):
logger.info("correlated data output not enabled")
return []
logger.info("calculating correlated data size")
storage_unit = 512 # all sizes in bytes
size_of_header = 512
size_of_overhead = 600000 # COBALT parset in MS HISTORY subtable + misc
size_of_short = 2
size_of_visib = 8 # a visibility is stored as a complex FP32
size_of_visib = 8 # a visibility is stored as a std::complex<float>
nr_polarizations = 2
channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) # defaults as in COBALT
integration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1)
nr_virtual_stations = self._virtual_stations(parset)
# Reflects MeasurementSets produced by the casacore LOFAR storage manager (LofarStMan)
# The sub-expression '+ val-1) / val' computes a rounded (positive) integer division.
integrated_seconds = int(duration / integration_time)
nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1) / 2
data_size = (nr_baselines * channels_per_subband * nr_polarizations * nr_polarizations * \
......@@ -194,8 +198,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_saps = parset.getInt('Observation.nrBeams')
if nr_saps < 1:
logger.error("Correlated data output enabled, but nrBeams < 1")
return None
raise ValueError("Correlated data output enabled, but nrBeams < 1")
# Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP.
# Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering.
......@@ -204,44 +207,44 @@ class ObservationResourceEstimator(BaseResourceEstimator):
total_files = 0 # sum of all subbands in all digital beams
estimates = []
for sap_nr in xrange(nr_saps):
for sap_nr in range(nr_saps):
subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
nr_subbands = len(subbandList)
if nr_subbands == 0:
# Replace here by 'continue' (+ check total_files > 0 at the end) once we support separate subband lists for UV, CS, IS
logger.error("Correlated data output enabled, but empty subband list for sap %d", sap_nr)
return None
raise ValueError("Correlated data output enabled, but empty subband list for sap %d" % sap_nr)
est = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
'resource_count': nr_subbands,
'root_resource_group': root_resource_group,
'output_files': {'uv': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr],
'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # i.e. total nr_of_uv_files is resource_count times 1
'properties': {'uv_file_size': file_size, 'nr_of_uv_files': 1, # thus total nr_of_uv_files is resource_count times 1
'start_sb_nr': total_files}}]}}
total_files += nr_subbands
estimates.append(est)
logger.debug("Correlated data estimates: {}".format(estimates))
logger.debug("Correlated data estimates:\n" + pprint.pformat(estimates))
return estimates
def coherentstokes(self, parset, duration):
""" Estimate storage size and bandwidth needed for Coherent Stokes (i.e. cs)
""" Estimate storage size and bandwidth needed for Coherent Stokes ('cs')
data products. Also add SAP properties needed by the propagator.
Return list of estimates, max 1 SAP per estimate (easier for assigner), or None on error.
The duration argument is a float in (fractional) seconds.
Return list of estimates, max 1 SAP per estimate (easier for assigner),
or raise ValueError on error.
"""
if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'):
logger.info("coherent stokes data output not enabled")
return []
logger.info("calculate coherent stokes data size")
size_of_sample = 4 # FP32
size_of_sample = 4 # single precision float
coherent_type = parset.getString(COBALT + 'BeamFormer.CoherentStokes.which')
subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512)
if subbands_per_file < 0:
raise ValueError('BeamFormer.CoherentStokes.subbandsPerFile may not be negative, but is %d' % subbands_per_file)
if subbands_per_file == 0:
subbands_per_file = 512
samples_per_second = self._samples_per_second(parset)
time_integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor')
# Note that complex voltages (XXYY) cannot be meaningfully integrated (i.e. time_integration_factor 1)
# Note that complex voltages (XXYY) cannot be meaningfully integrated (time_integration_factor 1)
size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor
nr_coherent = len(coherent_type) # 'I' or 'IQUV' or 'XXYY'
......@@ -250,8 +253,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_saps = parset.getInt('Observation.nrBeams')
if nr_saps < 1:
logger.error("Coherent Stokes data output enabled, but nrBeams < 1")
return None
raise ValueError("Coherent Stokes data output enabled, but nrBeams < 1")
# Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP.
# Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering.
......@@ -259,31 +261,29 @@ class ObservationResourceEstimator(BaseResourceEstimator):
sap_idents = self._sap_identifications(identifications, nr_saps)
estimates = []
for sap_nr in xrange(nr_saps):
for sap_nr in range(nr_saps):
logger.info("checking SAP {}".format(sap_nr))
subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
nr_subbands = len(subbandList)
if nr_subbands == 0:
logger.error("Coherent Stokes data output enabled, but empty subband list for sap %d", sap_nr)
return None
raise ValueError("Coherent Stokes data output enabled, but empty subband list for sap %d" % sap_nr)
nr_subbands_per_file = min(subbands_per_file, nr_subbands)
nr_coherent_tabs = 0
is_tab_nr = -1
is_tab_nr = None
nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)
for tab_nr in xrange(nr_tabs):
for tab_nr in range(nr_tabs):
if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)):
is_tab_nr = tab_nr
logger.info("coherentstokes: skipping incoherent tab")
continue
nr_coherent_tabs += 1
logger.info("added %d regular tabs", nr_coherent_tabs)
logger.info("added %d coherent tabs before considering tab rings and fly's eye tabs", nr_coherent_tabs)
nr_tab_rings = parset.getInt('Observation.Beam[%d].nrTabRings' % sap_nr)
if nr_tab_rings < 0:
logger.error("SAP %d: nr of tab rings is %d", sap_nr, nr_tab_rings)
return None
raise ValueError("SAP %d: nr of tab rings is < 0: %d" % (sap_nr, nr_tab_rings))
elif nr_tab_rings > 0:
nr_tabs = (3 * nr_tab_rings * (nr_tab_rings + 1) + 1)
nr_coherent_tabs += nr_tabs
......@@ -295,15 +295,14 @@ class ObservationResourceEstimator(BaseResourceEstimator):
logger.info("added %d fly's eye tabs", nr_tabs)
if nr_coherent_tabs == 0:
logger.error("Coherent Stokes data output enabled, but no coherent tabs for sap %d", sap_nr)
return None
raise ValueError("Coherent Stokes data output enabled, but no coherent tabs for sap %d" % sap_nr)
# Keep XXYY/IQUV together (>1 parts still possible).
# Else translator to parset filenames cannot know which stokes (nr_of_XX_stokes property too coarse).
# Also for complex voltages (XXYY) only: pipeline needs all 4 XXYY accessible from the same node.
#
# NOTE: If a TAB is split into parts, then the last TAB part may contain fewer subbands.
# Simplify: compute a single (i.e. max) file size for all TABs or TAB parts.
# Simplify: compute a single (max) file size for all TABs or TAB parts.
file_size = int(nr_subbands_per_file * size_per_subband) # bytes
storage = file_size * nr_coherent # bytes
bandwidth = int(ceil(8 * storage / duration)) # bits/second
......@@ -315,26 +314,27 @@ class ObservationResourceEstimator(BaseResourceEstimator):
'output_files': {'cs': [{'sap_nr': sap_nr, 'identification': sap_idents[sap_nr],
'properties': {'cs_file_size': file_size, 'nr_of_cs_files': nr_coherent,
'nr_of_cs_stokes': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}]}}
if is_tab_nr != -1: # translator to filenames needs to know: it may not have all CS+IS info in one claim
if is_tab_nr is not None: # translator to filenames needs to know: it may not have all CS+IS info in one claim
est['output_files']['cs'][0]['properties']['is_tab_nr'] = is_tab_nr
estimates.append(est)
logger.debug("Coherent Stokes data estimates: {}".format(estimates))
logger.debug("Coherent Stokes data estimates:\n" + pprint.pformat(estimates))
return estimates
def incoherentstokes(self, parset, duration):
""" Estimate storage size and bandwidth needed for Incoherent Stokes (i.e. is)
""" Estimate storage size and bandwidth needed for Incoherent Stokes ('is')
data products. Also add SAP properties needed by the propagator.
Return list of estimates, max 1 SAP per estimate (easier for assigner).
The duration argument is a float in (fractional) seconds.
Return list of estimates, max 1 SAP per estimate (easier for assigner),
or raise ValueError on error.
"""
if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'):
logger.info("incoherent stokes data output not enabled")
return []
logger.info("calculate incoherent stokes data size")
size_of_sample = 4 # FP32
size_of_sample = 4 # single precision float
incoherent_type = parset.getString(COBALT + 'BeamFormer.IncoherentStokes.which')
subbands_per_file = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.subbandsPerFile', 512)
if subbands_per_file < 0:
raise ValueError('BeamFormer.IncoherentStokes.subbandsPerFile may not be negative, but is %d' % subbands_per_file)
if subbands_per_file == 0:
subbands_per_file = 512
samples_per_second = self._samples_per_second(parset)
......@@ -346,8 +346,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_saps = parset.getInt('Observation.nrBeams')
if nr_saps < 1:
logger.error("Incoherent Stokes data output enabled, but nrBeams < 1")
return None
raise ValueError("Incoherent Stokes data output enabled, but nrBeams < 1")
# Estimates may differ per SAP for CS/IS. Decided to always produce a separate estimate per SAP.
# Hence, need to annotate each SAP with the right identifications for pipeline predecessor input filtering.
......@@ -355,13 +354,12 @@ class ObservationResourceEstimator(BaseResourceEstimator):
sap_idents = self._sap_identifications(identifications, nr_saps)
estimates = []
for sap_nr in xrange(nr_saps):
for sap_nr in range(nr_saps):
logger.info("checking SAP {}".format(sap_nr))
subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
nr_subbands = len(subbandList)
if nr_subbands == 0:
logger.error("Incoherent Stokes data output enabled, but empty subband list for sap %d", sap_nr)
return None
raise ValueError("Incoherent Stokes data output enabled, but empty subband list for sap %d" % sap_nr)
nr_subbands_per_file = min(subbands_per_file, nr_subbands)
# Atm can have 1 IS TAB per SAP, because its pointing is equal to the SAP pointing.
......@@ -369,27 +367,25 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_incoherent_tabs = 0
nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)
for tab_nr in xrange(nr_tabs):
for tab_nr in range(nr_tabs):
if parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)):
continue
if nr_incoherent_tabs > 0:
# Could get here to produce >1 IS TAB copies, maybe for some software test
logger.error("SAP %i: >1 incoherent TAB not supported: TAB nrs %i and %i" % (sap_nr, tab_nr, is_tab_nr))
return None
raise ValueError("SAP %i: >1 incoherent TAB not supported: TAB nrs %i and %i" % (sap_nr, tab_nr, is_tab_nr))
is_tab_nr = tab_nr
nr_incoherent_tabs += 1
logger.info("added %d incoherent tab(s)", nr_incoherent_tabs)
if nr_incoherent_tabs == 0:
logger.error("Incoherent Stokes data output enabled, but no incoherent tabs for sap %d", sap_nr)
return None
raise ValueError("Incoherent Stokes data output enabled, but no incoherent tabs for sap %d" % sap_nr)
# Keep IQUV together (>1 parts still possible).
# Else translator to parset filenames cannot know which stokes (nr_of_XX_stokes property too coarse).
#
# NOTE: If a TAB is split into parts, then the last TAB part may contain fewer subbands.
# Simplify: compute a single (i.e. max) file size for all TABs or TAB parts.
# Simplify: compute a single (max) file size for all TABs or TAB parts.
file_size = int(nr_subbands_per_file * size_per_subband) # bytes
storage = file_size * nr_incoherent # bytes
bandwidth = int(ceil(8 * storage / duration)) # bits/second
......@@ -403,7 +399,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
'nr_of_is_stokes': nr_incoherent, 'is_tab_nr': is_tab_nr}}]}}
estimates.append(est)
logger.debug("Incoherent Stokes data estimates: {}".format(estimates))
logger.debug("Incoherent Stokes data estimates:\n" + pprint.pformat(estimates))
return estimates
def _samples_per_second(self, parset):
......@@ -477,11 +473,13 @@ class ObservationResourceEstimator(BaseResourceEstimator):
""" Estimate required RSPs and RCUs per station.
One or two RSP boards are returned per station depending on antennaset.
RCUs are encoded as a bitfield, to be able to tell which RCUs are actually neeeded.
Return list of estimates.
Return list of estimates, or raise ValueError on error.
"""
estimates = []
antennaset = parset.getString('Observation.antennaSet')
stationset = parset.getStringVector('Observation.VirtualInstrument.stationList')
if not stationset:
raise ValueError("Observation.VirtualInstrument.stationList is empty")
rculists = self.asp.get_receiver_units_configuration_per_station(antennaset, stationset)
for station in stationset:
......@@ -526,7 +524,8 @@ class ObservationResourceEstimator(BaseResourceEstimator):
def _required_rsps(self, station, antennaset, parset):
"""
Takes station name and list of antennafields.
Returns list with one or both required rsps and number of channelbits.
Returns list with one or both required rsps and number of channelbits,
or raises ValueError on error.
"""
if station.startswith('CS'):
required_rsps = ['RSP0'] # default
......@@ -538,14 +537,20 @@ class ObservationResourceEstimator(BaseResourceEstimator):
required_rsps = ['RSP'] # default for non-core stations
nr_saps = parset.getInt('Observation.nrBeams')
if nr_saps < 1:
raise ValueError('Observation.nrBeams must be at least 1, but is %d' % nr_saps)
subBandList = []
for nr in range(nr_saps):
key = 'Observation.Beam['+str(nr)+'].subbandList'
sblist = parset.getStringVector(key)
if not sblist:
raise ValueError("%s is empty" % key)
subBandList.extend(sblist)
nrSubbands = len(subBandList)
nrBitsPerSample = parset.getInt('Observation.nrBitsPerSample')
if nrBitsPerSample != 16 and nrBitsPerSample != 8 and nrBitsPerSample != 4:
raise ValueError('Observation.nrBitsPerSample must be 16, 8, or 4, but is %d' % nrBitsPerSample)
channelbits = nrSubbands * nrBitsPerSample
return required_rsps, channelbits
......
......@@ -96,7 +96,7 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator):
estimate = {'input_files': input_files}
# NOTE: input bandwidth is not included in the resulting estimate atm.
# NOTE: input bandwidth is currently not included in the resulting estimate.
# Proper input bandwidth est has limited use and is tricky, because of pipeline duration est, tmp files, multiple passes, nr nodes and caching, ...
#input_cluster_cs = parset.getString(DATAPRODUCTS + 'Input_CoherentStokes.storageClusterName')
#input_cluster_is = parset.getString(DATAPRODUCTS + 'Input_IncoherentStokes.storageClusterName')
......@@ -105,7 +105,7 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator):
output_cluster_pulp = parset.getString(DATAPRODUCTS + 'Output_Pulsar.storageClusterName')
# The pulsar pipeline ('pulp') produces 1 data product per tied-array beam, it seems also for complex voltages (XXYY) and stokes IQUV(?).
# For XXYY it really needs all 4 components at once. For IQUV this is less important, but atm we treat it the same (1 obs output estimate).
# For XXYY it really needs all 4 components at once. For IQUV this is less important, but currently we treat it the same (1 obs output estimate).
# Note that it also produces 1 additional "summary" data product per data product *type* (i.e. 1 for 'cs' and/or 1 for 'is'),
# but the RA_Services sub-system does not know about it. Adding support may be a waste of time(?).
# Currently, RO controlled pulp grabs all inputs given some project name/id(?) and obs id, not from rotspservice generated parset parts.
......@@ -126,15 +126,15 @@ class PulsarPipelineResourceEstimator(BasePipelineResourceEstimator):
'pulp_file_size': pulp_file_size}}]}
# count total data size
data_size = nr_input_files * pulp_file_size
if data_size > 0:
bandwidth = int(ceil(8 * data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': data_size}
total_data_size = nr_input_files * pulp_file_size
if total_data_size > 0:
bandwidth = int(ceil(8 * total_data_size / duration)) # bits/second
estimate['resource_types'] = {'bandwidth': bandwidth, 'storage': total_data_size}
estimate['resource_count'] = 1
estimate['root_resource_group'] = output_cluster_pulp
else:
logger.error('An estimate of zero was calculated!')
result['errors'].append('An estimate of zero was calculated!')
logger.error('Estimated total data size is zero!')
result['errors'].append('Estimated total data size is zero!')
result['estimates'].append(estimate)
......
......@@ -50,7 +50,8 @@ class ReservationResourceEstimator(BaseResourceEstimator):
logger.info("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset)
# NOTE: Observation.stopTime may differ from real stop time, because of Cobalt block size not being exactly 1.0 s.
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
duration = self._getDuration(parset.getString('Observation.startTime'),
parset.getString('Observation.stopTime'))
errors = []
estimates = []
......@@ -85,7 +86,7 @@ class ReservationResourceEstimator(BaseResourceEstimator):
rsps, channelbits = self._max_rsps(station)
bitfield = len(rculists[station])*'1' # claim all RCUs irrespective of use in given antennaset, we actually only need the AntennasetsParser to obatin the numbe rof RCUs
bitfield = len(rculists[station])*'1' # claim all RCUs irrespective of use in given antennaset, we actually only need the AntennasetsParser to obtain the number of RCUs
est = {'resource_types': {'rcu': bitfield},
'resource_count': 1,
......
......@@ -89,13 +89,22 @@ class ResourceEstimatorHandler(MessageHandlerInterface):
predecessor_estimates = []
for branch_otdb_id, branch_estimate in branch_estimates.items():
logger.info('Looking at predecessor %s' % branch_otdb_id)
estimates = branch_estimate.values()[0]['estimates']
if any(['uv' in est['output_files'] and 'im' not in est['output_files'] for est in estimates if 'output_files' in est]): # Not a calibrator pipeline
for est in estimates:
if 'output_files' not in est:
continue
has_uv = 'uv' in est['output_files']
has_im = 'im' in est['output_files']
if has_uv and not has_im: # Not a calibrator pipeline
logger.info('found %s as the target of pipeline %s' % (branch_otdb_id, otdb_id))
predecessor_estimates.extend(estimates)
elif any(['im' in est['output_files'] for est in estimates if 'output_files' in est]):
break
elif has_im:
logger.info('found %s as the calibrator of pipeline %s' % (branch_otdb_id, otdb_id))
predecessor_estimates.extend(estimates)
break
return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, predecessor_estimates), otdb_id)}
if len(branch_estimates) > 1:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment