diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 147fa1c86232fc8b791d22cdbc7c1cc1fa08416d..0138bab79850072d36d9a5ab1ed897e5ef257d52 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -66,7 +66,7 @@ from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_SERVICE_BUSNAME from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.common.util import waitForInterrupt -from lofar.messaging.RPC import RPCTimeoutException +from lofar.messaging.RPC import RPCTimeoutException, RPCException from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RAS_SERVICE_BUSNAME @@ -296,7 +296,12 @@ class PipelineControl(OTDBBusListener): self.otdbrpc.taskSetStatus(otdb_id=otdb_id, new_status=status) def _getParset(self, otdbId): - return Parset(self.otdbrpc.taskGetSpecification(otdb_id=otdbId)["specification"]) + try: + return Parset(self.otdbrpc.taskGetSpecification(otdb_id=otdbId)["specification"]) + except RPCException, e: + # Parset not in OTDB, probably got deleted + logger.error("Cannot retrieve parset of task %s: %s", otdbId, e) + return None def start_listening(self, **kwargs): self.otdbrpc.open() @@ -322,8 +327,8 @@ class PipelineControl(OTDBBusListener): try: otdbId = pipeline['otdb_id'] parset = self._getParset(otdbId) - if not self._shouldHandle(parset): - return + if not parset or not self._shouldHandle(parset): + continue # Maybe the pipeline can start already if self.dependencies.canStart(otdbId): @@ -337,12 +342,17 @@ class PipelineControl(OTDBBusListener): @staticmethod def _shouldHandle(parset): - if not parset.isPipeline(): - logger.info("Not processing tree: is not a pipeline") - return False - - if parset.processingCluster() == "CEP2": - logger.info("Not processing tree: is a CEP2 pipeline") + try: + if not parset.isPipeline(): + logger.info("Not processing tree: is not a pipeline") + return False + + if parset.processingCluster() == "CEP2": + logger.info("Not processing tree: is a CEP2 pipeline") + return False + except KeyError as e: + # Parset not complete + logger.error("Parset incomplete, ignoring: %s", e) return False return True @@ -501,7 +511,7 @@ class PipelineControl(OTDBBusListener): for s in successor_ids: parset = self._getParset(s) - if not self._shouldHandle(parset): + if not parset or not self._shouldHandle(parset): continue if self.dependencies.canStart(s): @@ -511,7 +521,7 @@ class PipelineControl(OTDBBusListener): def onObservationScheduled(self, otdbId, modificationTime): parset = self._getParset(otdbId) - if not self._shouldHandle(parset): + if not parset or not self._shouldHandle(parset): return # Maybe the pipeline can start already @@ -529,7 +539,7 @@ class PipelineControl(OTDBBusListener): def onObservationAborted(self, otdbId, modificationTime): parset = self._getParset(otdbId) - if not self._shouldHandle(parset): + if parset and not self._shouldHandle(parset): # stop jobs even if there's no parset return logger.info("***** STOP Otdb ID %s *****", otdbId) diff --git a/MAC/Services/test/tPipelineControl.py b/MAC/Services/test/tPipelineControl.py index 07e23240de40b53b4ab313532bbe1366a1560633..f4994db0295480540d37e0dcfe51f918e2926708 100644 --- a/MAC/Services/test/tPipelineControl.py +++ b/MAC/Services/test/tPipelineControl.py @@ -108,6 +108,10 @@ class MockRAService(MessageHandlerInterface): def GetTasks(self, lower_bound, upper_bound, task_ids, task_status, task_type): print "***** GetTasks(%s) *****" % (task_ids,) + if task_ids is None: + # Used on startup to check which tasks are at scheduled + return [] + return [{ 'otdb_id': t - 1000, 'status': self.status[t - 1000],