Skip to content
Snippets Groups Projects
Commit 7dd73802 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-826: only insert predecessor/successor links into the radb when needed,...

SW-826: only insert predecessor/successor links into the radb when needed, thus preventing unneeded error log lines about duplicate keys
parent 1b447717
No related branches found
Tags LOFAR-Release-4_4_59
2 merge requests!70Lofar release 4 0,!69Resolve SW-826
...@@ -983,7 +983,7 @@ class Specification: ...@@ -983,7 +983,7 @@ class Specification:
self.status = task["status"] self.status = task["status"]
self.type = task["type"] self.type = task["type"]
self.duration = timedelta(seconds = task["duration"]) self.duration = timedelta(seconds = task["duration"])
self.cluster = task["cluster"] self.cluster = task.get("cluster", "CEP4")
#we don't seem to need specification_id? #we don't seem to need specification_id?
logger.info("Read task from RADB: %s", task) logger.info("Read task from RADB: %s", task)
return task return task
...@@ -1071,7 +1071,7 @@ class Specification: ...@@ -1071,7 +1071,7 @@ class Specification:
specification_id = result['specification_id'] # We never seem to need this again specification_id = result['specification_id'] # We never seem to need this again
task_id = result['task_id'] task_id = result['task_id']
logger.info('inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) logger.info('inserted/updated specification (id=%s) and task (id=%s)' % (specification_id, task_id))
return task_id return task_id
def _link_predecessors_to_task_in_radb(self): def _link_predecessors_to_task_in_radb(self):
...@@ -1092,24 +1092,30 @@ class Specification: ...@@ -1092,24 +1092,30 @@ class Specification:
# check if the predecessor needs to be linked to this task # check if the predecessor needs to be linked to this task
predecessor_task = self.radb.getTask(mom_id=predecessor_mom_id) predecessor_task = self.radb.getTask(mom_id=predecessor_mom_id)
if predecessor_task: if predecessor_task:
# do Specification-class bookkeeping (stupid, because all info is in the radb already)
predecessor_keys = [p.radb_id for p in self.predecessors] predecessor_keys = [p.radb_id for p in self.predecessors]
if predecessor_task['id'] not in predecessor_keys: if predecessor_task['id'] not in predecessor_keys:
logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s ' logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s '
'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], self.mom_id, 'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], self.mom_id,
self.otdb_id) self.otdb_id)
spec = Specification(self.otdbrpc, self.momquery, self.radb) pred_spec = Specification(self.otdbrpc, self.momquery, self.radb)
spec.read_from_radb(predecessor_task['id']) pred_spec.read_from_radb(predecessor_task['id'])
self.predecessors.append(spec) # TODO this needs a try/except somewhere? self.predecessors.append(pred_spec) # TODO this needs a try/except somewhere?
try:
self.radb.insertTaskPredecessor(self.radb_id, spec.radb_id)
except PostgresDBQueryExecutionError as e: # do radb task-predecessor bookkeeping if needed
# task was already connected to predecessor. Log and continue. try:
if 'task_predecessor_unique' in str(e): task = self.radb.getTask(self.radb_id)
logger.info('predecessor task with mom_id=%s otdb_id=%s was already connected to its successor with mom_id=%s otdb_id=%s', if predecessor_task['id'] not in task['predecessor_ids']:
predecessor_task['mom_id'], predecessor_task['otdb_id'], self.radb.insertTaskPredecessor(self.radb_id, predecessor_task['id'])
self.mom_id, self.otdb_id) except PostgresDBQueryExecutionError as e:
else: # task was already connected to predecessor. Log and continue.
raise if 'task_predecessor_unique' in str(e):
logger.info('predecessor task with mom_id=%s otdb_id=%s was already connected to its successor with mom_id=%s otdb_id=%s',
predecessor_task['mom_id'], predecessor_task['otdb_id'],
self.mom_id, self.otdb_id)
else:
raise
else: else:
# Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond # Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond
# approved, which is in principle valid. The link in the radb will be made later via processSuccessors() # approved, which is in principle valid. The link in the radb will be made later via processSuccessors()
...@@ -1142,16 +1148,20 @@ class Specification: ...@@ -1142,16 +1148,20 @@ class Specification:
' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], self.mom_id, self.otdb_id ' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], self.mom_id, self.otdb_id
) )
self.successor_ids.append(successor_task['id']) self.successor_ids.append(successor_task['id'])
try:
# do radb task-successor bookkeeping if needed
try:
task = self.radb.getTask(self.radb_id)
if successor_task['id'] not in task['successor_ids']:
self.radb.insertTaskPredecessor(successor_task['id'], self.radb_id) self.radb.insertTaskPredecessor(successor_task['id'], self.radb_id)
except PostgresDBQueryExecutionError as e: except PostgresDBQueryExecutionError as e:
# task was already connected to predecessor. Log and continue. # task was already connected to predecessor. Log and continue.
if 'task_predecessor_unique' in str(e): if 'task_predecessor_unique' in str(e):
logger.info('successor task with mom_id=%s otdb_id=%s was already connected to its predecessor with mom_id=%s otdb_id=%s', logger.info('successor task with mom_id=%s otdb_id=%s was already connected to its predecessor with mom_id=%s otdb_id=%s',
successor_task['mom_id'], successor_task['otdb_id'], successor_task['mom_id'], successor_task['otdb_id'],
self.mom_id, self.otdb_id) self.mom_id, self.otdb_id)
else: else:
raise raise
movePipelineAfterItsPredecessors(successor_task, self.radb) movePipelineAfterItsPredecessors(successor_task, self.radb)
else: else:
......
...@@ -1373,5 +1373,44 @@ class ReadFromRadb(unittest.TestCase): ...@@ -1373,5 +1373,44 @@ class ReadFromRadb(unittest.TestCase):
self.assertEqual(self.specification.duration, timedelta(seconds = task["duration"])) self.assertEqual(self.specification.duration, timedelta(seconds = task["duration"]))
self.assertEqual(self.specification.cluster, task["cluster"]) self.assertEqual(self.specification.cluster, task["cluster"])
def test_insert_into_radb_and_check_predecessors(self):
# Arrange
def mock_getTask(id=None, mom_id=None, otdb_id=None, specification_id=None):
if id is None and mom_id is not None:
id = mom_id
return {"id": id, "mom_id": id, "otdb_id": id, "status": "approved", "type": "observation", "duration": 100, "predecessor_ids": []}
self.radbrpc_mock.getTask.side_effect = mock_getTask
self.radbrpc_mock.insertOrUpdateSpecificationAndTask.return_value = {'specification_id': 1, 'task_id': 1}
self.momrpc_mock.getPredecessorIds.return_value = {'1': [42]}
self.specification.read_from_radb(1)
# Act
self.specification.insert_into_radb()
# Assert
self.radbrpc_mock.insertTaskPredecessor.assert_called_with(1, 42)
# Arrange
# now adapt the mock_getTask, and let it return the inserted predecessor_ids as well
def mock_getTask(id=None, mom_id=None, otdb_id=None, specification_id=None):
if id is None and mom_id is not None:
id = mom_id
task = {"id": id, "mom_id": id, "otdb_id": id, "status": "approved", "type": "observation", "duration": 100, "predecessor_ids": []}
if id == 1:
task['predecessor_ids'] = [42]
return task
self.radbrpc_mock.getTask.side_effect = mock_getTask
self.radbrpc_mock.insertTaskPredecessor.reset_mock()
# Act
self.specification.insert_into_radb()
# Assert
self.radbrpc_mock.insertTaskPredecessor.assert_not_called()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment