-
Adriaan Renting authoredAdriaan Renting authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
service.py 8.09 KiB
#!/usr/bin/env python
# $Id: service.py $
'''
Simple Service listening
'''
import logging, pprint
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import *
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME
logger = logging.getLogger(__name__)
class ResourceEstimatorHandler(MessageHandlerInterface):
def __init__(self, **kwargs):
super(ResourceEstimatorHandler, self).__init__(**kwargs)
self.observation = ObservationResourceEstimator()
self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator()
self.calibration_pipeline = CalibrationPipelineResourceEstimator()
self.pulsar_pipeline = PulsarPipelineResourceEstimator()
self.imaging_pipeline = ImagePipelineResourceEstimator()
def handle_message(self, content):
specification_tree = content["specification_tree"]
return self._get_estimated_resources(specification_tree) ##TODO also handle MoM tasks in RA 1.2
##FIXME dirty hack
def add_id(self, estimate, otdb_id):
if 'storage' in estimate.values()[0]:
if 'output_files' in estimate.values()[0]['storage']: #We only need to do output files, it will be someone else's input
for data_type in estimate.values()[0]['storage']['output_files'].keys():
if data_type != 'saps':
estimate.values()[0]['storage']['output_files'][data_type][data_type + '_otdb_id'] = otdb_id
logger.info('added %s to %s' % (otdb_id, str(estimate.values()[0]['storage']['output_files'][data_type])))
return estimate
#TODO use something else than .values()[0]['storage'] ??
def get_subtree_estimate(self, specification_tree):
otdb_id = specification_tree['otdb_id']
parset = specification_tree['specification']
if specification_tree['task_type'] == 'observation':
return {str(otdb_id): self.add_id(self.observation.verify_and_estimate(parset), otdb_id)}
elif specification_tree['task_type'] == 'pipeline':
if not specification_tree['predecessors']:
logger.warning("Could not estimate %s because the pipeline has no predecessors" % (otdb_id))
return {str(otdb_id): {'errors': ["Could not estimate %s because the pipeline has no predecessors" % (otdb_id)]}}
branch_estimates = {}
for branch in specification_tree['predecessors']:
subtree_estimate = self.get_subtree_estimate(branch)
if subtree_estimate[str(branch)]['errors']:
logger.warning("Could not estimate %s because predecessor %s has errors" % (otdb_id, branch))
return {str(otdb_id): {'errors': ["Could not estimate %s because predecessor %s has errors" % (otdb_id, branch)]}}
branch_estimates.update(subtree_estimate)
logger.info(("Branch estimates for %s\n" % otdb_id) + pprint.pformat(branch_estimates))
if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']:
for id, estimate in branch_estimates.iteritems():
input_files = {}
predecessor_output = estimate.values()[0]['storage']['output_files']
if not 'im' in predecessor_output and 'uv' in predecessor_output: # Not a calibrator pipeline
logger.info('found %s as the target of pipeline %s' % (id, otdb_id))
input_files['uv'] = predecessor_output['uv']
if 'saps' in predecessor_output:
input_files['saps'] = predecessor_output['saps']
elif 'im' in predecessor_output:
input_files['im'] = predecessor_output['im']
return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
if len(branch_estimates) > 1:
logger.warning('Pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys()))
return {str(otdb_id): {'errors': ['Pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys())]}}
predecessor_output = branch_estimates.values()[0].values()[0]['storage']['output_files']
if specification_tree['task_subtype'] in ['imaging pipeline', 'imaging pipeline msss']:
input_files = predecessor_output
return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
if specification_tree['task_subtype'] in ['long baseline pipeline']:
input_files = predecessor_output
return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
if specification_tree['task_subtype'] in ['pulsar pipeline']:
input_files = predecessor_output
return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
else: # reservation, maintenance, system tasks?
logger.warning("ID %s is not a pipeline or observation." % otdb_id)
return {str(otdb_id): {'errors': ["ID %s is not a pipeline or observation." % otdb_id]}}
def _get_estimated_resources(self, specification_tree):
""" Input is like:
{"otdb_id": otdb_id, "state": 'prescheduled', 'specification': ...,
'task_type': "pipeline", 'task_subtype': "long baseline pipeline",
'predecessors': [...]}
reply is something along the lines of:
{'452648':
{'observation':
{'bandwidth': {'total_size': 19021319494},
'storage': {'total_size': 713299481024,
'output_files':
{'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104},
'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}},
{'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}},
{'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}}
]}}}}}
"""
logger.info('get_estimated_resources on: %s' % specification_tree)
return self.get_subtree_estimate(specification_tree)
def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None):
return Service(servicename=servicename,
servicehandler=ResourceEstimatorHandler,
busname=busname,
broker=broker,
numthreads=1,
verbose=True)
def main():
from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the resourceassigner service')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME)
parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME)
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with createService(busname=options.busname, servicename=options.servicename, broker=options.broker):
waitForInterrupt()
if __name__ == '__main__':
main()