diff --git a/SAS/ResourceAssignment/Common/lib/specification.py b/SAS/ResourceAssignment/Common/lib/specification.py index da8ed2655143532eca4b3475b48ddea6afe902c2..eadfcfe62841136280c7a1e45b3896c5f9065394 100644 --- a/SAS/ResourceAssignment/Common/lib/specification.py +++ b/SAS/ResourceAssignment/Common/lib/specification.py @@ -28,10 +28,10 @@ The current class is a mix of backward compatible dicts in internal_dict, specfi properties and methods. """ +import logging from lofar.parameterset import parameterset from datetime import datetime, timedelta -from lofar.common.datetimeutils import parseDatetime -from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors +from lofar.common.datetimeutils import parseDatetime, format_timedelta import pprint from types import IntType, FloatType, StringTypes @@ -41,6 +41,61 @@ INPUT_PREFIX = "ObsSW." """ Prefix that is common to all parset keys, when we need to write a parset to OTDB """ OUTPUT_PREFIX = "LOFAR.ObsSW." + +logger = logging.getLogger(__name__) + + +# TODO +# This function does not really belong here but we do not have a better place for it at the moment. +def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): + try: + #only reschedule pipelines which run on cep4 + if task and task['type'] == 'pipeline' and task.get('cluster') == 'CEP4': + logger.info("checking pipeline starttime radb_id=%s otdb_id=%s starttime=%s", + task['id'], task['otdb_id'], task['starttime']) + + predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids']) + + predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] + if min_start_timestamp: + predecessor_endtimes.append(min_start_timestamp) + + max_pred_endtime = max(predecessor_endtimes) + + if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp): + shift = max_pred_endtime - task['starttime'] + newStartTime = task['starttime']+shift + newEndTime = task['endtime']+shift + + # move pipeline even further ahead in case there are more than 2 overlapping scheduled/queued pipelines + while True: + overlapping_pipelines = radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status=['scheduled', 'queued', 'active', 'completing'], cluster='CEP4') + #exclude self + overlapping_pipelines = [pl for pl in overlapping_pipelines if pl['id'] != task['id']] + + if len(overlapping_pipelines) >= 1: + max_overlapping_pipeline_endtime = max([t['endtime'] for t in overlapping_pipelines]) + shift = max_overlapping_pipeline_endtime + timedelta(minutes=1) - task['starttime'] + newStartTime = task['starttime']+shift + newEndTime = task['endtime']+shift + else: + break + + if shift != timedelta(seconds=0): + logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], format_timedelta(shift), task['starttime'], newStartTime) + if not radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime): + logger.warn("Could not update start/endtime for pipeline radb_id=%s otdb_id=%s", + updated_task['id'], updated_task['otdb_id']) + + updated_task = radbrpc.getTask(task['id']) + + if updated_task['status'] != task['status']: + logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status']) + #TODO: automatically resolve conflict status by moved pipeline in first free time slot. + except Exception as e: + logger.error("Error while checking pipeline starttime: %s", e) + + # TODO This class can use a more OO approach, it currenly exposes quite a bit of its internals and depends a bit # on the user of the class to enforce consistency. Given the time available, this will need to be done in a further # refactoring. diff --git a/SAS/ResourceAssignment/Common/test/test_specification.py b/SAS/ResourceAssignment/Common/test/test_specification.py index 719e3b7516de22e950353048384973355e3b76b9..cbb24584659cf3766c8e03bb073faa4f51a6492a 100755 --- a/SAS/ResourceAssignment/Common/test/test_specification.py +++ b/SAS/ResourceAssignment/Common/test/test_specification.py @@ -19,6 +19,7 @@ # $Id: $ import unittest, mock, os +import logging from lofar.parameterset import parameterset from datetime import datetime, timedelta @@ -27,9 +28,10 @@ from datetime import datetime, timedelta # lxml, xmljson, django, djangorestframework, djangorestframework_xml, python-ldap, six, qpid, mllib # using things like sudo pip install <package> -from lofar.sas.resourceassignment.common.specification import Specification +from lofar.sas.resourceassignment.common.specification import Specification, movePipelineAfterItsPredecessors from lofar.sas.resourceassignment.common.specification import INPUT_PREFIX, OUTPUT_PREFIX +logger = logging.getLogger(__name__) class General(unittest.TestCase): def setUp(self): @@ -45,6 +47,48 @@ class General(unittest.TestCase): self.specification = Specification(self.logger_mock, self.otdbrpc_mock, self.momrpc_mock, self.radbrpc_mock) + self.rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC') + self.addCleanup(self.rarpc_patcher.stop) + self.rarpc_mock = self.rarpc_patcher.start() + self.rarpc_mock.getTasks.return_value = [ { 'id': 'id', 'mom_id': '1', 'otdb_id': 'otdb_id', 'status': 'approved', 'type': 'observation', 'specification_id': 'specification_id' } ] + self.rarpc_mock.deleteSpecification.return_value = True + + # TODO + # This test does not really belong here but we do not have a better place for it at the moment. + def test_movePipelineAfterItsPredecessors(self): + """ Test if a pipeline is really moved beyond its predecessor. """ + + self.rarpc_mock.getTasks.return_value = mock.DEFAULT + + def tasks(*args, **kwargs): + if 'task_ids' in kwargs: + return [ { 'id': '2', 'endtime': datetime(2017, 01, 01) } ] + elif 'lower_bound' in kwargs: + return [] + return mock.DEFAULT + + self.rarpc_mock.getTasks.side_effect = tasks + + task = { + 'id': 1, + 'status': 'scheduled', + 'otdb_id': 'otdb_id', + 'type': 'pipeline', + 'cluster': 'CEP4', + 'predecessor_ids': '1', + 'starttime': datetime(2016, 12, 31), + 'endtime': datetime(2017, 12, 31) + } + + movePipelineAfterItsPredecessors(task, self.rarpc_mock) + + self.assertTrue(self.rarpc_mock.updateTaskAndResourceClaims.called, "Pipeline properties not updated.") + self.assertTrue(self.rarpc_mock.updateTaskAndResourceClaims.call_args[1]["starttime"] >= datetime(2017, 01, 01), "Pipeline not moved after predecessor") + self.assertEqual( + self.rarpc_mock.updateTaskAndResourceClaims.call_args[1]["endtime"] - self.rarpc_mock.updateTaskAndResourceClaims.call_args[1]["starttime"], + task["endtime"] - task["starttime"], + "Pipeline duration changed after move") + # ---------------------------------------------------------------------------------------------- # Tests of functions to read values from MoM diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index 83824fe2338d2b868a839f457e835002cd3889bd..b76e140065227a3978973b94d705fa60dbb31269 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -43,55 +43,10 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SER from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL from lofar.common.datetimeutils import * -logger = logging.getLogger(__name__) +from lofar.sas.resourceassignment.common.specification import movePipelineAfterItsPredecessors + -def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): - try: - #only reschedule pipelines which run on cep4 - if task and task['type'] == 'pipeline' and task.get('cluster') == 'CEP4': - logger.info("checking pipeline starttime radb_id=%s otdb_id=%s starttime=%s", - task['id'], task['otdb_id'], task['starttime']) - - predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids']) - - predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] - if min_start_timestamp: - predecessor_endtimes.append(min_start_timestamp) - - max_pred_endtime = max(predecessor_endtimes) - - if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp): - shift = max_pred_endtime - task['starttime'] - newStartTime = task['starttime']+shift - newEndTime = task['endtime']+shift - - # move pipeline even further ahead in case there are more than 2 overlapping scheduled/queued pipelines - while True: - overlapping_pipelines = radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status=['scheduled', 'queued', 'active', 'completing'], cluster='CEP4') - #exclude self - overlapping_pipelines = [pl for pl in overlapping_pipelines if pl['id'] != task['id']] - - if len(overlapping_pipelines) >= 1: - max_overlapping_pipeline_endtime = max([t['endtime'] for t in overlapping_pipelines]) - shift = max_overlapping_pipeline_endtime + timedelta(minutes=1) - task['starttime'] - newStartTime = task['starttime']+shift - newEndTime = task['endtime']+shift - else: - break - - if shift != timedelta(seconds=0): - logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], format_timedelta(shift), task['starttime'], newStartTime) - if not radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime): - logger.warn("Could not update start/endtime for pipeline radb_id=%s otdb_id=%s", - updated_task['id'], updated_task['otdb_id']) - - updated_task = radbrpc.getTask(task['id']) - - if updated_task['status'] != task['status']: - logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status']) - #TODO: automatically resolve conflict status by moved pipeline in first free time slot. - except Exception as e: - logger.error("Error while checking pipeline starttime: %s", e) +logger = logging.getLogger(__name__) class ScheduleChecker(): def __init__(self, @@ -175,7 +130,6 @@ class ScheduleChecker(): min_start_timestamp.day, min_start_timestamp.hour, min_start_timestamp.minute+1) - pipelines = self._radbrpc.getTasks(task_status=['scheduled', 'queued'], task_type='pipeline', cluster='CEP4') if pipelines: diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulechecker.py index 7892f76eeee62340c140de87e2a15a3eaaee9963..4a78a047a4229ca517fdc2f0dc3f19d33914d253 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulechecker.py @@ -23,7 +23,7 @@ import unittest import mock import datetime -from lofar.sas.resourceassignment.resourceassigner.schedulechecker import ScheduleChecker, movePipelineAfterItsPredecessors +from lofar.sas.resourceassignment.resourceassigner.schedulechecker import ScheduleChecker from lofar.parameterset import parameterset ra_notification_prefix = "ra_notification_prefix" @@ -84,52 +84,27 @@ class ScheduleCheckerTest(unittest.TestCase): self.assert_all_services_closed() - def test_movePipelineAfterItsPredecessors(self): - """ Test if a pipeline is really moved beyond its predecessor. """ - - self.rarpc_mock.getTasks.return_value = mock.DEFAULT - - def tasks(*args, **kwargs): - if 'task_ids' in kwargs: - return [ { 'id': '2', 'endtime': datetime.datetime(2017, 01, 01) } ] - elif 'lower_bound' in kwargs: - return [] - return mock.DEFAULT - - self.rarpc_mock.getTasks.side_effect = tasks - - task = { - 'id': 1, - 'status': 'scheduled', - 'otdb_id': 'otdb_id', - 'type': 'pipeline', - 'cluster': 'CEP4', - 'predecessor_ids': '1', - 'starttime': datetime.datetime(2016, 12, 31), - 'endtime': datetime.datetime(2017, 12, 31) - } - - movePipelineAfterItsPredecessors(task, self.rarpc_mock) - - self.assertTrue(self.rarpc_mock.updateTaskAndResourceClaims.called, "Pipeline properties not updated.") - self.assertTrue(self.rarpc_mock.updateTaskAndResourceClaims.call_args[1]["starttime"] >= datetime.datetime(2017, 01, 01), "Pipeline not moved after predecessor") - self.assertEqual( - self.rarpc_mock.updateTaskAndResourceClaims.call_args[1]["endtime"] - self.rarpc_mock.updateTaskAndResourceClaims.call_args[1]["starttime"], - task["endtime"] - task["starttime"], - "Pipeline duration changed after move") - - @mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulechecker.movePipelineAfterItsPredecessors') + @mock.patch('lofar.sas.resourceassignment.common.specification.movePipelineAfterItsPredecessors') def test_checkScheduledAndQueuedPipelines(self, movePipeline_mock): """ Test whether all scheduled/queued pipelines get a move request. """ - self.rarpc_mock.getTasks.return_value = [ { 'id': 'id', 'status': 'scheduled', 'type': 'pipeline', 'starttime': datetime.datetime.utcnow() } ] - self.rarpc_mock.getTask.return_value = { 'id': 'id', 'status': 'scheduled', 'type': 'pipeline', 'starttime': datetime.datetime.utcnow() } + # Store the getTasks default return value that gets set in the beginning. + old_getTasks_return_value = self.rarpc_mock.getTasks.return_value + # Replace it. + self.rarpc_mock.getTasks.return_value = [ { 'id': 'id', 'status': 'scheduled', 'type': 'pippo', 'starttime': datetime.datetime.utcnow() } ] + # Same for the getTask function. + old_getTask_return_value = self.rarpc_mock.getTask.return_value + self.rarpc_mock.getTask.return_value = { 'id': 'id', 'status': 'scheduled', 'type': 'pipeline', 'starttime': datetime.datetime.utcnow() } with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock, self.otdbrpc_mock) as schedulechecker: schedulechecker.checkScheduledAndQueuedPipelines() self.assertTrue(movePipeline_mock.called, "Pipeline was not moved.") + # Restore the old default values. + self.rarpc_mock.getTasks.return_value = old_getTasks_return_value + self.rarpc_mock.getTask.return_value = old_getTask_return_value + def test_checkRunningPipelines(self): """ Test whether the end time of running pipelines is extended if they run beyond their end time. """