Skip to content
Snippets Groups Projects
Unverified Commit 132cea3e authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-276 Removed callbacks for push_change_event

parent c0f1cec3
Branches
No related tags found
No related merge requests found
...@@ -760,19 +760,6 @@ class SKABaseDevice(Device): ...@@ -760,19 +760,6 @@ class SKABaseDevice(Device):
self.push_change_event("adminMode", admin_mode) self.push_change_event("adminMode", admin_mode)
self.push_archive_event("adminMode", admin_mode) self.push_archive_event("adminMode", admin_mode)
def _push_change_event_callback(
self, attribute_name: str, attribute_value: typing.Any
):
"""Push the change event on an attribute.
:param attribute_name: The attribute name to push a change event.
:type attribute_name: str
:param attribute_name: The attribute value to push.
:type attribute_name: Any
"""
# TODO: Fix this
self.push_change_event(attribute_name, attribute_value)
def _update_state(self, state): def _update_state(self, state):
""" """
Perform Tango operations in response to a change in op state. Perform Tango operations in response to a change in op state.
...@@ -849,9 +836,7 @@ class SKABaseDevice(Device): ...@@ -849,9 +836,7 @@ class SKABaseDevice(Device):
def create_component_manager(self): def create_component_manager(self):
"""Create and return a component manager for this device.""" """Create and return a component manager for this device."""
queue_manager = QueueManager( queue_manager = QueueManager()
on_property_update_callback=self._push_change_event_callback
)
return BaseComponentManager(self.op_state_model, queue_manager) return BaseComponentManager(self.op_state_model, queue_manager)
def register_command_object(self, command_name, command_object): def register_command_object(self, command_name, command_object):
......
...@@ -23,6 +23,8 @@ The basic model is: ...@@ -23,6 +23,8 @@ The basic model is:
the component to change behaviour and/or state; and it *monitors* its the component to change behaviour and/or state; and it *monitors* its
component by keeping track of its state. component by keeping track of its state.
""" """
from typing import Optional
from ska_tango_base.control_model import PowerMode from ska_tango_base.control_model import PowerMode
from ska_tango_base.base.task_queue_manager import QueueManager, QueueTask from ska_tango_base.base.task_queue_manager import QueueManager, QueueTask
...@@ -42,7 +44,13 @@ class BaseComponentManager: ...@@ -42,7 +44,13 @@ class BaseComponentManager:
or on or on
""" """
def __init__(self, op_state_model, queue_manager: QueueManager, *args, **kwargs): def __init__(
self,
op_state_model,
*args,
queue_manager: Optional[QueueManager] = None,
**kwargs
):
""" """
Initialise a new ComponentManager instance. Initialise a new ComponentManager instance.
...@@ -52,7 +60,7 @@ class BaseComponentManager: ...@@ -52,7 +60,7 @@ class BaseComponentManager:
In this case any tasks enqueued to it will block. In this case any tasks enqueued to it will block.
""" """
self.op_state_model = op_state_model self.op_state_model = op_state_model
self.queue_manager = queue_manager self.queue_manager = queue_manager if queue_manager else QueueManager()
def start_communicating(self): def start_communicating(self):
""" """
......
...@@ -236,13 +236,6 @@ class AsyncBaseDevice(BaseTestDevice): ...@@ -236,13 +236,6 @@ class AsyncBaseDevice(BaseTestDevice):
def create_component_manager(self): def create_component_manager(self):
"""Create the component manager with a queue manager that has workers.""" """Create the component manager with a queue manager that has workers."""
queue_manager = QueueManager( queue_manager = QueueManager(
max_queue_size=10, max_queue_size=10, num_workers=3, logger=self.logger
num_workers=3,
logger=self.logger,
on_property_update_callback=self._push_change_event_callback,
)
return BaseComponentManager(
op_state_model=None,
queue_manager=queue_manager,
push_change_event_callback=self._push_change_event_callback,
) )
return BaseComponentManager(op_state_model=None, queue_manager=queue_manager)
...@@ -455,7 +455,6 @@ class QueueManager: ...@@ -455,7 +455,6 @@ class QueueManager:
max_queue_size: int = 0, max_queue_size: int = 0,
queue_fetch_timeout: float = 0.1, queue_fetch_timeout: float = 0.1,
num_workers: int = 0, num_workers: int = 0,
on_property_update_callback: Optional[Callable] = None,
logger: Optional[logging.Logger] = None, logger: Optional[logging.Logger] = None,
): ):
"""Init QueryManager. """Init QueryManager.
...@@ -475,7 +474,6 @@ class QueueManager: ...@@ -475,7 +474,6 @@ class QueueManager:
self._max_queue_size = max_queue_size self._max_queue_size = max_queue_size
self._work_queue = Queue(self._max_queue_size) self._work_queue = Queue(self._max_queue_size)
self._queue_fetch_timeout = queue_fetch_timeout self._queue_fetch_timeout = queue_fetch_timeout
self._on_property_update_callback = on_property_update_callback
self.stopping_event = threading.Event() self.stopping_event = threading.Event()
self.aborting_event = threading.Event() self.aborting_event = threading.Event()
self._property_update_lock = threading.Lock() self._property_update_lock = threading.Lock()
...@@ -657,8 +655,7 @@ class QueueManager: ...@@ -657,8 +655,7 @@ class QueueManager:
:type property_name: Any :type property_name: Any
""" """
# with self._property_update_lock: # with self._property_update_lock:
if self._on_property_update_callback: pass
self._on_property_update_callback(property_name, property_value)
def abort_tasks(self): def abort_tasks(self):
"""Start aborting tasks.""" """Start aborting tasks."""
......
"""Tests for the reference base device that uses queue manager.""" """Tests for the reference base device that uses queue manager."""
from io import StringIO
import time
from unittest import mock
import pytest import pytest
from tango.test_context import DeviceTestContext from tango.test_context import DeviceTestContext
from tango.utils import EventCallback
from tango import EventType
from ska_tango_base.base.reference_base_device import ( from ska_tango_base.base.reference_base_device import (
BlockingBaseDevice, BlockingBaseDevice,
AsyncBaseDevice, AsyncBaseDevice,
...@@ -78,117 +73,3 @@ class TestCommands: ...@@ -78,117 +73,3 @@ class TestCommands:
"An error occurred Traceback (most recent call last)" "An error occurred Traceback (most recent call last)"
in result.task_result in result.task_result
) )
def test_callbacks():
"""Check that the callback is firing that sends the push change event."""
with mock.patch.object(AsyncBaseDevice, "_push_change_event_callback") as my_cb:
with DeviceTestContext(AsyncBaseDevice, process=False) as proxy:
# Execute some commands
proxy.TestProgress(0.5)
while not proxy.longRunningCommandResult:
time.sleep(0.1)
assert my_cb.called
called_args = [(_call[0][0], _call[0][1]) for _call in my_cb.call_args_list]
attribute_names = [arg[0] for arg in called_args]
assert attribute_names == [
"longRunningCommandsInQueue",
"longRunningCommandIDsInQueue",
"longRunningCommandsInQueue",
"longRunningCommandIDsInQueue",
"longRunningCommandStatus",
"longRunningCommandProgress",
"longRunningCommandProgress",
"longRunningCommandProgress",
"longRunningCommandProgress",
"longRunningCommandProgress",
"longRunningCommandResult",
]
# longRunningCommandsInQueue
attribute_values = [arg[1] for arg in called_args]
assert len(attribute_values[0]) == 1
assert attribute_values[0] == ["ProgressTask"]
# longRunningCommandIDsInQueue
assert len(attribute_values[1]) == 1
assert attribute_values[1][0].endswith("ProgressTask")
# longRunningCommandsInQueue
assert not attribute_values[2]
# longRunningCommandIDsInQueue
assert not attribute_values[3]
# longRunningCommandStatus
assert len(attribute_values[4]) == 2
assert attribute_values[4][0].endswith("ProgressTask")
assert attribute_values[4][1] == "IN_PROGRESS"
# longRunningCommandProgress
for (index, progress) in zip(range(5, 9), ["1", "25", "50", "74", "100"]):
assert len(attribute_values[index]) == 2
assert attribute_values[index][0].endswith("ProgressTask")
assert attribute_values[index][1] == progress
# longRunningCommandResult
assert len(attribute_values[10]) == 3
tr = TaskResult.from_task_result(attribute_values[10])
tr.unique_id.endswith("ProgressTask")
tr.result_code == ResultCode.OK
tr.task_result == "None"
@pytest.mark.skip(
"Run this test alone and it will pass. Run in this suite and it will Segfault"
)
def test_events():
"""Testing the events.
NOTE: Adding more than 2 event subscriptions leads to inconsistent results.
Sometimes misses events.
Full callback tests (where the push events are triggered) are covered
in `test_callbacks`
"""
with DeviceTestContext(AsyncBaseDevice, process=True) as proxy:
progress_events = EventCallback(fd=StringIO())
ids_in_queue_events = EventCallback(fd=StringIO())
progress_id = proxy.subscribe_event(
"longRunningCommandProgress",
EventType.CHANGE_EVENT,
progress_events,
wait=True,
)
ids_id = proxy.subscribe_event(
"longRunningCommandIDsInQueue",
EventType.CHANGE_EVENT,
ids_in_queue_events,
wait=True,
)
proxy.TestProgress(0.5)
# Wait for task to finish
while not proxy.longRunningCommandResult:
time.sleep(0.1)
progress_event_values = [
event.attr_value.value
for event in progress_events.get_events()
if event.attr_value and event.attr_value.value
]
for index, progress in enumerate(["1", "25", "50", "74", "100"]):
assert progress_event_values[index][1] == progress
ids_in_queue_events_values = [
event.attr_value.value
for event in ids_in_queue_events.get_events()
if event.attr_value and event.attr_value.value
]
assert len(ids_in_queue_events_values) == 1
assert ids_in_queue_events_values[0][0].endswith("ProgressTask")
proxy.unsubscribe_event(progress_id)
proxy.unsubscribe_event(ids_id)
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
import logging import logging
import time import time
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import patch
from ska_tango_base.commands import ResultCode from ska_tango_base.commands import ResultCode
from ska_tango_base.base.task_queue_manager import ( from ska_tango_base.base.task_queue_manager import (
...@@ -273,11 +273,11 @@ class TestQueueManagerTasks: ...@@ -273,11 +273,11 @@ class TestQueueManagerTasks:
"""Test that multiple threads are working. Test that attribute updates fires.""" """Test that multiple threads are working. Test that attribute updates fires."""
num_of_workers = 3 num_of_workers = 3
call_back_func = MagicMock() with patch.object(QueueManager, "_on_property_change") as call_back_func:
qm = QueueManager( qm = QueueManager(
max_queue_size=5, max_queue_size=5,
num_workers=num_of_workers, num_workers=num_of_workers,
on_property_update_callback=call_back_func,
logger=logger, logger=logger,
) )
unique_ids = [] unique_ids = []
...@@ -293,7 +293,9 @@ class TestQueueManagerTasks: ...@@ -293,7 +293,9 @@ class TestQueueManagerTasks:
pass pass
# Wait for last task to finish # Wait for last task to finish
while unique_ids[-1] != TaskResult.from_task_result(qm.task_result).unique_id: while (
unique_ids[-1] != TaskResult.from_task_result(qm.task_result).unique_id
):
pass pass
all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list] all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list]
...@@ -370,11 +372,9 @@ class TestQueueManagerExit: ...@@ -370,11 +372,9 @@ class TestQueueManagerExit:
@pytest.mark.timeout(15) @pytest.mark.timeout(15)
def test_exit_abort(self, abort_task, slow_task): def test_exit_abort(self, abort_task, slow_task):
"""Test aborting exit.""" """Test aborting exit."""
call_back_func = MagicMock()
qm = QueueManager( qm = QueueManager(
max_queue_size=10, max_queue_size=10,
num_workers=2, num_workers=2,
on_property_update_callback=call_back_func,
logger=logger, logger=logger,
) )
cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None) cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None)
...@@ -425,11 +425,9 @@ class TestQueueManagerExit: ...@@ -425,11 +425,9 @@ class TestQueueManagerExit:
@pytest.mark.timeout(20) @pytest.mark.timeout(20)
def test_exit_stop(self, stop_task): def test_exit_stop(self, stop_task):
"""Test stopping exit.""" """Test stopping exit."""
call_back_func = MagicMock()
qm = QueueManager( qm = QueueManager(
max_queue_size=5, max_queue_size=5,
num_workers=2, num_workers=2,
on_property_update_callback=call_back_func,
logger=logger, logger=logger,
) )
cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None) cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None)
...@@ -450,11 +448,9 @@ class TestQueueManagerExit: ...@@ -450,11 +448,9 @@ class TestQueueManagerExit:
@pytest.mark.timeout(5) @pytest.mark.timeout(5)
def test_delete_queue(self, slow_task, stop_task, abort_task): def test_delete_queue(self, slow_task, stop_task, abort_task):
"""Test deleting the queue.""" """Test deleting the queue."""
call_back_func = MagicMock()
qm = QueueManager( qm = QueueManager(
max_queue_size=8, max_queue_size=8,
num_workers=2, num_workers=2,
on_property_update_callback=call_back_func,
logger=logger, logger=logger,
) )
cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None) cm = BaseComponentManager(op_state_model=None, queue_manager=qm, logger=None)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment