Skip to content
Snippets Groups Projects
Commit 02b14cf3 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #9192: Guard against tasks not being present in RADB

parent d526b533
No related branches found
No related tags found
No related merge requests found
......@@ -191,6 +191,10 @@ class Slurm(object):
return stdout != ""
class PipelineDependencies(object):
class TaskNotFoundException(Exception):
""" Raised when a task cannot be found in the RADB. """
pass
def __init__(self, ra_service_busname=DEFAULT_RAS_SERVICE_BUSNAME):
self.rarpc = RARPC(busname=ra_service_busname)
......@@ -221,6 +225,9 @@ class PipelineDependencies(object):
"""
radb_task = self.rarpc.getTask(otdb_id=otdb_id)
if radb_task is None:
raise TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,))
predecessor_radb_ids = radb_task['predecessor_ids']
predecessor_tasks = self.rarpc.getTasks(task_ids=predecessor_radb_ids)
predecessor_states = {t["otdb_id"]: t["status"] for t in predecessor_tasks}
......@@ -235,6 +242,9 @@ class PipelineDependencies(object):
"""
radb_task = self.rarpc.getTask(otdb_id=otdb_id)
if radb_task is None:
raise TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,))
successor_radb_ids = radb_task['successor_ids']
successor_tasks = self.rarpc.getTasks(task_ids=successor_ids) if successor_radb_ids else []
successor_otdb_ids = [t["otdb_id"] for t in successor_tasks]
......@@ -248,8 +258,13 @@ class PipelineDependencies(object):
Return whether `otdbId' can start, according to the status of the predecessors
and its own status.
"""
myState = self.getState(otdbId)
predecessorStates = self.getPredecessorStates(otdbId)
try:
myState = self.getState(otdbId)
predecessorStates = self.getPredecessorStates(otdbId)
except TaskNotFoundException, e:
logger.error("canStart(%s): Error obtaining task states, not starting pipeline: %s", otdbId, e)
return False
logger.debug("canStart(%s)? state = %s, predecessors = %s", otdbId, myState, predecessorStates)
......@@ -441,7 +456,13 @@ class PipelineControl(OTDBBusListener):
cancel(jobName)
def _startSuccessors(self, otdbId):
for s in self.dependencies.getSuccessorIds(otdbId):
try:
successor_ids = self.dependencies.getSuccessorIds(otdbId)
except TaskNotFoundException, e:
logger.error("_startSuccessors(%s): Error obtaining task successors, not starting them: %s", otdbId, e)
return
for s in successor_ids:
parset = self._getParset(s)
if not self._shouldHandle(parset):
continue
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment