From 7d60e78ef007cbbe53e804ad5e9714fd64d9f797 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Fri, 22 Jul 2016 10:32:29 +0000 Subject: [PATCH] Task #9607: logging and fix --- .../ResourceAssigner/lib/schedulechecker.py | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index 3a26c6edcfb..8975c0838fd 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -83,6 +83,9 @@ class ScheduleChecker(): active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline') + if active_pipelines: + logger.info('checking endtime of running pipelines') + for task in active_pipelines: if task['endtime'] <= now: new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL) @@ -99,36 +102,39 @@ class ScheduleChecker(): queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline') sq_pipelines = scheduled_pipelines + queued_pipelines - for task in sq_pipelines: - predecessor_tasks = self._radbrpc.getTasks(task_ids=task['predecessor_ids']) - predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] - predecessor_endtimes.append(now + timedelta(seconds=PIPELINE_CHECK_INTERVAL)) - - max_pred_endtime = max(predecessor_endtimes) - - if task['starttime'] < max_pred_endtime: - shift = max_pred_endtime - task['starttime'] - logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift) - self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift) - updated_task = self._radbrpc.getTask(task['id']) - if updated_task['status'] != u'scheduled' or updated_task['status'] != u'queued': - logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) - #TODO: automatically resolve conflict status by moved pipeline in first free time slot. + if sq_pipelines: + logger.info('checking starttime of scheduled/queued cep4 pipelines') + + for task in sq_pipelines: + #only reschedule pipelines which have resourceclaims, and hence run on cep4 + if self._radbrpc.getResourceClaims(task_ids=task['id']): + predecessor_tasks = self._radbrpc.getTasks(task_ids=task['predecessor_ids']) + predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] + predecessor_endtimes.append(now + timedelta(seconds=PIPELINE_CHECK_INTERVAL)) + + max_pred_endtime = max(predecessor_endtimes) + + if task['starttime'] < max_pred_endtime: + shift = max_pred_endtime - task['starttime'] + logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift) + self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift) + updated_task = self._radbrpc.getTask(task['id']) + if updated_task['status'] not in [u'scheduled', u'queued']: + logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) + #TODO: automatically resolve conflict status by moved pipeline in first free time slot. except Exception as e: logger.error("Error while checking scheduled pipelines: %s", e) def checkApprovedTasks(self): try: - now = datetime.utcnow() - + logger.info('checking approved tasks for status in mom') approved_tasks = self._radbrpc.getTasks(task_status='approved') + mom_ids = [t['mom_id'] for t in approved_tasks] + mom_details = self._momrpc.getProjectDetails(mom_ids) for task in approved_tasks: - mom_task_details = self._momrpc.getProjectDetails(task['mom_id']) - - if (not mom_task_details or - str(task['mom_id']) not in mom_task_details or - mom_task_details[str(task['mom_id'])]['object_status'] == 'opened'): + if (str(task['mom_id']) not in mom_details or + mom_details[str(task['mom_id'])]['object_status'] == 'opened'): logger.info('task %s mom_id=%s otdb_id=%s was removed or set to status opened. removing task from rabd', task['id'], task['mom_id'], task['otdb_id']) self._radbrpc.deleteSpecification(task['specification_id']) @@ -139,6 +145,7 @@ class ScheduleChecker(): while self._running: self.checkRunningPipelines() self.checkScheduledAndQueuedPipelines() + self.checkApprovedTasks() for i in range(PIPELINE_CHECK_INTERVAL): sleep(1) -- GitLab