diff --git a/SAS/OTDB_Services/TreeService.py b/SAS/OTDB_Services/TreeService.py index 12e6793bf8194424dead20c7728f1fe8c19a191f..3a5b824220f87b0c19c1c8112c42ee8fe6fd3acb 100755 --- a/SAS/OTDB_Services/TreeService.py +++ b/SAS/OTDB_Services/TreeService.py @@ -108,7 +108,7 @@ def TaskGetIDs(input_dict, db_connection, return_tuple=True): # Task not found in any way return input values to the user return (None, otdb_id, mom_id) if return_tuple else [None, otdb_id, mom_id] - + # TODO have this return a Dict # Task Get Specification def TaskGetSpecification(input_dict, db_connection): diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index 53b4cbdff608a44f310a84ecf2a26a05f121dd02..2553dd80787f4fe30bee232a44936de5fd531676 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -64,12 +64,7 @@ class RAtoOTDBTranslator(): locations = [] filenames = [] result = {} - if not 'saps' in storage_properties: ## It's a pipeline (no SAPs) - for _ in xrange(storage_properties['nr_of_uv_files']): - locations.append(self.locationPath(project_name, otdb_id)) - filenames.append("L%d_SB%03d_uv.MS" % (otdb_id, sb_nr)) - sb_nr += 1 - else: + if 'saps' in storage_properties: ## It's a pipeline (no SAPs) for sap in storage_properties["saps"]: ##We might need to sort saps? logging.debug('processing sap: %s' % sap) if "nr_of_uv_files" in sap['properties']: @@ -77,12 +72,16 @@ class RAtoOTDBTranslator(): locations.append(self.locationPath(project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr)) sb_nr += 1 + else: + for _ in xrange(storage_properties['nr_of_uv_files']): + locations.append(self.locationPath(project_name, otdb_id)) + filenames.append("L%d_SB%03d_uv.MS" % (otdb_id, sb_nr)) + sb_nr += 1 result[PREFIX + 'DataProducts.Output_Correlated.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = '[' + to_csv_string(filenames) + ']' return result def CreateCoherentStokes(self, otdb_id, storage_properties, project_name): - SB_nr = 0 locations = [] filenames = [] result = {} @@ -102,7 +101,6 @@ class RAtoOTDBTranslator(): return result def CreateIncoherentStokes(self, otdb_id, storage_properties, project_name): - SB_nr = 0 locations = [] filenames = [] result = {} @@ -122,29 +120,27 @@ class RAtoOTDBTranslator(): return result def CreateInstrumentModel(self, otdb_id, storage_properties, project_name): - SB_nr = 0 + sb_nr = 0 locations = [] filenames = [] result = {} - for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_im_files" in sap['properties']: - for _ in range(sap['properties']['nr_of_im_files']): - locations.append(self.locationPath(project_name, otdb_id)) - filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr)) + for _ in xrange(storage_properties['nr_of_im_files']): + locations.append(self.locationPath(project_name, otdb_id)) + filenames.append("L%d_SB%03d_inst.INST" % (otdb_id, sb_nr)) + sb_nr += 1 result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = '[' + to_csv_string(filenames) + ']' return result def CreateSkyImage(self, otdb_id, storage_properties, project_name): - SB_nr = 0 + sbg_nr = 0 locations = [] filenames = [] result = {} - for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_img_files" in sap['properties']: - for _ in range(sap['properties']['nr_of_img_files']): - locations.append(self.locationPath(project_name, otdb_id)) - filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr)) + for _ in xrange(storage_properties['nr_of_img_files']): + locations.append(self.locationPath(project_name, otdb_id)) + filenames.append("L%d_SBG%03d_sky.IM" % (otdb_id, sbg_nr)) + sbg_nr += 1 result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = '[' + to_csv_string(filenames) + ']' return result @@ -172,8 +168,8 @@ class RAtoOTDBTranslator(): result.update(self.CreateCoherentStokes(otdb_id, storage_properties, project_name)) if 'nr_of_is_files' in storage_properties: result.update(self.CreateIncoherentStokes(otdb_id, storage_properties, project_name)) - #if 'nr_of_im_files' in storage_properties: - # result.update(self.CreateInstrumentModel(otdb_id, storage_properties, project_name)) + if 'nr_of_im_files' in storage_properties: + result.update(self.CreateInstrumentModel(otdb_id, storage_properties, project_name)) if 'nr_of_img_files' in storage_properties: result.update(self.CreateSkyImage(otdb_id, storage_properties, project_name)) if 'nr_of_pulp_files' in storage_properties: @@ -220,8 +216,8 @@ class RAtoOTDBTranslator(): parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']), project_name)) if 'stations' in ra_info: parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"] - parset[PREFIX+'ObservationControl.OnlineControl.inspectionHost'] = 'head01.cep4.control.lofar' - parset[PREFIX+'ObservationControl.OnlineControl.inspectionProgram'] = 'inspection-plots-observation.sh' +# parset[PREFIX+'ObservationControl.OnlineControl.inspectionHost'] = 'head01.cep4.control.lofar' +# parset[PREFIX+'ObservationControl.OnlineControl.inspectionProgram'] = 'inspection-plots-observation.sh' return parset diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index a33526962f43ab7dd40e303a0f63571162073d9a..bd3b060c5c149636e2080a4fc5f24e626d9415bd 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -36,7 +36,7 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): """ def __init__(self): logger.info("init CalibrationPipelineResourceEstimator") - BaseResourceEstimator.__init__(self, name='pipeline') + BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='calibration_pipeline' self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', @@ -95,8 +95,13 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): logger.info("correlated_im: {} files {} bytes each".format(result['storage']['output_files']['im']['nr_of_im_files'], result['storage']['output_files']['im']['im_file_size'])) # count total data size - total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] + \ - result['storage']['output_files']['im']['nr_of_im_files'] * result['storage']['output_files']['im']['im_file_size'] # bytes + total_data_size = 0 + for data_type in result['storage']['output_files']: + total_data_size += result['storage']['output_files'][data_type]['nr_of_' + data_type + '_files'] * result['storage']['output_files'][data_type][data_type + '_file_size'] # bytes + # FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export? + # Need to split averaging pipeline and calibration pipeline + #total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] + \ + # result['storage']['output_files']['im']['nr_of_im_files'] * result['storage']['output_files']['im']['im_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second result['storage']['total_size'] = total_data_size result['bandwidth']['total_size'] = total_bandwidth diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index 2c4b60797efa5cbb7e54939135245fa7b14e8919..d05e28cf51d79cf81be2ca7aa85bf71b6bea218f 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -38,7 +38,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): """ def __init__(self): logger.info("init ImagePipelineResourceEstimator") - BaseResourceEstimator.__init__(self, name='imaging_pipeline') + BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='imaging_pipeline' self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', @@ -62,7 +62,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - result = {'errors': []} + result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) @@ -85,15 +85,15 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): return result logger.debug("calculate sky image data size") - result['output_files'] = {} + result['storage']['output_files'] = {} nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) - result['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler - logger.debug("sky_images: {} files {} bytes each".format(result['output_files']['img']['nr_of_img_files'], result['output_files']['img']['img_file_size'])) + result['storage']['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("sky_images: {} files {} bytes each".format(result['storage']['output_files']['img']['nr_of_img_files'], result['storage']['output_files']['img']['img_file_size'])) # count total data size - total_data_size = result['output_files']['img']['nr_of_img_files'] * result['output_files']['img']['img_file_size'] # bytes + total_data_size = result['storage']['output_files']['img']['nr_of_img_files'] * result['storage']['output_files']['img']['img_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - result['storage'] = {'total_size': total_data_size} - result['bandwidth'] = {'total_size': total_bandwidth} + result['storage']['total_size'] = total_data_size + result['bandwidth']['total_size'] = total_bandwidth return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index c8d82defa440a099e25188c3ac4ba3b46343e052..0219cf586af3fc81ab8794e8dd13195de7378dd7 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -36,7 +36,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): """ def __init__(self): logger.info("init LongBaselinePipelineResourceEstimator") - BaseResourceEstimator.__init__(self, name='longbaseline_pipeline') + BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='longbaseline_pipeline' self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_Correlated.enabled', @@ -60,7 +60,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): """ logger.debug("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) - result = {'errors': []} + result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) @@ -76,21 +76,21 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): if not subbandgroups_per_ms or not subbands_per_subbandgroup: logger.warning('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') result['errors'].append('Missing UV Dataproducts in input_files') - if nr_input_files % (subbands_per_subband_group * subband_groups_per_ms) > 0: + if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0: logger.warning('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') if result['errors']: return result logger.debug("calculate correlated data size") - result['output_files'] = {} + result['storage']['output_files'] = {} nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) - result['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler - logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['uv']['nr_of_uv_files'], result['output_files']['uv']['uv_file_size'])) + result['storage']['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size'])) # count total data size - total_data_size = result['output_files']['uv']['nr_of_uv_files'] * result['output_files']['uv']['uv_file_size'] # bytes + total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - result['storage'] = {'total_size': total_data_size} - result['bandwidth'] = {'total_size': total_bandwidth} + result['storage']['total_size'] = total_data_size + result['bandwidth']['total_size'] = total_bandwidth return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index 575d46c1fe5a6ab79bedcdf2b294f9c53288ea5d..26471d794eae200519430331b84ea7b002d4ba95 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -36,7 +36,7 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): """ def __init__(self): logger.info("init PulsarPipelineResourceEstimator") - BaseResourceEstimator.__init__(self, name='pipeline') + BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='pulsar_pipeline' self.required_keys = ('Observation.startTime', 'Observation.stopTime', DATAPRODUCTS + 'Input_CoherentStokes.enabled', diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index c6fbc46d6d13456b443f7ed83505d1453d4b58a6..78c707874caad6cd1da80cab23971a8938ceffa7 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -49,7 +49,7 @@ class ResourceEstimatorHandler(MessageHandlerInterface): if len(branch_estimates) > 1: logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] - return {str(otdb_id): self.calibration_pipeline.verify_and_estimate(parset, input_files)} + return {str(otdb_id): self.imaging_pipeline.verify_and_estimate(parset, input_files)} if specification_tree['task_subtype'] in ['long baseline pipeline']: if len(branch_estimates) > 1: