Skip to content
Snippets Groups Projects
Commit e23a65e5 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: minor fix, momrpc -> momqueryrpc

parent 801dda6e
No related branches found
No related tags found
No related merge requests found
...@@ -35,6 +35,8 @@ from lofar.messaging.messagebus import ToBus ...@@ -35,6 +35,8 @@ from lofar.messaging.messagebus import ToBus
from lofar.messaging.RPC import RPC, RPCException from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset from lofar.parameterset import parameterset
from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
...@@ -263,6 +265,7 @@ class ResourceAssigner(): ...@@ -263,6 +265,7 @@ class ResourceAssigner():
if successor_task['id'] not in task['successor_ids']: if successor_task['id'] not in task['successor_ids']:
logger.info('connecting successor task with otdb_id=%s to it\'s predecessor with otdb_id=%s', successor_task['otdb_id'], task['otdb_id']) logger.info('connecting successor task with otdb_id=%s to it\'s predecessor with otdb_id=%s', successor_task['otdb_id'], task['otdb_id'])
self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id'])
movePipelineAfterItsPredecessors(successor_task, self.radbrpc)
else: else:
logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id']) logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id'])
else: else:
......
...@@ -37,6 +37,30 @@ from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_ ...@@ -37,6 +37,30 @@ from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
try:
#only reschedule pipelines which have resourceclaims and run on cep4
if task and task['type'] == 'pipeline' and task.get('cluster') == 'CEP4' and radbrpc.getResourceClaims(task_ids=task['id']):
logger.info("checking pipeline starttime radb_id=%s otdb_id=%s", task['id'], task['otdb_id'])
predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids'])
predecessor_endtimes = [t['endtime'] for t in predecessor_tasks]
if min_start_timestamp:
predecessor_endtimes.append(min_start_timestamp)
max_pred_endtime = max(predecessor_endtimes)
if task['starttime'] < max_pred_endtime:
shift = max_pred_endtime - task['starttime']
logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift)
radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift)
updated_task = radbrpc.getTask(task['id'])
if updated_task['status'] not in [u'scheduled', u'queued']:
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status'])
#TODO: automatically resolve conflict status by moved pipeline in first free time slot.
except Exception as e:
logger.error("Error while checking pipeline starttime: %s", e)
class ScheduleChecker(): class ScheduleChecker():
def __init__(self, def __init__(self,
radb_busname=DEFAULT_RADB_BUSNAME, radb_busname=DEFAULT_RADB_BUSNAME,
...@@ -97,31 +121,17 @@ class ScheduleChecker(): ...@@ -97,31 +121,17 @@ class ScheduleChecker():
def checkScheduledAndQueuedPipelines(self): def checkScheduledAndQueuedPipelines(self):
try: try:
now = datetime.utcnow() now = datetime.utcnow()
min_start_timestamp = now + timedelta(seconds=PIPELINE_CHECK_INTERVAL)
scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline') scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline')
queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline') queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline')
sq_pipelines = scheduled_pipelines + queued_pipelines sq_pipelines = scheduled_pipelines + queued_pipelines
if sq_pipelines: if sq_pipelines:
logger.info('checking starttime of scheduled/queued cep4 pipelines') logger.info('checking starttime of %s scheduled/queued cep4 pipelines', len(sq_pipelines))
for task in sq_pipelines: for task in sq_pipelines:
#only reschedule pipelines which have resourceclaims, and hence run on cep4 movePipelineAfterItsPredecessors(task, self._radbrpc, min_start_timestamp)
if self._radbrpc.getResourceClaims(task_ids=task['id']):
predecessor_tasks = self._radbrpc.getTasks(task_ids=task['predecessor_ids'])
predecessor_endtimes = [t['endtime'] for t in predecessor_tasks]
predecessor_endtimes.append(now + timedelta(seconds=PIPELINE_CHECK_INTERVAL))
max_pred_endtime = max(predecessor_endtimes)
if task['starttime'] < max_pred_endtime:
shift = max_pred_endtime - task['starttime']
logger.info("Moving ahead %s pipeline radb_id=%s otdb_id=%s by %s", task['status'], task['id'], task['otdb_id'], shift)
self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift)
updated_task = self._radbrpc.getTask(task['id'])
if updated_task['status'] not in [u'scheduled', u'queued']:
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status'])
#TODO: automatically resolve conflict status by moved pipeline in first free time slot.
except Exception as e: except Exception as e:
logger.error("Error while checking scheduled pipelines: %s", e) logger.error("Error while checking scheduled pipelines: %s", e)
......
...@@ -510,7 +510,7 @@ def getTasksHtml(): ...@@ -510,7 +510,7 @@ def getTasksHtml():
if not tasks: if not tasks:
abort(404) abort(404)
updateTaskMomDetails(tasks, momrpc) updateTaskMomDetails(tasks, momqueryrpc)
html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="width:100%">\n' html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="width:100%">\n'
...@@ -540,7 +540,7 @@ def getTaskHtml(task_id): ...@@ -540,7 +540,7 @@ def getTaskHtml(task_id):
abort(404, 'No such task %s' % task_id) abort(404, 'No such task %s' % task_id)
task['name'] = 'Task %d' % task['id'] task['name'] = 'Task %d' % task['id']
updateTaskMomDetails(task, momrpc) updateTaskMomDetails(task, momqueryrpc)
html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n' html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment