Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
service.py 7.32 KiB
#!/usr/bin/env python
# $Id: service.py $

'''
Simple Service listening
'''

import logging
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface

from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import *
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME

logger = logging.getLogger(__name__)

class ResourceEstimatorHandler(MessageHandlerInterface):
    def __init__(self, **kwargs):
        super(ResourceEstimatorHandler, self).__init__(**kwargs)
        self.observation = ObservationResourceEstimator()
        self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator()
        self.calibration_pipeline = CalibrationPipelineResourceEstimator()
        self.pulsar_pipeline = PulsarPipelineResourceEstimator()
        self.imaging_pipeline = ImagePipelineResourceEstimator()

    def handle_message(self, content):
        specification_tree = content["specification_tree"]
        return self._get_estimated_resources(specification_tree) ##TODO also handle MoM tasks in RA 1.2

    ##FIXME dirty hack
    def add_id(estimate, otdb_id):
        if 'storage' in estimate:
            if 'output_files' in estimate['storage']: #We only need to do output files, it will be someone else's input
                for data_type in estimate['storage']['output_files'].keys():
                    if data_type != 'saps':
                        estimate['storage']['output_files'][data_type][data_type + '_otdb_id'] = otdb_id
        return estimate

    #TODO use something else than .values()[0]['storage'] ??
    def get_subtree_estimate(self, specification_tree):
        otdb_id = specification_tree['otdb_id']
        parset = specification_tree['specification']
        if specification_tree['task_type'] == 'observation':
            return {str(otdb_id): self.add_id(self.observation.verify_and_estimate(parset), otdb_id)}
        elif specification_tree['task_type'] == 'pipeline':
            branch_estimates = {}
            for branch in specification_tree['predecessors']:
                branch_estimates.update(self.get_subtree_estimate(branch))
            logger.info(str(branch_estimates)) 
            if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']:
                for id, estimate in branch_estimates.iteritems():
                    input_files = {}
                    if not 'im' in estimate.values()[0]['storage']['output_files'] and 'uv' in estimate.values()[0]['storage']['output_files']: # Not a calibrator pipeline
                        logger.info('found %s as the target of pipeline %s' % (id, otdb_id))
                        input_files['uv'] = estimate.values()[0]['storage']['output_files']['uv']
                    elif 'im' in estimate.values()[0]['storage']['output_files']:
                        input_files['im'] = estimate.values()[0]['storage']['output_files']['im']
                return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
                
            if specification_tree['task_subtype'] in ['imaging pipeline', 'imaging pipeline msss']:
                if len(branch_estimates) > 1:
                    logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) )
                input_files = branch_estimates.values()[0].values()[0]['storage']['output_files']
                return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
                
            if specification_tree['task_subtype'] in ['long baseline pipeline']:
                if len(branch_estimates) > 1:
                    logger.error('Long baseline pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) )
                input_files = branch_estimates.values()[0].values()[0]['storage']['output_files']
                return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, input_files), otdb_id)}

            if specification_tree['task_subtype'] in ['pulsar pipeline']:
                if len(branch_estimates) > 1:
                    logger.error('Pulsar pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) )
                input_files = branch_estimates.values()[0].values()[0]['storage']['output_files']
                return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, input_files), otdb_id)}
            
        else: # reservation, maintenance, system tasks?
            logger.info("It's not a pipeline or observation: %s" % otdb_id)
            return {str(otdb_id): {}}

    def _get_estimated_resources(self, specification_tree):
        """ Input is like:
            {"otdb_id": otdb_id, "state": 'prescheduled', 'specification': ...,
             'task_type': "pipeline", 'task_subtype': "long baseline pipeline",
            'predecessors': [...]}
        
            reply is something along the lines of:
            {'452648': 
              {'observation': 
                {'bandwidth': {'total_size': 19021319494},
                'storage': {'total_size': 713299481024,
                'output_files': 
                  {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104},
                  'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}},
                           {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, 
                           {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}}
            ]}}}}}
        """
        logger.info('get_estimated_resources on: %s' % specification_tree)
        return self.get_subtree_estimate(specification_tree)

def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None):
    return Service(servicename=servicename,
                   servicehandler=ResourceEstimatorHandler,
                   busname=busname,
                   broker=broker,
                   numthreads=1,
                   verbose=True)

def main():
    from optparse import OptionParser
    from lofar.messaging import setQpidLogLevel
    from lofar.common.util import waitForInterrupt

    # Check the invocation arguments
    parser = OptionParser("%prog [options]",
                          description='runs the resourceassigner service')
    parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
    parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME)
    parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME)
    parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
    (options, args) = parser.parse_args()

    setQpidLogLevel(logging.INFO)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        level=logging.DEBUG if options.verbose else logging.INFO)

    with createService(busname=options.busname, servicename=options.servicename, broker=options.broker):
        waitForInterrupt()

if __name__ == '__main__':
    main()