diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index cb5f64eb58a88c3aec019362c89da38068de4c4e..ca0ea6534c576cb5707962510e3d777eea7e72ae 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -98,6 +98,8 @@ class ResourceAssigner(): self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) self.ra_notification_prefix = ra_notification_prefix + self.task = None + def __enter__(self): """Internal use only. (handles scope 'with')""" self.open() @@ -152,7 +154,7 @@ class ResourceAssigner(): return {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()} - + @property @cache def resource_claim_property_types(self): @@ -165,159 +167,60 @@ class ResourceAssigner(): dtypes = {'uv', 'cs', 'is', 'im', 'img', 'pulp'} return {self.resource_claim_property_types['nr_of_' + dt + '_files'] for dt in dtypes} - def doAssignment(self, specification_tree): - logger.info('doAssignment: specification_tree=%s' % (specification_tree)) + def doAssignment(self, otdb_id, specification_tree): + """ Attempts to assign the specified resources - otdb_id = specification_tree['otdb_id'] - - assignable_task_states = ['approved', 'prescheduled'] - status = specification_tree.get('state', '').lower() - if status not in assignable_task_states: - assignable_task_states_str = ', '.join(assignable_task_states) - logger.warn('skipping specification for task otdb_id=%s, because status=%s (not one of %s)', otdb_id, - status, assignable_task_states_str) - return - - try: - mainParset = parameterset(specification_tree['specification']) - except Exception as e: - logger.error(str(e)) - return - - taskType = specification_tree['task_type'] # is required item - if 'task_subtype' in specification_tree: # is optional item - taskSubtype = specification_tree['task_subtype'] - else: - taskSubtype = '' - - if self.taskTypeProducesOutput( taskType , taskSubtype): - - # Only assign resources for task output to known clusters - try: - clusterNameSet = self.getClusterNames(mainParset) - if str() in clusterNameSet or len(clusterNameSet) != 1: - # Empty set or name is always an error. - # TODO: To support >1 cluster per obs, - # self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name - # Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster - logger.error('clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet) - return - clusterName = clusterNameSet.pop() - - # Retrieve known cluster names (not all may be a valid storage target, but we cannot know...) - knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')} - logger.info('known clusters: %s', knownClusterSet) - if clusterName not in knownClusterSet: - logger.error('skipping resource assignment for task with cluster name \'%s\' not in known clusters %s' % (clusterName, knownClusterSet)) - return - except Exception as e: - logger.error(str(e)) - return - - try: - # fix for MoM bug introduced before NV's holiday - # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4 - # so, override it here if needed, and update to otdb - processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '') - if processingClusterName != clusterName: - logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', - processingClusterName, clusterName, otdb_id) - self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName }) - except Exception as e: - logger.error(str(e)) - return - - else: - # task has no output - clusterName = '' - - def applySaneStartEndTime(origStartTime, origEndTime): - startTime = datetime.utcnow() + timedelta(minutes=3) - - maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree) - if maxPredecessorEndTime and maxPredecessorEndTime > startTime: - startTime = maxPredecessorEndTime + timedelta(minutes=3) - - taskDuration = timedelta(seconds=totalSeconds(origEndTime - origStartTime)) if origEndTime > origStartTime else timedelta(hours=1) - - endTime = startTime + taskDuration - - logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', - startTime, endTime, otdb_id) - - logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id) - self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'), - 'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')}) - return startTime, endTime + :param otdb_id: OTDB ID of the main task which resources need to be assigned + :param specification_tree: the specification tree containing the main task and its resources + """ + logger.info('doAssignment: specification_tree=%s' % specification_tree) - # TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. Fix the cause! Idem for MoM fix up below. try: - startTime = parseDatetime(mainParset.getString('Observation.startTime')) - endTime = parseDatetime(mainParset.getString('Observation.stopTime')) + self._do_assignment(otdb_id, specification_tree) - if startTime < datetime.utcnow(): - startTime, endTime = applySaneStartEndTime(startTime, endTime) + if self.task is not None: + # send notification that the task was scheduled + self._announceStateChange(self.task, 'scheduled') except Exception as e: logger.error(str(e)) - return + if self.task is not None: + self._announceStateChange(self.task, 'error') + + def _do_assignment(self, otdb_id, specification_tree): + """ Performs the actual attempt to assign resources of the desired task """ - logger.info('doAssignment: Accepted specification') + self.task = None + if self._get_is_assignable(specification_tree): + main_parset = self._get_main_parset(specification_tree) + task_type, task_subtype = self._get_task_type(specification_tree) + cluster_name = self._get_clustername(otdb_id, main_parset, task_type, task_subtype) + start_time, end_time = self._get_main_task_start_and_end_times(specification_tree) - # insert new task and specification in the radb - # any existing specification and task with same otdb_id will be deleted automatically - momId = mainParset.getInt('Observation.momID', -1) + logger.info('doAssignment: Accepted specification') - logger.info('doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s cluster=%s' % - (momId, otdb_id, status, taskType, startTime, endTime, clusterName)) - try: - result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType, - startTime, endTime, str(mainParset), clusterName) - except Exception as e: - logger.error(str(e)) - result = {'inserted': False} # handled next - if not result['inserted']: - logger.error('could not insert specification and task: result = %s', result) - return + task_id, self.task = self._insert_main_task(specification_tree, start_time, end_time, cluster_name) - specificationId = result['specification_id'] - taskId = result['task_id'] - logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId, taskId)) + logger.info('doAssignment: task=%s', self.task) - task = None - try: - task = self.radbrpc.getTask(taskId) - except Exception as e: - logger.error(str(e)) - return - logger.info('doAssignment: task=%s', task) + self.processPredecessors(self.task) + self.processSuccessors(self.task) - try: - self.processPredecessors(task) - self.processSuccessors(task) - except Exception as e: - logger.error(str(e)) - self._announceStateChange(task, 'error') - return + status = specification_tree.get('state', '').lower() + if status == 'approved': + # Do this check after insertion of specification, task and predecessor/successor relations, + # so approved tasks appear correctly in the web scheduler. + logger.info('skipping resource assignment for task otdb_id=%s, because status=%s' % (otdb_id, status)) + else: + estimates = self.getResourceEstimates(specification_tree, otdb_id, task_type, task_id) - if status != 'prescheduled': # should only happen for approved - # Do this check after insertion of specif, task and predecessor/successor relations, - # so approved tasks appear correctly in the web scheduler. - logger.info('skipping resource assignment for task otdb_id=%s, because status=%s' % (otdb_id, status)) - return + db_resource_list = self.radbrpc.getResources(include_availability=True) + db_resource_max_fill_ratios = self.radbrpc.getResourceAllocationConfig(sql_like_name_pattern='max_fill_ratio_%') - try: - estimates = self.getResourceEstimates(specification_tree, otdb_id, taskType, taskId) - - db_resource_list = self.radbrpc.getResources(include_availability=True) - db_resource_max_fill_ratios = self.radbrpc.getResourceAllocationConfig(sql_like_name_pattern='max_fill_ratio_%') - except Exception as e: - logger.error(str(e)) - self._announceStateChange(task, 'error') + self.applyMaxFillRatios(db_resource_list, db_resource_max_fill_ratios) + else: return - self.applyMaxFillRatios(db_resource_list, db_resource_max_fill_ratios) - # Assume estimates are (close enough to) accurate to determine resource claims for this task. # Try to get a set of non-conflicting claims from availability info in the RA DB. # If that fails, insert no claims and set the task straight to conflict. @@ -326,50 +229,206 @@ class ResourceAssigner(): # If any claim is in conflict state, then the task is put to conflict status as well. all_fit, claims, _ = self.findClaims(estimates, db_resource_list) if not all_fit: - self._announceStateChange(task, 'error') + self._announceStateChange(self.task, 'error') return if claims: # Complete the claims by annotating them with task information - self.tieClaimsToTask(claims, task) + self.tieClaimsToTask(claims, self.task) logger.info('doAssignment: inserting %d claims in the radb: %s', len(claims), claims) try: - claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids'] + claim_ids = self.radbrpc.insertResourceClaims(self.task['id'], claims, 1, 'anonymous', -1)['ids'] logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids)) if len(claim_ids) != len(claims): logger.error('doAssignment: too few claims were inserted in the radb') - self._announceStateChange(task, 'conflict') + self._announceStateChange(self.task, 'conflict') return - conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict') + conflictingClaims = self.radbrpc.getResourceClaims(task_ids=task_id, status='conflict') if conflictingClaims: - # Will do _announceStateChange(task, 'conflict'), and radb sets task status to conflict automatically + # Will do _announceStateChange(self.task, 'conflict'), and radb sets task status to conflict automatically logger.error('doAssignment: Task cannot be scheduled, because of %d conflicting claims: %s' % (len(conflictingClaims), conflictingClaims)) - self._announceStateChange(task, 'conflict') + self._announceStateChange(self.task, 'conflict') return - logger.info('doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to claimed' % (taskId,)) - self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='claimed') + logger.info('doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to claimed' % (task_id,)) + self.radbrpc.updateTaskAndResourceClaims(task_id, claim_status='claimed') except Exception as e: logger.error(str(e)) - self._announceStateChange(task, 'conflict') + self._announceStateChange(self.task, 'conflict') return # remove any output and/or intermediate data for restarting pipelines - if task['type'] == 'pipeline': + if self.task['type'] == 'pipeline': try: - du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True) + du_result = self.sqrpc.getDiskUsageForOTDBId(self.task['otdb_id'], include_scratch_paths=True, force_update=True) if du_result['found'] and du_result.get('disk_usage', 0) > 0: logger.info("removing data on disk from previous run for otdb_id %s", otdb_id) - result = self.curpc.removeTaskData(task['otdb_id']) + result = self.curpc.removeTaskData(self.task['otdb_id']) if not result['deleted']: logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message']) except Exception as e: logger.error(str(e)) # in line with failure as warning just above: allow going to scheduled state here too - # send notification that the task was scheduled - self._announceStateChange(task, 'scheduled') + def _get_is_assignable(self, specification_tree): + """ Verifies if a task can actually be assigned + + :param specification_tree: the specification tree of the task + + :returns True if the task can be assigned or False if not + """ + + is_assignable = True + otdb_id = specification_tree['otdb_id'] + + assignable_task_states = ['approved', 'prescheduled'] + status = specification_tree.get('state', '').lower() + if not status in assignable_task_states: + is_assignable = False + + assignable_task_states_str = ', '.join(assignable_task_states) + logger.warn('skipping specification for task otdb_id=%s, because status=%s (not one of %s)', otdb_id, + status, assignable_task_states_str) + + return is_assignable + + def _get_main_parset(self, specification_tree): + """ Extracts the main parset from a specification tree + + :param specification_tree the specification tree of the task + + :returns the main parset + """ + + return parameterset(specification_tree['specification']) + + def _get_task_type(self, specification_tree): + taskType = specification_tree['task_type'] # is required item + if 'task_subtype' in specification_tree: # is optional item + taskSubtype = specification_tree['task_subtype'] + else: + taskSubtype = '' + + return taskType, taskSubtype + + def _get_clustername(self, otdb_id, mainParset, taskType, taskSubtype): + """ Determines the name of the cluster to which to store the task's output - if it produces output at all. + + :param otdb_id: the ORDB ID of the (main) task + :param mainParset: the parset of the main task + :param taskType: the task's type + :param taskSubtype: the task's subtype + + :returns The name of the output cluster, or an empty string if none is applicable + """ + + clusterName = '' + if self.taskTypeProducesOutput(taskType, taskSubtype): + # Only assign resources for task output to known clusters + clusterNameSet = self.getClusterNames(mainParset) + + if str() in clusterNameSet or len(clusterNameSet) != 1: + # Empty set or name is always an error. + # TODO: To support >1 cluster per obs, + # self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name + # Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster + logger.error( + 'clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet) + else: + clusterName = clusterNameSet.pop() + + # Retrieve known cluster names (not all may be a valid storage target, but we cannot know...) + knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')} + logger.info('known clusters: %s', knownClusterSet) + if clusterName not in knownClusterSet: + raise Exception("skipping resource assignment for task with cluster name '" + clusterName + + "' not in known clusters " + str(knownClusterSet)) + else: + # fix for MoM bug introduced before NV's holiday + # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4 + # so, override it here if needed, and update to otdb + processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '') + if processingClusterName != clusterName: + logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', + processingClusterName, clusterName, otdb_id) + self.otdbrpc.taskSetSpecification(otdb_id, { + 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName}) + + return clusterName + + def _get_main_task_start_and_end_times(self, specification_tree): + """ Get the start time and end time of the main task adapted such that (a) there's a period of 3 minutes between + tasks and (b) the start time and end time are actually in the future. + + :param specification_tree: specification tree for the main task + :returns 2-tuple (start_time, end_time) + """ + + def apply_sane_start_and_end_time(orig_start_time, orig_end_time, otdb_id): + start_time = datetime.utcnow() + timedelta(minutes=3) + + max_predecessor_end_time = self.getMaxPredecessorEndTime(specification_tree) + if max_predecessor_end_time and max_predecessor_end_time > start_time: + start_time = max_predecessor_end_time + timedelta(minutes=3) + + if orig_end_time > orig_start_time: + task_duration = timedelta(seconds=totalSeconds(orig_end_time - orig_start_time)) + else: + timedelta(hours=1) + + end_time = start_time + task_duration + + logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', + start_time, end_time, otdb_id) + + logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', start_time, + end_time, otdb_id) + self.otdbrpc.taskSetSpecification(otdb_id, + {'LOFAR.ObsSW.Observation.startTime': start_time.strftime('%Y-%m-%d %H:%M:%S'), + 'LOFAR.ObsSW.Observation.stopTime': end_time.strftime('%Y-%m-%d %H:%M:%S')}) + return start_time, end_time + + # TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. + # TODO: Fix the cause! Idem for MoM fix up below. + + main_parset = self._get_main_parset(specification_tree) + start_time = parseDatetime(main_parset.getString('Observation.startTime')) + end_time = parseDatetime(main_parset.getString('Observation.stopTime')) + + if start_time < datetime.utcnow(): + otdb_id = specification_tree['otdb_id'] + start_time, end_time = apply_sane_start_and_end_time(start_time, end_time, otdb_id) + + return start_time, end_time + + def _insert_main_task(self, specification_tree, start_time, end_time, cluster_name): + """ Inserts the main task and its specification into the RADB. Any existing specification and task with same + otdb_id will be deleted automatically. + + :param specification_tree: specification tree for the main task + :return: 2-tuple (task_id, task) + """ + + task_type, _ = self._get_task_type(specification_tree) + main_parset = self._get_main_parset(specification_tree) + mom_id = main_parset.getInt('Observation.momID', -1) + status = specification_tree.get('state', '').lower() + otdb_id = specification_tree['otdb_id'] + logger.info( + 'doAssignment: insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s cluster=%s' % + (mom_id, otdb_id, status, task_type, start_time, end_time, cluster_name)) + + result = self.radbrpc.insertSpecificationAndTask(mom_id, otdb_id, status, task_type, start_time, end_time, + str(main_parset), cluster_name) + + specification_id = result['specification_id'] + task_id = result['task_id'] + logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) + + task = self.radbrpc.getTask(task_id) + + return task_id, task def getResourceEstimates(self, specification_tree, otdb_id, taskType, taskId): """ Request and return checked estimates of needed resources from Resource Estimator. """ @@ -416,7 +475,7 @@ class ResourceAssigner(): def _announceStateChange(self, task, status): if status == 'scheduled' or status == 'conflict' or status == 'error': - content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']} + content={'radb_id': task['id'], 'otdb_id': task['otdb_id'], 'mom_id': task['mom_id']} subject= 'Task' + status[0].upper() + status[1:] else: # should not end up here (bug) logger.error('_announceStateChange(): bug: Not sending notification as status is %s' % status) diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 86cc56a273749ab34a0cabdb8f879d73f89ad10d..cd24c5657029b22436310c655659925dfa93f170 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -1614,12 +1614,13 @@ class ResourceAssignerTest(unittest.TestCase): self.assert_all_services_closed() def test_do_assignment_logs_specification(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('doAssignment: specification_tree=%s' % self.specification_tree) def test_do_assignment_log_non_approved_or_prescheduled_states(self): - self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) + self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], + self.non_approved_or_prescheduled_specification_tree) assignable_task_states_str = "approved, prescheduled" self.logger_mock.warn.assert_any_call( @@ -1628,7 +1629,8 @@ class ResourceAssignerTest(unittest.TestCase): assignable_task_states_str) def test_do_assignment_non_approved_or_prescheduled_states_should_be_skipped(self): - self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) + self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], + self.non_approved_or_prescheduled_specification_tree) self.assertEqual(len(self.otdbrpc_mock.method_calls), 0, "OTDBRPC was called for non approved or scheduled specification tree") @@ -1644,7 +1646,7 @@ class ResourceAssignerTest(unittest.TestCase): "RA notification bus was called for non approved or scheduled specification tree") def test_do_assignment_inserts_specification_and_task_in_radb(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) start_time = datetime.datetime.strptime(self.future_start_time, '%Y-%m-%d %H:%M:%S') stop_time = datetime.datetime.strptime(self.future_stop_time, '%Y-%m-%d %H:%M:%S') @@ -1654,25 +1656,26 @@ class ResourceAssignerTest(unittest.TestCase): self.task_type, start_time, stop_time, str(parset), "CEP4") - def test_do_assignment_logs_when_insertion_of_specification_and_task_in_radb_failed(self): - return_value = {'inserted': False} - - self.rarpc_mock.insertSpecificationAndTask.return_value = return_value - - self.resourceAssigner.doAssignment(self.specification_tree) - - self.logger_mock.error.assert_any_call('could not insert specification and task: result = %s', return_value) + # TODO: logging of failures is now done in raservice. How to go about this here? + # def test_do_assignment_logs_when_insertion_of_specification_and_task_in_radb_failed(self): + # return_value = {'inserted': False} + # + # self.rarpc_mock.insertSpecificationAndTask.return_value = return_value + # + # self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + # + # self.logger_mock.error.assert_any_call('could not insert specification and task: result = %s', return_value) def test_do_assignment_logs_when_no_predecessors_found(self): self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): []} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('no predecessors for otdb_id=%s mom_id=%s', self.task_otdb_id, self.task_mom_id) def test_do_assignment_logs_when_predecessors_are_found(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', self.predecessor_task_mom_ids, self.task_mom_id, self.task_otdb_id) @@ -1680,14 +1683,14 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self): self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_predecessors_are_found_that_need_to_be_linked_to_task(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( 'connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', @@ -1697,20 +1700,20 @@ class ResourceAssignerTest(unittest.TestCase): self.task_otdb_id) def test_do_assignment_insert_predecessor_into_task_when_not_linked_to_task(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.task_id, self.predecessor_task_id) def test_do_assignment_logs_when_no_successors_found(self): self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): []} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('no successors for otdb_id=%s mom_id=%s', self.task_otdb_id, self.task_mom_id) def test_do_assignment_logs_when_successors_are_found(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', self.successor_task_mom_ids, self.task_mom_id, self.task_otdb_id) @@ -1718,14 +1721,14 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_logs_when_successors_are_found_but_its_task_is_missing_in_radb(self): self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'could not find successor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_successors_are_found_that_need_to_be_linked_to_task(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( 'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', @@ -1735,24 +1738,26 @@ class ResourceAssignerTest(unittest.TestCase): self.task_otdb_id) def test_do_assignment_insert_successor_into_task_when_not_linked_to_task(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.successor_task_id, self.task_id) def test_do_assignment_moves_pipeline_of_successor_after_predecessor(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertTrue(self.movePipelineAfterItsPredecessors_mock.called) def test_do_assignment_logs_mom_bug(self): - self.resourceAssigner.doAssignment(self.mom_bug_specification_tree) + self.resourceAssigner.doAssignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) self.logger_mock.info.assert_any_call( 'overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', self.mom_bug_processing_cluster_name, 'CEP4', self.mom_bug_otdb_id) def test_do_assignment_resets_ProcessingCluster_clusterName_on_mom_bug(self): - self.resourceAssigner.doAssignment(self.mom_bug_specification_tree) + self.resourceAssigner.doAssignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) self.otdbrpc_mock.taskSetSpecification.assert_any_call( self.mom_bug_otdb_id, @@ -1767,7 +1772,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = now + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1795,7 +1800,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1832,7 +1837,7 @@ class ResourceAssignerTest(unittest.TestCase): new_starttime = now + datetime.timedelta(minutes=3) new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -1850,40 +1855,40 @@ class ResourceAssignerTest(unittest.TestCase): }) def test_do_assignment_should_log_insertion_of_specification_and_task(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( - 'doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s' + 'doAssignment: insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s' ' cluster=%s' % (self.mom_id, self.otdb_id, self.state, self.task_type, self.future_start_time, self.future_stop_time, "CEP4")) def test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('doAssignment: inserted specification (id=%s) and task (id=%s)' % (self.specification_id, self.task_id)) def test_do_assignment_should_not_claim_resources_on_CEP2_tasks(self): - self.resourceAssigner.doAssignment(self.cep2_specification_tree) + exception_regex = "skipping resource assignment for task with cluster name" + with self.assertRaisesRegexp(Exception, exception_regex): + self.resourceAssigner._do_assignment(self.cep2_specification_tree['otdb_id'], self.cep2_specification_tree) - self.rarpc_mock.insertResourceClaims.assert_not_called() - - def test_do_assignment_should_not_claim_resources_on_non_presceduled_cep4_tasks(self): - self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) + def test_do_assignment_should_not_claim_resources_on_non_prescheduled_cep4_tasks(self): + self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], + self.non_approved_or_prescheduled_specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() - def test_do_assginement_should_request_needed_resources(self): - self.resourceAssigner.doAssignment(self.specification_tree) + def test_do_assignment_should_request_needed_resources(self): + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - self.rerpc_mock.assert_any_call( - {"specification_tree": self.specification_tree}, timeout=10) + self.rerpc_mock.assert_any_call({"specification_tree": self.specification_tree}, timeout=10) def test_do_assignment_logs_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 11 - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call("no otdb_id %s found in estimator results %s" % (self.otdb_id + 11, self.rerpc_replymessage)) @@ -1891,7 +1896,7 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 1 - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() @@ -1899,7 +1904,7 @@ class ResourceAssignerTest(unittest.TestCase): wrong_task_type = "observation" self.specification_tree["task_type"] = wrong_task_type - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call("no task type %s found in estimator results %s" % (wrong_task_type, @@ -1909,14 +1914,14 @@ class ResourceAssignerTest(unittest.TestCase): wrong_task_type = "observation" self.specification_tree["task_type"] = wrong_task_type - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() def test_do_assignment_should_log_single_errors_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call("Error from Resource Estimator: %s", self.resource_error1) self.logger_mock.error.assert_any_call("Error from Resource Estimator: %s", self.resource_error2) @@ -1924,7 +1929,7 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_should_log_error_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call( "Error(s) in estimator for otdb_id=%s radb_id=%s" % @@ -1933,7 +1938,7 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') @@ -1943,7 +1948,7 @@ class ResourceAssignerTest(unittest.TestCase): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) @@ -1955,20 +1960,20 @@ class ResourceAssignerTest(unittest.TestCase): return found def test_do_assignment_should_log_estimator_reply(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('doAssignment: Resource Estimator reply = %s', self.rerpc_replymessage) def test_do_assignment_logs_created_claim_per_needed_resource_type(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) claim = [self.bandwidth_claim, self.storage_claim] self.logger_mock.debug.assert_any_call('fitMultipleResources: created claim: %s', claim) def test_do_assignment_logs_amount_of_claims_inserted_in_radb(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) storage_claim = {'status': 'tentative', 'resource_id': 117, 'resource_type_id': 5, 'claim_size': 2, 'starttime': datetime.datetime(2016, 3, 25, 21, 47, 31), @@ -1982,7 +1987,7 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.info.assert_any_call('doAssignment: inserting %d claims in the radb: %s', len(claims), claims) def test_do_assignment_inserts_resource_claims_in_radb(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, self.specification_claims, 1, 'anonymous', -1) @@ -2007,26 +2012,26 @@ class ResourceAssignerTest(unittest.TestCase): self.specification_tree['otdb_id'] = self.resources_with_rcus_otdb_id self.specification_tree['task_type'] = 'observation' self.task['type'] = 'observation' - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, [rcu_claim], 1, 'anonymous', -1) def test_do_assignment_logs_amount_claims_inserted(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call('doAssignment: %d claims were inserted in the radb' % 2) def test_do_assignment_logs_when_it_was_unable_to_claim_all_resources(self): self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call('doAssignment: too few claims were inserted in the radb') def test_do_assignment_updates_task_when_it_was_unable_to_claim_all_resources(self): self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict') @@ -2036,14 +2041,14 @@ class ResourceAssignerTest(unittest.TestCase): self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_resources(self): self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict') @@ -2053,7 +2058,7 @@ class ResourceAssignerTest(unittest.TestCase): self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) @@ -2062,7 +2067,7 @@ class ResourceAssignerTest(unittest.TestCase): self.rarpc_mock.getResourceClaims.return_value = conflicting_claims - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call( 'doAssignment: Task cannot be scheduled, because of %d conflicting claims: %s' % @@ -2075,33 +2080,33 @@ class ResourceAssignerTest(unittest.TestCase): conflicting_claims = [{}] self.rarpc_mock.getResourceClaims.return_value = conflicting_claims - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def test_do_assignment_logs_when_all_resources_were_claimed(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call( 'doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to claimed' % self.task_id) def test_do_assignment_updates_task_and_resources_as_claimed_in_radb(self): - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTaskAndResourceClaims.assert_any_call(self.task_id, claim_status='claimed') def test_do_assignment_logs_task_data_removal_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.info.assert_any_call("removing data on disk from previous run for otdb_id %s", self.otdb_id) def test_do_assignment_removes_task_data_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.curpc_mock.removeTaskData.assert_any_call(self.task_otdb_id) @@ -2110,7 +2115,7 @@ class ResourceAssignerTest(unittest.TestCase): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} self.curpc_mock.removeTaskData.return_value = {'deleted': False, 'message': message} - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.warning.assert_any_call( "could not remove all data on disk from previous run for otdb_id %s: %s", self.otdb_id, message) @@ -2119,7 +2124,7 @@ class ResourceAssignerTest(unittest.TestCase): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Scheduled' - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) @@ -2132,26 +2137,28 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification(self, datetime_mock): self.freeze_time_one_day_in_the_future(datetime_mock) - exception = Exception("Error something went wrong") - self.otdbrpc_mock.taskSetSpecification.side_effect = exception + exception_str = "Error something went wrong" + self.otdbrpc_mock.taskSetSpecification.side_effect = Exception(exception_str) - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_str) def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification_with_mom_bug(self): - exception = Exception("Error something went wrong") - self.otdbrpc_mock.taskSetSpecification.side_effect = exception + exception_str = "Error something went wrong" + self.otdbrpc_mock.taskSetSpecification.side_effect = Exception(exception_str) - self.resourceAssigner.doAssignment(self.mom_bug_specification_tree) + # with self.assertRaisesRegexp(Exception, exception_str): + self.resourceAssigner.doAssignment(self.mom_bug_specification_tree['otdb_id'], + self.mom_bug_specification_tree) - self.logger_mock.error.assert_any_call(str(exception)) + self.logger_mock.error.assert_any_call(exception_str) def test_do_assignment_logs_exception_from_rerpc(self): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call(str(exception)) @@ -2159,7 +2166,7 @@ class ResourceAssignerTest(unittest.TestCase): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') @@ -2170,7 +2177,7 @@ class ResourceAssignerTest(unittest.TestCase): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) @@ -2178,7 +2185,7 @@ class ResourceAssignerTest(unittest.TestCase): exception = Exception("Error something went wrong") self.ra_notification_bus_mock.send.side_effect = exception - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call(str(exception)) @@ -2186,7 +2193,7 @@ class ResourceAssignerTest(unittest.TestCase): exception = Exception("Error something went wrong") self.momrpc_mock.getPredecessorIds.side_effect = exception - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call(str(exception)) @@ -2194,7 +2201,7 @@ class ResourceAssignerTest(unittest.TestCase): exception = Exception("Error something went wrong") self.momrpc_mock.getSuccessorIds.side_effect = exception - self.resourceAssigner.doAssignment(self.specification_tree) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call(str(exception)) @@ -2203,19 +2210,22 @@ class ResourceAssignerTest(unittest.TestCase): self.freeze_time_one_day_in_the_future(datetime_mock) self.specification_tree[u'predecessors'][0]['specification'][u'Observation.stopTime'] = 'non parse' - exception = ValueError('time data \'non parse\' does not match format \'%Y-%m-%d %H:%M:%S\'') - self.resourceAssigner.doAssignment(self.specification_tree) + exception_str = 'time data \'non parse\' does not match format \'%Y-%m-%d %H:%M:%S\'' - self.logger_mock.error.assert_any_call(str(exception)) + self.resourceAssigner.doAssignment(self.specification_tree['otdb_id'], self.specification_tree) + + self.logger_mock.error.assert_any_call(exception_str) def test_do_assignment_inserts_maintenance_resource_claims_in_radb(self): - self.resourceAssigner.doAssignment(self.maintenance_specification_tree) + self.resourceAssigner.doAssignment(self.maintenance_specification_tree['otdb_id'], + self.maintenance_specification_tree) self.logger_mock.info.assert_any_call('findClaims: needed_resources_by_type_id: %s', {2: '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111'}) self.logger_mock.info.assert_any_call('doAssignment: %d claims were inserted in the radb' % 2) def test_do_assignment_inserts_projectreservation_resource_claims_in_radb(self): - self.resourceAssigner.doAssignment(self.projectreservation_specification_tree) + self.resourceAssigner.doAssignment(self.projectreservation_specification_tree['otdb_id'], + self.projectreservation_specification_tree) self.logger_mock.info.assert_any_call('findClaims: needed_resources_by_type_id: %s', {2: '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111'}) self.logger_mock.info.assert_any_call('doAssignment: %d claims were inserted in the radb' % 2)