#!/usr/bin/env python # $Id: service.py $ ''' Simple Service listening ''' import logging import pprint from lofar.messaging import Service from lofar.messaging.Service import MessageHandlerInterface from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import * from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME logger = logging.getLogger(__name__) class ResourceEstimatorHandler(MessageHandlerInterface): def __init__(self, **kwargs): super(ResourceEstimatorHandler, self).__init__(**kwargs) self.observation = ObservationResourceEstimator() self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator() self.calibration_pipeline = CalibrationPipelineResourceEstimator() self.pulsar_pipeline = PulsarPipelineResourceEstimator() self.imaging_pipeline = ImagePipelineResourceEstimator() def handle_message(self, content): specification_tree = content["specification_tree"] return self._get_estimated_resources(specification_tree) ##TODO also handle MoM tasks in RA 1.2 ##FIXME dirty hack def add_id(self, estimate, otdb_id): if 'storage' in estimate.values()[0]: if 'output_files' in estimate.values()[0]['storage']: #We only need to do output files, it will be someone else's input for data_type in estimate.values()[0]['storage']['output_files'].keys(): if data_type != 'saps': estimate.values()[0]['storage']['output_files'][data_type][data_type + '_otdb_id'] = otdb_id logger.info('added %s to %s' % (otdb_id, str(estimate.values()[0]['storage']['output_files'][data_type]))) return estimate #TODO use something else than .values()[0]['storage'] ?? def get_subtree_estimate(self, specification_tree): otdb_id = specification_tree['otdb_id'] parset = specification_tree['specification'] if specification_tree['task_type'] == 'observation': return {str(otdb_id): self.add_id(self.observation.verify_and_estimate(parset), otdb_id)} elif specification_tree['task_type'] == 'pipeline': if not 'predecessors' in specification_tree or not specification_tree['predecessors']: logger.warning("Could not estimate %s because the pipeline has no predecessors" % (otdb_id)) return {str(otdb_id): {'pipeline': {'errors': ["Could not estimate %s because the pipeline has no predecessors" % (otdb_id)]}}} branch_estimates = {} for branch in specification_tree['predecessors']: subtree_estimate = self.get_subtree_estimate(branch) if subtree_estimate[str(branch['otdb_id'])][branch['task_type']]['errors']: logger.warning("Could not estimate %s because predecessor %s has errors" % (otdb_id, branch)) return {str(otdb_id): {'pipeline': {'errors': ["Could not estimate %s because predecessor %s has errors" % (otdb_id, branch)]}}} branch_estimates.update(subtree_estimate) logger.info(("Branch estimates for %s\n" % otdb_id) + pprint.pformat(branch_estimates)) if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']: input_files = {} for id, estimate in branch_estimates.iteritems(): logger.info('Looking at predecessor %s' % id) predecessor_output = estimate.values()[0]['storage']['output_files'] if not 'im' in predecessor_output and 'uv' in predecessor_output: # Not a calibrator pipeline logger.info('found %s as the target of pipeline %s' % (id, otdb_id)) input_files['uv'] = predecessor_output['uv'] if 'saps' in predecessor_output: input_files['saps'] = predecessor_output['saps'] elif 'im' in predecessor_output: logger.info('found %s as the calibrator of pipeline %s' % (id, otdb_id)) input_files['im'] = predecessor_output['im'] return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, input_files), otdb_id)} if len(branch_estimates) > 1: logger.warning('Pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys())) return {str(otdb_id): {'pipeline': {'errors': ['Pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys())]}}} predecessor_output = branch_estimates.values()[0].values()[0]['storage']['output_files'] if specification_tree['task_subtype'] in ['imaging pipeline', 'imaging pipeline msss']: input_files = predecessor_output return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, input_files), otdb_id)} if specification_tree['task_subtype'] in ['long baseline pipeline']: input_files = predecessor_output return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, input_files), otdb_id)} if specification_tree['task_subtype'] in ['pulsar pipeline']: input_files = predecessor_output return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, input_files), otdb_id)} else: # reservation, maintenance, system tasks? logger.warning("ID %s is not a pipeline or observation." % otdb_id) return {str(otdb_id): {specification_tree['task_type']: {'errors': ["ID %s is not a pipeline or observation." % otdb_id]}}} def _get_estimated_resources(self, specification_tree): """ Input is like: {"otdb_id": otdb_id, "state": 'prescheduled', 'specification': ..., 'task_type': "pipeline", 'task_subtype': "long baseline pipeline", 'predecessors': [...]} reply is something along the lines of: {'452648': {'observation': {'bandwidth': {'total_size': 19021319494}, 'storage': {'total_size': 713299481024, 'output_files': {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} ]}}}}} """ logger.info('get_estimated_resources on: %s' % specification_tree) return self.get_subtree_estimate(specification_tree) def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None): return Service(servicename=servicename, servicehandler=ResourceEstimatorHandler, busname=busname, broker=broker, numthreads=1, verbose=True) def main(): # make sure we run in UTC timezone import os os.environ['TZ'] = 'UTC' from optparse import OptionParser from lofar.messaging import setQpidLogLevel from lofar.common.util import waitForInterrupt # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the resourceassigner service') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME) parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() setQpidLogLevel(logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) with createService(busname=options.busname, servicename=options.servicename, broker=options.broker): waitForInterrupt() if __name__ == '__main__': main()