Skip to content
Snippets Groups Projects
Commit 0c78f4b2 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: move scheduled/queued cep4 pipelines to now

parent b73672be
No related branches found
No related tags found
No related merge requests found
...@@ -274,7 +274,7 @@ class ResourceAssigner(): ...@@ -274,7 +274,7 @@ class ResourceAssigner():
task['mom_id'], task['mom_id'],
task['otdb_id']) task['otdb_id'])
self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id'])
movePipelineAfterItsPredecessors(successor_task, self.radbrpc) movePipelineAfterItsPredecessors(successor_task, self.radbrpc, datetime.utcnow())
else: else:
logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id']) logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id'])
else: else:
......
...@@ -44,6 +44,7 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): ...@@ -44,6 +44,7 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
logger.info("checking pipeline starttime radb_id=%s otdb_id=%s", task['id'], task['otdb_id']) logger.info("checking pipeline starttime radb_id=%s otdb_id=%s", task['id'], task['otdb_id'])
predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids']) predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids'])
predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] predecessor_endtimes = [t['endtime'] for t in predecessor_tasks]
if min_start_timestamp: if min_start_timestamp:
predecessor_endtimes.append(min_start_timestamp) predecessor_endtimes.append(min_start_timestamp)
...@@ -52,11 +53,11 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): ...@@ -52,11 +53,11 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp): if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp):
shift = max_pred_endtime - task['starttime'] shift = max_pred_endtime - task['starttime']
logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift) logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], max_pred_endtime)
radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift) radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift)
updated_task = radbrpc.getTask(task['id']) updated_task = radbrpc.getTask(task['id'])
if updated_task['status'] not in [u'scheduled', u'queued']: if updated_task['status'] != task['status']:
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status'])
#TODO: automatically resolve conflict status by moved pipeline in first free time slot. #TODO: automatically resolve conflict status by moved pipeline in first free time slot.
except Exception as e: except Exception as e:
logger.error("Error while checking pipeline starttime: %s", e) logger.error("Error while checking pipeline starttime: %s", e)
...@@ -125,12 +126,13 @@ class ScheduleChecker(): ...@@ -125,12 +126,13 @@ class ScheduleChecker():
scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline', cluster='CEP4') scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline', cluster='CEP4')
queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline', cluster='CEP4') queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline', cluster='CEP4')
sq_pipelines = scheduled_pipelines + queued_pipelines pipelines = scheduled_pipelines + queued_pipelines
if sq_pipelines: if pipelines:
logger.info('checking starttime of %s scheduled/queued cep4 pipelines', len(sq_pipelines)) logger.info('checking starttime of %s scheduled/queued cep4 pipelines min_start_timestamp=%s', len(pipelines), min_start_timestamp)
pipelines.sort(key=lambda pl: pl['starttime'], reverse=True)
for task in sq_pipelines: for task in pipelines:
movePipelineAfterItsPredecessors(task, self._radbrpc, min_start_timestamp) movePipelineAfterItsPredecessors(task, self._radbrpc, min_start_timestamp)
except Exception as e: except Exception as e:
logger.error("Error while checking scheduled pipelines: %s", e) logger.error("Error while checking scheduled pipelines: %s", e)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment