diff --git a/src/ska_tango_base/base/base_device.py b/src/ska_tango_base/base/base_device.py index e19c99a778a11ea987e09820e4cba488de044ef6..e367519b1eec9a26c9cc5869a80a705740a91abd 100644 --- a/src/ska_tango_base/base/base_device.py +++ b/src/ska_tango_base/base/base_device.py @@ -737,7 +737,7 @@ class SKABaseDevice(Device): dtype=("str",), max_dim_x=3, # Always the last result (unique_id, result_code, task_result) access=AttrWriteType.READ, - doc="ID, result pair. \n" + doc="unique_id, result_code, task_result. \n" "Clients can subscribe to on_change event and wait for the ID they are interested in.", ) """Device attribute for long running commands.""" @@ -1120,7 +1120,7 @@ class SKABaseDevice(Device): """ Read the long running commands in the queue. - :return: commands in the device queue + :return: tasks in the device queue """ return self.component_manager.queue_manager.tasks_in_queue @@ -1136,9 +1136,9 @@ class SKABaseDevice(Device): def read_longRunningCommandStatus(self): # PROTECTED REGION ID(SKABaseDevice.longRunningCommandStatus_read) ENABLED START # """ - Read the status of the currently executing long running command. + Read the status of the currently executing long running commands. - :return: ID, status pair of the currently executing commands + :return: ID, status pairs of the currently executing commands """ return self.component_manager.queue_manager.task_status diff --git a/src/ska_tango_base/base/task_queue_manager.py b/src/ska_tango_base/base/task_queue_manager.py index 82490ec15bca8dfb52ded4c2a137a9a4c25f73b8..6531eb4a7c95ee3cdda560c91ef3f369ea67fcad 100644 --- a/src/ska_tango_base/base/task_queue_manager.py +++ b/src/ska_tango_base/base/task_queue_manager.py @@ -1,6 +1,9 @@ """ This module provides a QueueManager, TaskResult and QueueTask classes. +* **TaskUniqueId**: is a convenience `dataclass` for parsing and generating the IDs used + to identify the tasks. + * **TaskResult**: is a convenience `dataclass` for parsing and formatting the results of a task. @@ -9,6 +12,12 @@ This module provides a QueueManager, TaskResult and QueueTask classes. * **QueueManager**: that implements the queue and thread worker functionality. +************ +TaskUniqueId +************ + +This is a simple convenience class for generating and parsing the IDs that identify tasks. + ********** TaskResult ********** @@ -224,14 +233,14 @@ class TaskUniqueId: @classmethod def generate_unique_id(cls, task_name: str) -> str: """Return a new unique ID.""" - return f"{uuid4()}_{time.time()}_{task_name}" + return f"{time.time()}_{uuid4().fields[-1]}_{task_name}" @classmethod def from_unique_id(cls, unique_id: str): """Parse a unique ID.""" parts = unique_id.split("_") - id_uuid = parts[0] - id_datetime = datetime.fromtimestamp(float(parts[1])) + id_uuid = parts[1] + id_datetime = datetime.fromtimestamp(float(parts[0])) id_task_name = "_".join(parts[2:]) return TaskUniqueId( id_uuid=id_uuid, id_datetime=id_datetime, id_task_name=id_task_name @@ -503,6 +512,10 @@ class QueueManager: :param logger: Python logger :type logger: logging.Logger """ + if max_queue_size > 100: + raise ValueError("A maximum queue size of 100 is supported") + if num_workers > 50: + raise ValueError("A maximum number of 50 workers is supported") self._max_queue_size = max_queue_size self._work_queue = Queue(self._max_queue_size) self._queue_fetch_timeout = queue_fetch_timeout diff --git a/tests/long_running_tasks/test_reference_base_device.py b/tests/long_running_tasks/test_reference_base_device.py index b9da1ccc3016247e430e06bee6514fa4b119b8f3..5893fe92eb621bb595eb5d2bfa0eaaa8badc07bc 100644 --- a/tests/long_running_tasks/test_reference_base_device.py +++ b/tests/long_running_tasks/test_reference_base_device.py @@ -18,7 +18,6 @@ from ska_tango_base.commands import ResultCode from ska_tango_base.control_model import AdminMode -@pytest.mark.forked class TestCommands: """Check that blocking and async commands behave the same way. @@ -26,6 +25,7 @@ class TestCommands: AsyncBaseDevice - QueueManager has multiple threads, tasks run from queue """ + @pytest.mark.forked @pytest.mark.timeout(5) def test_short_command(self): """Test a simple command.""" @@ -44,6 +44,7 @@ class TestCommands: assert result.result_code == ResultCode.OK assert result.get_task_unique_id().id_task_name == "SimpleTask" + @pytest.mark.forked @pytest.mark.timeout(5) def test_non_aborting_command(self): """Test tasks that does not abort.""" @@ -57,6 +58,7 @@ class TestCommands: assert result.result_code == ResultCode.OK assert result.get_task_unique_id().id_task_name == "NonAbortingTask" + @pytest.mark.forked @pytest.mark.timeout(5) def test_aborting_command(self): """Test Abort. @@ -79,6 +81,7 @@ class TestCommands: assert result.result_code == ResultCode.ABORTED assert "Aborted" in result.task_result + @pytest.mark.forked @pytest.mark.timeout(5) def test_exception_command(self): """Test the task that throws an error.""" @@ -161,6 +164,7 @@ def test_callbacks(): @pytest.mark.forked +@pytest.mark.timeout(10) def test_events(): """Testing the events. @@ -193,6 +197,10 @@ def test_events(): while not proxy.longRunningCommandResult: time.sleep(0.1) + # Wait for events + while not progress_events.get_events(): + time.sleep(0.1) + progress_event_values = [ event.attr_value.value for event in progress_events.get_events() diff --git a/tests/long_running_tasks/test_task_queue_manager.py b/tests/long_running_tasks/test_task_queue_manager.py index 1d4d8790fc7673398f245b6d52e58a707046fe5d..78384d6a380e2da7d011eb50abb283fcdf7d03af 100644 --- a/tests/long_running_tasks/test_task_queue_manager.py +++ b/tests/long_running_tasks/test_task_queue_manager.py @@ -487,14 +487,14 @@ class TestStress: @pytest.mark.timeout(20) def test_stress(self, slow_task): """Stress test the queue mananger.""" - qm = QueueManager(max_queue_size=600, num_workers=100, logger=logger) - assert len(qm._threads) == 100 + qm = QueueManager(max_queue_size=100, num_workers=50, logger=logger) + assert len(qm._threads) == 50 for worker in qm._threads: assert worker.is_alive() for _ in range(500): qm.enqueue_task(slow_task()) - assert qm._work_queue.qsize() > 100 + assert qm._work_queue.qsize() > 90 # Wait for the queue to drain while qm._work_queue.qsize():