Skip to content
Snippets Groups Projects
Commit 7636f3f3 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #9192: Saner error messages if parsets are incomplete, or got deleted from OTDB

parent ac4c1856
No related branches found
No related tags found
No related merge requests found
......@@ -66,7 +66,7 @@ from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_SERVICE_BUSNAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.common.util import waitForInterrupt
from lofar.messaging.RPC import RPCTimeoutException
from lofar.messaging.RPC import RPCTimeoutException, RPCException
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RAS_SERVICE_BUSNAME
......@@ -296,7 +296,12 @@ class PipelineControl(OTDBBusListener):
self.otdbrpc.taskSetStatus(otdb_id=otdb_id, new_status=status)
def _getParset(self, otdbId):
return Parset(self.otdbrpc.taskGetSpecification(otdb_id=otdbId)["specification"])
try:
return Parset(self.otdbrpc.taskGetSpecification(otdb_id=otdbId)["specification"])
except RPCException, e:
# Parset not in OTDB, probably got deleted
logger.error("Cannot retrieve parset of task %s: %s", otdbId, e)
return None
def start_listening(self, **kwargs):
self.otdbrpc.open()
......@@ -322,8 +327,8 @@ class PipelineControl(OTDBBusListener):
try:
otdbId = pipeline['otdb_id']
parset = self._getParset(otdbId)
if not self._shouldHandle(parset):
return
if not parset or not self._shouldHandle(parset):
continue
# Maybe the pipeline can start already
if self.dependencies.canStart(otdbId):
......@@ -337,12 +342,17 @@ class PipelineControl(OTDBBusListener):
@staticmethod
def _shouldHandle(parset):
if not parset.isPipeline():
logger.info("Not processing tree: is not a pipeline")
return False
if parset.processingCluster() == "CEP2":
logger.info("Not processing tree: is a CEP2 pipeline")
try:
if not parset.isPipeline():
logger.info("Not processing tree: is not a pipeline")
return False
if parset.processingCluster() == "CEP2":
logger.info("Not processing tree: is a CEP2 pipeline")
return False
except KeyError as e:
# Parset not complete
logger.error("Parset incomplete, ignoring: %s", e)
return False
return True
......@@ -501,7 +511,7 @@ class PipelineControl(OTDBBusListener):
for s in successor_ids:
parset = self._getParset(s)
if not self._shouldHandle(parset):
if not parset or not self._shouldHandle(parset):
continue
if self.dependencies.canStart(s):
......@@ -511,7 +521,7 @@ class PipelineControl(OTDBBusListener):
def onObservationScheduled(self, otdbId, modificationTime):
parset = self._getParset(otdbId)
if not self._shouldHandle(parset):
if not parset or not self._shouldHandle(parset):
return
# Maybe the pipeline can start already
......@@ -529,7 +539,7 @@ class PipelineControl(OTDBBusListener):
def onObservationAborted(self, otdbId, modificationTime):
parset = self._getParset(otdbId)
if not self._shouldHandle(parset):
if parset and not self._shouldHandle(parset): # stop jobs even if there's no parset
return
logger.info("***** STOP Otdb ID %s *****", otdbId)
......
......@@ -108,6 +108,10 @@ class MockRAService(MessageHandlerInterface):
def GetTasks(self, lower_bound, upper_bound, task_ids, task_status, task_type):
print "***** GetTasks(%s) *****" % (task_ids,)
if task_ids is None:
# Used on startup to check which tasks are at scheduled
return []
return [{
'otdb_id': t - 1000,
'status': self.status[t - 1000],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment