From 02b14cf3a29520dea4cca923ada9cbed1b937183 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Wed, 29 Jun 2016 08:42:15 +0000
Subject: [PATCH] Task #9192: Guard against tasks not being present in RADB

---
 MAC/Services/src/PipelineControl.py | 27 ++++++++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py
index b0b5996ad93..5a3325d3bc9 100755
--- a/MAC/Services/src/PipelineControl.py
+++ b/MAC/Services/src/PipelineControl.py
@@ -191,6 +191,10 @@ class Slurm(object):
     return stdout != ""
 
 class PipelineDependencies(object):
+  class TaskNotFoundException(Exception):
+    """ Raised when a task cannot be found in the RADB. """
+    pass
+
   def __init__(self, ra_service_busname=DEFAULT_RAS_SERVICE_BUSNAME):
     self.rarpc = RARPC(busname=ra_service_busname)
 
@@ -221,6 +225,9 @@ class PipelineDependencies(object):
     """
     radb_task = self.rarpc.getTask(otdb_id=otdb_id)
 
+    if radb_task is None:
+      raise TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,))
+
     predecessor_radb_ids = radb_task['predecessor_ids']
     predecessor_tasks = self.rarpc.getTasks(task_ids=predecessor_radb_ids)
     predecessor_states = {t["otdb_id"]: t["status"] for t in predecessor_tasks}
@@ -235,6 +242,9 @@ class PipelineDependencies(object):
     """
     radb_task = self.rarpc.getTask(otdb_id=otdb_id)
 
+    if radb_task is None:
+      raise TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,))
+
     successor_radb_ids = radb_task['successor_ids']
     successor_tasks = self.rarpc.getTasks(task_ids=successor_ids) if successor_radb_ids else []
     successor_otdb_ids = [t["otdb_id"] for t in successor_tasks]
@@ -248,8 +258,13 @@ class PipelineDependencies(object):
       Return whether `otdbId' can start, according to the status of the predecessors
       and its own status.
     """
-    myState = self.getState(otdbId)
-    predecessorStates = self.getPredecessorStates(otdbId)
+
+    try:
+      myState = self.getState(otdbId)
+      predecessorStates = self.getPredecessorStates(otdbId)
+    except TaskNotFoundException, e:
+      logger.error("canStart(%s): Error obtaining task states, not starting pipeline: %s", otdbId, e)
+      return False
 
     logger.debug("canStart(%s)? state = %s, predecessors = %s", otdbId, myState, predecessorStates)
 
@@ -441,7 +456,13 @@ class PipelineControl(OTDBBusListener):
     cancel(jobName)
 
   def _startSuccessors(self, otdbId):
-    for s in self.dependencies.getSuccessorIds(otdbId):
+    try:
+      successor_ids = self.dependencies.getSuccessorIds(otdbId)
+    except TaskNotFoundException, e:
+      logger.error("_startSuccessors(%s): Error obtaining task successors, not starting them: %s", otdbId, e)
+      return
+
+    for s in successor_ids:
       parset = self._getParset(s)
       if not self._shouldHandle(parset):
         continue
-- 
GitLab