Skip to content
Snippets Groups Projects
Select Git revision
  • 9fdd214bffb8102bc17a3b270012de5c45a0fca1
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

FindPythonModule.cmake

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_task_queue_manager.py 16.23 KiB
    """Tests for QueueManager and its component manager."""
    import logging
    import time
    import pytest
    from unittest.mock import patch
    
    from ska_tango_base.commands import ResultCode
    from ska_tango_base.base.task_queue_manager import (
        QueueManager,
        TaskResult,
        TaskState,
    )
    from ska_tango_base.base.reference_component_manager import QueueWorkerComponentManager
    from ska_tango_base.commands import BaseCommand
    
    logger = logging.getLogger(__name__)
    
    
    @pytest.fixture
    def progress_task():
        """Fixture for a test that throws an exception."""
    
        def get_task():
            class ProgressTask(BaseCommand):
                def do(self):
                    for i in range(100):
                        self.update_progress(str(i))
                        time.sleep(0.5)
    
            return ProgressTask(target=None)
    
        return get_task
    
    
    @pytest.fixture
    def exc_task():
        """Fixture for a test that throws an exception."""
    
        def get_task():
            class ExcTask(BaseCommand):
                def do(self):
                    raise Exception("An error occurred")
    
            return ExcTask(target=None)
    
        return get_task
    
    
    @pytest.fixture
    def slow_task():
        """Fixture for a test that takes long."""
    
        def get_task():
            class SlowTask(BaseCommand):
                def do(self):
                    time.sleep(1)
    
            return SlowTask(target=None)
    
        return get_task
    
    
    @pytest.fixture
    def simple_task():
        """Fixture for a very simple task."""
    
        def get_task():
            class SimpleTask(BaseCommand):
                def do(self, argin):
                    return argin + 2
    
            return SimpleTask(2)
    
        return get_task
    
    
    @pytest.fixture
    def abort_task():
        """Fixture for a task that aborts."""
    
        def get_task():
            class AbortTask(BaseCommand):
                def do(self, argin):
                    sleep_time = argin
                    while not self.aborting_event.is_set():
                        time.sleep(sleep_time)
    
            return AbortTask(target=None)
    
        return get_task
    
    
    @pytest.fixture
    def stop_task():
        """Fixture for a task that stops."""
    
        def get_task():
            class StopTask(BaseCommand):
                def do(self):
                    assert not self.stopping_event.is_set()
                    while not self.stopping_event.is_set():
                        time.sleep(0.1)
    
            return StopTask(target=None)
    
        return get_task
    
    
    @pytest.mark.forked
    class TestQueueManager:
        """General QueueManager checks."""
    
        def test_threads_start(self):
            """Test that threads start up. Set stop and exit."""
            qm = QueueManager(max_queue_size=2, num_workers=2, logger=logger)
            assert len(qm._threads) == 2
            for worker in qm._threads:
                assert worker.is_alive()
                assert worker.daemon
    
            for worker in qm._threads:
                worker.stopping_event.set()
    
    
    @pytest.mark.forked
    class TestQueueManagerTasks:
        """QueueManager checks for tasks executed."""
    
        @pytest.mark.timeout(5)
        def test_task_ids(self, simple_task):
            """Check ids."""
            qm = QueueManager(max_queue_size=5, num_workers=2, logger=logger)
            unique_id_one, result_code = qm.enqueue_task(simple_task(), 2)
            unique_id_two, _ = qm.enqueue_task(simple_task(), 2)
            assert unique_id_one.endswith("SimpleTask")
            assert unique_id_one != unique_id_two
            assert result_code == ResultCode.QUEUED
    
        @pytest.mark.timeout(5)
        def test_task_is_executed(self, simple_task):
            """Check that tasks are executed."""
            with patch.object(QueueManager, "result_callback") as my_cb:
                qm = QueueManager(max_queue_size=5, num_workers=2, logger=logger)
                unique_id_one, _ = qm.enqueue_task(simple_task(), 3)
                unique_id_two, _ = qm.enqueue_task(simple_task(), 3)
    
                while my_cb.call_count != 2:
                    time.sleep(0.5)
                result_one = my_cb.call_args_list[0][0][0]
                result_two = my_cb.call_args_list[1][0][0]
    
                assert result_one.unique_id.endswith("SimpleTask")
                assert result_one.unique_id == unique_id_one
                assert result_two.unique_id.endswith("SimpleTask")
                assert result_two.unique_id == unique_id_two
    
                assert result_one.result_code == ResultCode.OK
                assert result_two.result_code == ResultCode.OK
    
                assert result_one.task_result == "5"
                assert result_two.task_result == "5"
    
        @pytest.mark.timeout(5)
        def test_task_result(self, simple_task, exc_task):
            """Check task results are what's expected."""
            qm = QueueManager(max_queue_size=5, num_workers=2, logger=logger)
            add_task_one = simple_task()
            exc_task = exc_task()
    
            qm.enqueue_task(add_task_one, 3)
            while not qm.task_result:
                time.sleep(0.5)
            task_result = TaskResult.from_task_result(qm.task_result)
            assert task_result.unique_id.endswith("SimpleTask")
            assert task_result.result_code == ResultCode.OK
            assert task_result.task_result == "5"
    
            qm.enqueue_task(exc_task)
            while qm.task_result[0].endswith("SimpleTask"):
                time.sleep(0.5)
            task_result = TaskResult.from_task_result(qm.task_result)
            assert task_result.unique_id.endswith("ExcTask")
            assert task_result.result_code == ResultCode.FAILED
            assert task_result.task_result.startswith(
                "Error: An error occurred Traceback ("
            )
    
        @pytest.mark.timeout(10)
        def test_full_queue(self, slow_task):
            """Check full queues rejects new commands."""
            with patch.object(QueueManager, "result_callback") as my_cb:
                qm = QueueManager(max_queue_size=1, num_workers=1, logger=logger)
                for i in range(10):
                    qm.enqueue_task(slow_task())
    
                while len(my_cb.call_args_list) != 10:
                    time.sleep(0.5)
    
                results = [i[0][0].result_code for i in my_cb.call_args_list]
                # 9/10 should be rejected since the first is busy and the queue length is 1
                # Give a buffer of 2 just in case a task finishes up quicker than expected
                assert results[-1] == ResultCode.OK
                for res in results[:-3]:
                    assert res == ResultCode.REJECTED
    
            with patch.object(QueueManager, "result_callback") as my_cb:
                qm = QueueManager(max_queue_size=2, num_workers=2, logger=logger)
                for i in range(10):
                    qm.enqueue_task(slow_task())
    
                while len(my_cb.call_args_list) != 10:
                    time.sleep(0.5)
                results = [i[0][0].result_code for i in my_cb.call_args_list]
                # 8/10 should be rejected since two are taken to be processed.
                # Give a buffer of 2 just in case a task finishes up quicker than expected
                assert results[-1] == ResultCode.OK
                assert results[-2] == ResultCode.OK
                for res in results[:-4]:
                    assert res == ResultCode.REJECTED
    
        @pytest.mark.timeout(5)
        def test_zero_queue(self, simple_task):
            """Check task_result is the same between queue and non queue."""
            expected_name = "SimpleTask"
            expected_result_code = ResultCode.OK
            expected_result = "5"
    
            # No Queue
            qm = QueueManager(max_queue_size=0, num_workers=1, logger=logger)
            assert len(qm._threads) == 0
            res, _ = qm.enqueue_task(simple_task(), 3)
            assert res.endswith(expected_name)
            assert qm.task_result[0].endswith(expected_name)
            assert int(qm.task_result[1]) == expected_result_code
            assert qm.task_result[2] == expected_result
    
            # Queue
            qm = QueueManager(max_queue_size=2, num_workers=1, logger=logger)
            res, _ = qm.enqueue_task(simple_task(), 3)
            assert res.endswith(expected_name)
    
            # Wait for the task to be picked up
            while not qm.task_result:
                time.sleep(0.5)
            assert qm.task_result[0].endswith(expected_name)
            assert int(qm.task_result[1]) == expected_result_code
            assert qm.task_result[2] == expected_result
    
        @pytest.mark.timeout(5)
        def test_multi_jobs(self, slow_task):
            """Test that multiple threads are working. Test that attribute updates fires."""
            num_of_workers = 3
    
            with patch.object(QueueManager, "_on_property_change") as call_back_func:
    
                qm = QueueManager(
                    max_queue_size=5,
                    num_workers=num_of_workers,
                    logger=logger,
                )
                unique_ids = []
                for _ in range(4):
                    unique_id, _ = qm.enqueue_task(slow_task())
                    unique_ids.append(unique_id)
    
                # Wait for a item on the queue
                while not qm.task_ids_in_queue:
                    time.sleep(0.1)
    
                # Wait for the queue to empty
                while not qm.task_status:
                    time.sleep(0.1)
    
                # Wait for all the callbacks to fire
                while len(call_back_func.call_args_list) < 24:
                    time.sleep(0.1)
    
                all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list]
                tasks_in_queue = [
                    a_call[1]
                    for a_call in all_passed_params
                    if a_call[0] == "longRunningCommandsInQueue"
                ]
                task_ids_in_queue = [
                    a_call[1]
                    for a_call in all_passed_params
                    if a_call[0] == "longRunningCommandIDsInQueue"
                ]
                task_status = [
                    a_call[1]
                    for a_call in all_passed_params
                    if a_call[0] == "longRunningCommandStatus"
                ]
                task_result = [
                    a_call[1]
                    for a_call in all_passed_params
                    if a_call[0] == "longRunningCommandResult"
                ]
                task_result_ids = [res[0] for res in task_result]
    
                assert len(tasks_in_queue) == 8
                assert len(task_ids_in_queue) == 8
    
                # Since there's 3 workers there should at least once be 3 in progress
                for status in task_status:
                    if len(status) == 2 * num_of_workers:
                        break
                else:
                    assert 0, f"Length of {num_of_workers} in task_status not found"
                assert len(task_result) == 4
                for unique_id in unique_ids:
                    assert unique_id in task_result_ids
    
        def test_task_get_state_completed(self, simple_task):
            """Test the QueueTask get state is completed."""
            qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger)
            unique_id_one, _ = qm.enqueue_task(simple_task(), 3)
            while not qm.task_result:
                time.sleep(0.1)
            assert qm.get_task_state(unique_id=unique_id_one) == TaskState.COMPLETED
    
        def test_task_get_state_in_queued(self, slow_task):
            """Test the QueueTask get state is queued."""
            qm = QueueManager(max_queue_size=8, num_workers=1, logger=logger)
            qm.enqueue_task(slow_task(), 2)
            qm.enqueue_task(slow_task(), 2)
            unique_id_last, _ = qm.enqueue_task(slow_task())
    
            assert qm.get_task_state(unique_id=unique_id_last) == TaskState.QUEUED
    
        def test_task_get_state_in_progress(self, progress_task):
            """Test the QueueTask get state is in progress."""
            qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger)
            unique_id_one, _ = qm.enqueue_task(progress_task())
            while not qm.task_progress:
                time.sleep(0.1)
    
            assert qm.get_task_state(unique_id=unique_id_one) == TaskState.IN_PROGRESS
    
        def test_task_get_state_in_not_found(self):
            """Test the QueueTask get state not found."""
            qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger)
            assert qm.get_task_state(unique_id="non_existing_id") == TaskState.NOT_FOUND
    
    
    class TestQueueManagerExit:
        """Test the stopping and aborting."""
    
        @pytest.mark.forked
        @pytest.mark.timeout(5)
        def test_exit_abort(self, abort_task, slow_task):
            """Test aborting exit."""
    
            results = []
    
            def catch_updates(name, result):
                if name == "longRunningCommandResult":
                    tr = TaskResult.from_task_result(result)
                    results.append(
                        (
                            tr.unique_id,
                            tr.result_code,
                        )
                    )
    
            cm = QueueWorkerComponentManager(
                op_state_model=None,
                logger=logger,
                max_queue_size=10,
                num_workers=2,
                push_change_event=catch_updates,
            )
    
            cm.enqueue(abort_task(), 0.1)
    
            # Wait for the command to start
            while not cm.task_status:
                time.sleep(0.1)
            # Start aborting
            cm.queue_manager.abort_tasks()
    
            # Wait for the exit
            while not cm.task_result:
                time.sleep(0.1)
            # aborting state should be cleaned up since the queue is empty and
            # nothing is in progress
            while cm.queue_manager.is_aborting:
                time.sleep(0.1)
    
            # When aborting this should be rejected
            # Fill up the workers
            cm.enqueue(slow_task())
            cm.enqueue(slow_task())
    
            assert not cm.queue_manager.is_aborting
            # Abort tasks
            cm.queue_manager.abort_tasks()
    
            assert cm.queue_manager.is_aborting
    
            # Load up some tasks that should be aborted
            cm.enqueue(slow_task())
            cm.enqueue(slow_task())
            unique_id, _ = cm.enqueue(slow_task())
    
            while True:
                if (unique_id, ResultCode.ABORTED) in results:
                    break
                time.sleep(0.1)
    
            # Resume the commands
            cm.queue_manager.resume_tasks()
            assert not cm.queue_manager.is_aborting
    
            # Wait for my slow command to finish
            unique_id, _ = cm.enqueue(slow_task())
    
            while True:
                if (unique_id, ResultCode.OK) in results:
                    break
                time.sleep(0.1)
    
        @pytest.mark.forked
        @pytest.mark.timeout(5)
        def test_exit_stop(self, stop_task):
            """Test stopping exit."""
            cm = QueueWorkerComponentManager(
                op_state_model=None,
                logger=logger,
                max_queue_size=5,
                num_workers=2,
                push_change_event=None,
            )
            cm.enqueue(stop_task())
    
            # Wait for the command to start
            while not cm.task_status:
                time.sleep(0.1)
    
            # Stop all threads
            cm.queue_manager.stop_tasks()
            # Wait for the exit
            while not cm.task_result:
                time.sleep(0.5)
    
            # Wait for all the workers to stop
            while any([worker.is_alive() for worker in cm.queue_manager._threads]):
                time.sleep(0.1)
    
        @pytest.mark.forked
        @pytest.mark.timeout(5)
        def test_delete_queue(self, slow_task, stop_task, abort_task):
            """Test deleting the queue."""
            cm = QueueWorkerComponentManager(
                op_state_model=None,
                logger=logger,
                max_queue_size=8,
                num_workers=2,
                push_change_event=None,
            )
            cm.enqueue(slow_task())
            cm.enqueue(stop_task())
            cm.enqueue(abort_task())
            cm.enqueue(stop_task())
            cm.enqueue(abort_task())
            cm.enqueue(stop_task())
            cm.enqueue(abort_task())
            cm.enqueue(stop_task())
            cm.enqueue(abort_task())
    
            del cm.queue_manager
            del cm
    
    
    @pytest.mark.forked
    class TestComponentManager:
        """Tests for the component manager."""
    
        def test_init(self):
            """Test that we can init the component manager."""
            cm = QueueWorkerComponentManager(
                op_state_model=None,
                logger=logger,
                max_queue_size=0,
                num_workers=1,
                push_change_event=None,
            )
            assert cm.task_ids_in_queue == ()
    
    
    @pytest.mark.forked
    class TestStress:
        """Stress test the queue manager."""
    
        @pytest.mark.timeout(30)
        def test_stress(self, slow_task):
            """Stress test the queue manager."""
            qm = QueueManager(max_queue_size=100, num_workers=50, logger=logger)
            assert len(qm._threads) == 50
            for worker in qm._threads:
                assert worker.is_alive()
            for _ in range(500):
                qm.enqueue_task(slow_task())
    
            assert qm._work_queue.qsize() > 90
    
            # Wait for the queue to drain
            while qm._work_queue.qsize():
                time.sleep(0.1)
            del qm