Skip to content
Snippets Groups Projects
Commit 69e69194 authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #9192: added identification handling for supporting pipelines

parent 45c20874
No related branches found
No related tags found
No related merge requests found
...@@ -63,6 +63,43 @@ class BaseResourceEstimator(object): ...@@ -63,6 +63,43 @@ class BaseResourceEstimator(object):
def _calculate(self, parset, input_files={}): def _calculate(self, parset, input_files={}):
raise NotImplementedError('calculate() in base class is called. Please implement calculate() in your subclass') raise NotImplementedError('calculate() in base class is called. Please implement calculate() in your subclass')
def _filterInputs(input_files, identifications):
"""'input_files':
{'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104,
'identifications':['mom.G355415.B2.1.C.SAP000.uv.dps','mom.G355415.B2.1.C.SAP001.uv.dps','mom.G355415.B2.1.C.SAP002.uv.dps']},
'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}},
{'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}},
{'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}}
]}}}
'identifications': 'mom.G355415.B2.1.C.SAP000.uv.dps','mom.G355415.B2.1.C.SAP001.uv.dps'
"""
output_files = {}
if 'saps' in input_files:
output_files['saps'] = []
for identification in identifications:
for data_type, data_properties in input_files.items():
if data_type == 'saps':
continue
if identification in data_properties['identifications']: #This data_type matches an input identification
output_files[data_type] = data_properties.deepcopy()
if 'SAP' in identification: #Special case for identifications that contain a SAP number
# We would need to make this smarter if we can have the data from multiple SAPs as input.
# Or or different input sources of the same data_type
for s in identification.split('.'): # Find the SAP number
if 'SAP' in s:
try:
sap_nr = int(s[3:])
except:
sap_nr = None
for sap in input_files['saps'].items():
if sap['sap_nr'] == sap_nr:
for sap_data_type, sap_data_value in sap['properties']:
if sap_data_type in data_properties: # We found this SAP's nr_of_<data_type>_files
output_files[data_type][sap_data_type] = sap_data_value # We only count the nr_of_files from this SAP
output_files['saps'].append({'sap_nr': sap_nr, 'properties': {sap_data_type:sap_data_value}})
return output_files
def verify_and_estimate(self, parset, input_files={}): def verify_and_estimate(self, parset, input_files={}):
""" Create estimates for a single process based on its parset and input files""" """ Create estimates for a single process based on its parset and input files"""
if self._checkParsetForRequiredKeys(parset): if self._checkParsetForRequiredKeys(parset):
......
...@@ -40,8 +40,11 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): ...@@ -40,8 +40,11 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator):
self.required_keys = ('Observation.startTime', self.required_keys = ('Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.enabled',
DATAPRODUCTS + 'Input_Correlated.identifications',
DATAPRODUCTS + 'Output_InstrumentModel.enabled', DATAPRODUCTS + 'Output_InstrumentModel.enabled',
DATAPRODUCTS + 'Output_InstrumentModel.identifications',
DATAPRODUCTS + 'Output_Correlated.enabled', DATAPRODUCTS + 'Output_Correlated.enabled',
DATAPRODUCTS + 'Output_Correlated.identifications',
PIPELINE + 'DPPP.demixer.freqstep', PIPELINE + 'DPPP.demixer.freqstep',
PIPELINE + 'DPPP.demixer.timestep') PIPELINE + 'DPPP.demixer.timestep')
...@@ -63,6 +66,9 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): ...@@ -63,6 +66,9 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator):
logger.debug("start estimate '{}'".format(self.name)) logger.debug("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset) logger.info('parset: %s ' % parset)
result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}}
input_files = self._filterInputs(input_files, parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications'))
result['storage']['input_files'] = input_files
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
freq_step = parset.getInt(PIPELINE + 'DPPP.demixer.freqstep', 1) #TODO, should these have defaults? freq_step = parset.getInt(PIPELINE + 'DPPP.demixer.freqstep', 1) #TODO, should these have defaults?
time_step = parset.getInt(PIPELINE + 'DPPP.demixer.timestep', 1) time_step = parset.getInt(PIPELINE + 'DPPP.demixer.timestep', 1)
...@@ -86,12 +92,12 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): ...@@ -86,12 +92,12 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator):
output_file_size = 0.0 output_file_size = 0.0
new_size = input_file_size / float(reduction_factor) new_size = input_file_size / float(reduction_factor)
output_file_size = new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0 output_file_size = new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0
result['storage']['output_files']['uv'] = {'nr_of_uv_files': input_files['uv']['nr_of_uv_files'], 'uv_file_size': int(output_file_size)} result['storage']['output_files']['uv'] = {'nr_of_uv_files': input_files['uv']['nr_of_uv_files'], 'uv_file_size': int(output_file_size), 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')}
logger.info("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size'])) logger.info("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size']))
if parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled'): if parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled'):
logger.info("calculate instrument-model data size") logger.info("calculate instrument-model data size")
result['storage']['output_files']['im'] = {'nr_of_im_files': input_files['uv']['nr_of_uv_files'], 'im_file_size': 1000} # 1 kB was hardcoded in the Scheduler result['storage']['output_files']['im'] = {'nr_of_im_files': input_files['uv']['nr_of_uv_files'], 'im_file_size': 1000, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_InstrumentModel.identifications')} # 1 kB was hardcoded in the Scheduler
logger.info("correlated_im: {} files {} bytes each".format(result['storage']['output_files']['im']['nr_of_im_files'], result['storage']['output_files']['im']['im_file_size'])) logger.info("correlated_im: {} files {} bytes each".format(result['storage']['output_files']['im']['nr_of_im_files'], result['storage']['output_files']['im']['im_file_size']))
# count total data size # count total data size
......
...@@ -42,7 +42,9 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): ...@@ -42,7 +42,9 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator):
self.required_keys = ('Observation.startTime', self.required_keys = ('Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.enabled',
DATAPRODUCTS + 'Input_Correlated.identifications',
DATAPRODUCTS + 'Output_SkyImage.enabled', DATAPRODUCTS + 'Output_SkyImage.enabled',
DATAPRODUCTS + 'Output_SkyImage.identifications',
PIPELINE + 'Imaging.slices_per_image', PIPELINE + 'Imaging.slices_per_image',
PIPELINE + 'Imaging.subbands_per_image') PIPELINE + 'Imaging.subbands_per_image')
...@@ -63,6 +65,9 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): ...@@ -63,6 +65,9 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator):
logger.debug("start estimate '{}'".format(self.name)) logger.debug("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset) logger.info('parset: %s ' % parset)
result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}}
input_files = self._filterInputs(input_files, parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications'))
result['storage']['input_files'] = input_files
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults?
subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0)
...@@ -87,7 +92,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): ...@@ -87,7 +92,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator):
logger.debug("calculate sky image data size") logger.debug("calculate sky image data size")
result['storage']['output_files'] = {} result['storage']['output_files'] = {}
nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) nr_images = nr_input_subbands / (subbands_per_image * slices_per_image)
result['storage']['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler result['storage']['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_SkyImage.identifications')} # 1 kB was hardcoded in the Scheduler
logger.debug("sky_images: {} files {} bytes each".format(result['storage']['output_files']['img']['nr_of_img_files'], result['storage']['output_files']['img']['img_file_size'])) logger.debug("sky_images: {} files {} bytes each".format(result['storage']['output_files']['img']['nr_of_img_files'], result['storage']['output_files']['img']['img_file_size']))
# count total data size # count total data size
......
...@@ -40,7 +40,9 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): ...@@ -40,7 +40,9 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator):
self.required_keys = ('Observation.startTime', self.required_keys = ('Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.enabled',
DATAPRODUCTS + 'Input_Correlated.identifications',
DATAPRODUCTS + 'Output_Correlated.enabled', DATAPRODUCTS + 'Output_Correlated.enabled',
DATAPRODUCTS + 'Output_Correlated.identifications',
PIPELINE + 'LongBaseline.subbandgroups_per_ms', PIPELINE + 'LongBaseline.subbandgroups_per_ms',
PIPELINE + 'LongBaseline.subbands_per_subbandgroup') PIPELINE + 'LongBaseline.subbands_per_subbandgroup')
...@@ -61,6 +63,9 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): ...@@ -61,6 +63,9 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator):
logger.debug("start estimate '{}'".format(self.name)) logger.debug("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset) logger.info('parset: %s ' % parset)
result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}}
input_files = self._filterInputs(input_files, parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications'))
result['storage']['input_files'] = input_files
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults?
subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0)
...@@ -85,7 +90,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): ...@@ -85,7 +90,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator):
logger.debug("calculate correlated data size") logger.debug("calculate correlated data size")
result['storage']['output_files'] = {} result['storage']['output_files'] = {}
nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms)
result['storage']['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler result['storage']['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')} # 1 kB was hardcoded in the Scheduler
logger.debug("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size'])) logger.debug("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size']))
# count total data size # count total data size
......
...@@ -26,6 +26,7 @@ from base_resource_estimator import BaseResourceEstimator ...@@ -26,6 +26,7 @@ from base_resource_estimator import BaseResourceEstimator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DATAPRODUCTS = "Observation.DataProducts."
COBALT = "Observation.ObservationControl.OnlineControl.Cobalt." COBALT = "Observation.ObservationControl.OnlineControl.Cobalt."
class ObservationResourceEstimator(BaseResourceEstimator): class ObservationResourceEstimator(BaseResourceEstimator):
...@@ -44,9 +45,13 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -44,9 +45,13 @@ class ObservationResourceEstimator(BaseResourceEstimator):
COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor', COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor',
COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor', COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor',
'Observation.VirtualInstrument.stationList', 'Observation.VirtualInstrument.stationList',
'Observation.DataProducts.Output_CoherentStokes.enabled', DATAPRODUCTS + 'Output_Correlated.enabled',
DATAPRODUCTS + 'Output_Correlated.identifications',
DATAPRODUCTS + 'Output_CoherentStokes.enabled',
DATAPRODUCTS + 'Output_CoherentStokes.identifications',
COBALT + 'BeamFormer.CoherentStokes.which', COBALT + 'BeamFormer.CoherentStokes.which',
'Observation.DataProducts.Output_IncoherentStokes.enabled', DATAPRODUCTS + 'Output_IncoherentStokes.enabled',
DATAPRODUCTS + 'Output_IncoherentStokes.identifications',
COBALT + 'BeamFormer.IncoherentStokes.which' COBALT + 'BeamFormer.IncoherentStokes.which'
) )
...@@ -126,7 +131,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -126,7 +131,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
sap_files[sap_nr] = {'nr_of_uv_files': nr_files} sap_files[sap_nr] = {'nr_of_uv_files': nr_files}
file_size = int((data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead) file_size = int((data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead)
output_files = {'nr_of_uv_files': total_files, 'uv_file_size': file_size} output_files = {'nr_of_uv_files': total_files, 'uv_file_size': file_size, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')}
logger.info("correlated_uv: {} files {} bytes each".format(total_files, file_size)) logger.info("correlated_uv: {} files {} bytes each".format(total_files, file_size))
total_data_size = int(ceil(file_size * total_files)) # bytes total_data_size = int(ceil(file_size * total_files)) # bytes
...@@ -196,7 +201,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -196,7 +201,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
size_per_file = int(nr_subbands_per_file * size_per_subband) size_per_file = int(nr_subbands_per_file * size_per_subband)
output_files = {'nr_of_cs_files': total_files, 'nr_of_cs_stokes': nr_coherent, 'cs_file_size': size_per_file} output_files = {'nr_of_cs_files': total_files, 'nr_of_cs_stokes': nr_coherent, 'cs_file_size': size_per_file, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications')}
logger.info("coherentstokes: {} files {} bytes each".format(total_files, size_per_file)) logger.info("coherentstokes: {} files {} bytes each".format(total_files, size_per_file))
total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband))
...@@ -251,7 +256,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): ...@@ -251,7 +256,7 @@ class ObservationResourceEstimator(BaseResourceEstimator):
size_per_subband = int((samples_per_second * 4) / time_integration_factor / channel_integration_factor * duration) size_per_subband = int((samples_per_second * 4) / time_integration_factor / channel_integration_factor * duration)
size_per_file = nr_subbands_per_file * size_per_subband size_per_file = nr_subbands_per_file * size_per_subband
output_files = {'nr_of_is_files': total_files, 'nr_of_is_stokes': nr_incoherent, 'is_file_size': int(size_per_file)} output_files = {'nr_of_is_files': total_files, 'nr_of_is_stokes': nr_incoherent, 'is_file_size': int(size_per_file), 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications')}
logger.info("incoherentstokes: {} files {} bytes each".format(total_files, size_per_file)) logger.info("incoherentstokes: {} files {} bytes each".format(total_files, size_per_file))
total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) # bytes total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) # bytes
......
...@@ -40,8 +40,11 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): ...@@ -40,8 +40,11 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator):
self.required_keys = ('Observation.startTime', self.required_keys = ('Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
DATAPRODUCTS + 'Input_CoherentStokes.enabled', DATAPRODUCTS + 'Input_CoherentStokes.enabled',
DATAPRODUCTS + 'Input_CoherentStokes.identifications',
DATAPRODUCTS + 'Input_IncoherentStokes.enabled', DATAPRODUCTS + 'Input_IncoherentStokes.enabled',
DATAPRODUCTS + 'Output_Pulsar.enabled') DATAPRODUCTS + 'Input_IncoherentStokes.identifications',
DATAPRODUCTS + 'Output_Pulsar.enabled',
DATAPRODUCTS + 'Output_Pulsar.identifications')
def _calculate(self, parset, input_files): def _calculate(self, parset, input_files):
""" Estimate for Pulsar Pipeline """ Estimate for Pulsar Pipeline
...@@ -60,6 +63,10 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): ...@@ -60,6 +63,10 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator):
logger.debug("start estimate '{}'".format(self.name)) logger.debug("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset) logger.info('parset: %s ' % parset)
result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}}
input_files = self._filterInputs(input_files, parset.getStringVector(DATAPRODUCTS + 'Input_CoherentStokes.identifications') +
parset.getStringVector(DATAPRODUCTS + 'Input_IncoherentStokes.identifications'))
result['storage']['input_files'] = input_files
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
if not parset.getBool(DATAPRODUCTS + 'Output_Pulsar.enabled'): if not parset.getBool(DATAPRODUCTS + 'Output_Pulsar.enabled'):
...@@ -85,7 +92,7 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): ...@@ -85,7 +92,7 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator):
nr_input_files = input_files['is']['nr_of_is_files'] nr_input_files = input_files['is']['nr_of_is_files']
nr_output_files += nr_input_files nr_output_files += nr_input_files
result['storage']['output_files']['pulp'] = {'nr_of_pulp_files': nr_output_files, 'pulp_file_size': 1000} # 1 kB was hardcoded in the Scheduler result['storage']['output_files']['pulp'] = {'nr_of_pulp_files': nr_output_files, 'pulp_file_size': 1000, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Pulsar.identifications')} # 1 kB was hardcoded in the Scheduler
logger.info(result) logger.info(result)
logger.info("pulsar_pipeline pulp: {} files {} bytes each".format(result['storage']['output_files']['pulp']['nr_of_pulp_files'], result['storage']['output_files']['pulp']['pulp_file_size'])) logger.info("pulsar_pipeline pulp: {} files {} bytes each".format(result['storage']['output_files']['pulp']['nr_of_pulp_files'], result['storage']['output_files']['pulp']['pulp_file_size']))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment