diff --git a/SAS/ResourceAssignment/Common/lib/specification.py b/SAS/ResourceAssignment/Common/lib/specification.py index f674547379642cee263a6270be966a008910278f..c6a988a6ec322dec06b1fbeb944f0e28e8425895 100644 --- a/SAS/ResourceAssignment/Common/lib/specification.py +++ b/SAS/ResourceAssignment/Common/lib/specification.py @@ -983,7 +983,7 @@ class Specification: self.status = task["status"] self.type = task["type"] self.duration = timedelta(seconds = task["duration"]) - self.cluster = task["cluster"] + self.cluster = task.get("cluster", "CEP4") #we don't seem to need specification_id? logger.info("Read task from RADB: %s", task) return task @@ -1071,7 +1071,7 @@ class Specification: specification_id = result['specification_id'] # We never seem to need this again task_id = result['task_id'] - logger.info('inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) + logger.info('inserted/updated specification (id=%s) and task (id=%s)' % (specification_id, task_id)) return task_id def _link_predecessors_to_task_in_radb(self): @@ -1092,24 +1092,30 @@ class Specification: # check if the predecessor needs to be linked to this task predecessor_task = self.radb.getTask(mom_id=predecessor_mom_id) if predecessor_task: + # do Specification-class bookkeeping (stupid, because all info is in the radb already) predecessor_keys = [p.radb_id for p in self.predecessors] if predecessor_task['id'] not in predecessor_keys: logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s ' 'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], self.mom_id, self.otdb_id) - spec = Specification(self.otdbrpc, self.momquery, self.radb) - spec.read_from_radb(predecessor_task['id']) - self.predecessors.append(spec) # TODO this needs a try/except somewhere? - try: - self.radb.insertTaskPredecessor(self.radb_id, spec.radb_id) - except PostgresDBQueryExecutionError as e: - # task was already connected to predecessor. Log and continue. - if 'task_predecessor_unique' in str(e): - logger.info('predecessor task with mom_id=%s otdb_id=%s was already connected to its successor with mom_id=%s otdb_id=%s', - predecessor_task['mom_id'], predecessor_task['otdb_id'], - self.mom_id, self.otdb_id) - else: - raise + pred_spec = Specification(self.otdbrpc, self.momquery, self.radb) + pred_spec.read_from_radb(predecessor_task['id']) + self.predecessors.append(pred_spec) # TODO this needs a try/except somewhere? + + + # do radb task-predecessor bookkeeping if needed + try: + task = self.radb.getTask(self.radb_id) + if predecessor_task['id'] not in task['predecessor_ids']: + self.radb.insertTaskPredecessor(self.radb_id, predecessor_task['id']) + except PostgresDBQueryExecutionError as e: + # task was already connected to predecessor. Log and continue. + if 'task_predecessor_unique' in str(e): + logger.info('predecessor task with mom_id=%s otdb_id=%s was already connected to its successor with mom_id=%s otdb_id=%s', + predecessor_task['mom_id'], predecessor_task['otdb_id'], + self.mom_id, self.otdb_id) + else: + raise else: # Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond # approved, which is in principle valid. The link in the radb will be made later via processSuccessors() @@ -1142,16 +1148,20 @@ class Specification: ' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], self.mom_id, self.otdb_id ) self.successor_ids.append(successor_task['id']) - try: + + # do radb task-successor bookkeeping if needed + try: + task = self.radb.getTask(self.radb_id) + if successor_task['id'] not in task['successor_ids']: self.radb.insertTaskPredecessor(successor_task['id'], self.radb_id) - except PostgresDBQueryExecutionError as e: - # task was already connected to predecessor. Log and continue. - if 'task_predecessor_unique' in str(e): - logger.info('successor task with mom_id=%s otdb_id=%s was already connected to its predecessor with mom_id=%s otdb_id=%s', - successor_task['mom_id'], successor_task['otdb_id'], - self.mom_id, self.otdb_id) - else: - raise + except PostgresDBQueryExecutionError as e: + # task was already connected to predecessor. Log and continue. + if 'task_predecessor_unique' in str(e): + logger.info('successor task with mom_id=%s otdb_id=%s was already connected to its predecessor with mom_id=%s otdb_id=%s', + successor_task['mom_id'], successor_task['otdb_id'], + self.mom_id, self.otdb_id) + else: + raise movePipelineAfterItsPredecessors(successor_task, self.radb) else: diff --git a/SAS/ResourceAssignment/Common/test/test_specification.py b/SAS/ResourceAssignment/Common/test/test_specification.py index d78da20042ee9f83122582f36204556f1959321b..d0da3cd82ce02e60a4b3c8c8ab9f75053aeaac2b 100755 --- a/SAS/ResourceAssignment/Common/test/test_specification.py +++ b/SAS/ResourceAssignment/Common/test/test_specification.py @@ -1373,5 +1373,44 @@ class ReadFromRadb(unittest.TestCase): self.assertEqual(self.specification.duration, timedelta(seconds = task["duration"])) self.assertEqual(self.specification.cluster, task["cluster"]) + def test_insert_into_radb_and_check_predecessors(self): + # Arrange + def mock_getTask(id=None, mom_id=None, otdb_id=None, specification_id=None): + if id is None and mom_id is not None: + id = mom_id + return {"id": id, "mom_id": id, "otdb_id": id, "status": "approved", "type": "observation", "duration": 100, "predecessor_ids": []} + + self.radbrpc_mock.getTask.side_effect = mock_getTask + self.radbrpc_mock.insertOrUpdateSpecificationAndTask.return_value = {'specification_id': 1, 'task_id': 1} + self.momrpc_mock.getPredecessorIds.return_value = {'1': [42]} + self.specification.read_from_radb(1) + + # Act + self.specification.insert_into_radb() + + # Assert + self.radbrpc_mock.insertTaskPredecessor.assert_called_with(1, 42) + + + # Arrange + # now adapt the mock_getTask, and let it return the inserted predecessor_ids as well + def mock_getTask(id=None, mom_id=None, otdb_id=None, specification_id=None): + if id is None and mom_id is not None: + id = mom_id + task = {"id": id, "mom_id": id, "otdb_id": id, "status": "approved", "type": "observation", "duration": 100, "predecessor_ids": []} + if id == 1: + task['predecessor_ids'] = [42] + return task + + self.radbrpc_mock.getTask.side_effect = mock_getTask + self.radbrpc_mock.insertTaskPredecessor.reset_mock() + + # Act + self.specification.insert_into_radb() + + # Assert + self.radbrpc_mock.insertTaskPredecessor.assert_not_called() + + if __name__ == "__main__": unittest.main()