diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index 1b2afa52229b3c75ba0c533ca115d6015bbe1988..344779efc20cd095c7a7824edadf688f2b03f03e 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -169,6 +169,8 @@ def main(): mom_servicename=options.mom_query_servicename, cleanup_busname=options.cleanup_busname, cleanup_servicename=options.cleanup_servicename, + otdb_busname=options.otdb_busname, + otdb_servicename=options.otdb_servicename, broker=options.broker) as schedulechecker: waitForInterrupt() diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index 095c92a689cc5b4b5a3f1bbc025d64c2813df6be..1ab4dd4d3750c4450f4772e8a1aa325a5651b475 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -37,6 +37,9 @@ from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME +from lofar.sas.otdb.otdbrpc import OTDBRPC +from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME + from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL logger = logging.getLogger(__name__) @@ -76,7 +79,10 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): if shift != timedelta(seconds=0): logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], newStartTime) - radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime) + if not radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime): + logger.warn("Could not update start/endtime for pipeline radb_id=%s otdb_id=%s", + updated_task['id'], updated_task['otdb_id']) + updated_task = radbrpc.getTask(task['id']) if updated_task['status'] != task['status']: logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status']) @@ -92,6 +98,8 @@ class ScheduleChecker(): mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, cleanup_busname=DEFAULT_CLEANUP_BUSNAME, cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME, + otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, + otdb_servicename=DEFAULT_OTDB_SERVICENAME, broker=None): """ """ @@ -100,6 +108,7 @@ class ScheduleChecker(): self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker) self._curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) + self._otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) def __enter__(self): """Internal use only. (handles scope 'with')""" @@ -115,6 +124,7 @@ class ScheduleChecker(): self._radbrpc.open() self._momrpc.open() self._curpc.open() + self._otdbrpc.open() self._running = True self._thread = Thread(target=self._check_loop) self._thread.daemon = True @@ -125,6 +135,7 @@ class ScheduleChecker(): self._radbrpc.close() self._momrpc.close() self._curpc.close() + self._otdbrpc.close() self._running = False self._thread.join(60) @@ -220,11 +231,39 @@ class ScheduleChecker(): except Exception as e: logger.error("Error while checking unrun tasks for MoM opened/described/suspended status: %s", e) + def checkUnRunReservations(self): + """All non-mon tasks in otdb can be deleted in otdb without us knowing about it (there is no otdb-task-deleted event) + Here, we check for all non-mom tasks that are yet to run if they still exist in otdb. If not, such jobs are subsequently deleted from the RADB. + """ + try: + logger.info('checking reservation in otdb') + reservations = self._radbrpc.getTasks(task_type=['reservation'], lower_bound=datetime.utcnow()) + + for reservation in reservations: + try: + otdb_id = int(reservation['otdb_id']) + status = self._otdbrpc.taskGetStatus(otdb_id) + logger.info('reservation %s status = %s', otdb_id, status) + + if status == 'unknown': + logger.info('deleting reservation otdb_id=%s radb_id=%s otdb_status=%s from rabd', + otdb_id, reservation['id'], status) + # delete the spec (and task/claims etc via cascading delete) from radb to get it in sync again with mom + self._radbrpc.deleteSpecification(reservation['specification_id']) + except Exception as e: + logger.error("Error while checking reservation otdb_id=%s radb_id=%s in otdb: %s", + reservation['otdb_id'], + reservation['id'], + e) + except Exception as e: + logger.error("Error while checking unrun tasks for MoM opened/described/suspended status: %s", e) + def _check_loop(self): while self._running: self.checkRunningPipelines() self.checkScheduledAndQueuedPipelines() self.checkUnRunTasksForMoMOpenedStatus() + self.checkUnRunReservations() for i in range(PIPELINE_CHECK_INTERVAL): sleep(1)