Skip to content
Snippets Groups Projects
Commit 7d60e78e authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: logging and fix

parent ae48887a
No related branches found
No related tags found
No related merge requests found
...@@ -83,6 +83,9 @@ class ScheduleChecker(): ...@@ -83,6 +83,9 @@ class ScheduleChecker():
active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline') active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline')
if active_pipelines:
logger.info('checking endtime of running pipelines')
for task in active_pipelines: for task in active_pipelines:
if task['endtime'] <= now: if task['endtime'] <= now:
new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL) new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL)
...@@ -99,36 +102,39 @@ class ScheduleChecker(): ...@@ -99,36 +102,39 @@ class ScheduleChecker():
queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline') queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline')
sq_pipelines = scheduled_pipelines + queued_pipelines sq_pipelines = scheduled_pipelines + queued_pipelines
for task in sq_pipelines: if sq_pipelines:
predecessor_tasks = self._radbrpc.getTasks(task_ids=task['predecessor_ids']) logger.info('checking starttime of scheduled/queued cep4 pipelines')
predecessor_endtimes = [t['endtime'] for t in predecessor_tasks]
predecessor_endtimes.append(now + timedelta(seconds=PIPELINE_CHECK_INTERVAL)) for task in sq_pipelines:
#only reschedule pipelines which have resourceclaims, and hence run on cep4
max_pred_endtime = max(predecessor_endtimes) if self._radbrpc.getResourceClaims(task_ids=task['id']):
predecessor_tasks = self._radbrpc.getTasks(task_ids=task['predecessor_ids'])
if task['starttime'] < max_pred_endtime: predecessor_endtimes = [t['endtime'] for t in predecessor_tasks]
shift = max_pred_endtime - task['starttime'] predecessor_endtimes.append(now + timedelta(seconds=PIPELINE_CHECK_INTERVAL))
logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift)
self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift) max_pred_endtime = max(predecessor_endtimes)
updated_task = self._radbrpc.getTask(task['id'])
if updated_task['status'] != u'scheduled' or updated_task['status'] != u'queued': if task['starttime'] < max_pred_endtime:
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) shift = max_pred_endtime - task['starttime']
#TODO: automatically resolve conflict status by moved pipeline in first free time slot. logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift)
self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift)
updated_task = self._radbrpc.getTask(task['id'])
if updated_task['status'] not in [u'scheduled', u'queued']:
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status'])
#TODO: automatically resolve conflict status by moved pipeline in first free time slot.
except Exception as e: except Exception as e:
logger.error("Error while checking scheduled pipelines: %s", e) logger.error("Error while checking scheduled pipelines: %s", e)
def checkApprovedTasks(self): def checkApprovedTasks(self):
try: try:
now = datetime.utcnow() logger.info('checking approved tasks for status in mom')
approved_tasks = self._radbrpc.getTasks(task_status='approved') approved_tasks = self._radbrpc.getTasks(task_status='approved')
mom_ids = [t['mom_id'] for t in approved_tasks]
mom_details = self._momrpc.getProjectDetails(mom_ids)
for task in approved_tasks: for task in approved_tasks:
mom_task_details = self._momrpc.getProjectDetails(task['mom_id']) if (str(task['mom_id']) not in mom_details or
mom_details[str(task['mom_id'])]['object_status'] == 'opened'):
if (not mom_task_details or
str(task['mom_id']) not in mom_task_details or
mom_task_details[str(task['mom_id'])]['object_status'] == 'opened'):
logger.info('task %s mom_id=%s otdb_id=%s was removed or set to status opened. removing task from rabd', task['id'], task['mom_id'], task['otdb_id']) logger.info('task %s mom_id=%s otdb_id=%s was removed or set to status opened. removing task from rabd', task['id'], task['mom_id'], task['otdb_id'])
self._radbrpc.deleteSpecification(task['specification_id']) self._radbrpc.deleteSpecification(task['specification_id'])
...@@ -139,6 +145,7 @@ class ScheduleChecker(): ...@@ -139,6 +145,7 @@ class ScheduleChecker():
while self._running: while self._running:
self.checkRunningPipelines() self.checkRunningPipelines()
self.checkScheduledAndQueuedPipelines() self.checkScheduledAndQueuedPipelines()
self.checkApprovedTasks()
for i in range(PIPELINE_CHECK_INTERVAL): for i in range(PIPELINE_CHECK_INTERVAL):
sleep(1) sleep(1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment