diff --git a/LCS/Common/CMakeLists.txt b/LCS/Common/CMakeLists.txt index 8d998fa18d0b5b10ed70186ca27f05a3f5d24e24..44ee1fe2c7dfb79d83991a57c679e60b6aca28bf 100644 --- a/LCS/Common/CMakeLists.txt +++ b/LCS/Common/CMakeLists.txt @@ -3,7 +3,7 @@ lofar_package(Common 3.3) include(LofarFindPackage) -lofar_find_package(Casacore COMPONENTS casa) +# lofar_find_package(Casacore COMPONENTS casa) lofar_find_package(Boost REQUIRED) lofar_find_package(Readline) diff --git a/LCS/Common/src/CasaLogSink.cc b/LCS/Common/src/CasaLogSink.cc index d6de31d85938a538df6a64f1ce8e67c6160bfe32..b28fb674db1d29fe6b58216861ff73ce799c8a97 100644 --- a/LCS/Common/src/CasaLogSink.cc +++ b/LCS/Common/src/CasaLogSink.cc @@ -108,7 +108,9 @@ namespace LOFAR { #else void CasaLogSink::attach() - {} + { + cerr << "WARNING: no casa logging available." << endl; + } #endif } // end namespaces diff --git a/SAS/ResourceAssignment/Common/lib/specification.py b/SAS/ResourceAssignment/Common/lib/specification.py index f674547379642cee263a6270be966a008910278f..c6a988a6ec322dec06b1fbeb944f0e28e8425895 100644 --- a/SAS/ResourceAssignment/Common/lib/specification.py +++ b/SAS/ResourceAssignment/Common/lib/specification.py @@ -983,7 +983,7 @@ class Specification: self.status = task["status"] self.type = task["type"] self.duration = timedelta(seconds = task["duration"]) - self.cluster = task["cluster"] + self.cluster = task.get("cluster", "CEP4") #we don't seem to need specification_id? logger.info("Read task from RADB: %s", task) return task @@ -1071,7 +1071,7 @@ class Specification: specification_id = result['specification_id'] # We never seem to need this again task_id = result['task_id'] - logger.info('inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) + logger.info('inserted/updated specification (id=%s) and task (id=%s)' % (specification_id, task_id)) return task_id def _link_predecessors_to_task_in_radb(self): @@ -1092,24 +1092,30 @@ class Specification: # check if the predecessor needs to be linked to this task predecessor_task = self.radb.getTask(mom_id=predecessor_mom_id) if predecessor_task: + # do Specification-class bookkeeping (stupid, because all info is in the radb already) predecessor_keys = [p.radb_id for p in self.predecessors] if predecessor_task['id'] not in predecessor_keys: logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s ' 'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], self.mom_id, self.otdb_id) - spec = Specification(self.otdbrpc, self.momquery, self.radb) - spec.read_from_radb(predecessor_task['id']) - self.predecessors.append(spec) # TODO this needs a try/except somewhere? - try: - self.radb.insertTaskPredecessor(self.radb_id, spec.radb_id) - except PostgresDBQueryExecutionError as e: - # task was already connected to predecessor. Log and continue. - if 'task_predecessor_unique' in str(e): - logger.info('predecessor task with mom_id=%s otdb_id=%s was already connected to its successor with mom_id=%s otdb_id=%s', - predecessor_task['mom_id'], predecessor_task['otdb_id'], - self.mom_id, self.otdb_id) - else: - raise + pred_spec = Specification(self.otdbrpc, self.momquery, self.radb) + pred_spec.read_from_radb(predecessor_task['id']) + self.predecessors.append(pred_spec) # TODO this needs a try/except somewhere? + + + # do radb task-predecessor bookkeeping if needed + try: + task = self.radb.getTask(self.radb_id) + if predecessor_task['id'] not in task['predecessor_ids']: + self.radb.insertTaskPredecessor(self.radb_id, predecessor_task['id']) + except PostgresDBQueryExecutionError as e: + # task was already connected to predecessor. Log and continue. + if 'task_predecessor_unique' in str(e): + logger.info('predecessor task with mom_id=%s otdb_id=%s was already connected to its successor with mom_id=%s otdb_id=%s', + predecessor_task['mom_id'], predecessor_task['otdb_id'], + self.mom_id, self.otdb_id) + else: + raise else: # Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond # approved, which is in principle valid. The link in the radb will be made later via processSuccessors() @@ -1142,16 +1148,20 @@ class Specification: ' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], self.mom_id, self.otdb_id ) self.successor_ids.append(successor_task['id']) - try: + + # do radb task-successor bookkeeping if needed + try: + task = self.radb.getTask(self.radb_id) + if successor_task['id'] not in task['successor_ids']: self.radb.insertTaskPredecessor(successor_task['id'], self.radb_id) - except PostgresDBQueryExecutionError as e: - # task was already connected to predecessor. Log and continue. - if 'task_predecessor_unique' in str(e): - logger.info('successor task with mom_id=%s otdb_id=%s was already connected to its predecessor with mom_id=%s otdb_id=%s', - successor_task['mom_id'], successor_task['otdb_id'], - self.mom_id, self.otdb_id) - else: - raise + except PostgresDBQueryExecutionError as e: + # task was already connected to predecessor. Log and continue. + if 'task_predecessor_unique' in str(e): + logger.info('successor task with mom_id=%s otdb_id=%s was already connected to its predecessor with mom_id=%s otdb_id=%s', + successor_task['mom_id'], successor_task['otdb_id'], + self.mom_id, self.otdb_id) + else: + raise movePipelineAfterItsPredecessors(successor_task, self.radb) else: diff --git a/SAS/ResourceAssignment/Common/test/test_specification.py b/SAS/ResourceAssignment/Common/test/test_specification.py index d78da20042ee9f83122582f36204556f1959321b..d0da3cd82ce02e60a4b3c8c8ab9f75053aeaac2b 100755 --- a/SAS/ResourceAssignment/Common/test/test_specification.py +++ b/SAS/ResourceAssignment/Common/test/test_specification.py @@ -1373,5 +1373,44 @@ class ReadFromRadb(unittest.TestCase): self.assertEqual(self.specification.duration, timedelta(seconds = task["duration"])) self.assertEqual(self.specification.cluster, task["cluster"]) + def test_insert_into_radb_and_check_predecessors(self): + # Arrange + def mock_getTask(id=None, mom_id=None, otdb_id=None, specification_id=None): + if id is None and mom_id is not None: + id = mom_id + return {"id": id, "mom_id": id, "otdb_id": id, "status": "approved", "type": "observation", "duration": 100, "predecessor_ids": []} + + self.radbrpc_mock.getTask.side_effect = mock_getTask + self.radbrpc_mock.insertOrUpdateSpecificationAndTask.return_value = {'specification_id': 1, 'task_id': 1} + self.momrpc_mock.getPredecessorIds.return_value = {'1': [42]} + self.specification.read_from_radb(1) + + # Act + self.specification.insert_into_radb() + + # Assert + self.radbrpc_mock.insertTaskPredecessor.assert_called_with(1, 42) + + + # Arrange + # now adapt the mock_getTask, and let it return the inserted predecessor_ids as well + def mock_getTask(id=None, mom_id=None, otdb_id=None, specification_id=None): + if id is None and mom_id is not None: + id = mom_id + task = {"id": id, "mom_id": id, "otdb_id": id, "status": "approved", "type": "observation", "duration": 100, "predecessor_ids": []} + if id == 1: + task['predecessor_ids'] = [42] + return task + + self.radbrpc_mock.getTask.side_effect = mock_getTask + self.radbrpc_mock.insertTaskPredecessor.reset_mock() + + # Act + self.specification.insert_into_radb() + + # Assert + self.radbrpc_mock.insertTaskPredecessor.assert_not_called() + + if __name__ == "__main__": unittest.main() diff --git a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler old mode 100644 new mode 100755 diff --git a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py index 760096dd32daec4b34648fd2aa4bd8825686b38e..e3cf4e6ccc1730279de43c26cb2617b56709e09a 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py @@ -28,7 +28,7 @@ selecting the right timeslot and updating start/end time. import pprint from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler +from lofar.sas.resourceassignment.resourceassigner.rabuslistener import RABusListener, RAEventMessageHandler from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize @@ -83,7 +83,7 @@ def cobaltOTDBsettings(cobalt_values): otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"] return otdb_info -class TaskPrescheduler(OTDBEventMessageHandler): +class TaskPreschedulerEventHandler(RAEventMessageHandler): def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): super().__init__() self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker) @@ -102,7 +102,7 @@ class TaskPrescheduler(OTDBEventMessageHandler): self.radb.disconnect() super().stop_handling() - def onObservationApproved(self, treeId, modificationTime): + def onTaskApproved(self, task_ids): """ Updates task specification and puts task on prescheduled if it was generated by a trigger """ # TODO might work for all tasks in the future @@ -115,58 +115,35 @@ class TaskPrescheduler(OTDBEventMessageHandler): # Maybe these checks need to go into the RATaskSpecified instead. # NOTE: The MoM predecessor Ids to OTDB predecessor Ids conversion is done in RATaskSpecified on the fly - # otdb_id = treeId - # - # Note: Race condition when asking MoM as the mom-otdb-adapter might not have heard that the - # task is on approved and might still be on approved pending in MoM. - # so don't ask the MomQuery: mom_ids = self.momquery.getMoMIdsForOTDBIds([otdb_id]) - # We get the mom_id from the parset - # # We get the parset for all tasks we receive instead of just for the ones with # a trigger. - status = "approved" + otdb_id = task_ids['otdb_id'] spec = Specification(self.otdbrpc, self.momquery, self.radb) - spec.set_status(status) - spec.read_from_OTDB_with_predecessors(treeId, "otdb", {}) #Now checks predecessors, which theoretically could cause race contitions + spec.read_from_OTDB_with_predecessors(otdb_id, "otdb", {}) #Now checks predecessors, which theoretically could cause race contitions spec.read_from_mom() - if spec.status == "error": - return - - spec.update_start_end_times() - spec.insert_into_radb() - # if spec.validate()? - if spec.status != status: - return if not spec.mom_id: return - if spec.isTriggered(): + + if spec.isTriggered() and spec.isObservation(): logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, spec.trigger_id) - otdb_info = {} - if spec.isObservation(): - cobalt_values = calculateCobaltSettings(spec) - otdb_info.update(cobaltOTDBsettings(cobalt_values)) - self.setOTDBinfo(spec.otdb_id, otdb_info, 'prescheduled') + + cobalt_values = calculateCobaltSettings(spec) + extended_otdb_specficition = cobaltOTDBsettings(cobalt_values) + + logger.info('Extending otdb specification for otdb_id %s with cobalt settings:\n%s', + otdb_id, pprint.pformat(extended_otdb_specficition)) + self.otdbrpc.taskSetSpecification(otdb_id, extended_otdb_specficition) + + # make sure the task is starting/stopping at the correct times meeting all constraints. + # this call uploads the correct start/stop times to otdb. + spec.update_start_end_times() + + logger.info('Setting status to prescheduled for otdb_id %s so the resourceassigner can schedule the observation', otdb_id) + self.otdbrpc.taskSetStatus(otdb_id, 'prescheduled') else: logger.info('Did not find a trigger for task mom_id=%s', spec.mom_id) - def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): - """This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator""" - try: - if otdb_info: - logger.info('Setting specification for otdb_id %s:\n', otdb_id) - logger.info(pprint.pformat(otdb_info)) - self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) - #We probably will need this as well - #self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], - # otdb_info["LOFAR.ObsSW.Observation.stopTime"]) - logger.info('Setting status (%s) for otdb_id %s', otdb_status, otdb_id) - self.otdbrpc.taskSetStatus(otdb_id, otdb_status) - except Exception as e: - logger.exception(e) - logger.error("Problem setting specification or status in OTDB for otdb_id=%s", otdb_id) - self.radb.updateTaskStatusForOtdbId(otdb_id, 'error') # We don't catch an exception if this fails. - def main(): from optparse import OptionParser from lofar.common.util import waitForInterrupt @@ -178,7 +155,7 @@ def main(): # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the task prescheduler service') parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, - help='Address of the qpid broker, default: localhost') + help='Address of the qpid broker, default: %default') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option('-e', '--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, @@ -188,10 +165,10 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) - with OTDBBusListener(handler_type=TaskPrescheduler, - exchange=options.exchange, - broker=options.broker, - num_threads=1): + with RABusListener(handler_type=TaskPreschedulerEventHandler, + exchange=options.exchange, + broker=options.broker, + num_threads=1): waitForInterrupt() if __name__ == '__main__': diff --git a/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py index 92ec806e6c796158bf49121c2e827c1f53c8ea08..b5f38382ccfa972491ddca3a6fabcd20cb64da23 100755 --- a/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py @@ -25,13 +25,13 @@ from unittest import mock # lxml, xmljson, django, djangorestframework, djangorestframework_xml, python3-ldap, six, qpid, mllib # using things like sudo pip install <package> -from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import TaskPrescheduler +from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import TaskPreschedulerEventHandler from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import calculateCobaltSettings from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import cobaltOTDBsettings from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import main as PreschedulerMain from lofar.sas.resourceassignment.common.specification import Specification -class TestingTaskPrescheduler(TaskPrescheduler): +class TestingTaskPreschedulerEventHandler(TaskPreschedulerEventHandler): def __init__(self, otdbrpc, momrpc, radb): # super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting self.otdbrpc = otdbrpc @@ -151,7 +151,6 @@ class PreschedulerTest(unittest.TestCase): self.trigger_id = 2323 future_start_time = (datetime.datetime.utcnow() + datetime.timedelta(hours = 1)).strftime('%Y-%m-%d %H:%M:%S') future_stop_time = (datetime.datetime.utcnow() + datetime.timedelta(hours = 2)).strftime('%Y-%m-%d %H:%M:%S') - self.modification_time = datetime.datetime.utcnow() self.reset_specification_tree(self.otdb_id, self.mom_id, future_start_time, future_stop_time) otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc') @@ -174,7 +173,7 @@ class PreschedulerTest(unittest.TestCase): self.radb_mock.getTask.return_value = task self.radb_mock.getResourceGroupNames.return_value = [{'name':'CEP4'}] - self.taskprescheduler = TestingTaskPrescheduler(self.otdbrpc_mock, self.momrpc_mock, self.radb_mock) + self.taskprescheduler = TestingTaskPreschedulerEventHandler(self.otdbrpc_mock, self.momrpc_mock, self.radb_mock) def assert_all_services_opened(self): self.assertTrue(self.otdbrpc_mock.open.called, "OTDBRPC.open was not called") @@ -198,9 +197,10 @@ class PreschedulerTest(unittest.TestCase): self.assert_all_services_closed() self.assertTrue(mock_super.called) - def test_onObservationApproved_GetSpecification(self): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_GetSpecification(self): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) self.otdbrpc_mock.taskGetSpecification.assert_any_call(otdb_id = self.otdb_id) + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() # def test_resourceIndicatorsFromParset(self): # specification = resourceIndicatorsFromParset(self.observation_specification_tree) @@ -218,82 +218,84 @@ class PreschedulerTest(unittest.TestCase): self.assertEqual(int, type(otdb_info['LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.blockSize'])) @mock.patch('lofar.sas.resourceassignment.common.specification.logger') - def test_onObservationApproved_log_mom_id_found(self, logger_mock): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_log_mom_id_found(self, logger_mock): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.info.assert_any_call('Found mom_id %s for otdb_id %s', self.mom_id, self.otdb_id) + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @mock.patch('lofar.sas.resourceassignment.common.specification.logger') - def test_onObservationApproved_log_mom_id_not_found(self, logger_mock): + def test_onTaskApproved_log_mom_id_not_found(self, logger_mock): observation_specification_tree_no_momid = self.observation_specification_tree observation_specification_tree_no_momid['ObsSW.Observation.momID'] = '' self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, 'specification': observation_specification_tree_no_momid} - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.info.assert_any_call('Did not find a mom_id for task otdb_id=%s', self.otdb_id) + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @mock.patch('lofar.sas.resourceassignment.common.specification.logger') - def test_onObservationApproved_otdb_specification_problem(self, logger_mock): + def test_onTaskApproved_otdb_specification_problem(self, logger_mock): self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': 0, 'specification': ''} - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.exception.assert_called() logger_mock.error.assert_called() + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() # TODO not sure how to fix self.radb_mock.updateTaskStatusForOtdbId.assert_any_call(self.otdb_id, 'error') @mock.patch('lofar.sas.resourceassignment.common.specification.logger') - def test_onObservationApproved_log_trigger_found_0(self, logger_mock): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_log_trigger_found_0(self, logger_mock): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.info.assert_any_call('Found a task mom_id=%s with a trigger_id=%s', self.mom_id, self.trigger_id) + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @mock.patch('lofar.sas.resourceassignment.taskprescheduler.taskprescheduler.logger') - def test_onObservationApproved_log_no_trigger(self, logger_mock): + def test_onTaskApproved_log_no_trigger(self, logger_mock): self.momrpc_mock.get_trigger_time_restrictions.return_value = {"trigger_id": None} - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) logger_mock.info.assert_any_call('Did not find a trigger for task mom_id=%s', self.mom_id) + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_onObservationApproved_no_trigger(self): + def test_onTaskApproved_no_trigger(self): self.momrpc_mock.get_trigger_time_restrictions.return_value = {"trigger_id": None} - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) self.otdbrpc_mock.taskSetStatus.assert_not_called() + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() @mock.patch('lofar.sas.resourceassignment.taskprescheduler.taskprescheduler.logger') - def test_onObservationApproved_log_trigger_found_1(self, logger_mock): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) - logger_mock.info.assert_any_call('Setting status (%s) for otdb_id %s', 'prescheduled', self.otdb_id) + def test_onTaskApproved_log_trigger_found_1(self, logger_mock): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) + logger_mock.info.assert_any_call('Setting status to prescheduled for otdb_id %s so the resourceassigner can schedule the observation', self.otdb_id) + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_onObservationApproved_SetSpecification(self): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_SetSpecification(self): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) self.otdbrpc_mock.taskSetSpecification.assert_any_call(self.otdb_id, self.otdb_info) + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_onObservationApproved_pipeline_SetSpecification(self): + def test_onTaskApproved_pipeline_not_SetSpecification(self): self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, 'specification': self.pipeline_specification_tree} task = {"id": 1, "mom_id": self.mom_id, "otdb_id": self.otdb_id, "status": "approved", "type": "pipeline", "duration": 3600, "cluster": "CEP4"} self.radb_mock.getTask.return_value = task - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) - self.otdbrpc_mock.taskSetSpecification.assert_called() + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) + self.otdbrpc_mock.taskSetSpecification.assert_not_called() + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_onObservationApproved_taskSetStatus(self): - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + def test_onTaskApproved_taskSetStatus(self): + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled') + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() - def test_setOTDBinfo_taskSetStatus(self): - self.taskprescheduler.setOTDBinfo(self.otdb_id, self.otdb_info, 'prescheduled') - self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled') - - def test_setOTDBinfo_otdb_problem(self): - self.otdbrpc_mock.taskSetSpecification.side_effect = Exception() - self.taskprescheduler.setOTDBinfo(self.otdb_id, self.otdb_info, 'prescheduled') - self.radb_mock.updateTaskStatusForOtdbId.assert_any_call(self.otdb_id, 'error') - - def test_onObservationApproved_pipeline_taskSetStatus(self): + def test_onTaskApproved_pipeline_not_taskSetStatus(self): self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, 'specification': self.pipeline_specification_tree} task = {"id": 1, "mom_id": self.mom_id, "otdb_id": self.otdb_id, "status": "approved", "type": "pipeline", "duration": 3600, "cluster": "CEP4"} self.radb_mock.getTask.return_value = task - self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) - self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled') + self.taskprescheduler.onTaskApproved({'otdb_id': self.otdb_id}) + self.otdbrpc_mock.taskSetStatus.assert_not_called() + self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called() def test_calculateCobaltSettings(self): spec = Specification(self.otdbrpc_mock, self.momrpc_mock, self.radb_mock)