diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index 651c0b37a156ae5adba4ea39e2cdd75578ceb27d..ce5f20e942d4541549a1dc0df40f2376702863c0 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -35,6 +35,8 @@ from lofar.messaging.messagebus import ToBus from lofar.messaging.RPC import RPC, RPCException from lofar.parameterset import parameterset +from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors + from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME @@ -263,6 +265,7 @@ class ResourceAssigner(): if successor_task['id'] not in task['successor_ids']: logger.info('connecting successor task with otdb_id=%s to it\'s predecessor with otdb_id=%s', successor_task['otdb_id'], task['otdb_id']) self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) + movePipelineAfterItsPredecessors(successor_task, self.radbrpc) else: logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id']) else: diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index 8975c0838fdadf726a4063db06f897004bb63e99..007210019bd18b0e7ac3212e536fb131530f07b5 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -37,6 +37,30 @@ from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_ logger = logging.getLogger(__name__) +def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): + try: + #only reschedule pipelines which have resourceclaims and run on cep4 + if task and task['type'] == 'pipeline' and task.get('cluster') == 'CEP4' and radbrpc.getResourceClaims(task_ids=task['id']): + logger.info("checking pipeline starttime radb_id=%s otdb_id=%s", task['id'], task['otdb_id']) + + predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids']) + predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] + if min_start_timestamp: + predecessor_endtimes.append(min_start_timestamp) + + max_pred_endtime = max(predecessor_endtimes) + + if task['starttime'] < max_pred_endtime: + shift = max_pred_endtime - task['starttime'] + logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift) + radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift) + updated_task = radbrpc.getTask(task['id']) + if updated_task['status'] not in [u'scheduled', u'queued']: + logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) + #TODO: automatically resolve conflict status by moved pipeline in first free time slot. + except Exception as e: + logger.error("Error while checking pipeline starttime: %s", e) + class ScheduleChecker(): def __init__(self, radb_busname=DEFAULT_RADB_BUSNAME, @@ -97,31 +121,17 @@ class ScheduleChecker(): def checkScheduledAndQueuedPipelines(self): try: now = datetime.utcnow() + min_start_timestamp = now + timedelta(seconds=PIPELINE_CHECK_INTERVAL) scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline') queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline') sq_pipelines = scheduled_pipelines + queued_pipelines if sq_pipelines: - logger.info('checking starttime of scheduled/queued cep4 pipelines') + logger.info('checking starttime of %s scheduled/queued cep4 pipelines', len(sq_pipelines)) for task in sq_pipelines: - #only reschedule pipelines which have resourceclaims, and hence run on cep4 - if self._radbrpc.getResourceClaims(task_ids=task['id']): - predecessor_tasks = self._radbrpc.getTasks(task_ids=task['predecessor_ids']) - predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] - predecessor_endtimes.append(now + timedelta(seconds=PIPELINE_CHECK_INTERVAL)) - - max_pred_endtime = max(predecessor_endtimes) - - if task['starttime'] < max_pred_endtime: - shift = max_pred_endtime - task['starttime'] - logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift) - self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift) - updated_task = self._radbrpc.getTask(task['id']) - if updated_task['status'] not in [u'scheduled', u'queued']: - logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) - #TODO: automatically resolve conflict status by moved pipeline in first free time slot. + movePipelineAfterItsPredecessors(task, self._radbrpc, min_start_timestamp) except Exception as e: logger.error("Error while checking scheduled pipelines: %s", e) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py index 8b80f2d00accf07d921e10125e3088515909a6ae..f80bb2da9067edae3ce8135e8b9a9809e3ca49bf 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py @@ -510,7 +510,7 @@ def getTasksHtml(): if not tasks: abort(404) - updateTaskMomDetails(tasks, momrpc) + updateTaskMomDetails(tasks, momqueryrpc) html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="width:100%">\n' @@ -540,7 +540,7 @@ def getTaskHtml(task_id): abort(404, 'No such task %s' % task_id) task['name'] = 'Task %d' % task['id'] - updateTaskMomDetails(task, momrpc) + updateTaskMomDetails(task, momqueryrpc) html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n'