-
Thomas Jürges authored
Reminder for myself: Mocking is odd and does not follow common sense. When I want to mock out a function joe that is imported into foo.bar like this: from there.here.willy import joe then I have to mock it like so: @mock.patch("foo.bar.joe") Common sense would tell to mock it like so: @mock.patch("there.here.willy.joe")
Thomas Jürges authoredReminder for myself: Mocking is odd and does not follow common sense. When I want to mock out a function joe that is imported into foo.bar like this: from there.here.willy import joe then I have to mock it like so: @mock.patch("foo.bar.joe") Common sense would tell to mock it like so: @mock.patch("there.here.willy.joe")
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_schedulechecker.py 8.75 KiB
#!/usr/bin/env python
# Copyright (C) 2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import unittest
import mock
import datetime
from lofar.sas.resourceassignment.resourceassigner.schedulechecker import ScheduleChecker
from lofar.parameterset import parameterset
ra_notification_prefix = "ra_notification_prefix"
class TestingScheduleChecker(ScheduleChecker):
def __init__(self, rarpc, momrpc, curpc, otdbrpc):
# super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting
self._radbrpc = rarpc
self._momrpc = momrpc
self._curpc = curpc
self._otdbrpc = otdbrpc
class ScheduleCheckerTest(unittest.TestCase):
def setUp(self):
thread_patcher = mock.patch('threading.Thread')
thread_patcher.start()
self.addCleanup(thread_patcher.stop)
self.rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC')
self.addCleanup(self.rarpc_patcher.stop)
self.rarpc_mock = self.rarpc_patcher.start()
momrpc_patcher = mock.patch('lofar.mom.momqueryservice.momqueryrpc')
self.addCleanup(momrpc_patcher.stop)
self.momrpc_mock = momrpc_patcher.start()
curpc_patcher = mock.patch('lofar.sas.datamanagement.cleanup.rpc')
self.addCleanup(curpc_patcher.stop)
self.curpc_mock = curpc_patcher.start()
otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc.OTDBRPC')
self.addCleanup(otdbrpc_patcher.stop)
self.otdbrpc_mock = otdbrpc_patcher.start()
# Default return values
self.rarpc_mock.getTasks.return_value = [ { 'id': 'id', 'mom_id': '1', 'otdb_id': 'otdb_id', 'status': 'approved', 'type': 'observation', 'specification_id': 'specification_id' } ]
self.momrpc_mock.getObjectDetails.return_value = { 1: { 'object_status': 'approved' } }
self.rarpc_mock.deleteSpecification.return_value = True
self.curpc_mock.getPathForOTDBId.return_value = { 'found': True }
self.curpc_mock.removeTaskData.return_value = { 'deleted': True }
def assert_all_services_opened(self):
self.assertTrue(self.rarpc_mock.open.called, "RARPC.open was not called")
self.assertTrue(self.momrpc_mock.open.called, "MOMRPC.open was not called")
self.assertTrue(self.curpc_mock.open.called, "CURPC.open was not called")
def assert_all_services_closed(self):
self.assertTrue(self.rarpc_mock.close.called, "RARPC.close was not called")
self.assertTrue(self.momrpc_mock.close.called, "MOMRPC.close was not called")
self.assertTrue(self.curpc_mock.close.called, "CURPC.close was not called")
def test_contextManager_opens_and_closes_all_services(self):
with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock, self.otdbrpc_mock):
self.assert_all_services_opened()
self.assert_all_services_closed()
@mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulechecker.movePipelineAfterItsPredecessors')
def test_checkScheduledAndQueuedPipelines(self, movePipeline_mock):
""" Test whether all scheduled/queued pipelines get a move request. """
# Store the getTasks default return value that gets set in the beginning.
old_getTasks_return_value = self.rarpc_mock.getTasks.return_value
# Replace it.
self.rarpc_mock.getTasks.return_value = [ { 'id': 'id', 'status': 'scheduled', 'type': 'pippo', 'starttime': datetime.datetime.utcnow() } ]
# Same for the getTask function.
old_getTask_return_value = self.rarpc_mock.getTask.return_value
self.rarpc_mock.getTask.return_value = { 'id': 'id', 'status': 'scheduled', 'type': 'pipeline', 'starttime': datetime.datetime.utcnow() }
with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock, self.otdbrpc_mock) as schedulechecker:
schedulechecker.checkScheduledAndQueuedPipelines()
self.assertTrue(movePipeline_mock.called, "Pipeline was not moved.")
# Restore the old default values.
self.rarpc_mock.getTasks.return_value = old_getTasks_return_value
self.rarpc_mock.getTask.return_value = old_getTask_return_value
def test_checkRunningPipelines(self):
""" Test whether the end time of running pipelines is extended if they run beyond their end time. """
self.rarpc_mock.getTasks.return_value = [ { 'id': 'id', 'mom_id': '1', 'otdb_id': 'otdb_id', 'status': 'active', 'type': 'pipeline', 'specification_id': 'specification_id', 'endtime': datetime.datetime.utcnow() } ]
with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock, self.otdbrpc_mock) as schedulechecker:
schedulechecker.checkRunningPipelines()
self.assertTrue(self.rarpc_mock.updateTaskAndResourceClaims.called, "Task was not updated.")
self.assertTrue("endtime" in self.rarpc_mock.updateTaskAndResourceClaims.call_args[1], "Task end-time was not updated.")
def test_checkUnRunTasksForMoMOpenedStatus_mom_opened_otdb_approved(self):
""" Test if a MoM task on 'opened' and in OTDB on 'approved' causes the task to be deleted. """
self.momrpc_mock.getObjectDetails.return_value = { 1: { 'object_status': 'opened' } }
with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock, self.otdbrpc_mock) as schedulechecker:
schedulechecker.checkUnRunTasksForMoMOpenedStatus()
self.assertTrue(self.curpc_mock.removeTaskData.called, "Task output was not deleted from disk")
self.assertTrue(self.rarpc_mock.deleteSpecification.called, "Object was not removed from RADB")
def test_checkUnRunTasksForMoMOpenedStatus_mom_approved_otdb_approved(self):
""" Test if a MoM task on 'approved' and in OTDB on 'approved' causes the task NOT to be deleted. """
self.momrpc_mock.getObjectDetails.return_value = { 1: { 'object_status': 'approved' } }
with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock, self.otdbrpc_mock) as schedulechecker:
schedulechecker.checkUnRunTasksForMoMOpenedStatus()
self.assertFalse(self.curpc_mock.removeTaskData.called, "Task output was deleted from disk")
self.assertFalse(self.rarpc_mock.deleteSpecification.called, "Object was removed from RADB")
def test_checkUnRunTasksForMoMOpenedStatus_mom_notexisting_otdb_approved(self):
""" Test if a task is not in MoM, and 'approved' in OTDB. This causes the task to be deleted. """
self.momrpc_mock.getObjectDetails.return_value = {}
with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock, self.otdbrpc_mock) as schedulechecker:
schedulechecker.checkUnRunTasksForMoMOpenedStatus()
self.assertTrue(self.rarpc_mock.deleteSpecification.called, "Object was not removed from RADB")
def test_checkUnRunTasksForMoMOpenedStatus_only_observations_pipelines(self):
""" Test if only observations and pipelines are compared against the MoMDB (and not maintenance or reservation tasks, for example). """
with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock, self.otdbrpc_mock) as schedulechecker:
schedulechecker.checkUnRunTasksForMoMOpenedStatus()
self.assertEquals(self.rarpc_mock.getTasks.call_args[1]["task_type"], ['observation', 'pipeline'], "Only observations and pipelines must be matched against MoMDB")
def test_checkUnRunReservations_does_not_delete_unscheduled_reservations(self):
self.rarpc_mock.getTasks.return_value = [{'otdb_id': 123, 'id': 22, 'specification_id': 44}]
self.otdbrpc_mock.taskGetStatus.return_value = 'unscheduled'
with TestingScheduleChecker(self.rarpc_mock, self.momrpc_mock, self.curpc_mock,
self.otdbrpc_mock) as schedulechecker:
schedulechecker.checkUnRunReservations()
self.rarpc_mock.deleteSpecification.assert_not_called()
if __name__ == '__main__':
unittest.main()