From d56346a4c60af68f8bad9bd3c39d153d140592ba Mon Sep 17 00:00:00 2001 From: Ruud Beukema <beukema@astron.nl> Date: Thu, 13 Jul 2017 12:57:43 +0000 Subject: [PATCH] Task #10780: Finished refactoring, added docstrings, and got all unit tests working again. --- .../ResourceAssigner/lib/resource_assigner.py | 472 +++++++++++------- .../test/t_resourceassigner.py | 363 +++++--------- 2 files changed, 423 insertions(+), 412 deletions(-) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py index 47ee8b95cec..640321d1206 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py @@ -28,7 +28,7 @@ import logging from datetime import datetime, timedelta from lofar.common.cache import cache -from lofar.common.datetimeutils import totalSeconds, parseDatetime +from lofar.common.datetimeutils import parseDatetime from lofar.messaging.messages import EventMessage from lofar.messaging.messagebus import ToBus from lofar.messaging.RPC import RPC @@ -67,6 +67,11 @@ logger = logging.getLogger(__name__) class ResourceAssigner(object): + """ + The ResourceAssigner inserts new tasks or updates existing tasks in the RADB and assigns resources to it based on + a task's parset. + """ + def __init__(self, radb_busname=RADB_BUSNAME, radb_servicename=RADB_SERVICENAME, @@ -85,20 +90,33 @@ class ResourceAssigner(object): broker=None, radb_dbcreds=None): """ - ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset. - - :param radb_busname: busname on which the radb service listens (default: lofar.ra.command) - :param radb_servicename: servicename of the radb service (default: RADBService) - :param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command) - :param re_servicename: servicename of the resource estimator service (default: ResourceEstimation) + Creates a ResourceAssigner instance + + :param radb_busname: name of the bus on which the radb service listens (default: lofar.ra.command) + :param radb_servicename: name of the radb service (default: RADBService) + :param re_busname: name of the bus on which the resource estimator service listens (default: lofar.ra.command) + :param re_servicename: name of the resource estimator service (default: ResourceEstimation) + :param otdb_busname: name of the bus on which OTDB listens (default: lofar.otdb.command) + :param otdb_servicename: name of the OTDB service (default: OTDBService) + :param storagequery_busname: name of the bus on which the StorageQueryService listens + (default: lofar.dm.command) + :param storagequery_servicename: name of the StorageQueryService (default: StorageQueryService) + :param cleanup_busname: name of the bus on which the cleanup service listens (default: lofar.dm.command) + :param cleanup_servicename: name of the CleanupService (default: CleanupService) + :param ra_notification_busname: name of the bus on which the ResourceAssigner notifies registered parties + (default: lofar.ra.notification) + :param ra_notification_prefix: prefix used in notification message subject (default: ResourceAssigner.) + :param mom_busname: name of the bus on which MOM listens for queries (default: lofar.ra.command) + :param mom_servicename: name of the MOMQueryService (default: momqueryservice) :param broker: Valid Qpid broker host (default: None, which means localhost) + :param radb_dbcreds: the credentials to be used for accessing the RADB (default: None, which means default) """ self.radb_creds = radb_dbcreds self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180) self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180) - self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now + self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180) self.sqrpc = StorageQueryRPC(busname=storagequery_busname, servicename=storagequery_servicename, broker=broker) self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) @@ -144,39 +162,57 @@ class ResourceAssigner(object): return {rt['name']: rt['id'] for rt in self.radbrpc.getResourceTypes()} def do_assignment(self, otdb_id, specification_tree): - """ Attempts to assign the specified resources + """ + Makes the given task known to RADB and attempts to assign (schedule) the its requested resources. + + If no list of requested resources could be determined for the task, its status will be set to "error" in RADB. + If such list can be obtained but it is impossible to assign the requested resources, the task is in conflict + with other tasks, hence its status will be set to "conflict" in RADB. If all requested resources are + successfully assigned, its status will be put to "scheduled" in RADB. :param otdb_id: OTDB ID of the main task which resources need to be assigned :param specification_tree: the specification tree containing the main task and its resources + + :raises an Exception if something unforeseen happened while scheduling """ + logger.info(('do_assignment: otdb_id=%s specification_tree=%s' % (otdb_id, specification_tree))) + + # Make the task known to RADB task_id, task_type, task_status, task = self._insert_specification_into_radb(otdb_id, specification_tree) + # Don't perform any scheduling for tasks that are already approved. Do this check after insertion of + # specification, task and predecessor/successor relations, so approved tasks appear correctly in the web + # scheduler. if task_status == 'approved': - # Do this check after insertion of specification, task and predecessor/successor relations, so approved - # tasks appear correctly in the web scheduler. logger.info('Task otdb_id=%s is already approved, no resource assignment needed' % otdb_id) - return - - requested_resources = self._get_resource_estimates(specification_tree, otdb_id, task_type, task_id, task) - if requested_resources is None: - self._finish_resource_assignment(task, 'error') - return - - if not self._schedule_resources(task_id, specification_tree, requested_resources): - self._finish_resource_assignment(task, 'conflict') - return + else: + requested_resources = self._get_resource_estimates(specification_tree, otdb_id, task_type, task_id) + if requested_resources is None: + # No resource requests available, so change task status to "error" + self._finish_resource_assignment(task, 'error') + else: + if self._schedule_resources(task_id, specification_tree, requested_resources): + # Cleanup the data of any previous run of the task + self._cleanup_earlier_generated_data(otdb_id, task) - self._cleanup_generated_pipeline_data(otdb_id, task) - self._finish_resource_assignment(task, 'scheduled') + # Scheduling of resources for this task succeeded, so change task status to "scheduled" + self._finish_resource_assignment(task, 'scheduled') + else: + # Scheduling of resources for this task failed, so change task status to "conflict" + self._finish_resource_assignment(task, 'conflict') def _insert_specification_into_radb(self, otdb_id, specification_tree): """ - Inserts the main task's specification into RADB along with any predecessors and successors it has. + Tries to inserts the task's specification into RADB along with any of its predecessors and successors. :param otdb_id: the main task's OTDB ID :param specification_tree: the main task's specification - :return: True if specification is successfully inserted into RADB, or False if not + + :return: A 4-tuple (task_id, task_type, task_status, task) if the task's specification is successfully inserted + into RADB. + + :raises Exception if a task can't be inserted into RADB """ task_status = self._get_is_assignable(otdb_id, specification_tree) @@ -185,8 +221,8 @@ class ResourceAssigner(object): task_id, task = self._insert_main_task(specification_tree, start_time, end_time, cluster_name) - self._process_task_predecessors(task) - self._process_task_successors(task) + self._link_predecessors_to_task(task) + self._link_successors_to_task(task) logger.info('Successfully inserted main task and its predecessors and successors into RADB: task=%s', task) @@ -200,7 +236,8 @@ class ResourceAssigner(object): :param otdb_id: ORDB ID of the task :param specification_tree: the specification tree of the task - :returns the task's status if it is assignable (and raises an exception if it is not) + :returns the task's status if it is assignable + :raises Exception if it can't be assigned """ assignable_task_states = ['approved', 'prescheduled'] @@ -212,7 +249,7 @@ class ResourceAssigner(object): logger.warn('Task otdb_id=%s with status \'%s\' is not assignable. Allowed statuses are %s' % (otdb_id, status, assignable_task_states_str)) - message = "doAssignment: Unsupported status %s of task with OTDB ID: %s" % (status, otdb_id) + message = "doAssignment: Unsupported status '%s' of task with OTDB ID: %s" % (status, otdb_id) raise Exception(message) return status @@ -224,12 +261,13 @@ class ResourceAssigner(object): :param otdb_id: the main task's OTDB ID :param specification_tree: the main task's specification - :return: 3-tuple containing the main task's start_time, end_time, and cluster_name respectively + + :return: 4-tuple (task_type, start_time, end_time, cluster_name) of the task prepared for RADB insertion """ main_parset = self._get_main_parset(specification_tree) - task_type, task_subtype = self._get_task_type(specification_tree) - cluster_name = self._get_clustername(otdb_id, main_parset, task_type, task_subtype) + task_type, _ = self._get_task_type(specification_tree) + cluster_name = self._get_clustername(otdb_id, main_parset, task_type) start_time, end_time = self._get_main_task_start_and_end_times(specification_tree) logger.info('preparations for inserting main task into RADB successful') @@ -238,7 +276,13 @@ class ResourceAssigner(object): def _finish_resource_assignment(self, task, new_task_status): """ - Takes care of the needed RADB task administration and status change notification before generating an exception + Finishes the resource assignment by updating a task's status in RADB and sending out a corresponding + notification to registered parties on the Resource Assigner notification bus. + + :param task: the task at hand + :param new_task_status: the new status to set the task to in RADB + + :raises Exception if updating RADB fails, or if sending the notification fails """ if task is not None and new_task_status in ('conflict', 'error', 'scheduled'): @@ -260,9 +304,10 @@ class ResourceAssigner(object): self.ra_notification_bus.send(event_message) def _get_main_parset(self, specification_tree): - """ Extracts the main parset from a specification tree + """ + Extracts the main task's parset from a specification tree - :param specification_tree the specification tree of the task + :param specification_tree: the task's specification tree :returns the main parset """ @@ -270,6 +315,14 @@ class ResourceAssigner(object): return parameterset(specification_tree['specification']) def _get_task_type(self, specification_tree): + """ + Extracts the task's type and subtype (if applicable) from a specification tree + + :param specification_tree: specification_tree: the task's specification tree + + :return: 2-tuple (task_type, task_subtype) + """ + task_type = specification_tree['task_type'] # is required item if 'task_subtype' in specification_tree: # is optional item task_subtype = specification_tree['task_subtype'] @@ -278,21 +331,22 @@ class ResourceAssigner(object): return task_type, task_subtype - def _get_clustername(self, otdb_id, mainParset, taskType, taskSubtype): - """ Determines the name of the cluster to which to store the task's output - if it produces output at all. + def _get_clustername(self, otdb_id, parset, task_type): + """ + Determines the name of the cluster to which to store the task's output - if it produces output at all that is. - :param otdb_id: the ORDB ID of the (main) task - :param mainParset: the parset of the main task - :param taskType: the task's type - :param taskSubtype: the task's subtype + :param otdb_id: the ORDB ID of the task + :param parset: the parset of the task + :param task_type: the task's type :returns The name of the output cluster, or an empty string if none is applicable + :raises Exception if the storage cluster required by the task is unknown to the system """ cluster_name = '' - if taskType not in ('reservation',): + if task_type not in ('reservation',): # Only assign resources for task output to known clusters - cluster_name_set = self._get_cluster_names(mainParset) + cluster_name_set = self._get_cluster_names(parset) if str() in cluster_name_set or len(cluster_name_set) != 1: # Empty set or name is always an error. @@ -317,11 +371,11 @@ class ResourceAssigner(object): # fix for MoM bug introduced before NV's holiday # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4 # so, override it here if needed, and update to otdb - processing_cluster_name = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', - '') + processing_cluster_name = parset.getString('Observation.Cluster.ProcessingCluster.clusterName', + '') if processing_cluster_name != cluster_name: - logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', - processing_cluster_name, cluster_name, otdb_id) + logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' ' + 'for otdb_id=%s', processing_cluster_name, cluster_name, otdb_id) self.otdbrpc.taskSetSpecification( otdb_id, {'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': cluster_name} @@ -330,10 +384,14 @@ class ResourceAssigner(object): return cluster_name def _get_cluster_names(self, parset): - """ Return set of storage cluster names for all enabled output data product types in parset, - or raise for an enabled output data product type without storage cluster name. """ - clusterNames = set() + Get the storage cluster names for all enabled output data product types in parset + + :param parset: the task's parset + + :raises Exception if an enabled output data product type has no storage cluster name specified. + """ + cluster_names = set() keys = ['Output_Correlated', 'Output_IncoherentStokes', @@ -343,101 +401,126 @@ class ResourceAssigner(object): 'Output_Pulsar'] for key in keys: if parset.getBool('Observation.DataProducts.%s.enabled' % key, False): - name = parset.getString('Observation.DataProducts.%s.storageClusterName' % key) # may raise; don't pass default arg - clusterNames.add(name) - - return clusterNames - - # TODO: This fragment is from the trunk - # def applySaneStartEndTime(): - # try: - # startTime = parseDatetime(mainParset.getString('Observation.startTime')) - # except Exception: - # startTime = datetime.utcnow() + timedelta(minutes=3) - # - # maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree) - # if maxPredecessorEndTime and maxPredecessorEndTime > startTime: - # startTime = maxPredecessorEndTime + timedelta(minutes=3) - # - # taskDuration = mainParset.getInt('ObsSW.Observation.Scheduler.taskDuration', -1) - # taskDuration = timedelta(seconds=taskDuration) if taskDuration > 0 else timedelta(hours=1) - # - # endTime = startTime + taskDuration - # - # logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', - # startTime, endTime, otdb_id) - # - # try: - # logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', startTime, endTime, - # otdb_id) - # self.otdbrpc.taskSetSpecification(otdb_id, { - # 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'), - # 'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')}) - # except Exception as e: - # logger.error(e) - # - # return startTime, endTime - # - # try: - # startTime = parseDatetime(mainParset.getString('Observation.startTime')) - # endTime = parseDatetime(mainParset.getString('Observation.stopTime')) - # - # if startTime < datetime.utcnow() and taskType not in ['reservation', 'maintenance']: - # startTime, endTime = applySaneStartEndTime() - # except Exception: - # logger.warning( - # 'cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', - # otdb_id) - # startTime, endTime = applySaneStartEndTime() - # - # logger.info('doAssignment: Accepted specification') + # may raise; don't pass default arg + name = parset.getString('Observation.DataProducts.%s.storageClusterName' % key) + cluster_names.add(name) + + return cluster_names + def _get_main_task_start_and_end_times(self, specification_tree): """ - Get the start time and end time of the main task adapted such that (a) there's a period of 3 minutes between + Get the start time and end time of the main task modified such that (a) there's a period of 3 minutes between tasks and (b) the start time and end time are actually in the future. - When either one or all of start time, end time, and duration are missing from the specification_tree, the start - time will be set to 3 minutes in the future and the end time to 1 hour after that. + If the start time lies in the past or is not specified it is set to 3 minutes from the current time. The new end + time in that case is calculated using the specified duration or, if that is not specified, from the original + difference between start and end time. When a duration can't be determined the end time will be set to 1 hour + after the start time. :param specification_tree: specification tree for the main task + :returns 2-tuple (start_time, end_time) """ def _get_start_and_end_times_from_parset(_parset): + """ + Extract the start and end times from a parset + + :param _parset: the parset + :return: A 2-tuple (start_time, end_time). start_time and end_time are returned as None when they were not + specified, or where specified in a wrong format. + """ + try: parset_start_time = parseDatetime(_parset.getString('Observation.startTime')) - except Exception: + except ValueError or KeyError: # Too bad no valid start time is specified! parset_start_time = None try: parset_end_time = parseDatetime(_parset.getString('Observation.stopTime')) - except Exception: + except ValueError or KeyError: # Too bad no valid end time is specified! parset_end_time = None return parset_start_time, parset_end_time + # TODO: add unit tests for this functionality def _get_duration_from_parset(_parset): - parset_duration = _parset.getInt('Observation.Scheduler.taskDuration', -1) - return timedelta(seconds=parset_duration) if parset_duration > 0 else timedelta(hours=1) + """ + Preferably use the duration specified by the parset. If that's not available, calculate the duration from + the difference between start/end times. If that's also impossible, fall back to a default duration + + :param _parset: the task's parset containing start/end times and durations (usually) + + :returns the obtained, calculated, or default duration + """ + + try: + duration = timedelta(seconds=_parset.getInt('Observation.Scheduler.taskDuration')) + except Exception: + _start_time, _end_time = _get_start_and_end_times_from_parset(_parset) + if _start_time is not None and _end_time is not None and _start_time < _end_time: + duration = _end_time - _start_time + else: + duration = timedelta(hours=1) + + return duration + + # TODO: add unit tests that verify the task_types logic def _get_need_to_push_back_start_and_end_times(_start_time, _end_time): - return _start_time is None or \ - _end_time is None or \ - _start_time < datetime.utcnow() + """ + Determines whether or not a task's start/end times need to be pushed back in time + + :param _start_time: the task's start time + :param _end_time: the task's end time + + :return: True if start/end times need to be pushed back, False otherwise + """ + + task_type, _ = self._get_task_type(specification_tree) + + # The start time of reservations and maintenance tasks are allowed to lie in the past + if task_type in ['reservation', 'maintenance']: + do_push_back = False + else: + do_push_back = _start_time is None or \ + _end_time is None or \ + _start_time < datetime.utcnow() + + return do_push_back def _push_back_start_time_to_not_overlap_predecessors(_start_time, _specification_tree): + """ + Determines a new start time for a task when the current start time of that task overlaps with its + predecessors. + + :param _start_time: the task's start time + :param _specification_tree: the specification tree holding both the task's information and information about + its predecessors/successors etcetera. + + :return: The updated start time + """ + pushed_back_start_time = _start_time # Make sure the start time lies past the end time of the task's predecessors - max_predecessor_end_time = self._get_max_predecessor_end_time(_specification_tree) + max_predecessor_end_time = self._get_maximum_predecessor_end_time(_specification_tree) if max_predecessor_end_time and max_predecessor_end_time > _start_time: pushed_back_start_time = max_predecessor_end_time + timedelta(minutes=3) return pushed_back_start_time def _store_changed_start_and_end_times_to_otdb(_start_time, _end_time, _otdb_id): + """ + Stores the modified start/end times to the OTDB + + :param _start_time: the task's start time + :param _end_time: the task's end time + :param _otdb_id: the task's OTDB ID + """ + logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', _start_time, _end_time, _otdb_id) @@ -449,16 +532,15 @@ class ResourceAssigner(object): ) main_parset = self._get_main_parset(specification_tree) - original_start_time, end_time = _get_start_and_end_times_from_parset(main_parset) + start_time, end_time = _get_start_and_end_times_from_parset(main_parset) # TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. - if _get_need_to_push_back_start_and_end_times(original_start_time, end_time): + if _get_need_to_push_back_start_and_end_times(start_time, end_time): # Make sure the start time lies in the future and doesn't overlap with any predecessors - if original_start_time is None or original_start_time < datetime.utcnow(): + if start_time is None or start_time < datetime.utcnow(): start_time = datetime.utcnow() + timedelta(minutes=3) start_time = _push_back_start_time_to_not_overlap_predecessors(start_time, specification_tree) - # The end time is calculated either from the specified duration or end_time = start_time + _get_duration_from_parset(main_parset) otdb_id = specification_tree['otdb_id'] @@ -470,11 +552,17 @@ class ResourceAssigner(object): return start_time, end_time def _insert_main_task(self, specification_tree, start_time, end_time, cluster_name): - """ Inserts the main task and its specification into the RADB. Any existing specification and task with same + """ + Inserts the main task and its specification into the RADB. Any existing specification and task with same otdb_id will be deleted automatically. - :param specification_tree: specification tree for the main task - :return: 2-tuple (task_id, task) + :param specification_tree: the task's specification tree + :param start_time: the task's start time + :param end_time: the task's end time + :param cluster_name: the task's cluster name + + :return: 2-tuple (task_id, task) of the inserted task + :raises Exception if there's an unforeseen problem while inserting the task and its specifications into RADB """ task_type, _ = self._get_task_type(specification_tree) @@ -483,9 +571,8 @@ class ResourceAssigner(object): status = specification_tree.get('state', '').lower() otdb_id = specification_tree['otdb_id'] logger.info( - 'doAssignment: insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, \ - end_time=%s cluster=%s' % - (mom_id, otdb_id, status, task_type, start_time, end_time, cluster_name) + 'insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s ' + 'cluster=%s' % (mom_id, otdb_id, status, task_type, start_time, end_time, cluster_name) ) result = self.radbrpc.insertSpecificationAndTask(mom_id, otdb_id, status, task_type, start_time, end_time, @@ -493,32 +580,42 @@ class ResourceAssigner(object): specification_id = result['specification_id'] task_id = result['task_id'] - logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) + logger.info('inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) task = self.radbrpc.getTask(task_id) # if task_id is not None else None return task_id, task - def _get_resource_estimates(self, specification_tree, otdb_id, taskType, taskId, task): - """ Request and return checked estimates of needed resources from Resource Estimator. """ + def _get_resource_estimates(self, specification_tree, otdb_id, task_type, task_id): + """ + Obtains the resource estimates from the Resource Estimator for the main task in the specification tree and + validates them. + + :param specification_tree: the task's specification tree + :param otdb_id: the task's OTDB ID + :param task_type: the task's type + :param task_id: the task's ID + + :return A list of resource estimates for the given task or None in case none could be obtained or if the + validation failed. + """ - estimates = None try: - reReply, rerpcStatus = self.rerpc({"specification_tree" : specification_tree}, timeout=10) - logger.info('doAssignment: Resource Estimator reply = %s', reReply) + re_reply, rerpc_status = self.rerpc({"specification_tree": specification_tree}, timeout=10) + logger.info('Resource Estimator reply = %s', re_reply) - if str(otdb_id) not in reReply: - raise ValueError("no otdb_id %s found in estimator results %s" % (otdb_id, reReply)) - estimates = reReply[str(otdb_id)] + if str(otdb_id) not in re_reply: + raise ValueError("no otdb_id %s found in estimator results %s" % (otdb_id, re_reply)) + estimates = re_reply[str(otdb_id)] - if taskType not in estimates: - raise ValueError("no task type %s found in estimator results %s" % (taskType, estimates)) - estimates = estimates[taskType] + if task_type not in estimates: + raise ValueError("no task type %s found in estimator results %s" % (task_type, estimates)) + estimates = estimates[task_type] if 'errors' in estimates and estimates['errors']: for error in estimates['errors']: logger.error("Error from Resource Estimator: %s", error) - raise ValueError("Error(s) in estimator for otdb_id=%s radb_id=%s" % (otdb_id, taskId)) + raise ValueError("Error(s) in estimator for otdb_id=%s radb_id=%s" % (otdb_id, task_id)) if 'estimates' not in estimates or any('resource_types' not in est for est in estimates['estimates']): raise ValueError("missing 'resource_types' in 'estimates' in estimator results: %s" % estimates) @@ -535,14 +632,23 @@ class ResourceAssigner(object): return estimates def _schedule_resources(self, task_id, specification_tree, requested_resources): + """ + Schedule the requested resources for a task + + :param task_id: the task's ID + :param specification_tree: the task's specification tree + :param requested_resources: the resources requested by the task + + :returns: True if successful, or False otherwise + """ start_time, end_time = self._get_main_task_start_and_end_times(specification_tree) + # For now dwell-behavior is disabled by setting min_starttime/max_starttime to + # start_time, because the specification doesn't yet support this. + # TODO: enable dwell-scheduling once min_starttime/max_starttime are propagated scheduler = DwellScheduler(task_id=task_id, resource_availability_checker=self.resource_availability_checker, radbcreds=self.radb_creds, - # For now dwell-behavior is disabled by setting min_starttime/max_starttime to - # start_time, because the specification doesn't yet support this. - # TODO: enable dwell-scheduling once min_starttime/max_starttime are propagated min_starttime=start_time, max_starttime=start_time, duration=end_time - start_time) @@ -556,16 +662,21 @@ class ResourceAssigner(object): return result - def _cleanup_generated_pipeline_data(self, otdb_id, task): + def _cleanup_earlier_generated_data(self, otdb_id, task): """ - Remove any output and/or intermediate data for restarting pipelines + Remove any output and/or intermediate data from any previous run of the task - :return: + :param otdb_id: the task's OTDB ID + :param task: the task object """ + # Only needed for pipeline tasks if task['type'] == 'pipeline': try: - du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True) + du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], + include_scratch_paths=True, + force_update=True) + if du_result['found'] and du_result.get('disk_usage', 0) > 0: logger.info("removing data on disk from previous run for otdb_id %s", otdb_id) result = self.curpc.removeTaskData(task['otdb_id']) @@ -576,26 +687,13 @@ class ResourceAssigner(object): # in line with failure as warning just above: allow going to scheduled state here too logger.error(str(e)) - def _announceStateChange(self, task, status): - if status == 'scheduled' or status == 'conflict' or status == 'error': - content={'radb_id': task['id'], 'otdb_id': task['otdb_id'], 'mom_id': task['mom_id']} - subject= 'Task' + status[0].upper() + status[1:] - else: # should not end up here (bug) - logger.error('_announceStateChange(): bug: Not sending notification as status is %s' % status) - return - - try: - if status != 'scheduled': - # another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb - self.radbrpc.updateTask(task['id'], task_status=status) + def _link_predecessors_to_task(self, task): + """ + Links a task to its predecessors in RADB - msg = EventMessage(context=self.ra_notification_prefix + subject, content=content) - logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) - self.ra_notification_bus.send(msg) - except Exception as e: - logger.error(str(e)) + :param task: the task at hand + """ - def _process_task_predecessors(self, task): mom_id = task['mom_id'] predecessor_ids = self.momrpc.getPredecessorIds(mom_id) @@ -604,24 +702,31 @@ class ResourceAssigner(object): return predecessor_mom_ids = predecessor_ids[str(mom_id)] - logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id']) + logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], + task['otdb_id']) for predecessor_mom_id in predecessor_mom_ids: # check if the predecessor needs to be linked to this task predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id) if predecessor_task: if predecessor_task['id'] not in task['predecessor_ids']: - logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', - predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], task['otdb_id']) + logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s ' + 'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], + task['otdb_id']) self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id']) else: - # Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond approved, - # which is in principle valid. The link in the radb will be made later via processSuccessors() below. - # Alternatively, a predecessor could have been deleted. - logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', predecessor_mom_id, task['otdb_id']) + # Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond + # approved, which is in principle valid. The link in the radb will be made later via processSuccessors() + # below. Alternatively, a predecessor could have been deleted. + logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', + predecessor_mom_id, task['otdb_id']) + def _link_successors_to_task(self, task): + """ + Links a task to its successors in RADB - def _process_task_successors(self, task): + :param task: the task at hand + """ mom_id = task['mom_id'] successor_ids = self.momrpc.getSuccessorIds(mom_id) @@ -630,26 +735,41 @@ class ResourceAssigner(object): return successor_mom_ids = successor_ids[str(mom_id)] - logger.info('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id']) + logger.info('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], + task['otdb_id']) for successor_mom_id in successor_mom_ids: # check if the successor needs to be linked to this task successor_task = self.radbrpc.getTask(mom_id=successor_mom_id) if successor_task: if successor_task['id'] not in task['successor_ids']: - logger.info('connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', - successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], task['otdb_id']) + logger.info( + 'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s' + ' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], + task['otdb_id'] + ) + self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) + movePipelineAfterItsPredecessors(successor_task, self.radbrpc) else: - # Occurs when settings a obs or task to prescheduled while a successor has e.g. not yet been beyond approved, - # which is quite normal. The link in the radb will be made later via processPredecessors() above. - # Alternatively, a successor could have been deleted. - logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', successor_mom_id, task['otdb_id']) + # Occurs when settings a obs or task to prescheduled while a successor has e.g. not yet been beyond + # approved, which is quite normal. The link in the radb will be made later via processPredecessors() + # above. Alternatively, a successor could have been deleted. + logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', + successor_mom_id, task['otdb_id']) + + def _get_maximum_predecessor_end_time(self, specification_tree): + """ + Determine the highest end time of all predecessors of a task + + :param specification_tree: the task's specification tree + + :return: the maximum predecessor end time found, or None in case no predecessors are specified + """ - def _get_max_predecessor_end_time(self, specification_tree): predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']] - predecessor_endTimes = [parseDatetime(spec.getString('Observation.stopTime')) for spec in predecessor_specs] - if predecessor_endTimes: - return max(predecessor_endTimes) + predecessor_end_times = [parseDatetime(spec.getString('Observation.stopTime')) for spec in predecessor_specs] + if predecessor_end_times: + return max(predecessor_end_times) return None diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 33d964d5a2c..3b19819b59c 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -1568,10 +1568,12 @@ class ResourceAssignerTest(unittest.TestCase): # self.addCleanup(ra_checker_patcher.stop) # self.ra_checker_mock = ra_checker_patcher.start() - dwell_scheduler_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.DwellScheduler') + dwell_scheduler_patcher = mock.patch( + 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.DwellScheduler' + ) self.addCleanup(dwell_scheduler_patcher.stop) self.dwell_scheduler_mock = dwell_scheduler_patcher.start() - self.dwell_scheduler_mock.allocate_resources.return_value = True + self.dwell_scheduler_mock().allocate_resources.return_value = True # Select logger output to see def myprint(s, *args): @@ -1583,14 +1585,15 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.error.side_effect = myprint move_pipeline_after_its_predecessors_patcher = mock.patch( - 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.movePipelineAfterItsPredecessors') + 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.movePipelineAfterItsPredecessors' + ) self.addCleanup(move_pipeline_after_its_predecessors_patcher.stop) self.movePipelineAfterItsPredecessors_mock = move_pipeline_after_its_predecessors_patcher.start() - self.resourceAssigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, - self.otdbrpc_mock, self.momrpc_mock, - self.curpc_mock, self.sqrpc_mock, - self.ra_notification_bus_mock, self.dwell_scheduler_mock) + self.resource_assigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, + self.otdbrpc_mock, self.momrpc_mock, + self.curpc_mock, self.sqrpc_mock, + self.ra_notification_bus_mock, self.dwell_scheduler_mock) self.reset_specification_tree() @@ -1611,12 +1614,12 @@ class ResourceAssignerTest(unittest.TestCase): self.assertTrue(self.ra_notification_bus_mock.close.called, "ra_notification_bus.close was not called") def test_open_opens_all_services(self): - self.resourceAssigner.open() + self.resource_assigner.open() self.assert_all_services_opened() def test_close_closes_all_services(self): - self.resourceAssigner.close() + self.resource_assigner.close() self.assert_all_services_closed() @@ -1629,39 +1632,33 @@ class ResourceAssignerTest(unittest.TestCase): self.assert_all_services_closed() def test_do_assignment_logs_specification(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.info.assert_any_call('do_assignment: specification_tree=%s' % self.specification_tree) + self.logger_mock.info.assert_any_call('do_assignment: otdb_id=%s specification_tree=%s' % ( + self.specification_tree['otdb_id'], + self.specification_tree + )) def test_do_assignment_log_non_approved_or_prescheduled_states(self): - self.resourceAssigner.do_assignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], - self.non_approved_or_prescheduled_specification_tree) + otdb_id = self.non_approved_or_prescheduled_otdb_id + status = self.non_approved_or_prescheduled_status + spec_tree = self.non_approved_or_prescheduled_specification_tree - assignable_task_states_str = "approved, prescheduled" - self.logger_mock.warn.assert_any_call( - 'Task otdb_id=%s with status \'%s\' is not assignable. Allowed statuses are %s' % - (self.non_approved_or_prescheduled_otdb_id, self.non_approved_or_prescheduled_status, - assignable_task_states_str)) + with self.assertRaises(Exception): + self.resource_assigner.do_assignment(otdb_id, spec_tree) + + assignable_task_states_str = "approved, prescheduled" + self.logger_mock.warn.assert_any_call( + 'Task otdb_id=%s with status \'%s\' is not assignable. Allowed statuses are %s' % + (otdb_id, status, assignable_task_states_str)) def test_do_assignment_non_approved_or_prescheduled_states_should_be_skipped(self): - self.resourceAssigner.do_assignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], - self.non_approved_or_prescheduled_specification_tree) - - self.assertEqual(len(self.otdbrpc_mock.method_calls), 0, - "OTDBRPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.rarpc_mock.method_calls), 0, - "RARPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.momrpc_mock.method_calls), 0, - "MOMRPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.rerpc_mock.method_calls), 0, - "RERPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.curpc_mock.method_calls), 0, - "CURPC was called for non approved or scheduled specification tree") - self.assertEqual(len(self.ra_notification_bus_mock.method_calls), 0, - "RA notification bus was called for non approved or scheduled specification tree") + with self.assertRaises(Exception): + self.resource_assigner.do_assignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], + self.non_approved_or_prescheduled_specification_tree) def test_do_assignment_inserts_specification_and_task_in_radb(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) start_time = datetime.datetime.strptime(self.future_start_time, '%Y-%m-%d %H:%M:%S') stop_time = datetime.datetime.strptime(self.future_stop_time, '%Y-%m-%d %H:%M:%S') @@ -1671,26 +1668,16 @@ class ResourceAssignerTest(unittest.TestCase): self.task_type, start_time, stop_time, str(parset), "CEP4") - # TODO: logging of failures is now done in raservice. How to go about this here? - # def test_do_assignment_logs_when_insertion_of_specification_and_task_in_radb_failed(self): - # return_value = {'inserted': False} - # - # self.rarpc_mock.insertSpecificationAndTask.return_value = return_value - # - # self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - # - # self.logger_mock.error.assert_any_call('could not insert specification and task: result = %s', return_value) - def test_do_assignment_logs_when_no_predecessors_found(self): self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): []} - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('no predecessors for otdb_id=%s mom_id=%s', self.task_otdb_id, self.task_mom_id) def test_do_assignment_logs_when_predecessors_are_found(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', self.predecessor_task_mom_ids, self.task_mom_id, self.task_otdb_id) @@ -1698,14 +1685,14 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self): self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_predecessors_are_found_that_need_to_be_linked_to_task(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( 'connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', @@ -1715,20 +1702,20 @@ class ResourceAssignerTest(unittest.TestCase): self.task_otdb_id) def test_do_assignment_insert_predecessor_into_task_when_not_linked_to_task(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.task_id, self.predecessor_task_id) def test_do_assignment_logs_when_no_successors_found(self): self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): []} - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('no successors for otdb_id=%s mom_id=%s', self.task_otdb_id, self.task_mom_id) def test_do_assignment_logs_when_successors_are_found(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', self.successor_task_mom_ids, self.task_mom_id, self.task_otdb_id) @@ -1736,14 +1723,14 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_logs_when_successors_are_found_but_its_task_is_missing_in_radb(self): self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'could not find successor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_successors_are_found_that_need_to_be_linked_to_task(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( 'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', @@ -1753,26 +1740,26 @@ class ResourceAssignerTest(unittest.TestCase): self.task_otdb_id) def test_do_assignment_insert_successor_into_task_when_not_linked_to_task(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.successor_task_id, self.task_id) def test_do_assignment_moves_pipeline_of_successor_after_predecessor(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertTrue(self.movePipelineAfterItsPredecessors_mock.called) def test_do_assignment_logs_mom_bug(self): - self.resourceAssigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], - self.mom_bug_specification_tree) + self.resource_assigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) self.logger_mock.info.assert_any_call( 'overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', self.mom_bug_processing_cluster_name, 'CEP4', self.mom_bug_otdb_id) def test_do_assignment_resets_ProcessingCluster_clusterName_on_mom_bug(self): - self.resourceAssigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], - self.mom_bug_specification_tree) + self.resource_assigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) self.otdbrpc_mock.taskSetSpecification.assert_any_call( self.mom_bug_otdb_id, @@ -1787,7 +1774,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = now + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1815,7 +1802,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1852,7 +1839,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = now + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1870,31 +1857,31 @@ class ResourceAssignerTest(unittest.TestCase): }) def test_do_assignment_should_log_insertion_of_specification_and_task(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( - 'do_assignment: insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s' + 'insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s' ' cluster=%s' % (self.mom_id, self.otdb_id, self.state, self.task_type, self.future_start_time, self.future_stop_time, "CEP4")) def test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.info.assert_any_call('do_assignment: inserted specification (id=%s) and task (id=%s)' % + self.logger_mock.info.assert_any_call('inserted specification (id=%s) and task (id=%s)' % (self.specification_id, self.task_id)) def test_do_assignment_inserts_maintenance_resource_claims_in_radb(self): - self.resourceAssigner.do_assignment(self.maintenance_specification_tree['otdb_id'], - self.maintenance_specification_tree) + self.resource_assigner.do_assignment(self.maintenance_specification_tree['otdb_id'], + self.maintenance_specification_tree) subject = 'TaskScheduled' content = "{'mom_id': 351543, 'radb_id': 2299, 'otdb_id': 1290472}" self.logger_mock.info.assert_any_call('Sending notification % s: % s' % (subject, content)) def test_do_assignment_inserts_projectreservation_resource_claims_in_radb(self): - self.resourceAssigner.do_assignment(self.projectreservation_specification_tree['otdb_id'], - self.projectreservation_specification_tree) + self.resource_assigner.do_assignment(self.projectreservation_specification_tree['otdb_id'], + self.projectreservation_specification_tree) subject = 'TaskScheduled' content = "{'mom_id': 351543, 'radb_id': 2299, 'otdb_id': 1290472}" @@ -1903,31 +1890,28 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_should_not_claim_resources_on_CEP2_tasks(self): exception_regex = "skipping resource assignment for task with cluster name" with self.assertRaisesRegexp(Exception, exception_regex): - self.resourceAssigner._do_assignment(self.cep2_specification_tree['otdb_id'], self.cep2_specification_tree) - - def test_do_assignment_should_not_claim_resources_on_non_prescheduled_cep4_tasks(self): - self.resourceAssigner.do_assignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], - self.non_approved_or_prescheduled_specification_tree) - - self.rarpc_mock.insertResourceClaims.assert_not_called() + self.resource_assigner.do_assignment(self.cep2_specification_tree['otdb_id'], + self.cep2_specification_tree) def test_do_assignment_should_request_needed_resources(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rerpc_mock.assert_any_call({"specification_tree": self.specification_tree}, timeout=10) def test_do_assignment_logs_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 11 - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call("no otdb_id %s found in estimator results %s" % - (self.otdb_id + 11, self.rerpc_replymessage)) + self.logger_mock.error.assert_any_call( + "An exception occurred while obtaining resource estimates. Exception=no otdb_id %s found in estimator results %s" % + (self.otdb_id + 11, self.rerpc_replymessage) + ) def test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 1 - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() @@ -1935,24 +1919,25 @@ class ResourceAssignerTest(unittest.TestCase): wrong_task_type = "observation" self.specification_tree["task_type"] = wrong_task_type - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call("no task type %s found in estimator results %s" % - (wrong_task_type, - self.rerpc_replymessage[str(self.otdb_id)])) + self.logger_mock.error.assert_any_call( + "An exception occurred while obtaining resource estimates. Exception=no task type %s found in estimator results %s" % + (wrong_task_type, self.rerpc_replymessage[str(self.otdb_id)]) + ) def test_do_assignment_should_not_claim_resources_when_task_type_not_in_needed_resources(self): wrong_task_type = "observation" self.specification_tree["task_type"] = wrong_task_type - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() def test_do_assignment_should_log_single_errors_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call("Error from Resource Estimator: %s", self.resource_error1) self.logger_mock.error.assert_any_call("Error from Resource Estimator: %s", self.resource_error2) @@ -1960,16 +1945,17 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_should_log_error_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call( - "Error(s) in estimator for otdb_id=%s radb_id=%s" % - (self.resources_with_errors_otdb_id, self.task_id)) + "An exception occurred while obtaining resource estimates. Exception=Error(s) in estimator for otdb_id=%s radb_id=%s" % + (self.resources_with_errors_otdb_id, self.task_id) + ) def test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') @@ -1979,7 +1965,7 @@ class ResourceAssignerTest(unittest.TestCase): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) @@ -1991,149 +1977,48 @@ class ResourceAssignerTest(unittest.TestCase): return found def test_do_assignment_should_log_estimator_reply(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call('do_assignment: Resource Estimator reply = %s', - self.rerpc_replymessage) - - def test_do_assignment_logs_amount_of_claims_inserted_in_radb(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - storage_claim = {'status': 'tentative', 'resource_id': 117, 'resource_type_id': 5, 'claim_size': 2, - 'starttime': datetime.datetime(2016, 3, 25, 21, 47, 31), - 'used_rcus': None, - 'endtime': datetime.datetime(2017, 3, 25, 22, 47, 31), - 'properties': [{'type': 15, 'io_type': 'output', 'sap_nr': 0, 'value': 0}, - {'type': 2, 'io_type': 'output', 'sap_nr': 0, 'value': 1}, - {'type': 10, 'io_type': 'output', 'sap_nr': 0, 'value': 1073741824}]} - claims = [self.bandwidth_claim, storage_claim] - - self.logger_mock.info.assert_any_call('do_assignment: inserting %d claims in the radb: %s', len(claims), claims) - - # TODO: move this to t_schedulers.py - # def test_do_assignment_inserts_resource_claims_in_radb(self): - # self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - # - # self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, self.specification_claims, 1, 'anonymous', -1) - - # TODO: move this to t_schedulers.py - # def test_do_assignment_inserts_resource_claims_with_rcus_no_earlier_claims(self): - # used_rcus = '111100010111100101101010' - # - # self.rarpc_mock.insertRcuSpecifications.return_value = [1] - # self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} - # self.rarpc_mock.getResourceClaims.return_value = [] - # - # rcu_claim = { - # 'resource_id': 212, - # 'resource_type_id': 2, - # 'starttime': self.task_start_time, - # 'endtime': self.task_end_time, - # 'status': 'tentative', - # 'used_rcus': used_rcus, - # 'claim_size': used_rcus.count('1'), - # 'properties': [] - # } - # - # self.specification_tree['otdb_id'] = self.resources_with_rcus_otdb_id - # self.specification_tree['task_type'] = 'observation' - # self.task['type'] = 'observation' - # self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - # - # self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, [rcu_claim], 1, 'anonymous', -1) - - # TODO: move this to t_schedulers.py - # def test_do_assignment_logs_amount_claims_inserted(self): - # self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - # - # self.logger_mock.info.assert_any_call('do_assignment: %d claims were inserted in the radb' % 2) - - # TODO: move this to t_schedulers.py - # def test_do_assignment_logs_when_it_was_unable_to_claim_all_resources(self): - # self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} - # - # self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - # - # self.logger_mock.error.assert_any_call('do_assignment: too few claims were inserted in the radb') - - def test_do_assignment_updates_task_when_it_was_unable_to_claim_all_resources(self): - self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} - - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict') - - def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_all_resources(self): - content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} - subject = 'Task' + 'Conflict' + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} + self.logger_mock.info.assert_any_call('Resource Estimator reply = %s', self.rerpc_replymessage) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_or_all_resources(self): + self.dwell_scheduler_mock().allocate_resources.return_value = False - self.assertBusNotificationAndLogging(content, subject) - - def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_resources(self): - self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} - - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict') - def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_resources(self): + def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_or_all_resources(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Conflict' - self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} + self.dwell_scheduler_mock().allocate_resources.return_value = False - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) - def test_do_assignment_logs_when_there_are_conflicting_claims(self): - conflicting_claims = [{}] - - self.rarpc_mock.getResourceClaims.return_value = conflicting_claims - - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.error.assert_any_call( - 'do_assignment: Task cannot be scheduled, because of %d conflicting claims: %s' % - (len(conflicting_claims), conflicting_claims)) - def test_do_assignment_notifies_bus_when_there_are_conflicting_claims(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Conflict' - conflicting_claims = [{}] - self.rarpc_mock.getResourceClaims.return_value = conflicting_claims + self.dwell_scheduler_mock().allocate_resources.return_value = False - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) - def test_do_assignment_logs_when_all_resources_were_claimed(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call( - 'do_assignment: all resources for task %s were succesfully claimed. Setting claim statuses to claimed' % self.task_id) - - def test_do_assignment_updates_task_and_resources_as_claimed_in_radb(self): - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.updateTaskAndResourceClaims.assert_any_call(self.task_id, claim_status='claimed') - def test_do_assignment_logs_task_data_removal_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call("removing data on disk from previous run for otdb_id %s", self.otdb_id) def test_do_assignment_removes_task_data_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.curpc_mock.removeTaskData.assert_any_call(self.task_otdb_id) @@ -2142,7 +2027,7 @@ class ResourceAssignerTest(unittest.TestCase): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} self.curpc_mock.removeTaskData.return_value = {'deleted': False, 'message': message} - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( "could not remove all data on disk from previous run for otdb_id %s: %s", self.otdb_id, message) @@ -2151,7 +2036,7 @@ class ResourceAssignerTest(unittest.TestCase): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Scheduled' - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) @@ -2160,40 +2045,42 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.info.assert_any_call('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) - @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification(self, datetime_mock): self.freeze_time_one_day_in_the_future(datetime_mock) exception_str = "Error something went wrong" self.otdbrpc_mock.taskSetSpecification.side_effect = Exception(exception_str) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_str): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(exception_str) + self.logger_mock.error.assert_any_call(exception_str) def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification_with_mom_bug(self): exception_str = "Error something went wrong" self.otdbrpc_mock.taskSetSpecification.side_effect = Exception(exception_str) - # with self.assertRaisesRegexp(Exception, exception_str): - self.resourceAssigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], - self.mom_bug_specification_tree) + with self.assertRaisesRegexp(Exception, exception_str): + self.resource_assigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) - self.logger_mock.error.assert_any_call(exception_str) + self.logger_mock.error.assert_any_call(exception_str) def test_do_assignment_logs_exception_from_rerpc(self): - exception = Exception("Error something went wrong") - self.rerpc_mock.side_effect = exception + exception_msg = "Error something went wrong" + self.rerpc_mock.side_effect = Exception(exception_msg) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_msg): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_msg) def test_do_assignment_updates_task_on_exception_from_rerpc(self): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') @@ -2204,44 +2091,48 @@ class ResourceAssignerTest(unittest.TestCase): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def test_do_assignment_logs_when_notifies_bus_thows_exception(self): - exception = Exception("Error something went wrong") - self.ra_notification_bus_mock.send.side_effect = exception + exception_msg = "Error something went wrong" + self.ra_notification_bus_mock.send.side_effect = Exception(exception_msg) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_msg): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_msg) def test_do_assignment_logs_when_momrpc_getPredecessorIds_throws_exception(self): - exception = Exception("Error something went wrong") - self.momrpc_mock.getPredecessorIds.side_effect = exception + exception_msg = "Error something went wrong" + self.momrpc_mock.getPredecessorIds.side_effect = Exception(exception_msg) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_msg): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_msg) def test_do_assignment_logs_when_momrpc_getSuccessorIds_throws_exception(self): - exception = Exception("Error something went wrong") - self.momrpc_mock.getSuccessorIds.side_effect = exception + exception_msg = "Error something went wrong" + self.momrpc_mock.getSuccessorIds.side_effect = Exception(exception_msg) - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_msg): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_msg) - @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') def test_do_assignment_logs_exception_stop_time_parsing_on_predecessor(self, datetime_mock): self.freeze_time_one_day_in_the_future(datetime_mock) self.specification_tree[u'predecessors'][0]['specification'][u'Observation.stopTime'] = 'non parse' exception_str = 'time data \'non parse\' does not match format \'%Y-%m-%d %H:%M:%S\'' - self.resourceAssigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + with self.assertRaisesRegexp(Exception, exception_str): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(exception_str) + self.logger_mock.error.assert_any_call(exception_str) if __name__ == '__main__': unittest.main() -- GitLab