Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
resource_assigner.py 17.46 KiB
#!/usr/bin/env python

# Copyright (C) 2015-2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: resource_assigner.py 1580 2015-09-30 14:18:57Z loose $

"""
ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset.
"""

import logging

from lofar.common.cache import cache
from lofar.messaging.messages import EventMessage
from lofar.messaging.messagebus import ToBus
from lofar.messaging.RPC import RPC

from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME

from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME

from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME

from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX

from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import ResourceAvailabilityChecker
from lofar.sas.resourceassignment.resourceassigner.schedulers import BasicScheduler, DwellScheduler, PriorityScheduler

from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME

from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC
from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME
from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME

from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME

from lofar.sas.resourceassignment.common.specification import Specification

logger = logging.getLogger(__name__)


class ResourceAssigner(object):
    """
    The ResourceAssigner inserts new tasks or updates existing tasks in the RADB and assigns resources to it based on
    a task's parset.
    """

    def __init__(self,
                 radb_busname=RADB_BUSNAME,
                 radb_servicename=RADB_SERVICENAME,
                 re_busname=RE_BUSNAME,
                 re_servicename=RE_SERVICENAME,
                 otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
                 otdb_servicename=DEFAULT_OTDB_SERVICENAME,
                 storagequery_busname=DEFAULT_STORAGEQUERY_BUSNAME,
                 storagequery_servicename=DEFAULT_STORAGEQUERY_SERVICENAME,
                 cleanup_busname=DEFAULT_CLEANUP_BUSNAME,
                 cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME,
                 ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME,
                 ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX,
                 mom_busname=DEFAULT_MOMQUERY_BUSNAME,
                 mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                 broker=None,
                 radb_dbcreds=None):
        """
        Creates a ResourceAssigner instance

        :param radb_busname: name of the bus on which the radb service listens (default: lofar.ra.command)
        :param radb_servicename: name of the radb service (default: RADBService)
        :param re_busname: name of the bus on which the resource estimator service listens (default: lofar.ra.command)
        :param re_servicename: name of the resource estimator service (default: ResourceEstimation)
        :param otdb_busname: name of the bus on which OTDB listens (default: lofar.otdb.command)
        :param otdb_servicename: name of the OTDB service (default: OTDBService)
        :param storagequery_busname: name of the bus on which the StorageQueryService listens
                                     (default: lofar.dm.command)
        :param storagequery_servicename: name of the StorageQueryService (default: StorageQueryService)
        :param cleanup_busname: name of the bus on which the cleanup service listens (default: lofar.dm.command)
        :param cleanup_servicename: name of the CleanupService (default: CleanupService)
        :param ra_notification_busname: name of the bus on which the ResourceAssigner notifies registered parties
                                        (default: lofar.ra.notification)
        :param ra_notification_prefix: prefix used in notification message subject (default: ResourceAssigner.)
        :param mom_busname: name of the bus on which MOM listens for queries (default: lofar.ra.command)
        :param mom_servicename: name of the MOMQueryService (default: momqueryservice)
        :param broker: Valid Qpid broker host (default: None, which means localhost)
        :param radb_dbcreds: the credentials to be used for accessing the RADB (default: None, which means default)
        """

        self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180)
        self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180)
        self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180)
        self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
        self.sqrpc = StorageQueryRPC(busname=storagequery_busname, servicename=storagequery_servicename, broker=broker)
        self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker)
        self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
        self.ra_notification_prefix = ra_notification_prefix

        self.resource_availability_checker = ResourceAvailabilityChecker(self.radbrpc)

        # For the DwellScheduler instances created during run-time we store the following variables
        self.radb_creds = radb_dbcreds
        self.broker = broker

    def __enter__(self):
        """Internal use only. (handles scope 'with')"""
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Internal use only. (handles scope 'with')"""
        self.close()

    def open(self):
        """Open rpc connections to radb service and resource estimator service"""
        self.radbrpc.open()
        self.rerpc.open()
        self.otdbrpc.open()
        self.momrpc.open()
        self.sqrpc.open()
        self.curpc.open()
        self.ra_notification_bus.open()

    def close(self):
        """Close rpc connections to radb service and resource estimator service"""
        self.radbrpc.close()
        self.rerpc.close()
        self.otdbrpc.close()
        self.momrpc.close()
        self.sqrpc.close()
        self.curpc.close()
        self.ra_notification_bus.close()

    @property
    @cache
    def resource_types(self):
        """ Returns a dict of all the resource types, to convert name->id. """

        return {rt['name']: rt['id'] for rt in self.radbrpc.getResourceTypes()}

    def do_assignment(self, otdb_id, specification_tree):
        """
        Makes the given task known to RADB and attempts to assign (schedule) its requested resources.

        If no list of requested resources could be determined for the task, its status will be set to "error" in RADB.
        If such list can be obtained but it is impossible to assign the requested resources, the task is in conflict
        with other tasks, hence its status will be set to "conflict" in RADB. If all requested resources are
        successfully assigned, its status will be put to "scheduled" in RADB.

        :param otdb_id: OTDB ID of the main task which resources need to be assigned
        :param specification_tree: the specification tree containing the main task and its resources

        :raises an Exception if something unforeseen happened while scheduling
        """

        logger.info(('do_assignment: otdb_id=%s specification_tree=%s' % (otdb_id, specification_tree)))

        spec = Specification(logger, self.otdbrpc, self.momrpc, self.radbrpc)
        spec.from_dict(specification_tree)
        spec.insert_into_radb() # TODO Move this to TaskSpecified?

        # Don't perform any scheduling for tasks that are only approved. Do this check after insertion of
        # specification, task and predecessor/successor relations, so approved tasks appear correctly in the web
        # scheduler.
        if spec.status == 'approved': #TODO should this even still happen?
            logger.info('Task otdb_id=%s is only approved, no resource assignment needed yet' % otdb_id)
            return
        #TODO have Specification propagate to the estimator?
        if self._schedule_resources(spec, specification_tree):
            # Cleanup the data of any previous run of the task
            self._cleanup_earlier_generated_data(otdb_id, spec)

            # Scheduling of resources for this task succeeded, so change task status to "scheduled" and notify
            # our subscribers
            spec.set_status('scheduled')
            self._send_task_status_notification(spec, 'scheduled')
        else:
            # Scheduling of resources for this task failed,
            # check if any of the claims has status conflict,
            # and hence (by the radb triggers) the task has status conflict as well
            # if task not in conflict, then there was a specification/scheduling error
            # so put task status to error (not conflict!)
            spec.read_from_radb(spec.radb_id) #TODO cleanup
            if spec.status == 'conflict':
                # Updating the task status when it is already in 'conflict' seems unnecessary, but in order to
                # satisfy the existing unit tests we put it in here.
                # TODO: discuss if this can be removed
                spec.set_status('conflict')

                # No need for a status change, but do notify our subscribers
                self._send_task_status_notification(spec, 'conflict')
            else:
                # The task is in an unexpected state, so force it to 'error' state and notify our subscribers
                spec.set_status('error')
                self._send_task_status_notification(spec, 'error')

    def _send_task_status_notification(self, spec, new_status):
        """
        Sends a message about the task's status on the RA notification bus

        :param spec:    the task concerned
        :param new_status:  the task's status

        :raises Exception if sending the notification fails
        """
        #TODO can maybe move to Specification in the future? Logically it should be resource_assigner that sends the notification
        content = {
            'radb_id': spec.radb_id,
            'otdb_id': spec.otdb_id,
            'mom_id': spec.mom_id
        }
        subject = 'Task' + new_status[0].upper() + new_status[1:] #TODO this is MAGIC, needs explanation!
        event_message = EventMessage(context=self.ra_notification_prefix + subject, content=content)

        logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' ')))
        self.ra_notification_bus.send(event_message)

    def _get_resource_estimates(self, specification_tree):
        """
        Obtains the resource estimates from the Resource Estimator for the main task in the specification tree and
        validates them.

        :param specification_tree: the task's specification tree

        :return A list of resource estimates for the given task or None in case none could be obtained or if the
                validation failed.
        """

        otdb_id = specification_tree['otdb_id']

        estimates, rerpc_status = self.rerpc({"specification_tree": specification_tree}, timeout=10)
        logger.info('Resource Estimator reply = %s', estimates)

        if estimates['errors']:
            for error in estimates['errors']:
                logger.error("Error from Resource Estimator: %s", error)
            raise ValueError("Error(s) in estimator for otdb_id=%s" % (otdb_id, ))

        if any('resource_types' not in est for est in estimates['estimates']):
            raise ValueError("missing 'resource_types' in 'estimates' in estimator results: %s" % estimates)

        estimates = estimates['estimates']

        if not all(est_val > 0 for est in estimates for est_val in est['resource_types'].values()):
            # Avoid div by 0 and inf looping from estimate <= 0 later on.
            raise ValueError("at least one of the estimates is not a positive number")

        return estimates

    def _schedule_resources(self, spec, specification_tree):
        """
        Schedule the requested resources for a task

        :param spec:  the task's specification

        :returns: True if successful, or False otherwise
        """
        logger.info("Received good estimates, scheduling resources for task %i", spec.radb_id)
        try:
            if spec.isTriggered():
                min_starttime, max_starttime, duration = spec.calculate_dwell_values(spec.starttime, spec.duration,
                                                                                    spec.min_starttime, spec.max_endtime)
                scheduler = DwellScheduler(task_id=spec.radb_id,
                                           specification_tree=specification_tree,
                                           resource_estimator=self._get_resource_estimates,
                                           resource_availability_checker=self.resource_availability_checker,
                                           radbcreds=self.radb_creds,
                                           min_starttime=min_starttime,
                                           max_starttime=max_starttime,
                                           duration=duration)
            else:
                scheduler = PriorityScheduler(task_id=spec.radb_id,
                                              specification_tree=specification_tree,
                                              resource_estimator=self._get_resource_estimates,
                                              resource_availability_checker=self.resource_availability_checker,
                                              radbcreds=self.radb_creds)
        except Exception as e:
            logger.exception('Error in scheduler._schedule_resources: %s', e)
            return False

        try:
            if not scheduler.allocate_resources():
                # try again with basic scheduler to end up with a situation with the 'normal' conflicting resources, which can then be evaluated by users
                scheduler = BasicScheduler(task_id=spec.radb_id,
                                           specification_tree=specification_tree,
                                           resource_estimator=self._get_resource_estimates,
                                           resource_availability_checker=self.resource_availability_checker,
                                           radbcreds=self.radb_creds)

                return scheduler.allocate_resources()
        except Exception as e:
            logger.exception('Error in calling scheduler.allocate_resources: %s', e)
            return False

        logger.info('Resources successfully allocated task_id=%s' % spec.radb_id)
        return True

    def _cleanup_earlier_generated_data(self, otdb_id, spec):
        """
        Remove any output and/or intermediate data from any previous run of the task

        :param otdb_id: the task's OTDB ID
        :param spec:  the task's specification
        """

        # Only needed for pipeline tasks
        if spec.type == 'pipeline':
            try:
                du_result = self.sqrpc.getDiskUsageForOTDBId(spec.otdb_id,
                                                             include_scratch_paths=True,
                                                             force_update=True)

                if du_result['found'] and du_result.get('disk_usage', 0) > 0:
                    logger.info("removing data on disk from previous run for otdb_id %s", otdb_id)
                    result = self.curpc.removeTaskData(spec.otdb_id)
                    if not result['deleted']:
                        logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s",
                                       otdb_id, result['message'])
            except Exception as e:
                # in line with failure as warning just above: allow going to scheduled state here too
                logger.error("Exception in cleaning up earlier data: %s", str(e))