-
Jorrit Schaap authored
SW-826: the taskprescheduler should NOT insert the task into the RADB. Added that requirement to the test, which now fails. Next commit has the fix for this bug.
Jorrit Schaap authoredSW-826: the taskprescheduler should NOT insert the task into the RADB. Added that requirement to the test, which now fails. Next commit has the fix for this bug.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_taskprescheduler.py 21.68 KiB
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id: $
import unittest, datetime
from unittest import mock
# you might need to install mock, mysql.connector(from Oracle), testing.mysqld, mock, coverage,
# lxml, xmljson, django, djangorestframework, djangorestframework_xml, python3-ldap, six, qpid, mllib
# using things like sudo pip install <package>
from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import TaskPrescheduler
from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import calculateCobaltSettings
from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import cobaltOTDBsettings
from lofar.sas.resourceassignment.taskprescheduler.taskprescheduler import main as PreschedulerMain
from lofar.sas.resourceassignment.common.specification import Specification
class TestingTaskPrescheduler(TaskPrescheduler):
def __init__(self, otdbrpc, momrpc, radb):
# super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting
self.otdbrpc = otdbrpc
self.momquery = momrpc
self.radb = radb
class PreschedulerTest(unittest.TestCase):
# No __init__ because that confuses unittest.main()
def reset_specification_tree(self, otdb_id, mom_id, future_start_time, future_stop_time):
self.pipeline_specification_tree = {
'ObsSW.Observation.DataProducts.Output_InstrumentModel.enabled': False,
'ObsSW.Observation.stopTime': future_stop_time,
'ObsSW.Observation.VirtualInstrument.stationList': [],
'ObsSW.Observation.DataProducts.Input_CoherentStokes.enabled': False,
'ObsSW.Observation.DataProducts.Output_CoherentStokes.enabled': False,
'ObsSW.Observation.DataProducts.Output_SkyImage.enabled': False,
'ObsSW.Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0],
'ObsSW.Observation.antennaSet': 'LBA_INNER',
'ObsSW.Observation.nrBitsPerSample': 16,
'ObsSW.Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': 1,
'ObsSW.Observation.DataProducts.Output_IncoherentStokes.enabled': False,
'ObsSW.Observation.DataProducts.Input_IncoherentStokes.enabled': False,
'ObsSW.Observation.DataProducts.Input_Correlated.enabled': True,
'ObsSW.Observation.DataProducts.Output_Pulsar.enabled': False,
'ObsSW.Observation.DataProducts.Input_CoherentStokes.skip': [],
'ObsSW.Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': 10,
'Version.number': 33774,
'ObsSW.Observation.momID': mom_id,
'ObsSW.Observation.startTime': future_start_time,
'ObsSW.Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': 1,
'ObsSW.Observation.nrBeams': 0,
'ObsSW.Observation.DataProducts.Input_IncoherentStokes.skip': [],
'ObsSW.Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': 64,
'ObsSW.Observation.DataProducts.Output_Correlated.enabled': True,
'ObsSW.Observation.DataProducts.Output_Correlated.storageClusterName': 'CEP4',
'ObsSW.Observation.sampleClock': 200,
'ObsSW.Observation.processType': 'Pipeline',
'ObsSW.Observation.processSubtype': 'Averaging Pipeline',
'ObsSW.Observation.Scheduler.predecessors': '[]',
}
self.observation_specification_tree = {
'ObsSW.Observation.DataProducts.Output_InstrumentModel.enabled': False,
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.nrChannelsPerSubband': 64,
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': 1,
'ObsSW.Observation.stopTime': future_stop_time,
'ObsSW.Observation.VirtualInstrument.stationList': ['RS205', 'RS503', 'CS013', 'RS508', 'RS106'],
'ObsSW.Observation.DataProducts.Input_CoherentStokes.enabled': False,
'ObsSW.Observation.DataProducts.Output_CoherentStokes.enabled': True,
'ObsSW.Observation.DataProducts.Output_CoherentStokes.storageClusterName': 'CEP4',
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': 64,
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': 'I',
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': 'I',
'ObsSW.Observation.Beam[0].subbandList': '[100, 101, 102, 103]',
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': 512,
'ObsSW.Observation.DataProducts.Input_Correlated.skip': [],
'ObsSW.Observation.antennaSet': 'HBA_DUAL',
'ObsSW.Observation.nrBitsPerSample': 8,
'ObsSW.Observation.Beam[0].nrTabRings': 0,
'ObsSW.Observation.Beam[0].nrTiedArrayBeams': 0,
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False,
'ObsSW.Observation.nrBeams': 1,
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': 1.0,
'ObsSW.Observation.DataProducts.Output_IncoherentStokes.enabled': True,
'ObsSW.Observation.DataProducts.Output_IncoherentStokes.storageClusterName': 'CEP4',
'ObsSW.Observation.DataProducts.Input_IncoherentStokes.enabled': False,
'ObsSW.Observation.DataProducts.Input_Correlated.enabled': False,
'ObsSW.Observation.DataProducts.Output_Pulsar.enabled': False,
'ObsSW.Observation.DataProducts.Input_CoherentStokes.skip': [],
'ObsSW.Observation.DataProducts.Output_SkyImage.enabled': False,
'Version.number': 33774,
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband': 64,
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': 1,
'ObsSW.Observation.momID': mom_id,
'ObsSW.Observation.startTime': future_start_time,
'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': 512,
'ObsSW.Observation.DataProducts.Input_IncoherentStokes.skip': [],
'ObsSW.Observation.DataProducts.Output_Correlated.enabled': True,
'ObsSW.Observation.DataProducts.Output_Correlated.storageClusterName': 'CEP4',
'ObsSW.Observation.sampleClock': 200,
'ObsSW.Observation.processType': 'Observation',
'ObsSW.Observation.processSubtype': 'Beam Observation',
'ObsSW.Observation.Scheduler.predecessors': '[]',
}
self.test_specification = {
'Version.number': 33774,
'Observation.momID': mom_id,
'Observation.sampleClock': 200,
'Observation.DataProducts.Output_Correlated.enabled': True,
'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': 64,
'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': 1.0,
'Observation.DataProducts.Output_CoherentStokes.enabled': True,
'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband': 4,
'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': 1,
'Observation.DataProducts.Output_IncoherentStokes.enabled': True,
'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.nrChannelsPerSubband': 64,
'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': 1,
}
self.test_cobalt_settings = {
'blockSize': 196608, 'nrBlocks': 1, 'integrationTime': 1.00663296, 'nrSubblocks': 1
}
self.otdb_info = {
'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrBlocksPerIntegration': 1,
'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.blockSize': 196608,
'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': 1.00663296,
'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrIntegrationsPerBlock': 1
}
def setUp(self):
# init
self.mom_id = 351557
self.otdb_id = 1290494
self.trigger_id = 2323
future_start_time = (datetime.datetime.utcnow() + datetime.timedelta(hours = 1)).strftime('%Y-%m-%d %H:%M:%S')
future_stop_time = (datetime.datetime.utcnow() + datetime.timedelta(hours = 2)).strftime('%Y-%m-%d %H:%M:%S')
self.modification_time = datetime.datetime.utcnow()
self.reset_specification_tree(self.otdb_id, self.mom_id, future_start_time, future_stop_time)
otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc')
self.addCleanup(otdbrpc_patcher.stop)
self.otdbrpc_mock = otdbrpc_patcher.start()
self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, 'specification': self.observation_specification_tree}
self.otdbrpc_mock.setOTDBinfo.return_value = {}
momrpc_patcher = mock.patch('lofar.mom.momqueryservice.momqueryrpc')
self.addCleanup(momrpc_patcher.stop)
self.momrpc_mock = momrpc_patcher.start()
self.momrpc_mock.getMoMIdsForOTDBIds.return_value = {self.otdb_id: self.mom_id}
# self.momrpc_mock.get_trigger_id.return_value = {'status': 'OK', 'trigger_id': self.trigger_id}
self.momrpc_mock.get_trigger_time_restrictions.return_value = {"trigger_id": self.trigger_id}
radb_patcher = mock.patch('lofar.sas.resourceassignment.database.radb.RADatabase')
self.addCleanup(radb_patcher.stop)
self.radb_mock = radb_patcher.start()
task = {"id": 1, "mom_id": self.mom_id, "otdb_id": self.otdb_id, "status": "approved",
"type": "observation", "duration": 3600, "cluster": "CEP4"}
self.radb_mock.getTask.return_value = task
self.radb_mock.getResourceGroupNames.return_value = [{'name':'CEP4'}]
self.taskprescheduler = TestingTaskPrescheduler(self.otdbrpc_mock, self.momrpc_mock, self.radb_mock)
def assert_all_services_opened(self):
self.assertTrue(self.otdbrpc_mock.open.called, "OTDBRPC.open was not called")
self.assertTrue(self.momrpc_mock.open.called, "MOMRPC.open was not called")
self.assertTrue(self.radb_mock.connect.called, "radb.connect was not called")
def assert_all_services_closed(self):
self.assertTrue(self.otdbrpc_mock.close.called, "OTDBRPC.close was not called")
self.assertTrue(self.momrpc_mock.close.called, "MOMRPC.close was not called")
self.assertTrue(self.radb_mock.disconnect.called, "radb.disconnect was not called")
@mock.patch("lofar.messaging.messagebus.AbstractMessageHandler.start_handling")
def test_open_opens_all_services(self, mock_super):
self.taskprescheduler.start_handling()
self.assert_all_services_opened()
self.assertTrue(mock_super.called)
@mock.patch("lofar.messaging.messagebus.AbstractMessageHandler.stop_handling")
def test_close_closes_all_services(self, mock_super):
self.taskprescheduler.stop_handling()
self.assert_all_services_closed()
self.assertTrue(mock_super.called)
def test_onObservationApproved_GetSpecification(self):
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
self.otdbrpc_mock.taskGetSpecification.assert_any_call(otdb_id = self.otdb_id)
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
# def test_resourceIndicatorsFromParset(self):
# specification = resourceIndicatorsFromParset(self.observation_specification_tree)
# self.assertEqual(specification, self.test_specification)
def test_CobaltOTDBsettings(self):
otdb_info = cobaltOTDBsettings(self.test_cobalt_settings)
# beware! assertEqual succeeds for a comparison between 42.0 and 42
# but for lofar specs it is essential that some values are int
# so, check specifically for those!
self.assertEqual(otdb_info, self.otdb_info)
self.assertEqual(int, type(otdb_info['LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrBlocksPerIntegration']))
self.assertEqual(int, type(otdb_info['LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrIntegrationsPerBlock']))
self.assertEqual(int, type(otdb_info['LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.blockSize']))
@mock.patch('lofar.sas.resourceassignment.common.specification.logger')
def test_onObservationApproved_log_mom_id_found(self, logger_mock):
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
logger_mock.info.assert_any_call('Found mom_id %s for otdb_id %s', self.mom_id, self.otdb_id)
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
@mock.patch('lofar.sas.resourceassignment.common.specification.logger')
def test_onObservationApproved_log_mom_id_not_found(self, logger_mock):
observation_specification_tree_no_momid = self.observation_specification_tree
observation_specification_tree_no_momid['ObsSW.Observation.momID'] = ''
self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id,
'specification': observation_specification_tree_no_momid}
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
logger_mock.info.assert_any_call('Did not find a mom_id for task otdb_id=%s', self.otdb_id)
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
@mock.patch('lofar.sas.resourceassignment.common.specification.logger')
def test_onObservationApproved_otdb_specification_problem(self, logger_mock):
self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': 0, 'specification': ''}
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
logger_mock.exception.assert_called()
logger_mock.error.assert_called()
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
# TODO not sure how to fix self.radb_mock.updateTaskStatusForOtdbId.assert_any_call(self.otdb_id, 'error')
@mock.patch('lofar.sas.resourceassignment.common.specification.logger')
def test_onObservationApproved_log_trigger_found_0(self, logger_mock):
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
logger_mock.info.assert_any_call('Found a task mom_id=%s with a trigger_id=%s', self.mom_id, self.trigger_id)
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
@mock.patch('lofar.sas.resourceassignment.taskprescheduler.taskprescheduler.logger')
def test_onObservationApproved_log_no_trigger(self, logger_mock):
self.momrpc_mock.get_trigger_time_restrictions.return_value = {"trigger_id": None}
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
logger_mock.info.assert_any_call('Did not find a trigger for task mom_id=%s', self.mom_id)
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
def test_onObservationApproved_no_trigger(self):
self.momrpc_mock.get_trigger_time_restrictions.return_value = {"trigger_id": None}
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
self.otdbrpc_mock.taskSetStatus.assert_not_called()
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
@mock.patch('lofar.sas.resourceassignment.taskprescheduler.taskprescheduler.logger')
def test_onObservationApproved_log_trigger_found_1(self, logger_mock):
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
logger_mock.info.assert_any_call('Setting status (%s) for otdb_id %s', 'prescheduled', self.otdb_id)
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
def test_onObservationApproved_SetSpecification(self):
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
self.otdbrpc_mock.taskSetSpecification.assert_any_call(self.otdb_id, self.otdb_info)
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
def test_onObservationApproved_pipeline_SetSpecification(self):
self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id,
'specification': self.pipeline_specification_tree}
task = {"id": 1, "mom_id": self.mom_id, "otdb_id": self.otdb_id, "status": "approved",
"type": "pipeline", "duration": 3600, "cluster": "CEP4"}
self.radb_mock.getTask.return_value = task
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
self.otdbrpc_mock.taskSetSpecification.assert_called()
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
def test_onObservationApproved_taskSetStatus(self):
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled')
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
def test_setOTDBinfo_taskSetStatus(self):
self.taskprescheduler.setOTDBinfo(self.otdb_id, self.otdb_info, 'prescheduled')
self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled')
def test_setOTDBinfo_otdb_problem(self):
self.otdbrpc_mock.taskSetSpecification.side_effect = Exception()
self.taskprescheduler.setOTDBinfo(self.otdb_id, self.otdb_info, 'prescheduled')
self.radb_mock.updateTaskStatusForOtdbId.assert_any_call(self.otdb_id, 'error')
def test_onObservationApproved_pipeline_taskSetStatus(self):
self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id,
'specification': self.pipeline_specification_tree}
task = {"id": 1, "mom_id": self.mom_id, "otdb_id": self.otdb_id, "status": "approved",
"type": "pipeline", "duration": 3600, "cluster": "CEP4"}
self.radb_mock.getTask.return_value = task
self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time)
self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled')
self.radb_mock.insertOrUpdateSpecificationAndTask.assert_not_called()
def test_calculateCobaltSettings(self):
spec = Specification(self.otdbrpc_mock, self.momrpc_mock, self.radb_mock)
spec.internal_dict = self.test_specification
cobalt_settings = calculateCobaltSettings(spec)
# beware! assertEqual succeeds for a comparison between 42.0 and 42
# but for lofar specs it is essential that some values are int
# so, check specifically for those!
self.assertEqual(cobalt_settings, self.test_cobalt_settings)
self.assertEqual(int, type(cobalt_settings['blockSize']))
self.assertEqual(int, type(cobalt_settings['nrBlocks']))
self.assertEqual(int, type(cobalt_settings['nrSubblocks']))
def test_calculateCobaltSettingsAndConvertToOTDBsettings(self):
'''combination of test_CobaltOTDBsettings and test_calculateCobaltSettings
Make sure that the values are calculated and converted correctly'''
spec = Specification(self.otdbrpc_mock, self.momrpc_mock, self.radb_mock)
spec.internal_dict = self.test_specification
otdb_info = cobaltOTDBsettings(calculateCobaltSettings(spec))
# beware! assertEqual succeeds for a comparison between 42.0 and 42
# but for lofar specs it is essential that some values are int
# so, check specifically for those!
self.assertEqual(otdb_info, self.otdb_info)
self.assertEqual(int, type(otdb_info['LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrBlocksPerIntegration']))
self.assertEqual(int, type(otdb_info['LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrIntegrationsPerBlock']))
self.assertEqual(int, type(otdb_info['LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.blockSize']))
@mock.patch("lofar.common.util.waitForInterrupt")
@mock.patch("lofar.messaging.messagebus.BusListener.start_listening")
def test_main(self, mock_wait, mock_otdbbuslistener):
PreschedulerMain()
mock_wait.assert_called()
if __name__ == "__main__":
unittest.main()