diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index 13e912d31b8df5755139ecb293f882081cf90eb8..844324560135caa6f9e1ce73d714dbcea5ca52f5 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -274,7 +274,7 @@ class ResourceAssigner(): task['mom_id'], task['otdb_id']) self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) - movePipelineAfterItsPredecessors(successor_task, self.radbrpc) + movePipelineAfterItsPredecessors(successor_task, self.radbrpc, datetime.utcnow()) else: logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id']) else: diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index 26512725637f1711195ec52a0fea2ef681ad3a45..d309b2a229cc213b9e588c557e2a458890bacafa 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -44,6 +44,7 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): logger.info("checking pipeline starttime radb_id=%s otdb_id=%s", task['id'], task['otdb_id']) predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids']) + predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] if min_start_timestamp: predecessor_endtimes.append(min_start_timestamp) @@ -52,11 +53,11 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp): shift = max_pred_endtime - task['starttime'] - logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift) + logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], max_pred_endtime) radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift) updated_task = radbrpc.getTask(task['id']) - if updated_task['status'] not in [u'scheduled', u'queued']: - logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) + if updated_task['status'] != task['status']: + logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status']) #TODO: automatically resolve conflict status by moved pipeline in first free time slot. except Exception as e: logger.error("Error while checking pipeline starttime: %s", e) @@ -125,12 +126,13 @@ class ScheduleChecker(): scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline', cluster='CEP4') queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline', cluster='CEP4') - sq_pipelines = scheduled_pipelines + queued_pipelines + pipelines = scheduled_pipelines + queued_pipelines - if sq_pipelines: - logger.info('checking starttime of %s scheduled/queued cep4 pipelines', len(sq_pipelines)) + if pipelines: + logger.info('checking starttime of %s scheduled/queued cep4 pipelines min_start_timestamp=%s', len(pipelines), min_start_timestamp) + pipelines.sort(key=lambda pl: pl['starttime'], reverse=True) - for task in sq_pipelines: + for task in pipelines: movePipelineAfterItsPredecessors(task, self._radbrpc, min_start_timestamp) except Exception as e: logger.error("Error while checking scheduled pipelines: %s", e)