#!/usr/bin/env python # $Id: service.py $ ''' Simple Service listening ''' import logging from lofar.messaging import Service from lofar.messaging.Service import MessageHandlerInterface from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import * from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME logger = logging.getLogger(__name__) class ResourceEstimatorHandler(MessageHandlerInterface): def __init__(self, **kwargs): super(ResourceEstimatorHandler, self).__init__(**kwargs) self.observation = ObservationResourceEstimator() self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator() self.calibration_pipeline = CalibrationPipelineResourceEstimator() self.pulsar_pipeline = PulsarPipelineResourceEstimator() self.imaging_pipeline = ImagePipelineResourceEstimator() def handle_message(self, content): specification_tree = content["specification_tree"] return self._get_estimated_resources(specification_tree) ##TODO also handle MoM tasks in RA 1.2 ##FIXME dirty hack def add_id(estimate, otdb_id): if 'storage' in estimate: if 'output_files' in estimate['storage']: #We only need to do output files, it will be someone else's input for data_type in estimate['storage']['output_files'].keys(): if data_type != 'saps': estimate['storage']['output_files'][data_type][data_type + '_otdb_id'] = otdb_id return estimate #TODO use something else than .values()[0]['storage'] ?? def get_subtree_estimate(self, specification_tree): otdb_id = specification_tree['otdb_id'] parset = specification_tree['specification'] if specification_tree['task_type'] == 'observation': return {str(otdb_id): self.add_id(self.observation.verify_and_estimate(parset), otdb_id)} elif specification_tree['task_type'] == 'pipeline': branch_estimates = {} for branch in specification_tree['predecessors']: branch_estimates.update(self.get_subtree_estimate(branch)) logger.info(str(branch_estimates)) if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']: for id, estimate in branch_estimates.iteritems(): input_files = {} if not 'im' in estimate.values()[0]['storage']['output_files'] and 'uv' in estimate.values()[0]['storage']['output_files']: # Not a calibrator pipeline logger.info('found %s as the target of pipeline %s' % (id, otdb_id)) input_files['uv'] = estimate.values()[0]['storage']['output_files']['uv'] elif 'im' in estimate.values()[0]['storage']['output_files']: input_files['im'] = estimate.values()[0]['storage']['output_files']['im'] return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, input_files), otdb_id)} if specification_tree['task_subtype'] in ['imaging pipeline', 'imaging pipeline msss']: if len(branch_estimates) > 1: logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, input_files), otdb_id)} if specification_tree['task_subtype'] in ['long baseline pipeline']: if len(branch_estimates) > 1: logger.error('Long baseline pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, input_files), otdb_id)} if specification_tree['task_subtype'] in ['pulsar pipeline']: if len(branch_estimates) > 1: logger.error('Pulsar pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, input_files), otdb_id)} else: # reservation, maintenance, system tasks? logger.info("It's not a pipeline or observation: %s" % otdb_id) return {str(otdb_id): {}} def _get_estimated_resources(self, specification_tree): """ Input is like: {"otdb_id": otdb_id, "state": 'prescheduled', 'specification': ..., 'task_type': "pipeline", 'task_subtype': "long baseline pipeline", 'predecessors': [...]} reply is something along the lines of: {'452648': {'observation': {'bandwidth': {'total_size': 19021319494}, 'storage': {'total_size': 713299481024, 'output_files': {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} ]}}}}} """ logger.info('get_estimated_resources on: %s' % specification_tree) return self.get_subtree_estimate(specification_tree) def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None): return Service(servicename=servicename, servicehandler=ResourceEstimatorHandler, busname=busname, broker=broker, numthreads=1, verbose=True) def main(): from optparse import OptionParser from lofar.messaging import setQpidLogLevel from lofar.common.util import waitForInterrupt # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the resourceassigner service') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME) parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() setQpidLogLevel(logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) with createService(busname=options.busname, servicename=options.servicename, broker=options.broker): waitForInterrupt() if __name__ == '__main__': main()