diff --git a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler old mode 100644 new mode 100755 diff --git a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py index 760096dd32daec4b34648fd2aa4bd8825686b38e..e3cf4e6ccc1730279de43c26cb2617b56709e09a 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py @@ -28,7 +28,7 @@ selecting the right timeslot and updating start/end time. import pprint from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler +from lofar.sas.resourceassignment.resourceassigner.rabuslistener import RABusListener, RAEventMessageHandler from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize @@ -83,7 +83,7 @@ def cobaltOTDBsettings(cobalt_values): otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"] return otdb_info -class TaskPrescheduler(OTDBEventMessageHandler): +class TaskPreschedulerEventHandler(RAEventMessageHandler): def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): super().__init__() self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker) @@ -102,7 +102,7 @@ class TaskPrescheduler(OTDBEventMessageHandler): self.radb.disconnect() super().stop_handling() - def onObservationApproved(self, treeId, modificationTime): + def onTaskApproved(self, task_ids): """ Updates task specification and puts task on prescheduled if it was generated by a trigger """ # TODO might work for all tasks in the future @@ -115,58 +115,35 @@ class TaskPrescheduler(OTDBEventMessageHandler): # Maybe these checks need to go into the RATaskSpecified instead. # NOTE: The MoM predecessor Ids to OTDB predecessor Ids conversion is done in RATaskSpecified on the fly - # otdb_id = treeId - # - # Note: Race condition when asking MoM as the mom-otdb-adapter might not have heard that the - # task is on approved and might still be on approved pending in MoM. - # so don't ask the MomQuery: mom_ids = self.momquery.getMoMIdsForOTDBIds([otdb_id]) - # We get the mom_id from the parset - # # We get the parset for all tasks we receive instead of just for the ones with # a trigger. - status = "approved" + otdb_id = task_ids['otdb_id'] spec = Specification(self.otdbrpc, self.momquery, self.radb) - spec.set_status(status) - spec.read_from_OTDB_with_predecessors(treeId, "otdb", {}) #Now checks predecessors, which theoretically could cause race contitions + spec.read_from_OTDB_with_predecessors(otdb_id, "otdb", {}) #Now checks predecessors, which theoretically could cause race contitions spec.read_from_mom() - if spec.status == "error": - return - - spec.update_start_end_times() - spec.insert_into_radb() - # if spec.validate()? - if spec.status != status: - return if not spec.mom_id: return - if spec.isTriggered(): + + if spec.isTriggered() and spec.isObservation(): logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, spec.trigger_id) - otdb_info = {} - if spec.isObservation(): - cobalt_values = calculateCobaltSettings(spec) - otdb_info.update(cobaltOTDBsettings(cobalt_values)) - self.setOTDBinfo(spec.otdb_id, otdb_info, 'prescheduled') + + cobalt_values = calculateCobaltSettings(spec) + extended_otdb_specficition = cobaltOTDBsettings(cobalt_values) + + logger.info('Extending otdb specification for otdb_id %s with cobalt settings:\n%s', + otdb_id, pprint.pformat(extended_otdb_specficition)) + self.otdbrpc.taskSetSpecification(otdb_id, extended_otdb_specficition) + + # make sure the task is starting/stopping at the correct times meeting all constraints. + # this call uploads the correct start/stop times to otdb. + spec.update_start_end_times() + + logger.info('Setting status to prescheduled for otdb_id %s so the resourceassigner can schedule the observation', otdb_id) + self.otdbrpc.taskSetStatus(otdb_id, 'prescheduled') else: logger.info('Did not find a trigger for task mom_id=%s', spec.mom_id) - def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): - """This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator""" - try: - if otdb_info: - logger.info('Setting specification for otdb_id %s:\n', otdb_id) - logger.info(pprint.pformat(otdb_info)) - self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) - #We probably will need this as well - #self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], - # otdb_info["LOFAR.ObsSW.Observation.stopTime"]) - logger.info('Setting status (%s) for otdb_id %s', otdb_status, otdb_id) - self.otdbrpc.taskSetStatus(otdb_id, otdb_status) - except Exception as e: - logger.exception(e) - logger.error("Problem setting specification or status in OTDB for otdb_id=%s", otdb_id) - self.radb.updateTaskStatusForOtdbId(otdb_id, 'error') # We don't catch an exception if this fails. - def main(): from optparse import OptionParser from lofar.common.util import waitForInterrupt @@ -178,7 +155,7 @@ def main(): # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the task prescheduler service') parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, - help='Address of the qpid broker, default: localhost') + help='Address of the qpid broker, default: %default') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option('-e', '--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, @@ -188,10 +165,10 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) - with OTDBBusListener(handler_type=TaskPrescheduler, - exchange=options.exchange, - broker=options.broker, - num_threads=1): + with RABusListener(handler_type=TaskPreschedulerEventHandler, + exchange=options.exchange, + broker=options.broker, + num_threads=1): waitForInterrupt() if __name__ == '__main__': diff --git a/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py index 321b95a4e60931e6609986a0dc0819a8d1f76ea3..b5f38382ccfa972491ddca3a6fabcd20cb64da23 100755 --- a/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py @@ -25,13 +25,13 @@ from unittest import mock # lxml, xmljson, django, djangorestframework, djangorestframework_xml, python3-ldap, six, qpid, mllib # using things like sudo pip install <package> -from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import TaskPrescheduler +from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import TaskPreschedulerEventHandler from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import calculateCobaltSettings from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import cobaltOTDBsettings from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import main as PreschedulerMain from lofar.sas.resourceassignment.common.specification import Specification -class TestingTaskPrescheduler(TaskPrescheduler): +class TestingTaskPreschedulerEventHandler(TaskPreschedulerEventHandler): def __init__(self, otdbrpc, momrpc, radb): # super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting self.otdbrpc = otdbrpc @@ -151,7 +151,6 @@ class PreschedulerTest(unittest.TestCase): self.trigger_id = 2323 future_start_time = (datetime.datetime.utcnow() + datetime.timedelta(hours = 1)).strftime('%Y-%m-%d %H:%M:%S') future_stop_time = (datetime.datetime.utcnow() + datetime.timedelta(hours = 2)).strftime('%Y-%m-%d %H:%M:%S') - self.modification_time = datetime.datetime.utcnow() self.reset_specification_tree(self.otdb_id, self.mom_id, future_start_time, future_stop_time) otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc') @@ -174,7 +173,7 @@ class PreschedulerTest(unittest.TestCase): self.radb_mock.getTask.return_value = task self.radb_mock.getResourceGroupNames.return_value = [{'name':'CEP4'}] - self.taskprescheduler = TestingTaskPrescheduler(self.otdbrpc_mock, self.momrpc_mock, self.radb_mock) + self.taskprescheduler = TestingTaskPreschedulerEventHandler(self.otdbrpc_mock, self.momrpc_mock, self.radb_mock) def assert_all_services_opened(self): self.assertTrue(self.otdbrpc_mock.open.called, "OTDBRPC.open was not called") @@ -198,8 +197,8 @@ class PreschedulerTest(unittest.TestCase): self.assert_all_services_closed() self.assertTrue(mock_super.called) - def test_onObservationApproved_GetSpecification(self): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_GetSpecification(self): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) self.otdbrpc_mock.taskGetSpecification.assert_any_call(otdb_id = self.otdb_id) self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @@ -219,92 +218,83 @@ class PreschedulerTest(unittest.TestCase): self.assertEqual(int, type(otdb_info['LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.blockSize'])) @mock.patch('lofar.sas.resourceassignment.common.specification.logger') - def test_onObservationApproved_log_mom_id_found(self, logger_mock): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_log_mom_id_found(self, logger_mock): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.info.assert_any_call('Found mom_id %s for otdb_id %s', self.mom_id, self.otdb_id) self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @mock.patch('lofar.sas.resourceassignment.common.specification.logger') - def test_onObservationApproved_log_mom_id_not_found(self, logger_mock): + def test_onTaskApproved_log_mom_id_not_found(self, logger_mock): observation_specification_tree_no_momid = self.observation_specification_tree observation_specification_tree_no_momid['ObsSW.Observation.momID'] = '' self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, 'specification': observation_specification_tree_no_momid} - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.info.assert_any_call('Did not find a mom_id for task otdb_id=%s', self.otdb_id) self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @mock.patch('lofar.sas.resourceassignment.common.specification.logger') - def test_onObservationApproved_otdb_specification_problem(self, logger_mock): + def test_onTaskApproved_otdb_specification_problem(self, logger_mock): self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': 0, 'specification': ''} - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.exception.assert_called() logger_mock.error.assert_called() self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() # TODO not sure how to fix self.radb_mock.updateTaskStatusForOtdbId.assert_any_call(self.otdb_id, 'error') @mock.patch('lofar.sas.resourceassignment.common.specification.logger') - def test_onObservationApproved_log_trigger_found_0(self, logger_mock): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_log_trigger_found_0(self, logger_mock): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.info.assert_any_call('Found a task mom_id=%s with a trigger_id=%s', self.mom_id, self.trigger_id) self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @mock.patch('lofar.sas.resourceassignment.taskprescheduler.taskprescheduler.logger') - def test_onObservationApproved_log_no_trigger(self, logger_mock): + def test_onTaskApproved_log_no_trigger(self, logger_mock): self.momrpc_mock.get_trigger_time_restrictions.return_value = {"trigger_id": None} - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.info.assert_any_call('Did not find a trigger for task mom_id=%s', self.mom_id) self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_onObservationApproved_no_trigger(self): + def test_onTaskApproved_no_trigger(self): self.momrpc_mock.get_trigger_time_restrictions.return_value = {"trigger_id": None} - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) self.otdbrpc_mock.taskSetStatus.assert_not_called() self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @mock.patch('lofar.sas.resourceassignment.taskprescheduler.taskprescheduler.logger') - def test_onObservationApproved_log_trigger_found_1(self, logger_mock): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) - logger_mock.info.assert_any_call('Setting status (%s) for otdb_id %s', 'prescheduled', self.otdb_id) + def test_onTaskApproved_log_trigger_found_1(self, logger_mock): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) + logger_mock.info.assert_any_call('Setting status to prescheduled for otdb_id %s so the resourceassigner can schedule the observation', self.otdb_id) self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_onObservationApproved_SetSpecification(self): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_SetSpecification(self): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) self.otdbrpc_mock.taskSetSpecification.assert_any_call(self.otdb_id, self.otdb_info) self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_onObservationApproved_pipeline_SetSpecification(self): + def test_onTaskApproved_pipeline_not_SetSpecification(self): self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, 'specification': self.pipeline_specification_tree} task = {"id": 1, "mom_id": self.mom_id, "otdb_id": self.otdb_id, "status": "approved", "type": "pipeline", "duration": 3600, "cluster": "CEP4"} self.radb_mock.getTask.return_value = task - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) - self.otdbrpc_mock.taskSetSpecification.assert_called() + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) + self.otdbrpc_mock.taskSetSpecification.assert_not_called() self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_onObservationApproved_taskSetStatus(self): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_taskSetStatus(self): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled') self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_setOTDBinfo_taskSetStatus(self): - self.taskprescheduler.setOTDBinfo(self.otdb_id, self.otdb_info, 'prescheduled') - self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled') - - def test_setOTDBinfo_otdb_problem(self): - self.otdbrpc_mock.taskSetSpecification.side_effect = Exception() - self.taskprescheduler.setOTDBinfo(self.otdb_id, self.otdb_info, 'prescheduled') - self.radb_mock.updateTaskStatusForOtdbId.assert_any_call(self.otdb_id, 'error') - - def test_onObservationApproved_pipeline_taskSetStatus(self): + def test_onTaskApproved_pipeline_not_taskSetStatus(self): self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, 'specification': self.pipeline_specification_tree} task = {"id": 1, "mom_id": self.mom_id, "otdb_id": self.otdb_id, "status": "approved", "type": "pipeline", "duration": 3600, "cluster": "CEP4"} self.radb_mock.getTask.return_value = task - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) - self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled') + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) + self.otdbrpc_mock.taskSetStatus.assert_not_called() self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() def test_calculateCobaltSettings(self):