diff --git a/.gitattributes b/.gitattributes index 5cae2e0d5626a17886d8445b423054530b1bafcd..0e38df4e06b9890b9e1896b02d355f0abfc1bc56 100644 --- a/.gitattributes +++ b/.gitattributes @@ -4882,10 +4882,10 @@ SAS/ResourceAssignment/ResourceAssigner/bin/resourceassigner -text SAS/ResourceAssignment/ResourceAssigner/bin/resourceassigner.ini -text SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt -text SAS/ResourceAssignment/ResourceAssigner/lib/__init__.py -text -SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py -text SAS/ResourceAssignment/ResourceAssigner/lib/config.py -text SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py -text SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py -text +SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py -text SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py -text SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py -text SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py -text diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index 3533ea43844e4782c2fc6ce7ae3188ef34c47646..2d2f2ce5f920a1631c58ef3015a17cf600f060ac 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -18,7 +18,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $ +# $Id: resource_assigner.py 1580 2015-09-30 14:18:57Z loose $ """ RAtoOTDBTaskSpecificationPropagator gets a task to be scheduled in OTDB, diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index 0fa0b7585136c0cda18ee3c3e5283cc039ddcdb1..e686a0950808e260155dbccd485e3afa43eb148f 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -18,7 +18,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $ +# $Id: resource_assigner.py 1580 2015-09-30 14:18:57Z loose $ """ RAtoOTDBTaskSpecificationPropagator gets a task to be scheduled in OTDB, diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt index 3ff856e4ed1bcf8eeccf797d0ecef7f317a8c85c..a0bdc997103cd89e0b439b214d6ae217e903292e 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt @@ -3,7 +3,7 @@ python_install( __init__.py raservice.py - assignment.py + resource_assigner.py resource_availability_checker.py rabuslistener.py schedulechecker.py diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py deleted file mode 100755 index 3bf756caf4fae56125de0a30e3556005069e2f03..0000000000000000000000000000000000000000 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ /dev/null @@ -1,571 +0,0 @@ -#!/usr/bin/env python - -# Copyright (C) 2015-2017 -# ASTRON (Netherlands Institute for Radio Astronomy) -# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands -# -# This file is part of the LOFAR software suite. -# The LOFAR software suite is free software: you can redistribute it -# and/or modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# The LOFAR software suite is distributed in the hope that it will be -# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. -# -# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $ - -""" -ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset. -""" - -import logging -from datetime import datetime, timedelta - -from lofar.common.cache import cache -from lofar.common.datetimeutils import totalSeconds, parseDatetime -from lofar.messaging.messages import EventMessage -from lofar.messaging.messagebus import ToBus -from lofar.messaging.RPC import RPC -from lofar.parameterset import parameterset - -from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors - -from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC -from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME -from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME - -from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME -from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME - -from lofar.sas.otdb.otdbrpc import OTDBRPC -from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME - -from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME -from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX - -from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import ResourceAvailabilityChecker -from lofar.sas.resourceassignment.resourceassigner.schedulers import DwellScheduler - -from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC -from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME - -from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC -from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME -from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME - -from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC -from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME -from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME - -logger = logging.getLogger(__name__) - - -class ResourceAssigner(object): - def __init__(self, - radb_busname=RADB_BUSNAME, - radb_servicename=RADB_SERVICENAME, - re_busname=RE_BUSNAME, - re_servicename=RE_SERVICENAME, - otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, - otdb_servicename=DEFAULT_OTDB_SERVICENAME, - storagequery_busname=DEFAULT_STORAGEQUERY_BUSNAME, - storagequery_servicename=DEFAULT_STORAGEQUERY_SERVICENAME, - cleanup_busname=DEFAULT_CLEANUP_BUSNAME, - cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME, - ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, - ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX, - mom_busname=DEFAULT_MOMQUERY_BUSNAME, - mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, - broker=None, - radb_dbcreds=None): - """ - ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset. - - :param radb_busname: busname on which the radb service listens (default: lofar.ra.command) - :param radb_servicename: servicename of the radb service (default: RADBService) - :param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command) - :param re_servicename: servicename of the resource estimator service (default: ResourceEstimation) - :param broker: Valid Qpid broker host (default: None, which means localhost) - """ - - self.radb_creds = radb_dbcreds - - self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180) - self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180) - self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now - self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180) - self.sqrpc = StorageQueryRPC(busname=storagequery_busname, servicename=storagequery_servicename, broker=broker) - self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) - self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) - self.ra_notification_prefix = ra_notification_prefix - - self.resource_availability_checker = ResourceAvailabilityChecker(self.radbrpc) - - def __enter__(self): - """Internal use only. (handles scope 'with')""" - self.open() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Internal use only. (handles scope 'with')""" - self.close() - - def open(self): - """Open rpc connections to radb service and resource estimator service""" - self.radbrpc.open() - self.rerpc.open() - self.otdbrpc.open() - self.momrpc.open() - self.sqrpc.open() - self.curpc.open() - self.ra_notification_bus.open() - - def close(self): - """Close rpc connections to radb service and resource estimator service""" - self.radbrpc.close() - self.rerpc.close() - self.otdbrpc.close() - self.momrpc.close() - self.sqrpc.close() - self.curpc.close() - self.ra_notification_bus.close() - - @property - @cache - def resource_types(self): - """ Returns a dict of all the resource types, to convert name->id. """ - - return {rt['name']: rt['id'] for rt in self.radbrpc.getResourceTypes()} - - def do_assignment(self, otdb_id, specification_tree): - """ Attempts to assign the specified resources - - :param otdb_id: OTDB ID of the main task which resources need to be assigned - :param specification_tree: the specification tree containing the main task and its resources - """ - - task_id, task_type, task_status, task = self._insert_specification_into_radb(otdb_id, specification_tree) - - if task_status == 'approved': - # Do this check after insertion of specification, task and predecessor/successor relations, so approved - # tasks appear correctly in the web scheduler. - logger.info('Task otdb_id=%s is already approved, no resource assignment needed' % otdb_id) - return - - requested_resources = self._get_resource_estimates(specification_tree, otdb_id, task_type, task_id, task) - if requested_resources is None: - self._finish_resource_assignment(task, 'error') - return - - if not self._schedule_resources(task_id, task, requested_resources): - self._finish_resource_assignment(task, 'conflict') - return - - self._cleanup_generated_pipeline_data(otdb_id, task) - self._finish_resource_assignment(task, 'scheduled') - - def _insert_specification_into_radb(self, otdb_id, specification_tree): - """ - Inserts the main task's specification into RADB along with any predecessors and successors it has. - - :param otdb_id: the main task's OTDB ID - :param specification_tree: the main task's specification - :return: True if specification is successfully inserted into RADB, or False if not - """ - - task_status = self._get_is_assignable(otdb_id, specification_tree) - - task_type, start_time, end_time, cluster_name = self._prepare_to_insert_main_task(otdb_id, specification_tree) - - task_id, task = self._insert_main_task(specification_tree, start_time, end_time, cluster_name) - - self._process_task_predecessors(task) - self._process_task_successors(task) - - logger.info('Successfully inserted main task and its predecessors and successors into RADB: task=%s', task) - - return task_id, task_type, task_status, task - - def _get_is_assignable(self, otdb_id, specification_tree): - """ - Verifies if a task can actually be assigned by looking at its status. Raises an exception if the task is not - assignable. - - :param otdb_id: ORDB ID of the task - :param specification_tree: the specification tree of the task - - :returns the task's status if it is assignable (and raises an exception if it is not) - """ - - assignable_task_states = ['approved', 'prescheduled'] - status = specification_tree.get('state', '').lower() - if status in assignable_task_states: - logger.info('Task otdb_id=%s with status \'%s\' is assignable' % (otdb_id, status)) - else: - assignable_task_states_str = ', '.join(assignable_task_states) - logger.warn('Task otdb_id=%s with status \'%s\' is not assignable. Allowed statuses are %s' % - (otdb_id, status, assignable_task_states_str)) - - message = "doAssignment: Unsupported status %s of task with OTDB ID: %s" % (status, otdb_id) - raise Exception(message) - - return status - - def _prepare_to_insert_main_task(self, otdb_id, specification_tree): - """ - Prepares for insertion of the main task by extracting start_time, end_time, and cluster_name from its - specification. - - :param otdb_id: the main task's OTDB ID - :param specification_tree: the main task's specification - :return: 3-tuple containing the main task's start_time, end_time, and cluster_name respectively - """ - - main_parset = self._get_main_parset(specification_tree) - task_type, task_subtype = self._get_task_type(specification_tree) - cluster_name = self._get_clustername(otdb_id, main_parset, task_type, task_subtype) - start_time, end_time = self._get_main_task_start_and_end_times(specification_tree) - - logger.info('preparations for inserting main task into RADB successful') - - return task_type, start_time, end_time, cluster_name - - def _finish_resource_assignment(self, task, new_task_status): - """ - Takes care of the needed RADB task administration and status change notification before generating an exception - """ - - if task is not None and new_task_status in ('conflict', 'error', 'scheduled'): - logger.info('Finishing resource assignment for task_id=%s, status=%s' % (task["task_id"], new_task_status)) - - # another service sets the parset spec in OTDB, and updated otdb task status to scheduled, which is then - # synced to RADB - self.radbrpc.updateTask(task['id'], task_status=new_task_status) - - content = { - 'radb_id': task['id'], - 'otdb_id': task['otdb_id'], - 'mom_id': task['mom_id'] - } - subject = 'Task' + new_task_status[0].upper() + new_task_status[1:] - event_message = EventMessage(context=self.ra_notification_prefix + subject, content=content) - - logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) - self.ra_notification_bus.send(event_message) - - def _get_main_parset(self, specification_tree): - """ Extracts the main parset from a specification tree - - :param specification_tree the specification tree of the task - - :returns the main parset - """ - - return parameterset(specification_tree['specification']) - - def _get_task_type(self, specification_tree): - taskType = specification_tree['task_type'] # is required item - if 'task_subtype' in specification_tree: # is optional item - taskSubtype = specification_tree['task_subtype'] - else: - taskSubtype = '' - - return taskType, taskSubtype - - def _get_clustername(self, otdb_id, mainParset, taskType, taskSubtype): - """ Determines the name of the cluster to which to store the task's output - if it produces output at all. - - :param otdb_id: the ORDB ID of the (main) task - :param mainParset: the parset of the main task - :param taskType: the task's type - :param taskSubtype: the task's subtype - - :returns The name of the output cluster, or an empty string if none is applicable - """ - - clusterName = '' - if taskType not in ('reservation',): - # Only assign resources for task output to known clusters - clusterNameSet = self._get_cluster_names(mainParset) - - if str() in clusterNameSet or len(clusterNameSet) != 1: - # Empty set or name is always an error. - # TODO: To support >1 cluster per obs, - # self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name - # Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster - logger.error( - 'clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet) - else: - clusterName = clusterNameSet.pop() - - # Retrieve known cluster names (not all may be a valid storage target, but we cannot know...) - knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')} - logger.info('known clusters: %s', knownClusterSet) - if clusterName not in knownClusterSet: - raise Exception("skipping resource assignment for task with cluster name '" + clusterName + - "' not in known clusters " + str(knownClusterSet)) - else: - # fix for MoM bug introduced before NV's holiday - # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4 - # so, override it here if needed, and update to otdb - processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '') - if processingClusterName != clusterName: - logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', - processingClusterName, clusterName, otdb_id) - self.otdbrpc.taskSetSpecification(otdb_id, { - 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName}) - - return clusterName - - def _get_cluster_names(self, parset): - """ Return set of storage cluster names for all enabled output data product types in parset, - or raise for an enabled output data product type without storage cluster name. - """ - clusterNames = set() - - keys = ['Output_Correlated', - 'Output_IncoherentStokes', - 'Output_CoherentStokes', - 'Output_InstrumentModel', - 'Output_SkyImage', - 'Output_Pulsar'] - for key in keys: - if parset.getBool('Observation.DataProducts.%s.enabled' % key, False): - name = parset.getString('Observation.DataProducts.%s.storageClusterName' % key) # may raise; don't pass default arg - clusterNames.add(name) - - return clusterNames - - def _get_main_task_start_and_end_times(self, specification_tree): - """ Get the start time and end time of the main task adapted such that (a) there's a period of 3 minutes between - tasks and (b) the start time and end time are actually in the future. - - :param specification_tree: specification tree for the main task - :returns 2-tuple (start_time, end_time) - """ - - def apply_sane_start_and_end_time(orig_start_time, orig_end_time, otdb_id): - start_time = datetime.utcnow() + timedelta(minutes=3) - - max_predecessor_end_time = self._get_max_predecessor_end_time(specification_tree) - if max_predecessor_end_time and max_predecessor_end_time > start_time: - start_time = max_predecessor_end_time + timedelta(minutes=3) - - if orig_end_time > orig_start_time: - task_duration = timedelta(seconds=totalSeconds(orig_end_time - orig_start_time)) - else: - timedelta(hours=1) - - end_time = start_time + task_duration - - logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', - start_time, end_time, otdb_id) - - logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', start_time, - end_time, otdb_id) - self.otdbrpc.taskSetSpecification(otdb_id, - {'LOFAR.ObsSW.Observation.startTime': start_time.strftime('%Y-%m-%d %H:%M:%S'), - 'LOFAR.ObsSW.Observation.stopTime': end_time.strftime('%Y-%m-%d %H:%M:%S')}) - return start_time, end_time - - # TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. - # TODO: Fix the cause! Idem for MoM fix up below. - - main_parset = self._get_main_parset(specification_tree) - start_time = parseDatetime(main_parset.getString('Observation.startTime')) - end_time = parseDatetime(main_parset.getString('Observation.stopTime')) - - if start_time < datetime.utcnow(): - otdb_id = specification_tree['otdb_id'] - start_time, end_time = apply_sane_start_and_end_time(start_time, end_time, otdb_id) - - return start_time, end_time - - def _insert_main_task(self, specification_tree, start_time, end_time, cluster_name): - """ Inserts the main task and its specification into the RADB. Any existing specification and task with same - otdb_id will be deleted automatically. - - :param specification_tree: specification tree for the main task - :return: 2-tuple (task_id, task) - """ - - task_type, _ = self._get_task_type(specification_tree) - main_parset = self._get_main_parset(specification_tree) - mom_id = main_parset.getInt('Observation.momID', -1) - status = specification_tree.get('state', '').lower() - otdb_id = specification_tree['otdb_id'] - logger.info( - 'doAssignment: insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s cluster=%s' % - (mom_id, otdb_id, status, task_type, start_time, end_time, cluster_name)) - - result = self.radbrpc.insertSpecificationAndTask(mom_id, otdb_id, status, task_type, start_time, end_time, - str(main_parset), cluster_name) - - specification_id = result['specification_id'] - task_id = result['task_id'] - logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) - - task = self.radbrpc.getTask(task_id) - - return task_id, task - - def _get_resource_estimates(self, specification_tree, otdb_id, taskType, taskId, task): - """ Request and return checked estimates of needed resources from Resource Estimator. """ - - estimates = None - try: - reReply, rerpcStatus = self.rerpc({"specification_tree" : specification_tree}, timeout=10) - logger.info('doAssignment: Resource Estimator reply = %s', reReply) - - if str(otdb_id) not in reReply: - raise ValueError("no otdb_id %s found in estimator results %s" % (otdb_id, reReply)) - estimates = reReply[str(otdb_id)] - - if taskType not in estimates: - raise ValueError("no task type %s found in estimator results %s" % (taskType, estimates)) - estimates = estimates[taskType] - - if 'errors' in estimates and estimates['errors']: - for error in estimates['errors']: - logger.error("Error from Resource Estimator: %s", error) - raise ValueError("Error(s) in estimator for otdb_id=%s radb_id=%s" % (otdb_id, taskId)) - - if 'estimates' not in estimates or any('resource_types' not in est for est in estimates['estimates']): - raise ValueError("missing 'resource_types' in 'estimates' in estimator results: %s" % estimates) - estimates = estimates['estimates'] - - if not all(est_val > 0 for est in estimates for est_val in est['resource_types'].values()): - # Avoid div by 0 and inf looping from estimate <= 0 later on. - raise ValueError("at least one of the estimates is not a positive number") - except Exception as e: - estimates = None - - logger.error('An exception occurred while obtaining resource estimates. Exception=%s' % str(e)) - - return estimates - - def _schedule_resources(self, task_id, specification_tree, requested_resources): - start_time, end_time = self._get_main_task_start_and_end_times(specification_tree) - - scheduler = DwellScheduler(task_id = task_id, - resource_availability_checker=self.resource_availability_checker, - radbcreds=self.radb_creds, - # For now dwell-behavior is disabled by setting min_starttime/max_starttime to - # start_time, because the specification doesn't yet support this. - # TODO: enable dwell-scheduling once min_starttime/max_starttime are propagated - min_starttime=start_time, - max_starttime=start_time, - duration=end_time - start_time) - - result = scheduler.allocate_resources(requested_resources) - - if result: - logger.info('Resources successfully allocated task_id=%s' % task_id) - else: - logger.info('No resources allocated task_id=%s' % task_id) - - return result - - def _cleanup_generated_pipeline_data(self, otdb_id, task): - """ - Remove any output and/or intermediate data for restarting pipelines - - :return: - """ - - if task['type'] == 'pipeline': - try: - du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True) - if du_result['found'] and du_result.get('disk_usage', 0) > 0: - logger.info("removing data on disk from previous run for otdb_id %s", otdb_id) - result = self.curpc.removeTaskData(task['otdb_id']) - if not result['deleted']: - logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", - otdb_id, result['message']) - except Exception as e: - # in line with failure as warning just above: allow going to scheduled state here too - logger.error(str(e)) - - def _announceStateChange(self, task, status): - if status == 'scheduled' or status == 'conflict' or status == 'error': - content={'radb_id': task['id'], 'otdb_id': task['otdb_id'], 'mom_id': task['mom_id']} - subject= 'Task' + status[0].upper() + status[1:] - else: # should not end up here (bug) - logger.error('_announceStateChange(): bug: Not sending notification as status is %s' % status) - return - - try: - if status != 'scheduled': - # another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb - self.radbrpc.updateTask(task['id'], task_status=status) - - msg = EventMessage(context=self.ra_notification_prefix + subject, content=content) - logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) - self.ra_notification_bus.send(msg) - except Exception as e: - logger.error(str(e)) - - def _process_task_predecessors(self, task): - mom_id = task['mom_id'] - - predecessor_ids = self.momrpc.getPredecessorIds(mom_id) - if str(mom_id) not in predecessor_ids or not predecessor_ids[str(mom_id)]: - logger.info('no predecessors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) - return - predecessor_mom_ids = predecessor_ids[str(mom_id)] - - logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id']) - - for predecessor_mom_id in predecessor_mom_ids: - # check if the predecessor needs to be linked to this task - predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id) - if predecessor_task: - if predecessor_task['id'] not in task['predecessor_ids']: - logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', - predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], task['otdb_id']) - self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id']) - else: - # Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond approved, - # which is in principle valid. The link in the radb will be made later via processSuccessors() below. - # Alternatively, a predecessor could have been deleted. - logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', predecessor_mom_id, task['otdb_id']) - - - def _process_task_successors(self, task): - mom_id = task['mom_id'] - - successor_ids = self.momrpc.getSuccessorIds(mom_id) - if str(mom_id) not in successor_ids or not successor_ids[str(mom_id)]: - logger.info('no successors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) - return - successor_mom_ids = successor_ids[str(mom_id)] - - logger.info('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id']) - - for successor_mom_id in successor_mom_ids: - # check if the successor needs to be linked to this task - successor_task = self.radbrpc.getTask(mom_id=successor_mom_id) - if successor_task: - if successor_task['id'] not in task['successor_ids']: - logger.info('connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', - successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], task['otdb_id']) - self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) - movePipelineAfterItsPredecessors(successor_task, self.radbrpc) - else: - # Occurs when settings a obs or task to prescheduled while a successor has e.g. not yet been beyond approved, - # which is quite normal. The link in the radb will be made later via processPredecessors() above. - # Alternatively, a successor could have been deleted. - logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', successor_mom_id, task['otdb_id']) - - def _get_max_predecessor_end_time(self, specification_tree): - predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']] - predecessor_endTimes = [parseDatetime(spec.getString('Observation.stopTime')) for spec in predecessor_specs] - if predecessor_endTimes: - return max(predecessor_endTimes) - return None diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index 9fbc73851f2a1bba10a1b379903da6beb823d754..09e0413ad52ebef815d71a998f26781d050e2d83 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -33,11 +33,12 @@ from lofar.common import dbcredentials from lofar.sas.resourceassignment.rataskspecified.RABusListener import RATaskSpecifiedBusListener from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT -from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner +from lofar.sas.resourceassignment.resourceassigner.resource_assigner import ResourceAssigner from lofar.sas.resourceassignment.resourceassigner.schedulechecker import ScheduleChecker logger = logging.getLogger(__name__) + class SpecifiedTaskListener(RATaskSpecifiedBusListener): def __init__(self, busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME, @@ -65,12 +66,13 @@ class SpecifiedTaskListener(RATaskSpecifiedBusListener): logger.info('onTaskSpecified: otdb_id=%s' % otdb_id) try: - self.assigner.do_assignment(specification_tree) + self.assigner.do_assignment(otdb_id, specification_tree) except Exception as e: logger.error(str(e)) __all__ = ["SpecifiedTaskListener"] + def main(): # make sure we run in UTC timezone import os diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py new file mode 100755 index 0000000000000000000000000000000000000000..3f0b14b65145b3239a4e40ae04256145cc5b0719 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py @@ -0,0 +1,774 @@ +#!/usr/bin/env python + +# Copyright (C) 2015-2017 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: resource_assigner.py 1580 2015-09-30 14:18:57Z loose $ + +""" +ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset. +""" + +import logging +from datetime import datetime, timedelta + +from lofar.common.cache import cache +from lofar.common.datetimeutils import parseDatetime +from lofar.messaging.messages import EventMessage +from lofar.messaging.messagebus import ToBus +from lofar.messaging.RPC import RPC +from lofar.parameterset import parameterset + +from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors + +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME + +from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME +from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME + +from lofar.sas.otdb.otdbrpc import OTDBRPC +from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME + +from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME +from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX + +from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import ResourceAvailabilityChecker +from lofar.sas.resourceassignment.resourceassigner.schedulers import DwellScheduler + +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME + +from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC +from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME +from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME + +from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC +from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME +from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME + +logger = logging.getLogger(__name__) + + +class ResourceAssigner(object): + """ + The ResourceAssigner inserts new tasks or updates existing tasks in the RADB and assigns resources to it based on + a task's parset. + """ + + def __init__(self, + radb_busname=RADB_BUSNAME, + radb_servicename=RADB_SERVICENAME, + re_busname=RE_BUSNAME, + re_servicename=RE_SERVICENAME, + otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, + otdb_servicename=DEFAULT_OTDB_SERVICENAME, + storagequery_busname=DEFAULT_STORAGEQUERY_BUSNAME, + storagequery_servicename=DEFAULT_STORAGEQUERY_SERVICENAME, + cleanup_busname=DEFAULT_CLEANUP_BUSNAME, + cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME, + ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, + ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, + mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, + broker=None, + radb_dbcreds=None): + """ + Creates a ResourceAssigner instance + + :param radb_busname: name of the bus on which the radb service listens (default: lofar.ra.command) + :param radb_servicename: name of the radb service (default: RADBService) + :param re_busname: name of the bus on which the resource estimator service listens (default: lofar.ra.command) + :param re_servicename: name of the resource estimator service (default: ResourceEstimation) + :param otdb_busname: name of the bus on which OTDB listens (default: lofar.otdb.command) + :param otdb_servicename: name of the OTDB service (default: OTDBService) + :param storagequery_busname: name of the bus on which the StorageQueryService listens + (default: lofar.dm.command) + :param storagequery_servicename: name of the StorageQueryService (default: StorageQueryService) + :param cleanup_busname: name of the bus on which the cleanup service listens (default: lofar.dm.command) + :param cleanup_servicename: name of the CleanupService (default: CleanupService) + :param ra_notification_busname: name of the bus on which the ResourceAssigner notifies registered parties + (default: lofar.ra.notification) + :param ra_notification_prefix: prefix used in notification message subject (default: ResourceAssigner.) + :param mom_busname: name of the bus on which MOM listens for queries (default: lofar.ra.command) + :param mom_servicename: name of the MOMQueryService (default: momqueryservice) + :param broker: Valid Qpid broker host (default: None, which means localhost) + :param radb_dbcreds: the credentials to be used for accessing the RADB (default: None, which means default) + """ + + self.radb_creds = radb_dbcreds + + self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180) + self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180) + self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) + self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180) + self.sqrpc = StorageQueryRPC(busname=storagequery_busname, servicename=storagequery_servicename, broker=broker) + self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) + self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) + self.ra_notification_prefix = ra_notification_prefix + + self.resource_availability_checker = ResourceAvailabilityChecker(self.radbrpc) + + def __enter__(self): + """Internal use only. (handles scope 'with')""" + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Internal use only. (handles scope 'with')""" + self.close() + + def open(self): + """Open rpc connections to radb service and resource estimator service""" + self.radbrpc.open() + self.rerpc.open() + self.otdbrpc.open() + self.momrpc.open() + self.sqrpc.open() + self.curpc.open() + self.ra_notification_bus.open() + + def close(self): + """Close rpc connections to radb service and resource estimator service""" + self.radbrpc.close() + self.rerpc.close() + self.otdbrpc.close() + self.momrpc.close() + self.sqrpc.close() + self.curpc.close() + self.ra_notification_bus.close() + + @property + @cache + def resource_types(self): + """ Returns a dict of all the resource types, to convert name->id. """ + + return {rt['name']: rt['id'] for rt in self.radbrpc.getResourceTypes()} + + def do_assignment(self, otdb_id, specification_tree): + """ + Makes the given task known to RADB and attempts to assign (schedule) the its requested resources. + + If no list of requested resources could be determined for the task, its status will be set to "error" in RADB. + If such list can be obtained but it is impossible to assign the requested resources, the task is in conflict + with other tasks, hence its status will be set to "conflict" in RADB. If all requested resources are + successfully assigned, its status will be put to "scheduled" in RADB. + + :param otdb_id: OTDB ID of the main task which resources need to be assigned + :param specification_tree: the specification tree containing the main task and its resources + + :raises an Exception if something unforeseen happened while scheduling + """ + + logger.info(('do_assignment: otdb_id=%s specification_tree=%s' % (otdb_id, specification_tree))) + + # Make the task known to RADB + task_id, task_type, task_status, task = self._insert_specification_into_radb(otdb_id, specification_tree) + + # Don't perform any scheduling for tasks that are already approved. Do this check after insertion of + # specification, task and predecessor/successor relations, so approved tasks appear correctly in the web + # scheduler. + if task_status == 'approved': + logger.info('Task otdb_id=%s is already approved, no resource assignment needed' % otdb_id) + else: + requested_resources = self._get_resource_estimates(specification_tree, otdb_id, task_type, task_id) + if requested_resources is None: + # No resource requests available, so change task status to "error" + self._finish_resource_assignment(task, 'error') + else: + if self._schedule_resources(task_id, specification_tree, requested_resources): + # Cleanup the data of any previous run of the task + self._cleanup_earlier_generated_data(otdb_id, task) + + # Scheduling of resources for this task succeeded, so change task status to "scheduled" + self._finish_resource_assignment(task, 'scheduled') + else: + # Scheduling of resources for this task failed, so change task status to "conflict" + self._finish_resource_assignment(task, 'conflict') + + def _insert_specification_into_radb(self, otdb_id, specification_tree): + """ + Tries to inserts the task's specification into RADB along with any of its predecessors and successors. + + :param otdb_id: the main task's OTDB ID + :param specification_tree: the main task's specification + + :return: A 4-tuple (task_id, task_type, task_status, task) if the task's specification is successfully inserted + into RADB. + + :raises Exception if a task can't be inserted into RADB + """ + + task_status = self._get_is_assignable(otdb_id, specification_tree) + + task_type, start_time, end_time, cluster_name = self._prepare_to_insert_main_task(otdb_id, specification_tree) + + task_id, task = self._insert_main_task(specification_tree, start_time, end_time, cluster_name) + + self._link_predecessors_to_task_in_radb(task) + self._link_successors_to_task_in_radb(task) + + logger.info('Successfully inserted main task and its predecessors and successors into RADB: task=%s', task) + + return task_id, task_type, task_status, task + + def _get_is_assignable(self, otdb_id, specification_tree): + """ + Verifies if a task can actually be assigned by looking at its status. Raises an exception if the task is not + assignable. + + :param otdb_id: ORDB ID of the task + :param specification_tree: the specification tree of the task + + :returns the task's status if it is assignable + :raises Exception if it can't be assigned + """ + + assignable_task_states = ['approved', 'prescheduled'] + status = specification_tree.get('state', '').lower() + if status in assignable_task_states: + logger.info('Task otdb_id=%s with status \'%s\' is assignable' % (otdb_id, status)) + else: + assignable_task_states_str = ', '.join(assignable_task_states) + logger.warn('Task otdb_id=%s with status \'%s\' is not assignable. Allowed statuses are %s' % + (otdb_id, status, assignable_task_states_str)) + + message = "doAssignment: Unsupported status '%s' of task with OTDB ID: %s" % (status, otdb_id) + raise Exception(message) + + return status + + def _prepare_to_insert_main_task(self, otdb_id, specification_tree): + """ + Prepares for insertion of the main task by extracting start_time, end_time, and cluster_name from its + specification. + + :param otdb_id: the main task's OTDB ID + :param specification_tree: the main task's specification + + :return: 4-tuple (task_type, start_time, end_time, cluster_name) of the task prepared for RADB insertion + """ + + main_parset = self._get_main_parset(specification_tree) + task_type, _ = self._get_task_type(specification_tree) + cluster_name = self._get_clustername(otdb_id, main_parset, task_type) + start_time, end_time = self._get_main_task_start_and_end_times(specification_tree) + + logger.info('preparations for inserting main task into RADB successful') + + return task_type, start_time, end_time, cluster_name + + def _finish_resource_assignment(self, task, new_task_status): + """ + Finishes the resource assignment by updating a task's status in RADB and sending out a corresponding + notification to registered parties on the Resource Assigner notification bus. + + :param task: the task at hand + :param new_task_status: the new status to set the task to in RADB + + :raises Exception if updating RADB fails, or if sending the notification fails + """ + + if task is not None and new_task_status in ('conflict', 'error', 'scheduled'): + logger.info('Finishing resource assignment for task_id=%s, status=%s' % (task['id'], new_task_status)) + + # another service sets the parset spec in OTDB, and updated otdb task status to scheduled, which is then + # synced to RADB + self.radbrpc.updateTask(task['id'], task_status=new_task_status) + + content = { + 'radb_id': task['id'], + 'otdb_id': task['otdb_id'], + 'mom_id': task['mom_id'] + } + subject = 'Task' + new_task_status[0].upper() + new_task_status[1:] + event_message = EventMessage(context=self.ra_notification_prefix + subject, content=content) + + logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) + self.ra_notification_bus.send(event_message) + + def _get_main_parset(self, specification_tree): + """ + Extracts the main task's parset from a specification tree + + :param specification_tree: the task's specification tree + + :returns the main parset + """ + + return parameterset(specification_tree['specification']) + + def _get_task_type(self, specification_tree): + """ + Extracts the task's type and subtype (if applicable) from a specification tree + + :param specification_tree: specification_tree: the task's specification tree + + :return: 2-tuple (task_type, task_subtype) + """ + + task_type = specification_tree['task_type'] # is required item + if 'task_subtype' in specification_tree: # is optional item + task_subtype = specification_tree['task_subtype'] + else: + task_subtype = '' + + return task_type, task_subtype + + def _get_clustername(self, otdb_id, parset, task_type): + """ + Determines the name of the cluster to which to store the task's output - if it produces output at all that is. + + :param otdb_id: the ORDB ID of the task + :param parset: the parset of the task + :param task_type: the task's type + + :returns The name of the output cluster, or an empty string if none is applicable + :raises Exception if the storage cluster required by the task is unknown to the system + """ + + cluster_name = '' + if task_type not in ('reservation',): + # Only assign resources for task output to known clusters + cluster_name_set = self._get_cluster_names(parset) + + if str() in cluster_name_set or len(cluster_name_set) != 1: + # Empty set or name is always an error. + # TODO: To support >1 cluster per obs, + # self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 + # cluster name/ Also, there is only 1 processingClusterName in the parset, but we do not always want to + # pipeline process all obs outputs, or not on 1 cluster + logger.error( + 'clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % + cluster_name_set + ) + else: + cluster_name = cluster_name_set.pop() + + # Retrieve known cluster names (not all may be a valid storage target, but we cannot know...) + known_cluster_set = {cluster['name'] for cluster in self.radbrpc.getResourceGroupNames('cluster')} + logger.info('known clusters: %s', known_cluster_set) + if cluster_name not in known_cluster_set: + raise Exception("skipping resource assignment for task with cluster name '" + cluster_name + + "' not in known clusters " + str(known_cluster_set)) + else: + # fix for MoM bug introduced before NV's holiday + # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4 + # so, override it here if needed, and update to otdb + processing_cluster_name = parset.getString('Observation.Cluster.ProcessingCluster.clusterName', + '') + if processing_cluster_name != cluster_name: + logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' ' + 'for otdb_id=%s', processing_cluster_name, cluster_name, otdb_id) + self.otdbrpc.taskSetSpecification( + otdb_id, + {'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': cluster_name} + ) + + return cluster_name + + def _get_cluster_names(self, parset): + """ + Get the storage cluster names for all enabled output data product types in parset + + :param parset: the task's parset + + :raises Exception if an enabled output data product type has no storage cluster name specified. + """ + cluster_names = set() + + keys = ['Output_Correlated', + 'Output_IncoherentStokes', + 'Output_CoherentStokes', + 'Output_InstrumentModel', + 'Output_SkyImage', + 'Output_Pulsar'] + for key in keys: + if parset.getBool('Observation.DataProducts.%s.enabled' % key, False): + # may raise; don't pass default arg + name = parset.getString('Observation.DataProducts.%s.storageClusterName' % key) + cluster_names.add(name) + + return cluster_names + + def _get_main_task_start_and_end_times(self, specification_tree): + """ + Get the start time and end time of the main task modified such that (a) there's a period of 3 minutes between + tasks and (b) the start time and end time are actually in the future. + + If the start time lies in the past or is not specified it is set to 3 minutes from the current time. The new end + time in that case is calculated using the specified duration or, if that is not specified, from the original + difference between start and end time. When a duration can't be determined the end time will be set to 1 hour + after the start time. + + :param specification_tree: specification tree for the main task + + :returns 2-tuple (start_time, end_time) + """ + + def _get_start_and_end_times_from_parset(_parset): + """ + Extract the start and end times from a parset + + :param _parset: the parset + :return: A 2-tuple (start_time, end_time). start_time and end_time are returned as None when they were not + specified, or where specified in a wrong format. + """ + + try: + parset_start_time = parseDatetime(_parset.getString('Observation.startTime')) + except ValueError or KeyError: + # Too bad no valid start time is specified! + parset_start_time = None + + try: + parset_end_time = parseDatetime(_parset.getString('Observation.stopTime')) + except ValueError or KeyError: + # Too bad no valid end time is specified! + parset_end_time = None + + return parset_start_time, parset_end_time + + def _get_duration_from_parset(_parset): + """ + Preferably use the duration specified by the parset. If that's not available, calculate the duration from + the difference between start/end times. If that's also impossible, fall back to a default duration + + :param _parset: the task's parset containing start/end times and durations (usually) + + :returns the obtained, calculated, or default duration + """ + + try: + duration = timedelta(seconds=_parset.getInt('Observation.Scheduler.taskDuration')) + except Exception: + _start_time, _end_time = _get_start_and_end_times_from_parset(_parset) + + if _start_time is not None and _end_time is not None and _start_time < _end_time: + duration = _end_time - _start_time + else: + duration = timedelta(hours=1) + + return duration + + # TODO: add unit tests that verify the task_types logic + def _get_need_to_push_back_start_and_end_times(_start_time, _end_time): + """ + Determines whether or not a task's start/end times need to be pushed back in time + + :param _start_time: the task's start time + :param _end_time: the task's end time + + :return: True if start/end times need to be pushed back, False otherwise + """ + + task_type, _ = self._get_task_type(specification_tree) + + # The start time of reservations and maintenance tasks are allowed to lie in the past + if task_type in ['reservation', 'maintenance']: + do_push_back = False + else: + do_push_back = _start_time is None or \ + _end_time is None or \ + _start_time < datetime.utcnow() + + return do_push_back + + def _push_back_start_time_to_not_overlap_predecessors(_start_time, _specification_tree): + """ + Determines a new start time for a task when the current start time of that task overlaps with its + predecessors. + + :param _start_time: the task's start time + :param _specification_tree: the specification tree holding both the task's information and information about + its predecessors/successors etcetera. + + :return: The updated start time + """ + + pushed_back_start_time = _start_time + + # Make sure the start time lies past the end time of the task's predecessors + max_predecessor_end_time = self._get_maximum_predecessor_end_time(_specification_tree) + if max_predecessor_end_time and max_predecessor_end_time > _start_time: + pushed_back_start_time = max_predecessor_end_time + timedelta(minutes=3) + + return pushed_back_start_time + + def _store_changed_start_and_end_times_to_otdb(_start_time, _end_time, _otdb_id): + """ + Stores the modified start/end times to the OTDB + + :param _start_time: the task's start time + :param _end_time: the task's end time + :param _otdb_id: the task's OTDB ID + """ + + logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', + _start_time, _end_time, _otdb_id) + + self.otdbrpc.taskSetSpecification( + _otdb_id, { + 'LOFAR.ObsSW.Observation.startTime': _start_time.strftime('%Y-%m-%d %H:%M:%S'), + 'LOFAR.ObsSW.Observation.stopTime': _end_time.strftime('%Y-%m-%d %H:%M:%S') + } + ) + + main_parset = self._get_main_parset(specification_tree) + start_time, end_time = _get_start_and_end_times_from_parset(main_parset) + + # TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. + if _get_need_to_push_back_start_and_end_times(start_time, end_time): + # Make sure the start time lies in the future and doesn't overlap with any predecessors + if start_time is None or start_time < datetime.utcnow(): + start_time = datetime.utcnow() + timedelta(minutes=3) + start_time = _push_back_start_time_to_not_overlap_predecessors(start_time, specification_tree) + + end_time = start_time + _get_duration_from_parset(main_parset) + + otdb_id = specification_tree['otdb_id'] + logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', + start_time, end_time, otdb_id) + + _store_changed_start_and_end_times_to_otdb(start_time, end_time, otdb_id) + + return start_time, end_time + + def _insert_main_task(self, specification_tree, start_time, end_time, cluster_name): + """ + Inserts the main task and its specification into the RADB. Any existing specification and task with same + otdb_id will be deleted automatically. + + :param specification_tree: the task's specification tree + :param start_time: the task's start time + :param end_time: the task's end time + :param cluster_name: the task's cluster name + + :return: 2-tuple (task_id, task) of the inserted task + :raises Exception if there's an unforeseen problem while inserting the task and its specifications into RADB + """ + + task_type, _ = self._get_task_type(specification_tree) + main_parset = self._get_main_parset(specification_tree) + mom_id = main_parset.getInt('Observation.momID', -1) + status = specification_tree.get('state', '').lower() + otdb_id = specification_tree['otdb_id'] + logger.info( + 'insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s ' + 'cluster=%s' % (mom_id, otdb_id, status, task_type, start_time, end_time, cluster_name) + ) + + result = self.radbrpc.insertSpecificationAndTask(mom_id, otdb_id, status, task_type, start_time, end_time, + str(main_parset), cluster_name) + + specification_id = result['specification_id'] + task_id = result['task_id'] + logger.info('inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) + + task = self.radbrpc.getTask(task_id) # if task_id is not None else None + + return task_id, task + + def _get_resource_estimates(self, specification_tree, otdb_id, task_type, task_id): + """ + Obtains the resource estimates from the Resource Estimator for the main task in the specification tree and + validates them. + + :param specification_tree: the task's specification tree + :param otdb_id: the task's OTDB ID + :param task_type: the task's type + :param task_id: the task's ID + + :return A list of resource estimates for the given task or None in case none could be obtained or if the + validation failed. + """ + + try: + re_reply, rerpc_status = self.rerpc({"specification_tree": specification_tree}, timeout=10) + logger.info('Resource Estimator reply = %s', re_reply) + + if str(otdb_id) not in re_reply: + raise ValueError("no otdb_id %s found in estimator results %s" % (otdb_id, re_reply)) + estimates = re_reply[str(otdb_id)] + + if task_type not in estimates: + raise ValueError("no task type %s found in estimator results %s" % (task_type, estimates)) + estimates = estimates[task_type] + + if 'errors' in estimates and estimates['errors']: + for error in estimates['errors']: + logger.error("Error from Resource Estimator: %s", error) + raise ValueError("Error(s) in estimator for otdb_id=%s radb_id=%s" % (otdb_id, task_id)) + + if 'estimates' not in estimates or any('resource_types' not in est for est in estimates['estimates']): + raise ValueError("missing 'resource_types' in 'estimates' in estimator results: %s" % estimates) + estimates = estimates['estimates'] + + if not all(est_val > 0 for est in estimates for est_val in est['resource_types'].values()): + # Avoid div by 0 and inf looping from estimate <= 0 later on. + raise ValueError("at least one of the estimates is not a positive number") + except Exception as e: + estimates = None + + logger.error('An exception occurred while obtaining resource estimates. Exception=%s' % str(e)) + + return estimates + + def _schedule_resources(self, task_id, specification_tree, requested_resources): + """ + Schedule the requested resources for a task + + :param task_id: the task's ID + :param specification_tree: the task's specification tree + :param requested_resources: the resources requested by the task + + :returns: True if successful, or False otherwise + """ + start_time, end_time = self._get_main_task_start_and_end_times(specification_tree) + + # For now dwell-behavior is disabled by setting min_starttime/max_starttime to + # start_time, because the specification doesn't yet support this. + # TODO: enable dwell-scheduling once min_starttime/max_starttime are propagated + scheduler = DwellScheduler(task_id=task_id, + resource_availability_checker=self.resource_availability_checker, + radbcreds=self.radb_creds, + min_starttime=start_time, + max_starttime=start_time, + duration=end_time - start_time) + + result = scheduler.allocate_resources(requested_resources) + + if result: + logger.info('Resources successfully allocated task_id=%s' % task_id) + else: + logger.info('No resources allocated task_id=%s' % task_id) + + return result + + def _cleanup_earlier_generated_data(self, otdb_id, task): + """ + Remove any output and/or intermediate data from any previous run of the task + + :param otdb_id: the task's OTDB ID + :param task: the task object + """ + + # Only needed for pipeline tasks + if task['type'] == 'pipeline': + try: + du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], + include_scratch_paths=True, + force_update=True) + + if du_result['found'] and du_result.get('disk_usage', 0) > 0: + logger.info("removing data on disk from previous run for otdb_id %s", otdb_id) + result = self.curpc.removeTaskData(task['otdb_id']) + if not result['deleted']: + logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", + otdb_id, result['message']) + except Exception as e: + # in line with failure as warning just above: allow going to scheduled state here too + logger.error(str(e)) + + def _link_predecessors_to_task_in_radb(self, task): + """ + Links a task to its predecessors in RADB + + :param task: the task at hand + """ + + mom_id = task['mom_id'] + + predecessor_ids = self.momrpc.getPredecessorIds(mom_id) + if str(mom_id) not in predecessor_ids or not predecessor_ids[str(mom_id)]: + logger.info('no predecessors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) + return + predecessor_mom_ids = predecessor_ids[str(mom_id)] + + logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], + task['otdb_id']) + + for predecessor_mom_id in predecessor_mom_ids: + # check if the predecessor needs to be linked to this task + predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id) + if predecessor_task: + if predecessor_task['id'] not in task['predecessor_ids']: + logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s ' + 'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], + task['otdb_id']) + self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id']) + else: + # Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond + # approved, which is in principle valid. The link in the radb will be made later via processSuccessors() + # below. Alternatively, a predecessor could have been deleted. + logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', + predecessor_mom_id, task['otdb_id']) + + def _link_successors_to_task_in_radb(self, task): + """ + Links a task to its successors in RADB + + :param task: the task at hand + """ + mom_id = task['mom_id'] + + successor_ids = self.momrpc.getSuccessorIds(mom_id) + if str(mom_id) not in successor_ids or not successor_ids[str(mom_id)]: + logger.info('no successors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) + return + successor_mom_ids = successor_ids[str(mom_id)] + + logger.info('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], + task['otdb_id']) + + for successor_mom_id in successor_mom_ids: + # check if the successor needs to be linked to this task + successor_task = self.radbrpc.getTask(mom_id=successor_mom_id) + if successor_task: + if successor_task['id'] not in task['successor_ids']: + logger.info( + 'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s' + ' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], + task['otdb_id'] + ) + + self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) + + movePipelineAfterItsPredecessors(successor_task, self.radbrpc) + else: + # Occurs when settings a obs or task to prescheduled while a successor has e.g. not yet been beyond + # approved, which is quite normal. The link in the radb will be made later via processPredecessors() + # above. Alternatively, a successor could have been deleted. + logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', + successor_mom_id, task['otdb_id']) + + def _get_maximum_predecessor_end_time(self, specification_tree): + """ + Determine the highest end time of all predecessors of a task + + :param specification_tree: the task's specification tree + + :return: the maximum predecessor end time found, or None in case no predecessors are specified + """ + + predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']] + predecessor_end_times = [parseDatetime(spec.getString('Observation.stopTime')) for spec in predecessor_specs] + if predecessor_end_times: + return max(predecessor_end_times) + return None diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py index ee0f6152d9ad9f466492160e12c0e81da605f12c..87c66aa3c117699461f6ad1d228eb3a2d788a3d9 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py @@ -76,12 +76,16 @@ class ResourceAvailabilityChecker(object): # 4) annotate claims with claim["requested_resources"] == [requested_resources for this claim] # for each claim (one claim can cover multiple estimates) def get_is_claimable(self, requested_resources, available_resources): - """ Verifies if the requested resources can be claimed and tries to get a tentative claim on them + """ + Verify if the requested resources can be claimed and construct tentative claim objects for them. Note that these + objects are not inserted into the RADB - this is left to the caller. :param requested_resources: The requested resources :param available_resources: The available resources - :returns A 2-tuple containing the resource requests that are claimable and those that are not respectively + :returns A list of tentative resource claim objects if all requested resources are claimable + + :raises CouldNotFindClaimException exception if one or more of the requested resources are not claimable """ # This function selects resources for a task (i.e. obs or pipeline). Keep it side-effect free! @@ -104,8 +108,7 @@ class ResourceAvailabilityChecker(object): logger.debug('get_is_claimable: current_resource_usage: %s', available_resources) # big! - claimable_requests = [] - unclaimable_requests = [] + claims = [] for requested_resource in requested_resources: needed_resources_by_type_id = self._get_resource_types_by_type_id(requested_resource) @@ -117,12 +120,12 @@ class ResourceAvailabilityChecker(object): file_properties = self._get_resources_files_properties(requested_resource) - self._collapse_requested_resources(requested_resource, needed_resources_by_type_id, - claimable_resources, file_properties) + self._collapse_requested_resources(requested_resource, needed_resources_by_type_id, claimable_resources, + file_properties) - more_claims = self._get_claims_for_multiple_resources(needed_resources_by_type_id, - requested_resource['resource_count'], - claimable_resources) + more_claims = self._get_tentative_claim_objects_for_multiple_resources(needed_resources_by_type_id, + requested_resource['resource_count'], + claimable_resources) # add resource properties for claim in more_claims: @@ -135,11 +138,11 @@ class ResourceAvailabilityChecker(object): claim['properties'] = file_properties # add to the list of claims - claimable_requests.extend(more_claims) + claims.extend(more_claims) - self._merge_claims(claimable_requests) + self._merge_claims(claims) - return claimable_requests, unclaimable_requests + return claims def _get_current_resource_usage(self): db_resource_list = self.radbrpc.getResources(include_availability=True) @@ -159,6 +162,7 @@ class ResourceAvailabilityChecker(object): return needed_resources_by_type_id + # TODO: look at claimable capacity instead of available capacity? def _get_availability_of_requested_resources(self, root_resource_group, needed_resources_by_type_id, current_resource_usage): # Find group id ('gid') of needed_resources['root_resource_group'], # then get list of claimable resources at root_gid and its children @@ -228,15 +232,26 @@ class ResourceAvailabilityChecker(object): res['claimable_capacity'] = min(res['claimable_capacity'], int(ratio * res['total_capacity'])) logger.info('applyMaxFillRatios: applied %s = %f', ratio_dict['name'], ratio) - def _get_claims_for_multiple_resources(self, needed_resources_by_type_id, resource_count, claimable_resources_list): - """ Find a fit for multiple needed resources. Modifies claimable_resources_list with a lower resource - availability with respect to the claims made (also if no claims are made!). """ + def _get_tentative_claim_objects_for_multiple_resources(self, needed_resources_by_type_id, resource_count, + claimable_resources_list): + """ + Find a fit for multiple needed resources and create a tentative claim object for them. Modifies + claimable_resources_list with a lower resource availability with respect to the claims made (also if no claims + are made!). - claims = [] + :param needed_resources_by_type_id: The ID of the resource type to claim resources for + :param resource_count: The number of times each of the resource should be carried out + :param claimable_resources_list: The current list of available/claimable resources + :returns A list of tentative claim objects for the given needed resources + + :raises CouldNotFindClaimException if no tentative claim object could be made + """ + + claims = [] for _ in xrange(resource_count): # try to fit a single resource set - more_claims = self._get_claims_for_single_resource(needed_resources_by_type_id, claimable_resources_list) + more_claims = self._get_tentative_claim_objects_for_single_resource(needed_resources_by_type_id, claimable_resources_list) logger.debug('fit_multiple_resources: added claim: %s', more_claims) @@ -247,13 +262,18 @@ class ResourceAvailabilityChecker(object): return claims - def _get_claims_for_single_resource(self, needed_resources_by_type_id, claimable_resources_list): - """ Find a fit for a single needed resource set. Reorders claimable_resources_list, - and reduces the resource availability in claimable_resources_list with the size - of the resulting claims. + def _get_tentative_claim_objects_for_single_resource(self, needed_resources_by_type_id, claimable_resources_list): + """ + Find a fit for a single needed resource set. Reorders claimable_resources_list and reduces the resource + availability in claimable_resources_list with the size of the resulting claims. + + :param needed_resources_by_type_id: the ID of the resource type to find a fit for + :param claimable_resources_list: a list of all resources we are allowed to claim, f.e. all DRAGNET disks or + all stations. - :param claimable_resources_list: a list of all resources we are allowed to claim, - f.e. all DRAGNET disks or all stations. + :return A list of created tentative claims objects + + :raises CouldNotFindClaimException if no tentative claim object could be made """ # If no resources are available, we cannot return any claim @@ -268,9 +288,9 @@ class ResourceAvailabilityChecker(object): # Try to fit first where there is the most space. We first look for space within the unclaimed # resources (=free - claimed - our claims), we then look for a fit if no tasks were running # (=free - our claims), allowing conflict resolution to help with that later on. - tentative_claims = None + claims = None - for capacity_type in ['claimable_capacity', 'available_capacity']: + for capacity_type in ('claimable_capacity', 'available_capacity'): # Sorting on every change may be slow, but for 100s of DPs, insertion of merged claims is still 3-5x slower. # A heapq was not faster, yet solving the lack of total ordering more elaborate. # Of course, big-O complexity here is terrible, but we are nowhere near (too) big. @@ -279,50 +299,30 @@ class ResourceAvailabilityChecker(object): # Almost always iterates once. Still needed to match >1 resource types. For example, if we schedule # storage and bandwidth simultaneously, our sorting may not put a usable combination in the first slot, # as we sort on only one of their capacities (storage). - for claimable_resources_dict in claimable_resources_list: - tentative_claims = self._try_make_tentative_claim(needed_resources_by_type_id, claimable_resources_dict, capacity_type) - - if tentative_claims is not None: + if self._is_claimable_capacity_wise(needed_resources_by_type_id, + claimable_resources_dict, + capacity_type, + ignore_type_ids=[self.resource_types['rcu']]): + claims = self._construct_tentative_claim_object(needed_resources_by_type_id, + claimable_resources_dict) + + if claims is not None: # Found a fit break - if tentative_claims is not None: + if claims is not None: # Found a fit break - if tentative_claims is None: + if claims is None: # Could not find a fit in any way raise CouldNotFindClaimException("No resources available of the given type with sufficient capacity") - logger.debug('fit_single_resources: created claim: %s', tentative_claims) - - self._reduce_resource_availability(claimable_resources_dict, tentative_claims) - return tentative_claims - - - def _try_make_tentative_claim(self, needed_resources_by_type_id, claimable_resources_dict, capacity_type): - """ - Verify if all claims fit, and return them on a per resource type basis it if so - - :param needed_resources_by_type_id: a dict containing resource type IDs as keys and their requested allocation - as values. - :param claimable_resources_dict: a dict containing the currently available resources - :param capacity_type type of capacity to consider ('available_capacity' or 'claimable_capacity') - """ + logger.debug('fit_single_resources: created claim: %s', claims) - # Ignore check on claimable capacity of RCUs - is_claimable = self._is_claimable_capacity_wise(needed_resources_by_type_id, - claimable_resources_dict, - capacity_type, - ignore_type_ids=[self.resource_types['rcu']]) - - if is_claimable: - return self._make_tentative_claim(needed_resources_by_type_id, - claimable_resources_dict) - - # Claim does not fit - return None + self._reduce_resource_availability(claimable_resources_dict, claims) + return claims def _get_resource_group_id_by_name(self, name): """ Returns group id of resource group named name, or raises a ValueError if name was not found. @@ -387,7 +387,7 @@ class ResourceAvailabilityChecker(object): return is_claimable - def _make_tentative_claim(self, needed_resources, claimable_resources): + def _construct_tentative_claim_object(self, needed_resources, claimable_resources): """ Returns list of claims for a data product (one for each needed resource type). Format needed_resources: {resource_type_id: size, ...} @@ -404,9 +404,8 @@ class ResourceAvailabilityChecker(object): # We do this to separate responsibilities. The scheduling functions (get_is_claimable and helpers) # only depend on the available resources (between start and end time) and the # resources required by the task, but not on the actual task. - claim = {'starttime': None, 'endtime': None, 'properties': [], 'status': 'tentative'} - claim['resource_id'] = claimable_resources[res_type]['id'] - claim['resource_type_id'] = res_type # used internally, not propagated to radb + claim = {'starttime': None, 'endtime': None, 'properties': [], 'status': 'tentative', + 'resource_id': claimable_resources[res_type]['id'], 'resource_type_id': res_type} # RCU claim size as returned by the ResourceEstimator is actually a bit pattern (encoding which of a # station's RCUs are requested to take part in a measurement and which not). In order to have it countable diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index 3ca2aa54d05fa77ef834a9db97bb7716de649c3d..c0bc8c84041f40e84142940409b6d77f491a58d8 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -7,7 +7,7 @@ from lofar.sas.resourceassignment.database import radb from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME -from lofar.sas.resourceassignment.resourceassigner.assignment import CouldNotFindClaimException +from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_BUS_NAME, DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME from lofar.mac.observation_control_rpc import ObservationControlRPCClient @@ -117,22 +117,24 @@ class BasicScheduler: available_resources = self._get_resource_availability() try: - claims = self.resource_availability_checker.get_is_claimable(requested_resources, available_resources) + tentative_claims = self.resource_availability_checker.get_is_claimable(requested_resources, + available_resources) except CouldNotFindClaimException: raise ScheduleException("Could not schedule") # add static info to all claims - self._finalise_claims(claims) + self._finalise_claims(tentative_claims) # insert all claims to reserve the resources in the next call to findfit and to find the conflicts according to # the DB - claim_ids = self.radb.insertResourceClaims(self.task_id, claims, _, _, commit=False) + claim_ids = self.radb.insertResourceClaims(self.task_id, tentative_claims, _, _, commit=False) # tie the claim ids to the estimates - claim_to_estimates = {cid: claims[cid]["requested_resources"] for cid in claim_ids} + claim_to_estimates = {cid: tentative_claims[cid]["requested_resources"] for cid in claim_ids} - # handle any conflicts. We need NOT resolve ALL conflicts: removing one conflict can free up more resources as - # a by-product, in which case other conflicts can simply be shifted to those newly freed resources. + # try solving as much conflicts as possible. We need NOT resolve ALL conflicts: removing one conflict can free + # up more resources as a by-product, in which case other conflicts can simply be shifted to those newly freed + # resources. conflict_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="conflict") if not any([self._resolve_conflict(c) for c in conflict_claims]): # Could not resolve any conflict @@ -175,7 +177,7 @@ class PriorityScheduler(BasicScheduler): radbcreds=None, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, - observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUSNAME, + observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME, observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, broker=None): diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssigner/test/CMakeLists.txt index 4c6edf24a445664e7efe600963ceffa7192e00b5..d027157190e03bbc9dbe5fa71be08049de3aa056 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/test/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssigner/test/CMakeLists.txt @@ -4,5 +4,6 @@ include(LofarCTest) lofar_add_test(t_resourceassigner) lofar_add_test(t_schedulechecker) lofar_add_test(t_schedulers) +lofar_add_test(t_resource_availability_checker) diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.py index 58329628f5fedf53d089487fa8264d38e21d2d77..5d30711f65b13f46a4340136dc3644a5c40bd14c 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.py @@ -24,7 +24,9 @@ from mock import MagicMock import datetime import sys -from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAvailabilityChecker +from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import ResourceAvailabilityChecker +from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException + class ResourceAvailabilityCheckerTest(unittest.TestCase): @@ -1214,11 +1216,11 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): """ needed_resources_by_type_id = { 5: 500 } - claimable_resources_list = [ { 5: { 'id': 1, 'available_capacity': 1000 } } ] + claimable_resources_list = [ { 5: { 'id': 1, 'claimable_capacity': 1000, 'available_capacity': 1000 } } ] uut = ResourceAvailabilityChecker(self.rarpc_mock) - claims = self.uut._get_claims_for_single_resource(needed_resources_by_type_id, claimable_resources_list) + claims = self.uut._get_tentative_claim_objects_for_single_resource(needed_resources_by_type_id, claimable_resources_list) self.assertIsNotNone(claims) @@ -1229,11 +1231,10 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): """ needed_resources_by_type_id = { 5: 500 } - claimable_resources_list = [ { 5: { 'id': 1, 'available_capacity': 400 } } ] - - claims = self.uut._get_claims_for_single_resource(needed_resources_by_type_id, claimable_resources_list) + claimable_resources_list = [ { 5: { 'id': 1, 'claimable_capacity': 400, 'available_capacity': 400 } } ] - self.assertIsNone(claims) + with self.assertRaises(CouldNotFindClaimException): + self.uut._get_tentative_claim_objects_for_single_resource(needed_resources_by_type_id, claimable_resources_list) def test_fit_single_resources_fit_multiple_disks(self): """ @@ -1243,10 +1244,10 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): needed_resources_by_type_id = { 5: 500 } claimable_resources_list = [ - {5: {'id': 1, 'available_capacity': 400}}, - {5: {'id': 1, 'available_capacity': 1000}}] + {5: {'id': 1, 'claimable_capacity': 400, 'available_capacity': 400}}, + {5: {'id': 1, 'claimable_capacity': 1000, 'available_capacity': 1000}}] - claims = self.uut._get_claims_for_single_resource(needed_resources_by_type_id, claimable_resources_list) + claims = self.uut._get_tentative_claim_objects_for_single_resource(needed_resources_by_type_id, claimable_resources_list) self.assertIsNotNone(claims) @@ -1258,12 +1259,13 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): needed_resources_by_type_id = { 3: 3000, 5: 500 } claimable_resources_list = [ - {3: {'id': 0, 'available_capacity': 3000}, 5: {'id': 1, 'available_capacity': 400}}, # type 5 does not fit - {3: {'id': 0, 'available_capacity': 1000}, 5: {'id': 1, 'available_capacity': 1000}}] # type 3 does not fit + {3: {'id': 0, 'claimable_capacity': 3000, 'available_capacity': 3000}, + 5: {'id': 1, 'claimable_capacity': 400, 'available_capacity': 400}}, # type 5 does not fit + {3: {'id': 0, 'claimable_capacity': 1000, 'available_capacity': 1000}, + 5: {'id': 1, 'claimable_capacity': 1000, 'available_capacity': 1000}}] # type 3 does not fit - claims = self.uut._get_claims_for_single_resource(needed_resources_by_type_id, claimable_resources_list) - - self.assertIsNone(claims) + with self.assertRaises(CouldNotFindClaimException): + self.uut._get_tentative_claim_objects_for_single_resource(needed_resources_by_type_id, claimable_resources_list) def test_fit_single_resources_fit_multiple_resources(self): """ @@ -1273,10 +1275,12 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): needed_resources_by_type_id = { 3: 3000, 5: 500 } claimable_resources_list = [ - {3: {'id': 0, 'available_capacity': 3000}, 5: {'id': 1, 'available_capacity': 400}}, # type 5 does not fit - {3: {'id': 0, 'available_capacity': 3000}, 5: {'id': 1, 'available_capacity': 1000}}] # both fit + {3: {'id': 0, 'claimable_capacity': 3000, 'available_capacity': 3000}, + 5: {'id': 1, 'claimable_capacity': 400, 'available_capacity': 400}}, # type 5 does not fit + {3: {'id': 0, 'claimable_capacity': 3000, 'available_capacity': 3000}, + 5: {'id': 1, 'claimable_capacity': 1000, 'available_capacity': 1000}}] # both fit - claims = self.uut._get_claims_for_single_resource(needed_resources_by_type_id, claimable_resources_list) + claims = self.uut._get_tentative_claim_objects_for_single_resource(needed_resources_by_type_id, claimable_resources_list) self.assertIsNotNone(claims) @@ -1288,12 +1292,13 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): needed_resources_by_type_id = {3: 1000, 5: 100} claimable_resources_list = [ - {3: {'id': 0, 'available_capacity': 3000}, 5: {'id': 1, 'available_capacity': 200}}, # fits 2x - {3: {'id': 0, 'available_capacity': 1000}, 5: {'id': 1, 'available_capacity': 1000}}] # fits 1x - - claims = self.uut._get_claims_for_multiple_resources(needed_resources_by_type_id, 4, claimable_resources_list) + {3: {'id': 0, 'claimable_capacity': 3000, 'available_capacity': 3000}, + 5: {'id': 1, 'claimable_capacity': 200, 'available_capacity': 200}}, # fits 2x + {3: {'id': 0, 'claimable_capacity': 1000, 'available_capacity': 1000}, + 5: {'id': 1, 'claimable_capacity': 1000, 'available_capacity': 1000}}] # fits 1x - self.assertIsNone(claims) + with self.assertRaises(CouldNotFindClaimException): + self.uut._get_tentative_claim_objects_for_multiple_resources(needed_resources_by_type_id, 4, claimable_resources_list) def test_fit_multiple_resources_fit(self): """ @@ -1303,10 +1308,12 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): needed_resources_by_type_id = {3: 1000, 5: 100} claimable_resources_list = [ - {3: {'id': 0, 'available_capacity': 3000}, 5: {'id': 1, 'available_capacity': 200}}, # fits 2x - {3: {'id': 0, 'available_capacity': 2000}, 5: {'id': 1, 'available_capacity': 1000}}] # fits 2x + {3: {'id': 0, 'claimable_capacity': 3000, 'available_capacity': 3000}, + 5: {'id': 1, 'claimable_capacity': 200, 'available_capacity': 200}}, # fits 2x + {3: {'id': 0, 'claimable_capacity': 2000, 'available_capacity': 2000}, + 5: {'id': 1, 'claimable_capacity': 1000, 'available_capacity': 1000}}] # fits 2x - claims = self.uut._get_claims_for_multiple_resources(needed_resources_by_type_id, 4, claimable_resources_list) + claims = self.uut._get_tentative_claim_objects_for_multiple_resources(needed_resources_by_type_id, 4, claimable_resources_list) self.assertIsNotNone(claims) @@ -1318,10 +1325,12 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): needed_resources_by_type_id = {3: 1000, 5: 100} claimable_resources_list = [ - {3: {'id': 0, 'available_capacity': 3000}, 5: {'id': 1, 'available_capacity': 200}}, # fits 2x - {3: {'id': 0, 'available_capacity': 2000}, 5: {'id': 1, 'available_capacity': 1000}}] # fits 2x + {3: {'id': 0, 'claimable_capacity': 3000, 'available_capacity': 3000}, + 5: {'id': 1, 'claimable_capacity': 200, 'available_capacity': 200}}, # fits 2x + {3: {'id': 0, 'claimable_capacity': 2000, 'available_capacity': 2000}, + 5: {'id': 1, 'claimable_capacity': 1000, 'available_capacity': 1000}}] # fits 2x - self.uut._get_claims_for_multiple_resources(needed_resources_by_type_id, 4, claimable_resources_list) + self.uut._get_tentative_claim_objects_for_multiple_resources(needed_resources_by_type_id, 4, claimable_resources_list) resource_type_3_dict = {'status': 'tentative', 'resource_type_id': 3, 'resource_id': 0, 'claim_size': 1000, 'starttime': None, 'used_rcus': None, 'endtime': None, 'properties': []} @@ -1336,13 +1345,25 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): def test_get_is_claimable_invalid_resource_group(self): """ If we try to find claims with a non-existing root_resource_group, get_is_claimable should fail. """ - estimates = [ { 'root_resource_group': 'MIDDLE EARTH', 'resource_count': 1, 'resource_types': { 'storage': 100 } } ] - claimable_resources_list = { self.cep4storage_resource_id: { 'id': self.cep4storage_resource_id, 'type_id': 5, 'available_capacity': 400, 'active': True } } - - self.uut._get_current_resource_usage = MagicMock(return_value=claimable_resources_list) + estimates = [{ + 'root_resource_group': 'MIDDLE EARTH', + 'resource_count': 1, + 'resource_types': { + 'storage': 100 + } + }] + claimable_resources_list = { + self.cep4storage_resource_id: { + 'id': self.cep4storage_resource_id, + 'type_id': 5, + 'claimable_capacity': 400, + 'available_capacity': 400, + 'active': True + } + } with self.assertRaises(ValueError): - _, _ = self.uut.get_is_claimable(estimates) + _, _ = self.uut.get_is_claimable(estimates, claimable_resources_list) def test_get_is_claimable_fit(self): """ @@ -1362,51 +1383,94 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): self.cep4bandwidth_resource_id: { 'id': self.cep4bandwidth_resource_id, 'type_id': 3, + 'claimable_capacity': 4000, 'available_capacity': 4000, 'active': True }, self.cep4storage_resource_id: { 'id': self.cep4storage_resource_id, 'type_id': 5, + 'claimable_capacity': 400, 'available_capacity': 400, 'active': True }} - self.uut._get_current_resource_usage = MagicMock(return_value=claimable_resources_list) - - claimable_resources, unclaimable_resources = self.uut.get_is_claimable(estimates) + claimable_resources = self.uut.get_is_claimable(estimates, claimable_resources_list) self.assertEqual(len(claimable_resources), len(claimable_resources_list)) - self.assertEqual(unclaimable_resources, []) def test_get_is_claimable_not_fit(self): - """ Given 2 needed resources (which we need 4 times), and 2 claimable resource sets, 3 out of 4 fit, get_is_claimable should return failure. """ - - estimates = [ { 'root_resource_group': 'CEP4', 'resource_count': 4, 'resource_types': { 'bandwidth': 1000, 'storage': 100 } } ] - claimable_resources_list = { self.cep4bandwidth_resource_id: { 'id': self.cep4bandwidth_resource_id, 'type_id': 3, 'available_capacity': 4000, 'active': True }, - self.cep4storage_resource_id: { 'id': self.cep4storage_resource_id, 'type_id': 5, 'available_capacity': 300, 'active': True } } + """ Given 2 needed resources (which we need 4 times), and 2 claimable resource sets, 3 out of 4 fit, + get_is_claimable should return failure. """ - self._get_current_resource_usage = MagicMock(return_value=claimable_resources_list) - - claimable_resources, unclaimable_resources = self.uut.get_is_claimable(estimates) + estimates = [{ + 'root_resource_group': 'CEP4', + 'resource_count': 4, + 'resource_types': { + 'bandwidth': 1000, + 'storage': 100 + } + }] + claimable_resources_list = { + self.cep4bandwidth_resource_id: { + 'id': self.cep4bandwidth_resource_id, + 'type_id': 3, + 'claimable_capacity': 4000, + 'available_capacity': 4000, 'active': True + }, + self.cep4storage_resource_id: { + 'id': self.cep4storage_resource_id, + 'type_id': 5, + 'claimable_capacity': 300, + 'available_capacity': 300, + 'active': True + } + } - self.assertEqual(claimable_resources, []) - self.assertEqual(unclaimable_resources, estimates) + with self.assertRaises(CouldNotFindClaimException): + self.uut.get_is_claimable(estimates, claimable_resources_list) def test_get_is_claimable_partial_fit(self): - """ Given 2 sets of 2 needed resources (which we need 4 times), and 2 claimable resource sets, only one set fits, get_is_claimable should return partial success. """ + """ Given 2 sets of 2 needed resources (which we need 4 times), and 2 claimable resource sets, only one set + fits, get_is_claimable should return partial success. """ - estimates = [ { 'root_resource_group': 'CEP4', 'resource_count': 4, 'resource_types': { 'bandwidth': 1000, 'storage': 100 } }, - { 'root_resource_group': 'CEP4', 'resource_count': 4, 'resource_types': { 'bandwidth': 1000, 'storage': 100 } } ] - claimable_resources_list = { self.cep4bandwidth_resource_id: { 'id': self.cep4bandwidth_resource_id, 'type_id': 3, 'available_capacity': 5000, 'active': True }, - self.cep4storage_resource_id: { 'id': self.cep4storage_resource_id, 'type_id': 5, 'available_capacity': 500, 'active': True } } - - self.uut._get_current_resource_usage = MagicMock(return_value=claimable_resources_list) + estimates = [{ + 'root_resource_group': 'CEP4', + 'resource_count': 4, + 'resource_types': { + 'bandwidth': 1000, + 'storage': 100 + }}, { + 'root_resource_group': 'CEP4', + 'resource_count': 4, + 'resource_types': { + 'bandwidth': 1000, + 'storage': 100 + }}] + claimable_resources_list = { + self.cep4bandwidth_resource_id: { + 'id': self.cep4bandwidth_resource_id, + 'type_id': 3, + 'claimable_capacity': 5000, + 'available_capacity': 5000, + 'active': True + }, + self.cep4storage_resource_id: { + 'id': self.cep4storage_resource_id, + 'type_id': 5, + 'claimable_capacity': 500, + 'available_capacity': 500, + 'active': True + }} - claimable_resources, unclaimable_resources = self.uut.get_is_claimable(estimates) + # TODO: verify with Jan David whether this test case (returning a partial fit) should still succeed or whether + # an exception is expected to be raised + with self.assertRaises(CouldNotFindClaimException): + self.uut.get_is_claimable(estimates, claimable_resources_list) - self.assertEqual(len(claimable_resources), 2) # storage & bandwidth for estimates[0] - self.assertEqual(unclaimable_resources, [estimates[1]]) + # TODO: remove if uut raising exception is what's expected + # claimable_resources = self.uut.get_is_claimable(estimates, claimable_resources_list) + # self.assertEqual(len(claimable_resources), 2) # storage & bandwidth for estimates[0] if __name__ == '__main__': unittest.main() diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.run b/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.run index 659fea0dbec3b75b640ff6b5b0fa4c94093e236a..1b01df6857f64af27b5a19eca79d2fea2932b837 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.run +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.run @@ -2,5 +2,5 @@ # Run the unit test source python-coverage.sh -python_coverage_test "ResourceAvailabilityChecker*" t_resource_availability_checker.py +python_coverage_test "resource_availability_checker" t_resource_availability_checker.py diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 4bb8f7b9307f6f691a40be65ab274b303abb7fcd..44f93c2fa7f70d66533484c2411bc23fe4bd7e12 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -23,14 +23,17 @@ import mock import datetime import sys -from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner +from lofar.sas.resourceassignment.resourceassigner.resource_assigner import ResourceAssigner +from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import ResourceAvailabilityChecker from lofar.parameterset import parameterset +from lofar.common.datetimeutils import parseDatetime ra_notification_prefix = "ra_notification_prefix" class TestingResourceAssigner(ResourceAssigner): - def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, sqrpc, ra_notification_bus, ra_checker): + def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, sqrpc, ra_notification_bus, dwell_scheduler, + radb_dbcreds=None): # super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting self.radbrpc = rarpc self.rerpc = rerpc @@ -40,7 +43,10 @@ class TestingResourceAssigner(ResourceAssigner): self.sqrpc = sqrpc self.ra_notification_bus = ra_notification_bus self.ra_notification_prefix = ra_notification_prefix - self.resource_availability_checker = ra_checker + # Could mock ResourceAvailabilityChecker, but it is out of play already due to mocked DwellScheduler + self.resource_availability_checker = ResourceAvailabilityChecker(rarpc) + self.dwell_scheduler = dwell_scheduler + self.radb_creds = radb_dbcreds class ResourceAssignerTest(unittest.TestCase): @@ -1555,24 +1561,20 @@ class ResourceAssignerTest(unittest.TestCase): self.addCleanup(ra_notification_bus_patcher.stop) self.ra_notification_bus_mock = ra_notification_bus_patcher.start() - logger_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.logger') + logger_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.logger') self.addCleanup(logger_patcher.stop) self.logger_mock = logger_patcher.start() - ra_checker_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_availability_checker') - self.addCleanup(ra_checker_patcher.stop) - self.ra_checker_mock = ra_checker_patcher.start() - # Most tests use the same doAssignment() input values, hence the return value of get_is_claimable() is the same - # as for all these tests as well. Do override when necessary of course - self.ra_checker_mock.get_is_claimable.return_value = ([ - {'status': 'tentative', 'resource_type_id': 3, 'resource_id': 116, 'claim_size': 2, 'starttime': None, - 'used_rcus': None, 'endtime': None, 'properties': []}, - {'status': 'tentative', 'resource_type_id': 5, 'resource_id': 117, 'claim_size': 2, 'starttime': None, - 'used_rcus': None, 'endtime': None, 'properties': [ - {'io_type': 'output', 'type': 15, 'sap_nr': 0, 'value': 0}, - {'io_type': 'output', 'type': 2, 'sap_nr': 0, 'value': 1}, - {'io_type': 'output', 'type': 10, 'sap_nr': 0, 'value': 1073741824}] - }], []) + # ra_checker_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_availability_checker') + # self.addCleanup(ra_checker_patcher.stop) + # self.ra_checker_mock = ra_checker_patcher.start() + + dwell_scheduler_patcher = mock.patch( + 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.DwellScheduler' + ) + self.addCleanup(dwell_scheduler_patcher.stop) + self.dwell_scheduler_mock = dwell_scheduler_patcher.start() + self.dwell_scheduler_mock().allocate_resources.return_value = True # Select logger output to see def myprint(s, *args): @@ -1584,14 +1586,15 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.error.side_effect = myprint move_pipeline_after_its_predecessors_patcher = mock.patch( - 'lofar.sas.resourceassignment.resourceassigner.assignment.movePipelineAfterItsPredecessors') + 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.movePipelineAfterItsPredecessors' + ) self.addCleanup(move_pipeline_after_its_predecessors_patcher.stop) self.movePipelineAfterItsPredecessors_mock = move_pipeline_after_its_predecessors_patcher.start() - self.resourceAssigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, - self.otdbrpc_mock, self.momrpc_mock, - self.curpc_mock, self.sqrpc_mock, - self.ra_notification_bus_mock, self.ra_checker_mock) + self.resource_assigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, + self.otdbrpc_mock, self.momrpc_mock, + self.curpc_mock, self.sqrpc_mock, + self.ra_notification_bus_mock, self.dwell_scheduler_mock) self.reset_specification_tree() @@ -1612,58 +1615,60 @@ class ResourceAssignerTest(unittest.TestCase): self.assertTrue(self.ra_notification_bus_mock.close.called, "ra_notification_bus.close was not called") def test_open_opens_all_services(self): - self.resourceAssigner.open() + self.resource_assigner.open() self.assert_all_services_opened() def test_close_closes_all_services(self): - self.resourceAssigner.close() + self.resource_assigner.close() self.assert_all_services_closed() def test_contextManager_opens_and_closes_all_services(self): - with TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, - self.otdbrpc_mock, self.momrpc_mock, - self.curpc_mock, self.sqrpc_mock, - self.ra_notification_bus_mock, self.ra_checker_mock): + with TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, self.otdbrpc_mock, self.momrpc_mock, + self.curpc_mock, self.sqrpc_mock, self.ra_notification_bus_mock, + self.dwell_scheduler_mock): self.assert_all_services_opened() self.assert_all_services_closed() def test_do_assignment_logs_specification(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.info.assert_any_call('doAssignment: specification_tree=%s' % self.specification_tree) + self.logger_mock.info.assert_any_call('do_assignment: otdb_id=%s specification_tree=%s' % ( + self.specification_tree['otdb_id'], + self.specification_tree + )) def test_do_assignment_log_non_approved_or_prescheduled_states(self): - self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], - self.non_approved_or_prescheduled_specification_tree) + otdb_id = self.non_approved_or_prescheduled_otdb_id + status = self.non_approved_or_prescheduled_status + spec_tree = self.non_approved_or_prescheduled_specification_tree + + with self.assertRaises(Exception): + self.resource_assigner.do_assignment(otdb_id, spec_tree) - assignable_task_states_str = "approved, prescheduled" - self.logger_mock.warn.assert_any_call( - 'Task otdb_id=%s with status \'%s\' is not assignable. Allowed statuses are %s' % - (self.non_approved_or_prescheduled_otdb_id, self.non_approved_or_prescheduled_status, - assignable_task_states_str)) + assignable_task_states_str = "approved, prescheduled" + self.logger_mock.warn.assert_any_call( + 'Task otdb_id=%s with status \'%s\' is not assignable. Allowed statuses are %s' % + (otdb_id, status, assignable_task_states_str)) def test_do_assignment_non_approved_or_prescheduled_states_should_be_skipped(self): - self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], - self.non_approved_or_prescheduled_specification_tree) - - self.assertEqual(len(self.otdbrpc_mock.method_calls), 0, - "OTDBRPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.rarpc_mock.method_calls), 0, - "RARPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.momrpc_mock.method_calls), 0, - "MOMRPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.rerpc_mock.method_calls), 0, - "RERPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.curpc_mock.method_calls), 0, - "CURPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.ra_notification_bus_mock.method_calls), 0, - "RA notification bus was called for non approved or scheduled specification tree") + with self.assertRaises(Exception): + self.resource_assigner.do_assignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], + self.non_approved_or_prescheduled_specification_tree) + + def test_do_assignment_approved_task_should_not_be_rescheduled(self): + otdb_id = self.specification_tree['otdb_id'] + self.specification_tree['state'] = 'approved' + + self.resource_assigner.do_assignment(otdb_id, self.specification_tree) + + self.logger_mock.info.assert_any_call('Task otdb_id=%s is already approved, no resource assignment needed' % + otdb_id) def test_do_assignment_inserts_specification_and_task_in_radb(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) start_time = datetime.datetime.strptime(self.future_start_time, '%Y-%m-%d %H:%M:%S') stop_time = datetime.datetime.strptime(self.future_stop_time, '%Y-%m-%d %H:%M:%S') @@ -1673,26 +1678,16 @@ class ResourceAssignerTest(unittest.TestCase): self.task_type, start_time, stop_time, str(parset), "CEP4") - # TODO: logging of failures is now done in raservice. How to go about this here? - # def test_do_assignment_logs_when_insertion_of_specification_and_task_in_radb_failed(self): - # return_value = {'inserted': False} - # - # self.rarpc_mock.insertSpecificationAndTask.return_value = return_value - # - # self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - # - # self.logger_mock.error.assert_any_call('could not insert specification and task: result = %s', return_value) - def test_do_assignment_logs_when_no_predecessors_found(self): self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): []} - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('no predecessors for otdb_id=%s mom_id=%s', self.task_otdb_id, self.task_mom_id) def test_do_assignment_logs_when_predecessors_are_found(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', self.predecessor_task_mom_ids, self.task_mom_id, self.task_otdb_id) @@ -1700,14 +1695,14 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self): self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_predecessors_are_found_that_need_to_be_linked_to_task(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( 'connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', @@ -1717,20 +1712,20 @@ class ResourceAssignerTest(unittest.TestCase): self.task_otdb_id) def test_do_assignment_insert_predecessor_into_task_when_not_linked_to_task(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.task_id, self.predecessor_task_id) def test_do_assignment_logs_when_no_successors_found(self): self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): []} - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('no successors for otdb_id=%s mom_id=%s', self.task_otdb_id, self.task_mom_id) def test_do_assignment_logs_when_successors_are_found(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', self.successor_task_mom_ids, self.task_mom_id, self.task_otdb_id) @@ -1738,14 +1733,14 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_logs_when_successors_are_found_but_its_task_is_missing_in_radb(self): self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'could not find successor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_successors_are_found_that_need_to_be_linked_to_task(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( 'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', @@ -1755,32 +1750,32 @@ class ResourceAssignerTest(unittest.TestCase): self.task_otdb_id) def test_do_assignment_insert_successor_into_task_when_not_linked_to_task(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.successor_task_id, self.task_id) def test_do_assignment_moves_pipeline_of_successor_after_predecessor(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertTrue(self.movePipelineAfterItsPredecessors_mock.called) def test_do_assignment_logs_mom_bug(self): - self.resourceAssigner.doAssignment(self.mom_bug_specification_tree['otdb_id'], - self.mom_bug_specification_tree) + self.resource_assigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) self.logger_mock.info.assert_any_call( 'overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', self.mom_bug_processing_cluster_name, 'CEP4', self.mom_bug_otdb_id) def test_do_assignment_resets_ProcessingCluster_clusterName_on_mom_bug(self): - self.resourceAssigner.doAssignment(self.mom_bug_specification_tree['otdb_id'], - self.mom_bug_specification_tree) + self.resource_assigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) self.otdbrpc_mock.taskSetSpecification.assert_any_call( self.mom_bug_otdb_id, {'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': 'CEP4'}) - @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') def test_do_assignment_should_reset_observation_period_when_in_past_without_predecessor_and_duration( self, datetime_mock): now = datetime.datetime.utcnow() + datetime.timedelta(days=1) @@ -1789,7 +1784,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = now + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1806,7 +1801,7 @@ class ResourceAssignerTest(unittest.TestCase): 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') }) - @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') def test_do_assignment_should_reset_observation_period_when_in_past_with_predecessor_in_future(self, datetime_mock): now = self.freeze_time_one_day_in_the_future(datetime_mock) @@ -1817,7 +1812,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1845,8 +1840,8 @@ class ResourceAssignerTest(unittest.TestCase): def _strip_ms(self, now): return datetime.datetime.strptime(now.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S') - @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') - def test_do_assignment_should_reset_observation_period_when_in_past(self, datetime_mock): + @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') + def test_do_assignment_pushes_back_observation_start_and_end_times_when_in_past(self, datetime_mock): now = datetime.datetime.utcnow() + datetime.timedelta(days=1) datetime_mock.utcnow.return_value = now @@ -1854,7 +1849,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = now + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1871,61 +1866,112 @@ class ResourceAssignerTest(unittest.TestCase): 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') }) + def test_get_main_task_start_and_end_times_with_unspecified_start_and_end_times(self): + """ + Verify that get_main_task_start_and_end_times() returns start/end times in the future with the default duration + """ + + self.specification_tree['specification']['Observation.startTime'] = None + self.specification_tree['specification']['Observation.stopTime'] = None + expected_duration = datetime.timedelta(hours=1) + + start_time, end_time = self.resource_assigner._get_main_task_start_and_end_times(self.specification_tree) + + duration = end_time - start_time + self.assertEqual(expected_duration, duration) + self.assertGreater(start_time, datetime.datetime.utcnow()) + + def test_get_main_task_start_and_end_times_with_unspecified_start_and_end_times_and_specified_duration(self): + """ + Verify that get_main_task_start_and_end_times() returns start/end times in the future with the specified + duration + """ + + self.specification_tree['specification']['Observation.startTime'] = None + self.specification_tree['specification']['Observation.stopTime'] = None + self.specification_tree['specification']['Observation.Scheduler.taskDuration'] = 300 # seconds + expected_duration = datetime.timedelta(seconds=300) + + start_time, end_time = self.resource_assigner._get_main_task_start_and_end_times(self.specification_tree) + + duration = end_time - start_time + self.assertEqual(expected_duration, duration) + self.assertGreater(start_time, datetime.datetime.utcnow()) + + def test_get_main_task_start_and_end_times_with_start_and_end_times_in_the_past(self): + """ + Verify that get_main_task_start_and_end_times() returns start/end times in the future but retains the original + duration. + """ + + specified_duration = datetime.timedelta(hours=5) + _start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=7) + _end_time = _start_time + specified_duration + self.specification_tree['specification']['Observation.startTime'] = _start_time.strftime('%Y-%m-%d %H:%M:%S') + self.specification_tree['specification']['Observation.stopTime'] = _end_time.strftime('%Y-%m-%d %H:%M:%S') + + start_time, end_time = self.resource_assigner._get_main_task_start_and_end_times(self.specification_tree) + + duration = end_time - start_time + self.assertEqual(specified_duration, duration) + self.assertGreater(start_time, datetime.datetime.utcnow()) + def test_do_assignment_should_log_insertion_of_specification_and_task(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( - 'doAssignment: insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s' + 'insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s' ' cluster=%s' % (self.mom_id, self.otdb_id, self.state, self.task_type, self.future_start_time, self.future_stop_time, "CEP4")) def test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.info.assert_any_call('doAssignment: inserted specification (id=%s) and task (id=%s)' % + self.logger_mock.info.assert_any_call('inserted specification (id=%s) and task (id=%s)' % (self.specification_id, self.task_id)) def test_do_assignment_inserts_maintenance_resource_claims_in_radb(self): - self.resourceAssigner.doAssignment(self.maintenance_specification_tree['otdb_id'], - self.maintenance_specification_tree) + self.resource_assigner.do_assignment(self.maintenance_specification_tree['otdb_id'], + self.maintenance_specification_tree) - self.logger_mock.info.assert_any_call('doAssignment: %d claims were inserted in the radb' % 2) + subject = 'TaskScheduled' + content = "{'mom_id': 351543, 'radb_id': 2299, 'otdb_id': 1290472}" + self.logger_mock.info.assert_any_call('Sending notification % s: % s' % (subject, content)) def test_do_assignment_inserts_projectreservation_resource_claims_in_radb(self): - self.resourceAssigner.doAssignment(self.projectreservation_specification_tree['otdb_id'], - self.projectreservation_specification_tree) + self.resource_assigner.do_assignment(self.projectreservation_specification_tree['otdb_id'], + self.projectreservation_specification_tree) - self.logger_mock.info.assert_any_call('doAssignment: %d claims were inserted in the radb' % 2) + subject = 'TaskScheduled' + content = "{'mom_id': 351543, 'radb_id': 2299, 'otdb_id': 1290472}" + self.logger_mock.info.assert_any_call('Sending notification % s: % s' % (subject, content)) def test_do_assignment_should_not_claim_resources_on_CEP2_tasks(self): exception_regex = "skipping resource assignment for task with cluster name" with self.assertRaisesRegexp(Exception, exception_regex): - self.resourceAssigner._do_assignment(self.cep2_specification_tree['otdb_id'], self.cep2_specification_tree) - - def test_do_assignment_should_not_claim_resources_on_non_prescheduled_cep4_tasks(self): - self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], - self.non_approved_or_prescheduled_specification_tree) - - self.rarpc_mock.insertResourceClaims.assert_not_called() + self.resource_assigner.do_assignment(self.cep2_specification_tree['otdb_id'], + self.cep2_specification_tree) def test_do_assignment_should_request_needed_resources(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rerpc_mock.assert_any_call({"specification_tree": self.specification_tree}, timeout=10) def test_do_assignment_logs_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 11 - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call("no otdb_id %s found in estimator results %s" % - (self.otdb_id + 11, self.rerpc_replymessage)) + self.logger_mock.error.assert_any_call( + "An exception occurred while obtaining resource estimates. Exception=no otdb_id %s found in estimator results %s" % + (self.otdb_id + 11, self.rerpc_replymessage) + ) def test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 1 - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() @@ -1933,24 +1979,25 @@ class ResourceAssignerTest(unittest.TestCase): wrong_task_type = "observation" self.specification_tree["task_type"] = wrong_task_type - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call("no task type %s found in estimator results %s" % - (wrong_task_type, - self.rerpc_replymessage[str(self.otdb_id)])) + self.logger_mock.error.assert_any_call( + "An exception occurred while obtaining resource estimates. Exception=no task type %s found in estimator results %s" % + (wrong_task_type, self.rerpc_replymessage[str(self.otdb_id)]) + ) def test_do_assignment_should_not_claim_resources_when_task_type_not_in_needed_resources(self): wrong_task_type = "observation" self.specification_tree["task_type"] = wrong_task_type - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() def test_do_assignment_should_log_single_errors_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call("Error from Resource Estimator: %s", self.resource_error1) self.logger_mock.error.assert_any_call("Error from Resource Estimator: %s", self.resource_error2) @@ -1958,16 +2005,17 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_should_log_error_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call( - "Error(s) in estimator for otdb_id=%s radb_id=%s" % - (self.resources_with_errors_otdb_id, self.task_id)) + "An exception occurred while obtaining resource estimates. Exception=Error(s) in estimator for otdb_id=%s radb_id=%s" % + (self.resources_with_errors_otdb_id, self.task_id) + ) def test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') @@ -1977,7 +2025,7 @@ class ResourceAssignerTest(unittest.TestCase): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) @@ -1989,149 +2037,48 @@ class ResourceAssignerTest(unittest.TestCase): return found def test_do_assignment_should_log_estimator_reply(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call('doAssignment: Resource Estimator reply = %s', - self.rerpc_replymessage) - - def test_do_assignment_logs_amount_of_claims_inserted_in_radb(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - storage_claim = {'status': 'tentative', 'resource_id': 117, 'resource_type_id': 5, 'claim_size': 2, - 'starttime': datetime.datetime(2016, 3, 25, 21, 47, 31), - 'used_rcus': None, - 'endtime': datetime.datetime(2017, 3, 25, 22, 47, 31), - 'properties': [{'type': 15, 'io_type': 'output', 'sap_nr': 0, 'value': 0}, - {'type': 2, 'io_type': 'output', 'sap_nr': 0, 'value': 1}, - {'type': 10, 'io_type': 'output', 'sap_nr': 0, 'value': 1073741824}]} - claims = [self.bandwidth_claim, storage_claim] - - self.logger_mock.info.assert_any_call('doAssignment: inserting %d claims in the radb: %s', len(claims), claims) - - def test_do_assignment_inserts_resource_claims_in_radb(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, self.specification_claims, 1, 'anonymous', -1) - - def test_do_assignment_inserts_resource_claims_with_rcus_no_earlier_claims(self): - used_rcus = '111100010111100101101010' - - self.rarpc_mock.insertRcuSpecifications.return_value = [1] - self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} - self.rarpc_mock.getResourceClaims.return_value = [] - - self.ra_checker_mock.get_is_claimable.return_value = ([ - {'status': 'tentative', 'resource_type_id': 2, 'resource_id': 212, 'claim_size': 14, 'starttime': None, - 'used_rcus': '111100010111100101101010', 'endtime': None, 'properties': []}], []) - - rcu_claim = { - 'resource_id': 212, - 'resource_type_id': 2, - 'starttime': self.task_start_time, - 'endtime': self.task_end_time, - 'status': 'tentative', - 'used_rcus': used_rcus, - 'claim_size': used_rcus.count('1'), - 'properties': [] - } - - self.specification_tree['otdb_id'] = self.resources_with_rcus_otdb_id - self.specification_tree['task_type'] = 'observation' - self.task['type'] = 'observation' - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, [rcu_claim], 1, 'anonymous', -1) - - def test_do_assignment_logs_amount_claims_inserted(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call('doAssignment: %d claims were inserted in the radb' % 2) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - def test_do_assignment_logs_when_it_was_unable_to_claim_all_resources(self): - self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} + self.logger_mock.info.assert_any_call('Resource Estimator reply = %s', self.rerpc_replymessage) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_or_all_resources(self): + self.dwell_scheduler_mock().allocate_resources.return_value = False - self.logger_mock.error.assert_any_call('doAssignment: too few claims were inserted in the radb') - - def test_do_assignment_updates_task_when_it_was_unable_to_claim_all_resources(self): - self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} - - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict') - def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_all_resources(self): + def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_or_all_resources(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Conflict' - self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} + self.dwell_scheduler_mock().allocate_resources.return_value = False - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) - def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_resources(self): - self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} - - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict') - - def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_resources(self): - content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} - subject = 'Task' + 'Conflict' - - self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} - - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.assertBusNotificationAndLogging(content, subject) - - def test_do_assignment_logs_when_there_are_conflicting_claims(self): - conflicting_claims = [{}] - - self.rarpc_mock.getResourceClaims.return_value = conflicting_claims - - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.error.assert_any_call( - 'doAssignment: Task cannot be scheduled, because of %d conflicting claims: %s' % - (len(conflicting_claims), conflicting_claims)) - def test_do_assignment_notifies_bus_when_there_are_conflicting_claims(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Conflict' - conflicting_claims = [{}] - self.rarpc_mock.getResourceClaims.return_value = conflicting_claims + self.dwell_scheduler_mock().allocate_resources.return_value = False - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) - def test_do_assignment_logs_when_all_resources_were_claimed(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call( - 'doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to claimed' % self.task_id) - - def test_do_assignment_updates_task_and_resources_as_claimed_in_radb(self): - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.updateTaskAndResourceClaims.assert_any_call(self.task_id, claim_status='claimed') - def test_do_assignment_logs_task_data_removal_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call("removing data on disk from previous run for otdb_id %s", self.otdb_id) def test_do_assignment_removes_task_data_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.curpc_mock.removeTaskData.assert_any_call(self.task_otdb_id) @@ -2140,7 +2087,7 @@ class ResourceAssignerTest(unittest.TestCase): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} self.curpc_mock.removeTaskData.return_value = {'deleted': False, 'message': message} - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( "could not remove all data on disk from previous run for otdb_id %s: %s", self.otdb_id, message) @@ -2149,7 +2096,7 @@ class ResourceAssignerTest(unittest.TestCase): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Scheduled' - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) @@ -2158,40 +2105,42 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.info.assert_any_call('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) - @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification(self, datetime_mock): self.freeze_time_one_day_in_the_future(datetime_mock) exception_str = "Error something went wrong" self.otdbrpc_mock.taskSetSpecification.side_effect = Exception(exception_str) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_str): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(exception_str) + self.logger_mock.error.assert_any_call(exception_str) def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification_with_mom_bug(self): exception_str = "Error something went wrong" self.otdbrpc_mock.taskSetSpecification.side_effect = Exception(exception_str) - # with self.assertRaisesRegexp(Exception, exception_str): - self.resourceAssigner.doAssignment(self.mom_bug_specification_tree['otdb_id'], - self.mom_bug_specification_tree) + with self.assertRaisesRegexp(Exception, exception_str): + self.resource_assigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) - self.logger_mock.error.assert_any_call(exception_str) + self.logger_mock.error.assert_any_call(exception_str) def test_do_assignment_logs_exception_from_rerpc(self): - exception = Exception("Error something went wrong") - self.rerpc_mock.side_effect = exception + exception_msg = "Error something went wrong" + self.rerpc_mock.side_effect = Exception(exception_msg) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_msg): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_msg) def test_do_assignment_updates_task_on_exception_from_rerpc(self): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') @@ -2202,44 +2151,48 @@ class ResourceAssignerTest(unittest.TestCase): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def test_do_assignment_logs_when_notifies_bus_thows_exception(self): - exception = Exception("Error something went wrong") - self.ra_notification_bus_mock.send.side_effect = exception + exception_msg = "Error something went wrong" + self.ra_notification_bus_mock.send.side_effect = Exception(exception_msg) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_msg): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_msg) def test_do_assignment_logs_when_momrpc_getPredecessorIds_throws_exception(self): - exception = Exception("Error something went wrong") - self.momrpc_mock.getPredecessorIds.side_effect = exception + exception_msg = "Error something went wrong" + self.momrpc_mock.getPredecessorIds.side_effect = Exception(exception_msg) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_msg): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_msg) def test_do_assignment_logs_when_momrpc_getSuccessorIds_throws_exception(self): - exception = Exception("Error something went wrong") - self.momrpc_mock.getSuccessorIds.side_effect = exception + exception_msg = "Error something went wrong" + self.momrpc_mock.getSuccessorIds.side_effect = Exception(exception_msg) - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_msg): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_msg) - @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') def test_do_assignment_logs_exception_stop_time_parsing_on_predecessor(self, datetime_mock): self.freeze_time_one_day_in_the_future(datetime_mock) self.specification_tree[u'predecessors'][0]['specification'][u'Observation.stopTime'] = 'non parse' exception_str = 'time data \'non parse\' does not match format \'%Y-%m-%d %H:%M:%S\'' - self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_str): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(exception_str) + self.logger_mock.error.assert_any_call(exception_str) if __name__ == '__main__': unittest.main() diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.run b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.run index 636e663a0917a66c8581ca569fd658744b662c80..101b24af41f5f5a642b571de6446ee88f8e0d6e8 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.run +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.run @@ -2,5 +2,5 @@ # Run the unit test source python-coverage.sh -python_coverage_test "ResourceAssigner*" t_resourceassigner.py +python_coverage_test "resource_assigner" t_resourceassigner.py