Skip to content
Snippets Groups Projects
Unverified Commit 83d947db authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-276 Updates from comments. Made a test more robust

parent 114716d9
No related branches found
No related tags found
No related merge requests found
...@@ -737,7 +737,7 @@ class SKABaseDevice(Device): ...@@ -737,7 +737,7 @@ class SKABaseDevice(Device):
dtype=("str",), dtype=("str",),
max_dim_x=3, # Always the last result (unique_id, result_code, task_result) max_dim_x=3, # Always the last result (unique_id, result_code, task_result)
access=AttrWriteType.READ, access=AttrWriteType.READ,
doc="ID, result pair. \n" doc="unique_id, result_code, task_result. \n"
"Clients can subscribe to on_change event and wait for the ID they are interested in.", "Clients can subscribe to on_change event and wait for the ID they are interested in.",
) )
"""Device attribute for long running commands.""" """Device attribute for long running commands."""
...@@ -1120,7 +1120,7 @@ class SKABaseDevice(Device): ...@@ -1120,7 +1120,7 @@ class SKABaseDevice(Device):
""" """
Read the long running commands in the queue. Read the long running commands in the queue.
:return: commands in the device queue :return: tasks in the device queue
""" """
return self.component_manager.queue_manager.tasks_in_queue return self.component_manager.queue_manager.tasks_in_queue
...@@ -1136,9 +1136,9 @@ class SKABaseDevice(Device): ...@@ -1136,9 +1136,9 @@ class SKABaseDevice(Device):
def read_longRunningCommandStatus(self): def read_longRunningCommandStatus(self):
# PROTECTED REGION ID(SKABaseDevice.longRunningCommandStatus_read) ENABLED START # # PROTECTED REGION ID(SKABaseDevice.longRunningCommandStatus_read) ENABLED START #
""" """
Read the status of the currently executing long running command. Read the status of the currently executing long running commands.
:return: ID, status pair of the currently executing commands :return: ID, status pairs of the currently executing commands
""" """
return self.component_manager.queue_manager.task_status return self.component_manager.queue_manager.task_status
......
""" """
This module provides a QueueManager, TaskResult and QueueTask classes. This module provides a QueueManager, TaskResult and QueueTask classes.
* **TaskUniqueId**: is a convenience `dataclass` for parsing and generating the IDs used
to identify the tasks.
* **TaskResult**: is a convenience `dataclass` for parsing and formatting the * **TaskResult**: is a convenience `dataclass` for parsing and formatting the
results of a task. results of a task.
...@@ -9,6 +12,12 @@ This module provides a QueueManager, TaskResult and QueueTask classes. ...@@ -9,6 +12,12 @@ This module provides a QueueManager, TaskResult and QueueTask classes.
* **QueueManager**: that implements the queue and thread worker functionality. * **QueueManager**: that implements the queue and thread worker functionality.
************
TaskUniqueId
************
This is a simple convenience class for generating and parsing the IDs that identify tasks.
********** **********
TaskResult TaskResult
********** **********
...@@ -224,14 +233,14 @@ class TaskUniqueId: ...@@ -224,14 +233,14 @@ class TaskUniqueId:
@classmethod @classmethod
def generate_unique_id(cls, task_name: str) -> str: def generate_unique_id(cls, task_name: str) -> str:
"""Return a new unique ID.""" """Return a new unique ID."""
return f"{uuid4()}_{time.time()}_{task_name}" return f"{time.time()}_{uuid4().fields[-1]}_{task_name}"
@classmethod @classmethod
def from_unique_id(cls, unique_id: str): def from_unique_id(cls, unique_id: str):
"""Parse a unique ID.""" """Parse a unique ID."""
parts = unique_id.split("_") parts = unique_id.split("_")
id_uuid = parts[0] id_uuid = parts[1]
id_datetime = datetime.fromtimestamp(float(parts[1])) id_datetime = datetime.fromtimestamp(float(parts[0]))
id_task_name = "_".join(parts[2:]) id_task_name = "_".join(parts[2:])
return TaskUniqueId( return TaskUniqueId(
id_uuid=id_uuid, id_datetime=id_datetime, id_task_name=id_task_name id_uuid=id_uuid, id_datetime=id_datetime, id_task_name=id_task_name
...@@ -503,6 +512,10 @@ class QueueManager: ...@@ -503,6 +512,10 @@ class QueueManager:
:param logger: Python logger :param logger: Python logger
:type logger: logging.Logger :type logger: logging.Logger
""" """
if max_queue_size > 100:
raise ValueError("A maximum queue size of 100 is supported")
if num_workers > 50:
raise ValueError("A maximum number of 50 workers is supported")
self._max_queue_size = max_queue_size self._max_queue_size = max_queue_size
self._work_queue = Queue(self._max_queue_size) self._work_queue = Queue(self._max_queue_size)
self._queue_fetch_timeout = queue_fetch_timeout self._queue_fetch_timeout = queue_fetch_timeout
......
...@@ -18,7 +18,6 @@ from ska_tango_base.commands import ResultCode ...@@ -18,7 +18,6 @@ from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode from ska_tango_base.control_model import AdminMode
@pytest.mark.forked
class TestCommands: class TestCommands:
"""Check that blocking and async commands behave the same way. """Check that blocking and async commands behave the same way.
...@@ -26,6 +25,7 @@ class TestCommands: ...@@ -26,6 +25,7 @@ class TestCommands:
AsyncBaseDevice - QueueManager has multiple threads, tasks run from queue AsyncBaseDevice - QueueManager has multiple threads, tasks run from queue
""" """
@pytest.mark.forked
@pytest.mark.timeout(5) @pytest.mark.timeout(5)
def test_short_command(self): def test_short_command(self):
"""Test a simple command.""" """Test a simple command."""
...@@ -44,6 +44,7 @@ class TestCommands: ...@@ -44,6 +44,7 @@ class TestCommands:
assert result.result_code == ResultCode.OK assert result.result_code == ResultCode.OK
assert result.get_task_unique_id().id_task_name == "SimpleTask" assert result.get_task_unique_id().id_task_name == "SimpleTask"
@pytest.mark.forked
@pytest.mark.timeout(5) @pytest.mark.timeout(5)
def test_non_aborting_command(self): def test_non_aborting_command(self):
"""Test tasks that does not abort.""" """Test tasks that does not abort."""
...@@ -57,6 +58,7 @@ class TestCommands: ...@@ -57,6 +58,7 @@ class TestCommands:
assert result.result_code == ResultCode.OK assert result.result_code == ResultCode.OK
assert result.get_task_unique_id().id_task_name == "NonAbortingTask" assert result.get_task_unique_id().id_task_name == "NonAbortingTask"
@pytest.mark.forked
@pytest.mark.timeout(5) @pytest.mark.timeout(5)
def test_aborting_command(self): def test_aborting_command(self):
"""Test Abort. """Test Abort.
...@@ -79,6 +81,7 @@ class TestCommands: ...@@ -79,6 +81,7 @@ class TestCommands:
assert result.result_code == ResultCode.ABORTED assert result.result_code == ResultCode.ABORTED
assert "Aborted" in result.task_result assert "Aborted" in result.task_result
@pytest.mark.forked
@pytest.mark.timeout(5) @pytest.mark.timeout(5)
def test_exception_command(self): def test_exception_command(self):
"""Test the task that throws an error.""" """Test the task that throws an error."""
...@@ -161,6 +164,7 @@ def test_callbacks(): ...@@ -161,6 +164,7 @@ def test_callbacks():
@pytest.mark.forked @pytest.mark.forked
@pytest.mark.timeout(10)
def test_events(): def test_events():
"""Testing the events. """Testing the events.
...@@ -193,6 +197,10 @@ def test_events(): ...@@ -193,6 +197,10 @@ def test_events():
while not proxy.longRunningCommandResult: while not proxy.longRunningCommandResult:
time.sleep(0.1) time.sleep(0.1)
# Wait for events
while not progress_events.get_events():
time.sleep(0.1)
progress_event_values = [ progress_event_values = [
event.attr_value.value event.attr_value.value
for event in progress_events.get_events() for event in progress_events.get_events()
......
...@@ -487,14 +487,14 @@ class TestStress: ...@@ -487,14 +487,14 @@ class TestStress:
@pytest.mark.timeout(20) @pytest.mark.timeout(20)
def test_stress(self, slow_task): def test_stress(self, slow_task):
"""Stress test the queue mananger.""" """Stress test the queue mananger."""
qm = QueueManager(max_queue_size=600, num_workers=100, logger=logger) qm = QueueManager(max_queue_size=100, num_workers=50, logger=logger)
assert len(qm._threads) == 100 assert len(qm._threads) == 50
for worker in qm._threads: for worker in qm._threads:
assert worker.is_alive() assert worker.is_alive()
for _ in range(500): for _ in range(500):
qm.enqueue_task(slow_task()) qm.enqueue_task(slow_task())
assert qm._work_queue.qsize() > 100 assert qm._work_queue.qsize() > 90
# Wait for the queue to drain # Wait for the queue to drain
while qm._work_queue.qsize(): while qm._work_queue.qsize():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment