From eac55f17960f5f36281da4c252a40c82d70a16be Mon Sep 17 00:00:00 2001 From: Adriaan Renting <renting@astron.nl> Date: Wed, 1 Jun 2016 15:27:59 +0000 Subject: [PATCH] Task #9192: first more or less working pipelines --- SAS/OTDB_Services/TreeService.py | 2 +- .../lib/translator.py | 44 +++++++++---------- .../calibration_pipeline.py | 11 +++-- .../resource_estimators/image_pipeline.py | 16 +++---- .../longbaseline_pipeline.py | 18 ++++---- .../resource_estimators/pulsar_pipeline.py | 2 +- .../ResourceAssignmentEstimator/service.py | 2 +- 7 files changed, 48 insertions(+), 47 deletions(-) diff --git a/SAS/OTDB_Services/TreeService.py b/SAS/OTDB_Services/TreeService.py index 12e6793bf81..3a5b824220f 100755 --- a/SAS/OTDB_Services/TreeService.py +++ b/SAS/OTDB_Services/TreeService.py @@ -108,7 +108,7 @@ def TaskGetIDs(input_dict, db_connection, return_tuple=True): # Task not found in any way return input values to the user return (None, otdb_id, mom_id) if return_tuple else [None, otdb_id, mom_id] - + # TODO have this return a Dict # Task Get Specification def TaskGetSpecification(input_dict, db_connection): diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index 53b4cbdff60..2553dd80787 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -64,12 +64,7 @@ class RAtoOTDBTranslator(): locations = [] filenames = [] result = {} - if not 'saps' in storage_properties: ## It's a pipeline (no SAPs) - for _ in xrange(storage_properties['nr_of_uv_files']): - locations.append(self.locationPath(project_name, otdb_id)) - filenames.append("L%d_SB%03d_uv.MS" % (otdb_id, sb_nr)) - sb_nr += 1 - else: + if 'saps' in storage_properties: ## It's a pipeline (no SAPs) for sap in storage_properties["saps"]: ##We might need to sort saps? logging.debug('processing sap: %s' % sap) if "nr_of_uv_files" in sap['properties']: @@ -77,12 +72,16 @@ class RAtoOTDBTranslator(): locations.append(self.locationPath(project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr)) sb_nr += 1 + else: + for _ in xrange(storage_properties['nr_of_uv_files']): + locations.append(self.locationPath(project_name, otdb_id)) + filenames.append("L%d_SB%03d_uv.MS" % (otdb_id, sb_nr)) + sb_nr += 1 result[PREFIX + 'DataProducts.Output_Correlated.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = '[' + to_csv_string(filenames) + ']' return result def CreateCoherentStokes(self, otdb_id, storage_properties, project_name): - SB_nr = 0 locations = [] filenames = [] result = {} @@ -102,7 +101,6 @@ class RAtoOTDBTranslator(): return result def CreateIncoherentStokes(self, otdb_id, storage_properties, project_name): - SB_nr = 0 locations = [] filenames = [] result = {} @@ -122,29 +120,27 @@ class RAtoOTDBTranslator(): return result def CreateInstrumentModel(self, otdb_id, storage_properties, project_name): - SB_nr = 0 + sb_nr = 0 locations = [] filenames = [] result = {} - for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_im_files" in sap['properties']: - for _ in range(sap['properties']['nr_of_im_files']): - locations.append(self.locationPath(project_name, otdb_id)) - filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr)) + for _ in xrange(storage_properties['nr_of_im_files']): + locations.append(self.locationPath(project_name, otdb_id)) + filenames.append("L%d_SB%03d_inst.INST" % (otdb_id, sb_nr)) + sb_nr += 1 result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = '[' + to_csv_string(filenames) + ']' return result def CreateSkyImage(self, otdb_id, storage_properties, project_name): - SB_nr = 0 + sbg_nr = 0 locations = [] filenames = [] result = {} - for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_img_files" in sap['properties']: - for _ in range(sap['properties']['nr_of_img_files']): - locations.append(self.locationPath(project_name, otdb_id)) - filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr)) + for _ in xrange(storage_properties['nr_of_img_files']): + locations.append(self.locationPath(project_name, otdb_id)) + filenames.append("L%d_SBG%03d_sky.IM" % (otdb_id, sbg_nr)) + sbg_nr += 1 result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = '[' + to_csv_string(filenames) + ']' return result @@ -172,8 +168,8 @@ class RAtoOTDBTranslator(): result.update(self.CreateCoherentStokes(otdb_id, storage_properties, project_name)) if 'nr_of_is_files' in storage_properties: result.update(self.CreateIncoherentStokes(otdb_id, storage_properties, project_name)) - #if 'nr_of_im_files' in storage_properties: - # result.update(self.CreateInstrumentModel(otdb_id, storage_properties, project_name)) + if 'nr_of_im_files' in storage_properties: + result.update(self.CreateInstrumentModel(otdb_id, storage_properties, project_name)) if 'nr_of_img_files' in storage_properties: result.update(self.CreateSkyImage(otdb_id, storage_properties, project_name)) if 'nr_of_pulp_files' in storage_properties: @@ -220,8 +216,8 @@ class RAtoOTDBTranslator(): parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']), project_name)) if 'stations' in ra_info: parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"] - parset[PREFIX+'ObservationControl.OnlineControl.inspectionHost'] = 'head01.cep4.control.lofar' - parset[PREFIX+'ObservationControl.OnlineControl.inspectionProgram'] = 'inspection-plots-observation.sh' +# parset[PREFIX+'ObservationControl.OnlineControl.inspectionHost'] = 'head01.cep4.control.lofar' +# parset[PREFIX+'ObservationControl.OnlineControl.inspectionProgram'] = 'inspection-plots-observation.sh' return parset diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index a33526962f4..bd3b060c5c1 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -36,7 +36,7 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): """ def __init__(self): logger.info("init CalibrationPipelineResourceEstimator") - BaseResourceEstimator.__init__(self, name='pipeline') + BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='calibration_pipeline' self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', @@ -95,8 +95,13 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): logger.info("correlated_im: {} files {} bytes each".format(result['storage']['output_files']['im']['nr_of_im_files'], result['storage']['output_files']['im']['im_file_size'])) # count total data size - total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] + \ - result['storage']['output_files']['im']['nr_of_im_files'] * result['storage']['output_files']['im']['im_file_size'] # bytes + total_data_size = 0 + for data_type in result['storage']['output_files']: + total_data_size += result['storage']['output_files'][data_type]['nr_of_' + data_type + '_files'] * result['storage']['output_files'][data_type][data_type + '_file_size'] # bytes + # FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export? + # Need to split averaging pipeline and calibration pipeline + #total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] + \ + # result['storage']['output_files']['im']['nr_of_im_files'] * result['storage']['output_files']['im']['im_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second result['storage']['total_size'] = total_data_size result['bandwidth']['total_size'] = total_bandwidth diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index 2c4b60797ef..d05e28cf51d 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -38,7 +38,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): """ def __init__(self): logger.info("init ImagePipelineResourceEstimator") - BaseResourceEstimator.__init__(self, name='imaging_pipeline') + BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='imaging_pipeline' self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', @@ -62,7 +62,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - result = {'errors': []} + result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) @@ -85,15 +85,15 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): return result logger.debug("calculate sky image data size") - result['output_files'] = {} + result['storage']['output_files'] = {} nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) - result['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler - logger.debug("sky_images: {} files {} bytes each".format(result['output_files']['img']['nr_of_img_files'], result['output_files']['img']['img_file_size'])) + result['storage']['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("sky_images: {} files {} bytes each".format(result['storage']['output_files']['img']['nr_of_img_files'], result['storage']['output_files']['img']['img_file_size'])) # count total data size - total_data_size = result['output_files']['img']['nr_of_img_files'] * result['output_files']['img']['img_file_size'] # bytes + total_data_size = result['storage']['output_files']['img']['nr_of_img_files'] * result['storage']['output_files']['img']['img_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - result['storage'] = {'total_size': total_data_size} - result['bandwidth'] = {'total_size': total_bandwidth} + result['storage']['total_size'] = total_data_size + result['bandwidth']['total_size'] = total_bandwidth return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index c8d82defa44..0219cf586af 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -36,7 +36,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): """ def __init__(self): logger.info("init LongBaselinePipelineResourceEstimator") - BaseResourceEstimator.__init__(self, name='longbaseline_pipeline') + BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='longbaseline_pipeline' self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', @@ -60,7 +60,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - result = {'errors': []} + result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) @@ -76,21 +76,21 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): if not subbandgroups_per_ms or not subbands_per_subbandgroup: logger.warning('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') result['errors'].append('Missing UV Dataproducts in input_files') - if nr_input_files % (subbands_per_subband_group * subband_groups_per_ms) > 0: + if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0: logger.warning('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') if result['errors']: return result logger.debug("calculate correlated data size") - result['output_files'] = {} + result['storage']['output_files'] = {} nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) - result['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler - logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['uv']['nr_of_uv_files'], result['output_files']['uv']['uv_file_size'])) + result['storage']['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size'])) # count total data size - total_data_size = result['output_files']['uv']['nr_of_uv_files'] * result['output_files']['uv']['uv_file_size'] # bytes + total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - result['storage'] = {'total_size': total_data_size} - result['bandwidth'] = {'total_size': total_bandwidth} + result['storage']['total_size'] = total_data_size + result['bandwidth']['total_size'] = total_bandwidth return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index 575d46c1fe5..26471d794ea 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -36,7 +36,7 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): """ def __init__(self): logger.info("init PulsarPipelineResourceEstimator") - BaseResourceEstimator.__init__(self, name='pipeline') + BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='pulsar_pipeline' self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_CoherentStokes.enabled', diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index c6fbc46d6d1..78c707874ca 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -49,7 +49,7 @@ class ResourceEstimatorHandler(MessageHandlerInterface): if len(branch_estimates) > 1: logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] - return {str(otdb_id): self.calibration_pipeline.verify_and_estimate(parset, input_files)} + return {str(otdb_id): self.imaging_pipeline.verify_and_estimate(parset, input_files)} if specification_tree['task_subtype'] in ['long baseline pipeline']: if len(branch_estimates) > 1: -- GitLab