Skip to content
Snippets Groups Projects
Commit 918410c2 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #10660: check if reservations in otdb were deleted (since there is no deleted event)

parent 4c097c43
No related branches found
No related tags found
No related merge requests found
...@@ -169,6 +169,8 @@ def main(): ...@@ -169,6 +169,8 @@ def main():
mom_servicename=options.mom_query_servicename, mom_servicename=options.mom_query_servicename,
cleanup_busname=options.cleanup_busname, cleanup_busname=options.cleanup_busname,
cleanup_servicename=options.cleanup_servicename, cleanup_servicename=options.cleanup_servicename,
otdb_busname=options.otdb_busname,
otdb_servicename=options.otdb_servicename,
broker=options.broker) as schedulechecker: broker=options.broker) as schedulechecker:
waitForInterrupt() waitForInterrupt()
......
...@@ -37,6 +37,9 @@ from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC ...@@ -37,6 +37,9 @@ from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -76,7 +79,10 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): ...@@ -76,7 +79,10 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
if shift != timedelta(seconds=0): if shift != timedelta(seconds=0):
logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], newStartTime) logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], newStartTime)
radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime) if not radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime):
logger.warn("Could not update start/endtime for pipeline radb_id=%s otdb_id=%s",
updated_task['id'], updated_task['otdb_id'])
updated_task = radbrpc.getTask(task['id']) updated_task = radbrpc.getTask(task['id'])
if updated_task['status'] != task['status']: if updated_task['status'] != task['status']:
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status']) logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status'])
...@@ -92,6 +98,8 @@ class ScheduleChecker(): ...@@ -92,6 +98,8 @@ class ScheduleChecker():
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
cleanup_busname=DEFAULT_CLEANUP_BUSNAME, cleanup_busname=DEFAULT_CLEANUP_BUSNAME,
cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME, cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME,
broker=None): broker=None):
""" """
""" """
...@@ -100,6 +108,7 @@ class ScheduleChecker(): ...@@ -100,6 +108,7 @@ class ScheduleChecker():
self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker)
self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker) self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker)
self._curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) self._curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker)
self._otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180)
def __enter__(self): def __enter__(self):
"""Internal use only. (handles scope 'with')""" """Internal use only. (handles scope 'with')"""
...@@ -115,6 +124,7 @@ class ScheduleChecker(): ...@@ -115,6 +124,7 @@ class ScheduleChecker():
self._radbrpc.open() self._radbrpc.open()
self._momrpc.open() self._momrpc.open()
self._curpc.open() self._curpc.open()
self._otdbrpc.open()
self._running = True self._running = True
self._thread = Thread(target=self._check_loop) self._thread = Thread(target=self._check_loop)
self._thread.daemon = True self._thread.daemon = True
...@@ -125,6 +135,7 @@ class ScheduleChecker(): ...@@ -125,6 +135,7 @@ class ScheduleChecker():
self._radbrpc.close() self._radbrpc.close()
self._momrpc.close() self._momrpc.close()
self._curpc.close() self._curpc.close()
self._otdbrpc.close()
self._running = False self._running = False
self._thread.join(60) self._thread.join(60)
...@@ -220,11 +231,39 @@ class ScheduleChecker(): ...@@ -220,11 +231,39 @@ class ScheduleChecker():
except Exception as e: except Exception as e:
logger.error("Error while checking unrun tasks for MoM opened/described/suspended status: %s", e) logger.error("Error while checking unrun tasks for MoM opened/described/suspended status: %s", e)
def checkUnRunReservations(self):
"""All non-mon tasks in otdb can be deleted in otdb without us knowing about it (there is no otdb-task-deleted event)
Here, we check for all non-mom tasks that are yet to run if they still exist in otdb. If not, such jobs are subsequently deleted from the RADB.
"""
try:
logger.info('checking reservation in otdb')
reservations = self._radbrpc.getTasks(task_type=['reservation'], lower_bound=datetime.utcnow())
for reservation in reservations:
try:
otdb_id = int(reservation['otdb_id'])
status = self._otdbrpc.taskGetStatus(otdb_id)
logger.info('reservation %s status = %s', otdb_id, status)
if status == 'unknown':
logger.info('deleting reservation otdb_id=%s radb_id=%s otdb_status=%s from rabd',
otdb_id, reservation['id'], status)
# delete the spec (and task/claims etc via cascading delete) from radb to get it in sync again with mom
self._radbrpc.deleteSpecification(reservation['specification_id'])
except Exception as e:
logger.error("Error while checking reservation otdb_id=%s radb_id=%s in otdb: %s",
reservation['otdb_id'],
reservation['id'],
e)
except Exception as e:
logger.error("Error while checking unrun tasks for MoM opened/described/suspended status: %s", e)
def _check_loop(self): def _check_loop(self):
while self._running: while self._running:
self.checkRunningPipelines() self.checkRunningPipelines()
self.checkScheduledAndQueuedPipelines() self.checkScheduledAndQueuedPipelines()
self.checkUnRunTasksForMoMOpenedStatus() self.checkUnRunTasksForMoMOpenedStatus()
self.checkUnRunReservations()
for i in range(PIPELINE_CHECK_INTERVAL): for i in range(PIPELINE_CHECK_INTERVAL):
sleep(1) sleep(1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment