Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
assignment.py 24.22 KiB
#!/usr/bin/env python

# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $

"""
ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset.
"""

import logging
from datetime import datetime, timedelta
import time
import collections

from lofar.common.util import humanreadablesize
from lofar.messaging.messages import EventMessage
from lofar.messaging.messagebus import ToBus
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset

from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors

from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME

from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME

from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME

from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX

from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME

from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME

logger = logging.getLogger(__name__)

class ResourceAssigner():
    def __init__(self,
                 radb_busname=RADB_BUSNAME,
                 radb_servicename=RADB_SERVICENAME,
                 re_busname=RE_BUSNAME,
                 re_servicename=RE_SERVICENAME,
                 otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
                 otdb_servicename=DEFAULT_OTDB_SERVICENAME,
                 cleanup_busname=DEFAULT_CLEANUP_BUSNAME,
                 cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME,
                 ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME,
                 ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX,
                 mom_busname=DEFAULT_MOMQUERY_BUSNAME,
                 mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                 broker=None):
        """
        ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset.
        :param radb_busname: busname on which the radb service listens (default: lofar.ra.command)
        :param radb_servicename: servicename of the radb service (default: RADBService)
        :param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command)
        :param re_servicename: servicename of the resource estimator service (default: ResourceEstimation)
        :param broker: Valid Qpid broker host (default: None, which means localhost)
        """
        self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180)
        self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180)
        self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
        self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
        self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker)
        self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
        self.ra_notification_prefix = ra_notification_prefix

    def __enter__(self):
        """Internal use only. (handles scope 'with')"""
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Internal use only. (handles scope 'with')"""
        self.close()

    def open(self):
        """Open rpc connections to radb service and resource estimator service"""
        self.radbrpc.open()
        self.rerpc.open()
        self.otdbrpc.open()
        self.momrpc.open()
        self.curpc.open()
        self.ra_notification_bus.open()

    def close(self):
        """Close rpc connections to radb service and resource estimator service"""
        self.radbrpc.close()
        self.rerpc.close()
        self.otdbrpc.close()
        self.momrpc.close()
        self.curpc.close()
        self.ra_notification_bus.close()

    def doAssignment(self, specification_tree):
        logger.info('doAssignment: specification_tree=%s' % (specification_tree))

        otdb_id = specification_tree['otdb_id']
        taskType = specification_tree.get('task_type', '').lower()
        status = specification_tree.get('state', '').lower()

        if status not in ['approved', 'prescheduled']: # cep2 accepts both, cep4 only prescheduled, see below
            logger.info('skipping specification for otdb_id=%s because status=%s', (otdb_id, status))

        #parse main parset...
        mainParset = parameterset(specification_tree['specification'])

        momId = mainParset.getInt('Observation.momID', -1)

        clusterIsCEP4 = self.checkClusterIsCEP4(mainParset)
        clusterName = 'CEP4' if clusterIsCEP4 else 'CEP2'

        if clusterIsCEP4:
            try:
                startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S')
                endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S')
            except ValueError:
                logger.warning('cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', (otdb_id, ))
                maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree)

                taskDuration = mainParset.getInt('Observation.Scheduler.taskDuration', -1)
                taskDuration = timedelta(seconds=taskDuration) if taskDuration > 0 else timedelta(hours=1)

                if maxPredecessorEndTime:
                    startTime = maxPredecessorEndTime + timedelta(minutes=1)
                    endTime = startTime + taskDuration
                    logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s based on maxPredecessorEndTime (%s)',
                                   startTime, endTime, otdb_id, maxPredecessorEndTime)
                else:
                    startTime = datetime.utcnow() + timedelta(hours=1) + timedelta(minutes=1)
                    endTime = startTime + taskDuration
                    logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s one hour from now',
                                   startTime, endTime, otdb_id)

                try:
                    logger.info('uploading auto-generated start/end time  (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id)
                    self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'),
                                                                 'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')})
                except Exception as e:
                    logger.error(e)

            try:
                # fix for MoM bug introduced before NV's holiday
                # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4
                # so, override it here if needed, and update to otdb
                processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '')
                if processingClusterName != clusterName:
                    logger.info('overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s',
                                 processingClusterName, clusterName, otdb_id)
                    self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName })
            except Exception as e:
                logger.error(e)

        # insert new task and specification in the radb
        # any existing specification and task with same otdb_id will be deleted automatically
        logger.info('doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s cluster=%s' %
                    (momId, otdb_id, status, taskType, startTime, endTime, clusterName))
        result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType, startTime, endTime, str(mainParset), clusterName)

        if not result['inserted']:
            logger.error('could not insert specification and task')
            return

        specificationId = result['specification_id']
        taskId = result['task_id']
        logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId,taskId))

        task = self.radbrpc.getTask(taskId)
        self.processPredecessors(task)
        self.processSuccessors(task)

        # do not assign resources to task for other clusters than cep4
        if not clusterIsCEP4:
            return

        if status != 'prescheduled':
            logger.info('skipping resource assignment for CEP4 task otdb_id=%s because status=%s' % (otdb_id, status))
            return

        needed = self.getNeededResouces(specification_tree)
        logger.info('doAssignment: getNeededResouces=%s' % (needed,))

        if not str(otdb_id) in needed:
            logger.error("no otdb_id %s found in estimator results %s" % (otdb_id, needed))
            return

        if not taskType in needed[str(otdb_id)]:
            logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)]))
            return

        # claim the resources for this task
        # during the claim inserts the claims are automatically validated
        # and if not enough resources are available, then they are put to conflict status
        # also, if any claim is in conflict state, then the task is put to conflict status as well
        main_needed = needed[str(otdb_id)]

        if 'errors' in main_needed and main_needed['errors']:
            for error in main_needed['errors']:
                logger.error("Error in estimator: %s", error)

            logger.error("Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to 'error'", otdb_id, taskId)
            self.radbrpc.updateTask(taskId, status='error')
            self._sendNotification(task, 'error')
        else:
            claimed, claim_ids = self.claimResources(main_needed, task)
            if claimed:
                conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict')

                if conflictingClaims:
                    # radb set's task status to conflict automatically
                    logger.warning('doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' %
                                (len(conflictingClaims), conflictingClaims))
                    self._sendNotification(task, 'conflict')
                else:
                    logger.info('doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,))
                    self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated')

                    # remove any output and/or intermediate data for restarting CEP4 pipelines
                    if task['type'] == 'pipeline':
                        path_result = self.curpc.getPathForOTDBId(task['otdb_id'])
                        if path_result['found']:
                            logger.info("removing data on disk from previous run for otdb_id %s", otdb_id)
                            result = self.curpc.removeTaskData(task['otdb_id'])

                            if not result['deleted']:
                                logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message'])

                    # send notification that the task was scheduled,
                    # another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb
                    self._sendNotification(task, 'scheduled')
            else:
                logger.warning('doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % (taskId))
                self.radbrpc.updateTask(taskId, status='conflict')
                self._sendNotification(task, 'conflict')

    def _sendNotification(self, task, status):
        try:
            if status == 'scheduled' or status == 'conflict' or status == 'error':
                content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']}
                subject= 'Task' + status[0].upper() + status[1:]
                msg = EventMessage(context=self.ra_notification_prefix + subject, content=content)
                logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' ')))
                self.ra_notification_bus.send(msg)
        except Exception as e:
            logger.error(str(e))

    def processPredecessors(self, task):
        try:
            predecessor_mom_ids = self.momrpc.getPredecessorIds(task['mom_id'])[str(task['mom_id'])]

            if predecessor_mom_ids:
                logger.info('proccessing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id'])

                for predecessor_mom_id in predecessor_mom_ids:
                    #check if the predecessor needs to be linked to this task
                    predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id)
                    if predecessor_task:
                        if predecessor_task['id'] not in task['predecessor_ids']:
                            logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to it\'s successor with mom_id=%s otdb_id=%s', predecessor_task['mom_id'],
                                                                                                                                              predecessor_task['otdb_id'],
                                                                                                                                              task['mom_id'],
                                                                                                                                              task['otdb_id'])
                            self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id'])
                    else:
                        logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', predecessor_task['otdb_id'], task['otdb_id'])
            else:
                logger.info('no predecessors for otdb_id=%s', task['otdb_id'])

        except Exception as e:
            logger.error(e)

    def processSuccessors(self, task):
        try:
            successor_mom_ids = self.momrpc.getSuccessorIds(task['mom_id'])[str(task['mom_id'])]

            if successor_mom_ids:
                logger.info('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id'])

                for successor_mom_id in successor_mom_ids:
                    #check if the successor needs to be linked to this task
                    successor_task = self.radbrpc.getTask(mom_id=successor_mom_id)
                    if successor_task:
                        if successor_task['id'] not in task['successor_ids']:
                            logger.info('connecting successor task with mom_id=%s otdb_id=%s to it\'s predecessor with mom_id=%s otdb_id=%s', successor_task['mom_id'],
                                                                                                                                              successor_task['otdb_id'],
                                                                                                                                              task['mom_id'],
                                                                                                                                              task['otdb_id'])
                            self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id'])
                        movePipelineAfterItsPredecessors(successor_task, self.radbrpc)
                    else:
                        logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id'])
            else:
                logger.info('no successors for otdb_id=%s', task['otdb_id'])

        except Exception as e:
            logger.error(e)

    def getMaxPredecessorEndTime(self, specification_tree):
        try:
            predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']]
            predecessor_endTimes = [datetime.strptime(spec.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') for spec in predecessor_specs]

            if predecessor_endTimes:
                return max(predecessor_endTimes)
        except Exception as e:
            logger.error(e)
        return None

    def checkClusterIsCEP4(self, parset):
        # check storageClusterName for enabled DataProducts
        # if any storageClusterName is not CEP4, we do not accept this parset
        keys = ['Output_Correlated',
                'Output_IncoherentStokes',
                'Output_CoherentStokes',
                'Output_InstrumentModel',
                'Output_SkyImage',
                'Output_Pulsar']
        for key in keys:
            if parset.getBool('Observation.DataProducts.%s.enabled' % key, False):
                if parset.getString('Observation.DataProducts.%s.storageClusterName' % key, '') != 'CEP4':
                    logger.warn("storageClusterName not CEP4, rejecting specification.")
                    return False

        logger.info("all enabled storageClusterName's are CEP4, accepting specification.")
        return True


    def getNeededResouces(self, specification_tree):
        replymessage, status = self.rerpc({"specification_tree":specification_tree}, timeout=10)
        logger.info('getNeededResouces: %s' % replymessage)
        return replymessage

    def claimResources(self, needed_resources, task):
        logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources))

        # get the needed resources for the task type
        needed_resources_for_task_type = needed_resources[task['type']]

        # get db lists
        rc_property_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()}
        resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()}
        resources = self.radbrpc.getResources()

        # loop over needed_resources -> resource_type -> claim (and props)
        # flatten the tree dict to a list of claims (with props)
        claims = []
        for resource_type_name, needed_claim_for_resource_type in needed_resources_for_task_type.items():
            if resource_type_name in resource_types:
                logger.info('claimResources: processing resource_type: %s contents: %s' % (resource_type_name, needed_claim_for_resource_type))
                db_resource_type_id = resource_types[resource_type_name]
                db_resources_for_type = [r for r in resources if r['type_id'] == db_resource_type_id]

                # needed_claim_for_resource_type is a dict containing exactly one kvp of which the value is an int
                # that value is the value for the claim
                needed_claim_value = next((v for k,v in needed_claim_for_resource_type.items() if isinstance(v, int)))

                # FIXME: right now we just pick the first resource from the 'cep4' resources.
                # estimator will deliver this info in the future
                db_cep4_resources_for_type = [r for r in db_resources_for_type if 'cep4' in r['name'].lower()]

                if db_cep4_resources_for_type:
                    claim = {'resource_id':db_cep4_resources_for_type[0]['id'],
                            'starttime':task['starttime'],
                            'endtime':task['endtime'],
                            'status':'claimed',
                            'claim_size':needed_claim_value}

                    #FIXME: find proper way to extend storage time with a month
                    if 'storage' in db_cep4_resources_for_type[0]['name']:
                        claim['endtime'] += timedelta(days=31)

                    # if the needed_claim_for_resource_type dict contains more kvp's,
                    # then the subdicts contains groups of properties for the claim
                    if len(needed_claim_for_resource_type) > 1:
                        claim['properties'] = []

                        def processProperties(propertiesDict, sap_nr=None, is_input=False):
                            for prop_type_name, prop_value in propertiesDict.items():
                                if prop_type_name in rc_property_types:
                                    rc_property_type_id = rc_property_types[prop_type_name]
                                    property = {'type':rc_property_type_id,
                                                'value':prop_value,
                                                'io_type': 'input' if is_input else 'output'}
                                    if sap_nr is not None:
                                        property['sap_nr'] = sap_nr
                                    claim['properties'].append(property)
                                else:
                                    logger.error('claimResources: unknown prop_type:%s' % prop_type_name)

                        subdicts = {k:v for k,v in needed_claim_for_resource_type.items() if isinstance(v, dict)}
                        for subdict_name, subdict in subdicts.items():
                            logger.info('claimResources: processing resource_type: %s subdict_name: \'%s\' subdict_contents: %s' % (resource_type_name, subdict_name, subdict))
                            is_input = 'input' in subdict_name.lower()
                            for group_name, needed_prop_group in subdict.items():
                                if group_name == 'saps':
                                    for sap_dict in needed_prop_group:
                                        processProperties(sap_dict['properties'], sap_dict['sap_nr'], is_input)
                                else:
                                    processProperties(needed_prop_group, None, is_input)

                    logger.info('claimResources: created claim:%s' % claim)
                    claims.append(claim)
            else:
                logger.error('claimResources: unknown resource_type:%s' % resource_type_name)

        logger.info('claimResources: inserting %d claims in the radb' % len(claims))
        claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids']
        logger.info('claimResources: %d claims were inserted in the radb' % len(claim_ids))
        return len(claim_ids) == len(claims), claim_ids