Skip to content
Snippets Groups Projects
Commit eac55f17 authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #9192: first more or less working pipelines

parent 5ad4c286
No related branches found
No related tags found
No related merge requests found
...@@ -108,7 +108,7 @@ def TaskGetIDs(input_dict, db_connection, return_tuple=True): ...@@ -108,7 +108,7 @@ def TaskGetIDs(input_dict, db_connection, return_tuple=True):
# Task not found in any way return input values to the user # Task not found in any way return input values to the user
return (None, otdb_id, mom_id) if return_tuple else [None, otdb_id, mom_id] return (None, otdb_id, mom_id) if return_tuple else [None, otdb_id, mom_id]
# TODO have this return a Dict
# Task Get Specification # Task Get Specification
def TaskGetSpecification(input_dict, db_connection): def TaskGetSpecification(input_dict, db_connection):
......
...@@ -64,12 +64,7 @@ class RAtoOTDBTranslator(): ...@@ -64,12 +64,7 @@ class RAtoOTDBTranslator():
locations = [] locations = []
filenames = [] filenames = []
result = {} result = {}
if not 'saps' in storage_properties: ## It's a pipeline (no SAPs) if 'saps' in storage_properties: ## It's a pipeline (no SAPs)
for _ in xrange(storage_properties['nr_of_uv_files']):
locations.append(self.locationPath(project_name, otdb_id))
filenames.append("L%d_SB%03d_uv.MS" % (otdb_id, sb_nr))
sb_nr += 1
else:
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
logging.debug('processing sap: %s' % sap) logging.debug('processing sap: %s' % sap)
if "nr_of_uv_files" in sap['properties']: if "nr_of_uv_files" in sap['properties']:
...@@ -77,12 +72,16 @@ class RAtoOTDBTranslator(): ...@@ -77,12 +72,16 @@ class RAtoOTDBTranslator():
locations.append(self.locationPath(project_name, otdb_id)) locations.append(self.locationPath(project_name, otdb_id))
filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr)) filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr))
sb_nr += 1 sb_nr += 1
else:
for _ in xrange(storage_properties['nr_of_uv_files']):
locations.append(self.locationPath(project_name, otdb_id))
filenames.append("L%d_SB%03d_uv.MS" % (otdb_id, sb_nr))
sb_nr += 1
result[PREFIX + 'DataProducts.Output_Correlated.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_Correlated.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
def CreateCoherentStokes(self, otdb_id, storage_properties, project_name): def CreateCoherentStokes(self, otdb_id, storage_properties, project_name):
SB_nr = 0
locations = [] locations = []
filenames = [] filenames = []
result = {} result = {}
...@@ -102,7 +101,6 @@ class RAtoOTDBTranslator(): ...@@ -102,7 +101,6 @@ class RAtoOTDBTranslator():
return result return result
def CreateIncoherentStokes(self, otdb_id, storage_properties, project_name): def CreateIncoherentStokes(self, otdb_id, storage_properties, project_name):
SB_nr = 0
locations = [] locations = []
filenames = [] filenames = []
result = {} result = {}
...@@ -122,29 +120,27 @@ class RAtoOTDBTranslator(): ...@@ -122,29 +120,27 @@ class RAtoOTDBTranslator():
return result return result
def CreateInstrumentModel(self, otdb_id, storage_properties, project_name): def CreateInstrumentModel(self, otdb_id, storage_properties, project_name):
SB_nr = 0 sb_nr = 0
locations = [] locations = []
filenames = [] filenames = []
result = {} result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps? for _ in xrange(storage_properties['nr_of_im_files']):
if "nr_of_im_files" in sap['properties']: locations.append(self.locationPath(project_name, otdb_id))
for _ in range(sap['properties']['nr_of_im_files']): filenames.append("L%d_SB%03d_inst.INST" % (otdb_id, sb_nr))
locations.append(self.locationPath(project_name, otdb_id)) sb_nr += 1
filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr))
result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
def CreateSkyImage(self, otdb_id, storage_properties, project_name): def CreateSkyImage(self, otdb_id, storage_properties, project_name):
SB_nr = 0 sbg_nr = 0
locations = [] locations = []
filenames = [] filenames = []
result = {} result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps? for _ in xrange(storage_properties['nr_of_img_files']):
if "nr_of_img_files" in sap['properties']: locations.append(self.locationPath(project_name, otdb_id))
for _ in range(sap['properties']['nr_of_img_files']): filenames.append("L%d_SBG%03d_sky.IM" % (otdb_id, sbg_nr))
locations.append(self.locationPath(project_name, otdb_id)) sbg_nr += 1
filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr))
result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
...@@ -172,8 +168,8 @@ class RAtoOTDBTranslator(): ...@@ -172,8 +168,8 @@ class RAtoOTDBTranslator():
result.update(self.CreateCoherentStokes(otdb_id, storage_properties, project_name)) result.update(self.CreateCoherentStokes(otdb_id, storage_properties, project_name))
if 'nr_of_is_files' in storage_properties: if 'nr_of_is_files' in storage_properties:
result.update(self.CreateIncoherentStokes(otdb_id, storage_properties, project_name)) result.update(self.CreateIncoherentStokes(otdb_id, storage_properties, project_name))
#if 'nr_of_im_files' in storage_properties: if 'nr_of_im_files' in storage_properties:
# result.update(self.CreateInstrumentModel(otdb_id, storage_properties, project_name)) result.update(self.CreateInstrumentModel(otdb_id, storage_properties, project_name))
if 'nr_of_img_files' in storage_properties: if 'nr_of_img_files' in storage_properties:
result.update(self.CreateSkyImage(otdb_id, storage_properties, project_name)) result.update(self.CreateSkyImage(otdb_id, storage_properties, project_name))
if 'nr_of_pulp_files' in storage_properties: if 'nr_of_pulp_files' in storage_properties:
...@@ -220,8 +216,8 @@ class RAtoOTDBTranslator(): ...@@ -220,8 +216,8 @@ class RAtoOTDBTranslator():
parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']), project_name)) parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']), project_name))
if 'stations' in ra_info: if 'stations' in ra_info:
parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"] parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"]
parset[PREFIX+'ObservationControl.OnlineControl.inspectionHost'] = 'head01.cep4.control.lofar' # parset[PREFIX+'ObservationControl.OnlineControl.inspectionHost'] = 'head01.cep4.control.lofar'
parset[PREFIX+'ObservationControl.OnlineControl.inspectionProgram'] = 'inspection-plots-observation.sh' # parset[PREFIX+'ObservationControl.OnlineControl.inspectionProgram'] = 'inspection-plots-observation.sh'
return parset return parset
......
...@@ -36,7 +36,7 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): ...@@ -36,7 +36,7 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator):
""" """
def __init__(self): def __init__(self):
logger.info("init CalibrationPipelineResourceEstimator") logger.info("init CalibrationPipelineResourceEstimator")
BaseResourceEstimator.__init__(self, name='pipeline') BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='calibration_pipeline'
self.required_keys = ('Observation.startTime', self.required_keys = ('Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.enabled',
...@@ -95,8 +95,13 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): ...@@ -95,8 +95,13 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator):
logger.info("correlated_im: {} files {} bytes each".format(result['storage']['output_files']['im']['nr_of_im_files'], result['storage']['output_files']['im']['im_file_size'])) logger.info("correlated_im: {} files {} bytes each".format(result['storage']['output_files']['im']['nr_of_im_files'], result['storage']['output_files']['im']['im_file_size']))
# count total data size # count total data size
total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] + \ total_data_size = 0
result['storage']['output_files']['im']['nr_of_im_files'] * result['storage']['output_files']['im']['im_file_size'] # bytes for data_type in result['storage']['output_files']:
total_data_size += result['storage']['output_files'][data_type]['nr_of_' + data_type + '_files'] * result['storage']['output_files'][data_type][data_type + '_file_size'] # bytes
# FIXME I don't think this is technically correct, as the IM files are sometimes created and used, just not a required export?
# Need to split averaging pipeline and calibration pipeline
#total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] + \
# result['storage']['output_files']['im']['nr_of_im_files'] * result['storage']['output_files']['im']['im_file_size'] # bytes
total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second
result['storage']['total_size'] = total_data_size result['storage']['total_size'] = total_data_size
result['bandwidth']['total_size'] = total_bandwidth result['bandwidth']['total_size'] = total_bandwidth
......
...@@ -38,7 +38,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): ...@@ -38,7 +38,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator):
""" """
def __init__(self): def __init__(self):
logger.info("init ImagePipelineResourceEstimator") logger.info("init ImagePipelineResourceEstimator")
BaseResourceEstimator.__init__(self, name='imaging_pipeline') BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='imaging_pipeline'
self.required_keys = ('Observation.startTime', self.required_keys = ('Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.enabled',
...@@ -62,7 +62,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): ...@@ -62,7 +62,7 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator):
""" """
logger.debug("start estimate '{}'".format(self.name)) logger.debug("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset) logger.info('parset: %s ' % parset)
result = {'errors': []} result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}}
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults?
subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0)
...@@ -85,15 +85,15 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): ...@@ -85,15 +85,15 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator):
return result return result
logger.debug("calculate sky image data size") logger.debug("calculate sky image data size")
result['output_files'] = {} result['storage']['output_files'] = {}
nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) nr_images = nr_input_subbands / (subbands_per_image * slices_per_image)
result['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler result['storage']['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler
logger.debug("sky_images: {} files {} bytes each".format(result['output_files']['img']['nr_of_img_files'], result['output_files']['img']['img_file_size'])) logger.debug("sky_images: {} files {} bytes each".format(result['storage']['output_files']['img']['nr_of_img_files'], result['storage']['output_files']['img']['img_file_size']))
# count total data size # count total data size
total_data_size = result['output_files']['img']['nr_of_img_files'] * result['output_files']['img']['img_file_size'] # bytes total_data_size = result['storage']['output_files']['img']['nr_of_img_files'] * result['storage']['output_files']['img']['img_file_size'] # bytes
total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second
result['storage'] = {'total_size': total_data_size} result['storage']['total_size'] = total_data_size
result['bandwidth'] = {'total_size': total_bandwidth} result['bandwidth']['total_size'] = total_bandwidth
return result return result
...@@ -36,7 +36,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): ...@@ -36,7 +36,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator):
""" """
def __init__(self): def __init__(self):
logger.info("init LongBaselinePipelineResourceEstimator") logger.info("init LongBaselinePipelineResourceEstimator")
BaseResourceEstimator.__init__(self, name='longbaseline_pipeline') BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='longbaseline_pipeline'
self.required_keys = ('Observation.startTime', self.required_keys = ('Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
DATAPRODUCTS + 'Input_Correlated.enabled', DATAPRODUCTS + 'Input_Correlated.enabled',
...@@ -60,7 +60,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): ...@@ -60,7 +60,7 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator):
""" """
logger.debug("start estimate '{}'".format(self.name)) logger.debug("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset) logger.info('parset: %s ' % parset)
result = {'errors': []} result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}}
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults?
subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0)
...@@ -76,21 +76,21 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): ...@@ -76,21 +76,21 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator):
if not subbandgroups_per_ms or not subbands_per_subbandgroup: if not subbandgroups_per_ms or not subbands_per_subbandgroup:
logger.warning('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') logger.warning('subbandgroups_per_ms or subbands_per_subbandgroup are not valid')
result['errors'].append('Missing UV Dataproducts in input_files') result['errors'].append('Missing UV Dataproducts in input_files')
if nr_input_files % (subbands_per_subband_group * subband_groups_per_ms) > 0: if nr_input_files % (subbands_per_subbandgroup * subbandgroups_per_ms) > 0:
logger.warning('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') logger.warning('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs')
result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs')
if result['errors']: if result['errors']:
return result return result
logger.debug("calculate correlated data size") logger.debug("calculate correlated data size")
result['output_files'] = {} result['storage']['output_files'] = {}
nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms)
result['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler result['storage']['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler
logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['uv']['nr_of_uv_files'], result['output_files']['uv']['uv_file_size'])) logger.debug("correlated_uv: {} files {} bytes each".format(result['storage']['output_files']['uv']['nr_of_uv_files'], result['storage']['output_files']['uv']['uv_file_size']))
# count total data size # count total data size
total_data_size = result['output_files']['uv']['nr_of_uv_files'] * result['output_files']['uv']['uv_file_size'] # bytes total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] # bytes
total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second
result['storage'] = {'total_size': total_data_size} result['storage']['total_size'] = total_data_size
result['bandwidth'] = {'total_size': total_bandwidth} result['bandwidth']['total_size'] = total_bandwidth
return result return result
...@@ -36,7 +36,7 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): ...@@ -36,7 +36,7 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator):
""" """
def __init__(self): def __init__(self):
logger.info("init PulsarPipelineResourceEstimator") logger.info("init PulsarPipelineResourceEstimator")
BaseResourceEstimator.__init__(self, name='pipeline') BaseResourceEstimator.__init__(self, name='pipeline') #FIXME name='pulsar_pipeline'
self.required_keys = ('Observation.startTime', self.required_keys = ('Observation.startTime',
'Observation.stopTime', 'Observation.stopTime',
DATAPRODUCTS + 'Input_CoherentStokes.enabled', DATAPRODUCTS + 'Input_CoherentStokes.enabled',
......
...@@ -49,7 +49,7 @@ class ResourceEstimatorHandler(MessageHandlerInterface): ...@@ -49,7 +49,7 @@ class ResourceEstimatorHandler(MessageHandlerInterface):
if len(branch_estimates) > 1: if len(branch_estimates) > 1:
logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) )
input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] input_files = branch_estimates.values()[0].values()[0]['storage']['output_files']
return {str(otdb_id): self.calibration_pipeline.verify_and_estimate(parset, input_files)} return {str(otdb_id): self.imaging_pipeline.verify_and_estimate(parset, input_files)}
if specification_tree['task_subtype'] in ['long baseline pipeline']: if specification_tree['task_subtype'] in ['long baseline pipeline']:
if len(branch_estimates) > 1: if len(branch_estimates) > 1:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment