Skip to content
Snippets Groups Projects
Commit fc8079ea authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #9192: Merged resource estimator service.py from...

Task #9192: Merged resource estimator service.py from RA1_0_Integration-Task9192 after a problem in the 34853 merge
parent 7278db89
No related branches found
No related tags found
No related merge requests found
...@@ -18,10 +18,10 @@ class ResourceEstimatorHandler(MessageHandlerInterface): ...@@ -18,10 +18,10 @@ class ResourceEstimatorHandler(MessageHandlerInterface):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super(ResourceEstimatorHandler, self).__init__(**kwargs) super(ResourceEstimatorHandler, self).__init__(**kwargs)
self.observation = ObservationResourceEstimator() self.observation = ObservationResourceEstimator()
#self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator() self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator()
#self.calibration_pipeline = CalibrationPipelineResourceEstimator() self.calibration_pipeline = CalibrationPipelineResourceEstimator()
#self.pulsar_pipeline = PulsarPipelineResourceEstimator() self.pulsar_pipeline = PulsarPipelineResourceEstimator()
#self.imaging_pipeline = ImagePipelineResourceEstimator() self.imaging_pipeline = ImagePipelineResourceEstimator()
def handle_message(self, content): def handle_message(self, content):
specification_tree = content["specification_tree"] specification_tree = content["specification_tree"]
...@@ -48,7 +48,6 @@ class ResourceEstimatorHandler(MessageHandlerInterface): ...@@ -48,7 +48,6 @@ class ResourceEstimatorHandler(MessageHandlerInterface):
for branch in specification_tree['predecessors']: for branch in specification_tree['predecessors']:
branch_estimates.update(self.get_subtree_estimate(branch)) branch_estimates.update(self.get_subtree_estimate(branch))
logger.info(str(branch_estimates)) logger.info(str(branch_estimates))
if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']: if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']:
for id, estimate in branch_estimates.iteritems(): for id, estimate in branch_estimates.iteritems():
input_files = {} input_files = {}
...@@ -67,23 +66,21 @@ class ResourceEstimatorHandler(MessageHandlerInterface): ...@@ -67,23 +66,21 @@ class ResourceEstimatorHandler(MessageHandlerInterface):
input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] input_files = branch_estimates.values()[0].values()[0]['storage']['output_files']
return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, input_files), otdb_id)} return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
if specification_tree['task_subtype'] in ['long baseline pipeline']: if specification_tree['task_subtype'] in ['long baseline pipeline']:
if len(branch_estimates) > 1: if len(branch_estimates) > 1:
logger.error('Long baseline pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) logger.error('Long baseline pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) )
input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] input_files = branch_estimates.values()[0].values()[0]['storage']['output_files']
return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, input_files), otdb_id)} return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
if specification_tree['task_subtype'] in ['pulsar pipeline']: if specification_tree['task_subtype'] in ['pulsar pipeline']:
if len(branch_estimates) > 1: if len(branch_estimates) > 1:
logger.error('Pulsar pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) logger.error('Pulsar pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) )
input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] input_files = branch_estimates.values()[0].values()[0]['storage']['output_files']
return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, input_files), otdb_id)} return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
#else: # reservation, maintenance, system tasks? else: # reservation, maintenance, system tasks?
#logger.info("It's not a pipeline or observation: %s" % otdb_id) logger.info("It's not a pipeline or observation: %s" % otdb_id)
#return {str(otdb_id): {}} return {str(otdb_id): {}}
def _get_estimated_resources(self, specification_tree): def _get_estimated_resources(self, specification_tree):
""" Input is like: """ Input is like:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment