Skip to content
Snippets Groups Projects
Commit 90de1dd0 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: move CEP4 pipelines way in the future to now

parent 28ac303e
No related branches found
No related tags found
No related merge requests found
......@@ -39,8 +39,8 @@ logger = logging.getLogger(__name__)
def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
try:
#only reschedule pipelines which have resourceclaims and run on cep4
if task and task['type'] == 'pipeline' and task.get('cluster') == 'CEP4' and radbrpc.getResourceClaims(task_ids=task['id']):
#only reschedule pipelines which run on cep4
if task and task['type'] == 'pipeline' and task.get('cluster') == 'CEP4':
logger.info("checking pipeline starttime radb_id=%s otdb_id=%s", task['id'], task['otdb_id'])
predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids'])
......@@ -50,9 +50,9 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
max_pred_endtime = max(predecessor_endtimes)
if task['starttime'] < max_pred_endtime:
if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp):
shift = max_pred_endtime - task['starttime']
logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift)
logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift)
radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift)
updated_task = radbrpc.getTask(task['id'])
if updated_task['status'] not in [u'scheduled', u'queued']:
......@@ -123,8 +123,8 @@ class ScheduleChecker():
now = datetime.utcnow()
min_start_timestamp = now + timedelta(seconds=PIPELINE_CHECK_INTERVAL)
scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline')
queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline')
scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline', cluster='CEP4')
queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline', cluster='CEP4')
sq_pipelines = scheduled_pipelines + queued_pipelines
if sq_pipelines:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment