diff --git a/src/ska_tango_base/base/base_device.py b/src/ska_tango_base/base/base_device.py index 002bd6617dfc2ef8279176556b62fd42cff0ee21..cd8b719c85cef5abfed3b173d450640e5c8776e5 100644 --- a/src/ska_tango_base/base/base_device.py +++ b/src/ska_tango_base/base/base_device.py @@ -760,19 +760,6 @@ class SKABaseDevice(Device): self.push_change_event("adminMode", admin_mode) self.push_archive_event("adminMode", admin_mode) - def _push_change_event_callback( - self, attribute_name: str, attribute_value: typing.Any - ): - """Push the change event on an attribute. - - :param attribute_name: The attribute name to push a change event. - :type attribute_name: str - :param attribute_name: The attribute value to push. - :type attribute_name: Any - """ - # TODO: Fix this - self.push_change_event(attribute_name, attribute_value) - def _update_state(self, state): """ Perform Tango operations in response to a change in op state. @@ -849,9 +836,7 @@ class SKABaseDevice(Device): def create_component_manager(self): """Create and return a component manager for this device.""" - queue_manager = QueueManager( - on_property_update_callback=self._push_change_event_callback - ) + queue_manager = QueueManager() return BaseComponentManager(self.op_state_model, queue_manager) def register_command_object(self, command_name, command_object): diff --git a/src/ska_tango_base/base/component_manager.py b/src/ska_tango_base/base/component_manager.py index 4a02475cb14e192bf3c15527e663b4affd79e92a..929f06985de05647e86e8bde94623b9142501656 100644 --- a/src/ska_tango_base/base/component_manager.py +++ b/src/ska_tango_base/base/component_manager.py @@ -23,6 +23,8 @@ The basic model is: the component to change behaviour and/or state; and it *monitors* its component by keeping track of its state. """ +from typing import Optional + from ska_tango_base.control_model import PowerMode from ska_tango_base.base.task_queue_manager import QueueManager, QueueTask @@ -42,7 +44,13 @@ class BaseComponentManager: or on """ - def __init__(self, op_state_model, queue_manager: QueueManager, *args, **kwargs): + def __init__( + self, + op_state_model, + *args, + queue_manager: Optional[QueueManager] = None, + **kwargs + ): """ Initialise a new ComponentManager instance. @@ -52,7 +60,7 @@ class BaseComponentManager: In this case any tasks enqueued to it will block. """ self.op_state_model = op_state_model - self.queue_manager = queue_manager + self.queue_manager = queue_manager if queue_manager else QueueManager() def start_communicating(self): """ diff --git a/src/ska_tango_base/base/reference_base_device.py b/src/ska_tango_base/base/reference_base_device.py index d57c36b11fde82ceeafba43efbc9f478da6163b1..79a900c15c48d66464c0826f42b7d2038ef8d94f 100644 --- a/src/ska_tango_base/base/reference_base_device.py +++ b/src/ska_tango_base/base/reference_base_device.py @@ -236,13 +236,6 @@ class AsyncBaseDevice(BaseTestDevice): def create_component_manager(self): """Create the component manager with a queue manager that has workers.""" queue_manager = QueueManager( - max_queue_size=10, - num_workers=3, - logger=self.logger, - on_property_update_callback=self._push_change_event_callback, - ) - return BaseComponentManager( - op_state_model=None, - queue_manager=queue_manager, - push_change_event_callback=self._push_change_event_callback, + max_queue_size=10, num_workers=3, logger=self.logger ) + return BaseComponentManager(op_state_model=None, queue_manager=queue_manager) diff --git a/src/ska_tango_base/base/task_queue_manager.py b/src/ska_tango_base/base/task_queue_manager.py index 96f30222ddad51eece1058b6708ca18e2fd8e544..4c912f5e16556b80d47c136742443f8ae2a6a44f 100644 --- a/src/ska_tango_base/base/task_queue_manager.py +++ b/src/ska_tango_base/base/task_queue_manager.py @@ -455,7 +455,6 @@ class QueueManager: max_queue_size: int = 0, queue_fetch_timeout: float = 0.1, num_workers: int = 0, - on_property_update_callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ): """Init QueryManager. @@ -475,7 +474,6 @@ class QueueManager: self._max_queue_size = max_queue_size self._work_queue = Queue(self._max_queue_size) self._queue_fetch_timeout = queue_fetch_timeout - self._on_property_update_callback = on_property_update_callback self.stopping_event = threading.Event() self.aborting_event = threading.Event() self._property_update_lock = threading.Lock() @@ -657,8 +655,7 @@ class QueueManager: :type property_name: Any """ # with self._property_update_lock: - if self._on_property_update_callback: - self._on_property_update_callback(property_name, property_value) + pass def abort_tasks(self): """Start aborting tasks.""" diff --git a/tests/test_reference_base_device.py b/tests/test_reference_base_device.py index 5b682483206ef715f0518c1f4e63ea4df851e946..70377482bdcf9a78d72643431c17cb248c542ff5 100644 --- a/tests/test_reference_base_device.py +++ b/tests/test_reference_base_device.py @@ -1,13 +1,8 @@ """Tests for the reference base device that uses queue manager.""" -from io import StringIO -import time -from unittest import mock import pytest from tango.test_context import DeviceTestContext -from tango.utils import EventCallback -from tango import EventType from ska_tango_base.base.reference_base_device import ( BlockingBaseDevice, AsyncBaseDevice, @@ -78,117 +73,3 @@ class TestCommands: "An error occurred Traceback (most recent call last)" in result.task_result ) - - -def test_callbacks(): - """Check that the callback is firing that sends the push change event.""" - with mock.patch.object(AsyncBaseDevice, "_push_change_event_callback") as my_cb: - with DeviceTestContext(AsyncBaseDevice, process=False) as proxy: - # Execute some commands - proxy.TestProgress(0.5) - while not proxy.longRunningCommandResult: - time.sleep(0.1) - assert my_cb.called - called_args = [(_call[0][0], _call[0][1]) for _call in my_cb.call_args_list] - - attribute_names = [arg[0] for arg in called_args] - assert attribute_names == [ - "longRunningCommandsInQueue", - "longRunningCommandIDsInQueue", - "longRunningCommandsInQueue", - "longRunningCommandIDsInQueue", - "longRunningCommandStatus", - "longRunningCommandProgress", - "longRunningCommandProgress", - "longRunningCommandProgress", - "longRunningCommandProgress", - "longRunningCommandProgress", - "longRunningCommandResult", - ] - - # longRunningCommandsInQueue - attribute_values = [arg[1] for arg in called_args] - assert len(attribute_values[0]) == 1 - assert attribute_values[0] == ["ProgressTask"] - - # longRunningCommandIDsInQueue - assert len(attribute_values[1]) == 1 - assert attribute_values[1][0].endswith("ProgressTask") - - # longRunningCommandsInQueue - assert not attribute_values[2] - - # longRunningCommandIDsInQueue - assert not attribute_values[3] - - # longRunningCommandStatus - assert len(attribute_values[4]) == 2 - assert attribute_values[4][0].endswith("ProgressTask") - assert attribute_values[4][1] == "IN_PROGRESS" - - # longRunningCommandProgress - for (index, progress) in zip(range(5, 9), ["1", "25", "50", "74", "100"]): - assert len(attribute_values[index]) == 2 - assert attribute_values[index][0].endswith("ProgressTask") - assert attribute_values[index][1] == progress - - # longRunningCommandResult - assert len(attribute_values[10]) == 3 - tr = TaskResult.from_task_result(attribute_values[10]) - tr.unique_id.endswith("ProgressTask") - tr.result_code == ResultCode.OK - tr.task_result == "None" - - -@pytest.mark.skip( - "Run this test alone and it will pass. Run in this suite and it will Segfault" -) -def test_events(): - """Testing the events. - - NOTE: Adding more than 2 event subscriptions leads to inconsistent results. - Sometimes misses events. - - Full callback tests (where the push events are triggered) are covered - in `test_callbacks` - """ - with DeviceTestContext(AsyncBaseDevice, process=True) as proxy: - progress_events = EventCallback(fd=StringIO()) - ids_in_queue_events = EventCallback(fd=StringIO()) - - progress_id = proxy.subscribe_event( - "longRunningCommandProgress", - EventType.CHANGE_EVENT, - progress_events, - wait=True, - ) - ids_id = proxy.subscribe_event( - "longRunningCommandIDsInQueue", - EventType.CHANGE_EVENT, - ids_in_queue_events, - wait=True, - ) - - proxy.TestProgress(0.5) - - # Wait for task to finish - while not proxy.longRunningCommandResult: - time.sleep(0.1) - - progress_event_values = [ - event.attr_value.value - for event in progress_events.get_events() - if event.attr_value and event.attr_value.value - ] - for index, progress in enumerate(["1", "25", "50", "74", "100"]): - assert progress_event_values[index][1] == progress - - ids_in_queue_events_values = [ - event.attr_value.value - for event in ids_in_queue_events.get_events() - if event.attr_value and event.attr_value.value - ] - assert len(ids_in_queue_events_values) == 1 - assert ids_in_queue_events_values[0][0].endswith("ProgressTask") - proxy.unsubscribe_event(progress_id) - proxy.unsubscribe_event(ids_id) diff --git a/tests/test_task_queue_manager.py b/tests/test_task_queue_manager.py index 2c6a751e0febaf4b9ff671f57fc5ba83b87589b0..974e4d2411c4e10746a7d93efc3fbe5f9373cc53 100644 --- a/tests/test_task_queue_manager.py +++ b/tests/test_task_queue_manager.py @@ -2,7 +2,7 @@ import logging import time import pytest -from unittest.mock import MagicMock, patch +from unittest.mock import patch from ska_tango_base.commands import ResultCode from ska_tango_base.base.task_queue_manager import ( @@ -273,64 +273,66 @@ class TestQueueManagerTasks: """Test that multiple threads are working. Test that attribute updates fires.""" num_of_workers = 3 - call_back_func = MagicMock() - qm = QueueManager( - max_queue_size=5, - num_workers=num_of_workers, - on_property_update_callback=call_back_func, - logger=logger, - ) - unique_ids = [] - for _ in range(4): - unique_id = qm.enqueue_task(slow_task()) - unique_ids.append(unique_id) - - # Wait for a item on the queue - while not qm.task_ids_in_queue: - pass - - while not qm.task_result: - pass - - # Wait for last task to finish - while unique_ids[-1] != TaskResult.from_task_result(qm.task_result).unique_id: - pass - - all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list] - tasks_in_queue = [ - a_call[1] - for a_call in all_passed_params - if a_call[0] == "longRunningCommandsInQueue" - ] - task_ids_in_queue = [ - a_call[1] - for a_call in all_passed_params - if a_call[0] == "longRunningCommandIDsInQueue" - ] - task_status = [ - a_call[1] - for a_call in all_passed_params - if a_call[0] == "longRunningCommandStatus" - ] - task_result = [ - a_call[1] - for a_call in all_passed_params - if a_call[0] == "longRunningCommandResult" - ] - task_result_ids = [res[0] for res in task_result] - - check_matching_pattern(tuple(tasks_in_queue)) - check_matching_pattern(tuple(task_ids_in_queue)) - - # Since there's 3 workers there should at least once be 3 in progress - for status in task_status: - if len(status) == 2 * num_of_workers: - break - else: - assert 0, f"Length of {num_of_workers} in task_status not found" - assert len(task_result) == 4 - for unique_id in unique_ids: - assert unique_id in task_result_ids + with patch.object(QueueManager, "_on_property_change") as call_back_func: + + qm = QueueManager( + max_queue_size=5, + num_workers=num_of_workers, + logger=logger, + ) + unique_ids = [] + for _ in range(4): + unique_id = qm.enqueue_task(slow_task()) + unique_ids.append(unique_id) + + # Wait for a item on the queue + while not qm.task_ids_in_queue: + pass + + while not qm.task_result: + pass + + # Wait for last task to finish + while ( + unique_ids[-1] != TaskResult.from_task_result(qm.task_result).unique_id + ): + pass + + all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list] + tasks_in_queue = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "longRunningCommandsInQueue" + ] + task_ids_in_queue = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "longRunningCommandIDsInQueue" + ] + task_status = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "longRunningCommandStatus" + ] + task_result = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "longRunningCommandResult" + ] + task_result_ids = [res[0] for res in task_result] + + check_matching_pattern(tuple(tasks_in_queue)) + check_matching_pattern(tuple(task_ids_in_queue)) + + # Since there's 3 workers there should at least once be 3 in progress + for status in task_status: + if len(status) == 2 * num_of_workers: + break + else: + assert 0, f"Length of {num_of_workers} in task_status not found" + assert len(task_result) == 4 + for unique_id in unique_ids: + assert unique_id in task_result_ids def test_task_get_state_completed(self, simple_task): """Test the QueueTask get state is completed.""" @@ -370,11 +372,9 @@ class TestQueueManagerExit: @pytest.mark.timeout(15) def test_exit_abort(self, abort_task, slow_task): """Test aborting exit.""" - call_back_func = MagicMock() qm = QueueManager( max_queue_size=10, num_workers=2, - on_property_update_callback=call_back_func, logger=logger, ) cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None) @@ -425,11 +425,9 @@ class TestQueueManagerExit: @pytest.mark.timeout(20) def test_exit_stop(self, stop_task): """Test stopping exit.""" - call_back_func = MagicMock() qm = QueueManager( max_queue_size=5, num_workers=2, - on_property_update_callback=call_back_func, logger=logger, ) cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None) @@ -450,11 +448,9 @@ class TestQueueManagerExit: @pytest.mark.timeout(5) def test_delete_queue(self, slow_task, stop_task, abort_task): """Test deleting the queue.""" - call_back_func = MagicMock() qm = QueueManager( max_queue_size=8, num_workers=2, - on_property_update_callback=call_back_func, logger=logger, ) cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None)