diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index 45b18dc87c7d762be6509a695284bba48e4dc4ce..20604c62da09dbcb3b045402b4069a78a883b14b 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -63,6 +63,43 @@ class BaseResourceEstimator(object): def _calculate(self, parset, input_files={}): raise NotImplementedError('calculate() in base class is called. Please implement calculate() in your subclass') + def _filterInputs(input_files, identifications): + """'input_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104, + 'identifications':['mom.G355415.B2.1.C.SAP000.uv.dps','mom.G355415.B2.1.C.SAP001.uv.dps','mom.G355415.B2.1.C.SAP002.uv.dps']}, + 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, + {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, + {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} + ]}}} + 'identifications': 'mom.G355415.B2.1.C.SAP000.uv.dps','mom.G355415.B2.1.C.SAP001.uv.dps' + """ + output_files = {} + if 'saps' in input_files: + output_files['saps'] = [] + for identification in identifications: + for data_type, data_properties in input_files.items(): + if data_type == 'saps': + continue + if identification in data_properties['identifications']: #This data_type matches an input identification + output_files[data_type] = data_properties.deepcopy() + if 'SAP' in identification: #Special case for identifications that contain a SAP number + # We would need to make this smarter if we can have the data from multiple SAPs as input. + # Or or different input sources of the same data_type + for s in identification.split('.'): # Find the SAP number + if 'SAP' in s: + try: + sap_nr = int(s[3:]) + except: + sap_nr = None + for sap in input_files['saps'].items(): + if sap['sap_nr'] == sap_nr: + for sap_data_type, sap_data_value in sap['properties']: + if sap_data_type in data_properties: # We found this SAP's nr_of_<data_type>_files + output_files[data_type][sap_data_type] = sap_data_value # We only count the nr_of_files from this SAP + output_files['saps'].append({'sap_nr': sap_nr, 'properties': {sap_data_type:sap_data_value}}) + return output_files + + def verify_and_estimate(self, parset, input_files={}): """ Create estimates for a single process based on its parset and input files""" if self._checkParsetForRequiredKeys(parset): diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index a0a6e481ec8edb68192df741d4aff20f3cabf3cc..dcbbc995506e9e3aac18becfb405e2e4e139ad70 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -40,8 +40,11 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Input_Correlated.identifications', DATAPRODUCTS + 'Output_InstrumentModel.enabled', + DATAPRODUCTS + 'Output_InstrumentModel.identifications', DATAPRODUCTS + 'Output_Correlated.enabled', + DATAPRODUCTS + 'Output_Correlated.identifications', PIPELINE + 'DPPP.demixer.freqstep', PIPELINE + 'DPPP.demixer.timestep') @@ -62,7 +65,10 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} + result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} + input_files = self._filterInputs(input_files, parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications')) + result['storage']['input_files'] = input_files + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) freq_step = parset.getInt(PIPELINE + 'DPPP.demixer.freqstep', 1) #TODO, should these have defaults? time_step = parset.getInt(PIPELINE + 'DPPP.demixer.timestep', 1) @@ -86,12 +92,12 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): output_file_size = 0.0 new_size = input_file_size / float(reduction_factor) output_file_size = new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0 - result['storage']['output_files']['uv'] = {'nr_of_uv_files': input_files['uv']['nr_of_uv_files'], 'uv_file_size': int(output_file_size)} + result['storage']['output_files']['uv'] = {'nr_of_uv_files': input_files['uv']['nr_of_uv_files'], 'uv_file_size': int(output_file_size), 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')} logger.info("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size'])) if parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled'): logger.info("calculate instrument-model data size") - result['storage']['output_files']['im'] = {'nr_of_im_files': input_files['uv']['nr_of_uv_files'], 'im_file_size': 1000} # 1 kB was hardcoded in the Scheduler + result['storage']['output_files']['im'] = {'nr_of_im_files': input_files['uv']['nr_of_uv_files'], 'im_file_size': 1000, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_InstrumentModel.identifications')} # 1 kB was hardcoded in the Scheduler logger.info("correlated_im: {} files {} bytes each".format(result['storage']['output_files']['im']['nr_of_im_files'], result['storage']['output_files']['im']['im_file_size'])) # count total data size diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index d05e28cf51d79cf81be2ca7aa85bf71b6bea218f..5314e3b81ecd5843da060da7a4c0a4855255c626 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -42,7 +42,9 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Input_Correlated.identifications', DATAPRODUCTS + 'Output_SkyImage.enabled', + DATAPRODUCTS + 'Output_SkyImage.identifications', PIPELINE + 'Imaging.slices_per_image', PIPELINE + 'Imaging.subbands_per_image') @@ -63,6 +65,9 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} + input_files = self._filterInputs(input_files, parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications')) + result['storage']['input_files'] = input_files + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) @@ -87,7 +92,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): logger.debug("calculate sky image data size") result['storage']['output_files'] = {} nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) - result['storage']['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler + result['storage']['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_SkyImage.identifications')} # 1 kB was hardcoded in the Scheduler logger.debug("sky_images: {} files {} bytes each".format(result['storage']['output_files']['img']['nr_of_img_files'], result['storage']['output_files']['img']['img_file_size'])) # count total data size diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index 0219cf586af3fc81ab8794e8dd13195de7378dd7..7aa75217988391f0816515649fb0df9e707bd10a 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -40,7 +40,9 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Input_Correlated.identifications', DATAPRODUCTS + 'Output_Correlated.enabled', + DATAPRODUCTS + 'Output_Correlated.identifications', PIPELINE + 'LongBaseline.subbandgroups_per_ms', PIPELINE + 'LongBaseline.subbands_per_subbandgroup') @@ -61,6 +63,9 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} + input_files = self._filterInputs(input_files, parset.getStringVector(DATAPRODUCTS + 'Input_Correlated.identifications')) + result['storage']['input_files'] = input_files + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) @@ -85,7 +90,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): logger.debug("calculate correlated data size") result['storage']['output_files'] = {} nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) - result['storage']['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler + result['storage']['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')} # 1 kB was hardcoded in the Scheduler logger.debug("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size'])) # count total data size diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index e3e8e1622c7f1e28172caad8d2ceb804b77af5ff..0f7359d46d27f06f6264d7f98a02a8ce6888571e 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -26,6 +26,7 @@ from base_resource_estimator import BaseResourceEstimator logger = logging.getLogger(__name__) +DATAPRODUCTS = "Observation.DataProducts." COBALT = "Observation.ObservationControl.OnlineControl.Cobalt." class ObservationResourceEstimator(BaseResourceEstimator): @@ -44,9 +45,13 @@ class ObservationResourceEstimator(BaseResourceEstimator): COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor', COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor', 'Observation.VirtualInstrument.stationList', - 'Observation.DataProducts.Output_CoherentStokes.enabled', + DATAPRODUCTS + 'Output_Correlated.enabled', + DATAPRODUCTS + 'Output_Correlated.identifications', + DATAPRODUCTS + 'Output_CoherentStokes.enabled', + DATAPRODUCTS + 'Output_CoherentStokes.identifications', COBALT + 'BeamFormer.CoherentStokes.which', - 'Observation.DataProducts.Output_IncoherentStokes.enabled', + DATAPRODUCTS + 'Output_IncoherentStokes.enabled', + DATAPRODUCTS + 'Output_IncoherentStokes.identifications', COBALT + 'BeamFormer.IncoherentStokes.which' ) @@ -123,10 +128,10 @@ class ObservationResourceEstimator(BaseResourceEstimator): subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr) nr_files = len(subbandList) total_files += nr_files - sap_files[sap_nr]= {'nr_of_uv_files': nr_files} + sap_files[sap_nr] = {'nr_of_uv_files': nr_files} file_size = int((data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead) - output_files = {'nr_of_uv_files': total_files, 'uv_file_size': file_size} + output_files = {'nr_of_uv_files': total_files, 'uv_file_size': file_size, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')} logger.info("correlated_uv: {} files {} bytes each".format(total_files, file_size)) total_data_size = int(ceil(file_size * total_files)) # bytes @@ -196,7 +201,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) size_per_file = int(nr_subbands_per_file * size_per_subband) - output_files = {'nr_of_cs_files': total_files, 'nr_of_cs_stokes': nr_coherent, 'cs_file_size': size_per_file} + output_files = {'nr_of_cs_files': total_files, 'nr_of_cs_stokes': nr_coherent, 'cs_file_size': size_per_file, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications')} logger.info("coherentstokes: {} files {} bytes each".format(total_files, size_per_file)) total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) @@ -251,7 +256,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): size_per_subband = int((samples_per_second * 4) / time_integration_factor / channel_integration_factor * duration) size_per_file = nr_subbands_per_file * size_per_subband - output_files = {'nr_of_is_files': total_files, 'nr_of_is_stokes': nr_incoherent, 'is_file_size': int(size_per_file)} + output_files = {'nr_of_is_files': total_files, 'nr_of_is_stokes': nr_incoherent, 'is_file_size': int(size_per_file), 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications')} logger.info("incoherentstokes: {} files {} bytes each".format(total_files, size_per_file)) total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband)) # bytes diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index 26471d794eae200519430331b84ea7b002d4ba95..a024a9db3c34167516b8fa9bacbecb787d2b5846 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -40,8 +40,11 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_CoherentStokes.enabled', + DATAPRODUCTS + 'Input_CoherentStokes.identifications', DATAPRODUCTS + 'Input_IncoherentStokes.enabled', - DATAPRODUCTS + 'Output_Pulsar.enabled') + DATAPRODUCTS + 'Input_IncoherentStokes.identifications', + DATAPRODUCTS + 'Output_Pulsar.enabled', + DATAPRODUCTS + 'Output_Pulsar.identifications') def _calculate(self, parset, input_files): """ Estimate for Pulsar Pipeline @@ -60,6 +63,10 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} + input_files = self._filterInputs(input_files, parset.getStringVector(DATAPRODUCTS + 'Input_CoherentStokes.identifications') + + parset.getStringVector(DATAPRODUCTS + 'Input_IncoherentStokes.identifications')) + result['storage']['input_files'] = input_files + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) if not parset.getBool(DATAPRODUCTS + 'Output_Pulsar.enabled'): @@ -85,7 +92,7 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): nr_input_files = input_files['is']['nr_of_is_files'] nr_output_files += nr_input_files - result['storage']['output_files']['pulp'] = {'nr_of_pulp_files': nr_output_files, 'pulp_file_size': 1000} # 1 kB was hardcoded in the Scheduler + result['storage']['output_files']['pulp'] = {'nr_of_pulp_files': nr_output_files, 'pulp_file_size': 1000, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Pulsar.identifications')} # 1 kB was hardcoded in the Scheduler logger.info(result) logger.info("pulsar_pipeline pulp: {} files {} bytes each".format(result['storage']['output_files']['pulp']['nr_of_pulp_files'], result['storage']['output_files']['pulp']['pulp_file_size']))