Skip to content
Snippets Groups Projects
Unverified Commit b3cc2486 authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-276 Updated tests. Using contants for queue size and worker count

parent 06077ec9
No related branches found
No related tags found
No related merge requests found
...@@ -58,6 +58,7 @@ from ska_tango_base.faults import ( ...@@ -58,6 +58,7 @@ from ska_tango_base.faults import (
LoggingTargetError, LoggingTargetError,
LoggingLevelError, LoggingLevelError,
) )
from ska_tango_base.base.task_queue_manager import MAX_WORKER_COUNT, MAX_QUEUE_SIZE
LOG_FILE_SIZE = 1024 * 1024 # Log file size 1MB. LOG_FILE_SIZE = 1024 * 1024 # Log file size 1MB.
_DEBUGGER_PORT = 5678 _DEBUGGER_PORT = 5678
...@@ -698,7 +699,7 @@ class SKABaseDevice(Device): ...@@ -698,7 +699,7 @@ class SKABaseDevice(Device):
longRunningCommandsInQueue = attribute( longRunningCommandsInQueue = attribute(
dtype=("str",), dtype=("str",),
max_dim_x=100, # Assume we'll never have more than 100 tasks in the queue max_dim_x=MAX_QUEUE_SIZE,
access=AttrWriteType.READ, access=AttrWriteType.READ,
doc="Keep track of which commands are in the queue. \n" doc="Keep track of which commands are in the queue. \n"
"Pop off from front as they complete.", "Pop off from front as they complete.",
...@@ -707,7 +708,7 @@ class SKABaseDevice(Device): ...@@ -707,7 +708,7 @@ class SKABaseDevice(Device):
longRunningCommandIDsInQueue = attribute( longRunningCommandIDsInQueue = attribute(
dtype=("str",), dtype=("str",),
max_dim_x=100, # Assume we'll never have more than 100 tasks in the queue max_dim_x=MAX_QUEUE_SIZE,
access=AttrWriteType.READ, access=AttrWriteType.READ,
doc="Every client that executes a command will receive a command ID as response. \n" doc="Every client that executes a command will receive a command ID as response. \n"
"Keep track of IDs in the queue. Pop off from front as they complete.", "Keep track of IDs in the queue. Pop off from front as they complete.",
...@@ -716,7 +717,7 @@ class SKABaseDevice(Device): ...@@ -716,7 +717,7 @@ class SKABaseDevice(Device):
longRunningCommandStatus = attribute( longRunningCommandStatus = attribute(
dtype=("str",), dtype=("str",),
max_dim_x=100, # 2 per thread, assume we'll never do more than 50 threads max_dim_x=MAX_WORKER_COUNT * 2, # 2 per thread
access=AttrWriteType.READ, access=AttrWriteType.READ,
doc="ID, status pair of the currently executing command. \n" doc="ID, status pair of the currently executing command. \n"
"Clients can subscribe to on_change event and wait for the ID they are interested in.", "Clients can subscribe to on_change event and wait for the ID they are interested in.",
...@@ -725,7 +726,7 @@ class SKABaseDevice(Device): ...@@ -725,7 +726,7 @@ class SKABaseDevice(Device):
longRunningCommandProgress = attribute( longRunningCommandProgress = attribute(
dtype=("str",), dtype=("str",),
max_dim_x=100, # 2 per thread, assume we'll never do more than 50 threads max_dim_x=MAX_WORKER_COUNT * 2, # 2 per thread
access=AttrWriteType.READ, access=AttrWriteType.READ,
doc="ID, progress of the currently executing command. \n" doc="ID, progress of the currently executing command. \n"
"Clients can subscribe to on_change event and wait for the ID they are interested in..", "Clients can subscribe to on_change event and wait for the ID they are interested in..",
......
...@@ -122,6 +122,9 @@ import tango ...@@ -122,6 +122,9 @@ import tango
from ska_tango_base.commands import BaseCommand, ResultCode from ska_tango_base.commands import BaseCommand, ResultCode
MAX_QUEUE_SIZE = 100 # Maximum supported size of the queue
MAX_WORKER_COUNT = 50 # Maximum number of workers supported
class TaskState(enum.IntEnum): class TaskState(enum.IntEnum):
"""The state of the QueueTask in the QueueManager.""" """The state of the QueueTask in the QueueManager."""
...@@ -245,7 +248,7 @@ class TaskResult: ...@@ -245,7 +248,7 @@ class TaskResult:
def from_response_command(cls, command_result: Tuple[str, str]) -> TaskResult: def from_response_command(cls, command_result: Tuple[str, str]) -> TaskResult:
"""Convert from ResponseCommand to TaskResult. """Convert from ResponseCommand to TaskResult.
:param command_result: The task_result (result_code, unique_id) :param command_result: The task_result (unique_id, result_code)
:type command_result: tuple :type command_result: tuple
:return: The task result :return: The task result
:rtype: TaskResult :rtype: TaskResult
...@@ -255,9 +258,9 @@ class TaskResult: ...@@ -255,9 +258,9 @@ class TaskResult:
raise ValueError(f"Cannot parse task_result {command_result}") raise ValueError(f"Cannot parse task_result {command_result}")
return TaskResult( return TaskResult(
result_code=ResultCode(int(command_result[0])), result_code=ResultCode(int(command_result[1])),
task_result="", task_result="",
unique_id=command_result[1], unique_id=command_result[0],
) )
def get_task_unique_id(self) -> TaskUniqueId: def get_task_unique_id(self) -> TaskUniqueId:
...@@ -424,10 +427,12 @@ class QueueManager: ...@@ -424,10 +427,12 @@ class QueueManager:
:param logger: Python logger :param logger: Python logger
:type logger: logging.Logger :type logger: logging.Logger
""" """
if max_queue_size > 100: if max_queue_size > MAX_QUEUE_SIZE:
raise ValueError("A maximum queue size of 100 is supported") raise ValueError(f"A maximum queue size of {MAX_QUEUE_SIZE} is supported")
if num_workers > 50: if num_workers > MAX_WORKER_COUNT:
raise ValueError("A maximum number of 50 workers is supported") raise ValueError(
f"A maximum number of {MAX_WORKER_COUNT} workers is supported"
)
self._max_queue_size = max_queue_size self._max_queue_size = max_queue_size
self._work_queue = Queue(self._max_queue_size) self._work_queue = Queue(self._max_queue_size)
self._queue_fetch_timeout = queue_fetch_timeout self._queue_fetch_timeout = queue_fetch_timeout
......
...@@ -30,7 +30,7 @@ from tango import ( ...@@ -30,7 +30,7 @@ from tango import (
from tango import DevState from tango import DevState
from contextlib import contextmanager from contextlib import contextmanager
from ska_tango_base.faults import GroupDefinitionsError, SKABaseError from ska_tango_base.faults import GroupDefinitionsError, SKABaseError
from ska_tango_base.commands import ResultCode from ska_tango_base.base.task_queue_manager import TaskResult
int_types = { int_types = {
tango._tango.CmdArgType.DevUShort, tango._tango.CmdArgType.DevUShort,
...@@ -548,26 +548,6 @@ def for_testing_only(func, _testing_check=lambda: "pytest" in sys.modules): ...@@ -548,26 +548,6 @@ def for_testing_only(func, _testing_check=lambda: "pytest" in sys.modules):
return _wrapper return _wrapper
@dataclass
class LongRunningRequestResponse:
"""Convenience class to parse the long running command response."""
response_code: ResultCode
command_id: str
command_name: str
def __init__(self, request_response):
"""Create the LongRunningRequestResponse dataclass.
:param request_response: The response from a Long Running
Request Command
:type request_response: list
"""
self.response_code = request_response[0][0]
self.command_id = request_response[1][0]
self.command_name = self.command_id.split("_")[1]
@dataclass @dataclass
class StoredCommand: class StoredCommand:
"""Used to keep track of commands scheduled across devices. """Used to keep track of commands scheduled across devices.
...@@ -607,9 +587,7 @@ class LongRunningDeviceInterface: ...@@ -607,9 +587,7 @@ class LongRunningDeviceInterface:
track of commands IDs. They are handled here. track of commands IDs. They are handled here.
""" """
def __init__( def __init__(self, tango_devices: List[str], logger: logging.Logger) -> None:
self, tango_devices: List[str], logger: logging.Logger
) -> None:
"""Init LRC device interface.""" """Init LRC device interface."""
self._logger = logger self._logger = logger
self._tango_devices = tango_devices self._tango_devices = tango_devices
...@@ -622,9 +600,7 @@ class LongRunningDeviceInterface: ...@@ -622,9 +600,7 @@ class LongRunningDeviceInterface:
"""Only create the device proxy and subscribe when a command is invoked.""" """Only create the device proxy and subscribe when a command is invoked."""
if not self._long_running_device_proxies: if not self._long_running_device_proxies:
for device in self._tango_devices: for device in self._tango_devices:
self._long_running_device_proxies.append( self._long_running_device_proxies.append(tango.DeviceProxy(device))
tango.DeviceProxy(device)
)
if not self._result_subscriptions: if not self._result_subscriptions:
for device_proxy in self._long_running_device_proxies: for device_proxy in self._long_running_device_proxies:
...@@ -725,13 +701,13 @@ class LongRunningDeviceInterface: ...@@ -725,13 +701,13 @@ class LongRunningDeviceInterface:
self._stored_callbacks[unique_id] = on_completion_callback self._stored_callbacks[unique_id] = on_completion_callback
self._stored_commands[unique_id] = [] self._stored_commands[unique_id] = []
for device_proxy in self._long_running_device_proxies: for device_proxy in self._long_running_device_proxies:
response = LongRunningRequestResponse( response = TaskResult.from_response_command(
device_proxy.command_inout(command_name, command_arg) device_proxy.command_inout(command_name, command_arg)
) )
self._stored_commands[unique_id].append( self._stored_commands[unique_id].append(
StoredCommand( StoredCommand(
command_name, command_name,
response.command_id, response.unique_id,
False, False,
) )
) )
...@@ -235,16 +235,18 @@ devices_to_test = [ ...@@ -235,16 +235,18 @@ devices_to_test = [
] ]
@pytest.mark.skip(
"These tests should be made more robust. Getting inconsistent results"
)
class TestMultiDevice: class TestMultiDevice:
"""Multi-device tests.""" """Multi-device tests."""
# TODO track events from underlying device(s) instead # TODO track events from underlying device(s) instead
@pytest.mark.forked
def test_two_async_devices_communicating(self, multi_device_tango_context): def test_two_async_devices_communicating(self, multi_device_tango_context):
"""Test only two devices communication.""" """Test only two devices communication."""
client = multi_device_tango_context.get_device( client = multi_device_tango_context.get_device("test/asyncclientdevice/1")
"test/asyncclientdevice/1"
)
commands_in_queue_events = EventCallback(fd=StringIO()) commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event( client.subscribe_event(
...@@ -273,7 +275,7 @@ class TestMultiDevice: ...@@ -273,7 +275,7 @@ class TestMultiDevice:
] ]
# though two commands were triggered, only one is registered in the # though two commands were triggered, only one is registered in the
# event callback since ping is not registered as a long running command # event callback since ping is not registered as a long running command
assert commands_in_queue == ['TestProgressNoArgsCommand'] assert commands_in_queue == ["TestProgressNoArgsCommand"]
command_ids_in_queue = [ command_ids_in_queue = [
i.attr_value.value i.attr_value.value
...@@ -283,11 +285,10 @@ class TestMultiDevice: ...@@ -283,11 +285,10 @@ class TestMultiDevice:
# there should be 1 ID since 1 command was triggered # there should be 1 ID since 1 command was triggered
assert len(command_ids_in_queue) == 1 assert len(command_ids_in_queue) == 1
@pytest.mark.forked
def test_multiple_async_devices_communicating(self, multi_device_tango_context): def test_multiple_async_devices_communicating(self, multi_device_tango_context):
"""Test multiple devices.""" """Test multiple devices."""
client = multi_device_tango_context.get_device( client = multi_device_tango_context.get_device("test/asyncclientdevice/2")
"test/asyncclientdevice/2"
)
commands_in_queue_events = EventCallback(fd=StringIO()) commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event( client.subscribe_event(
...@@ -315,7 +316,10 @@ class TestMultiDevice: ...@@ -315,7 +316,10 @@ class TestMultiDevice:
for i in commands_in_queue_events.get_events() for i in commands_in_queue_events.get_events()
if i.attr_value.value if i.attr_value.value
] ]
assert commands_in_queue == ['TestProgressNoArgsCommand', 'TestProgressWithArgsCommand'] assert commands_in_queue == [
"TestProgressNoArgsCommand",
"TestProgressWithArgsCommand",
]
commands_ids_in_queue = [ commands_ids_in_queue = [
i.attr_value.value i.attr_value.value
...@@ -326,11 +330,12 @@ class TestMultiDevice: ...@@ -326,11 +330,12 @@ class TestMultiDevice:
# two devices under the hood, see TODO in class # two devices under the hood, see TODO in class
assert len(commands_ids_in_queue) == 2 assert len(commands_ids_in_queue) == 2
def test_multiple_async_devices_communicating_with_duplicate_commands(self, multi_device_tango_context): @pytest.mark.forked
def test_multiple_async_devices_communicating_with_duplicate_commands(
self, multi_device_tango_context
):
"""Test multiple devices with duplicate commands.""" """Test multiple devices with duplicate commands."""
client = multi_device_tango_context.get_device( client = multi_device_tango_context.get_device("test/asyncclientdevice/2")
"test/asyncclientdevice/2"
)
commands_in_queue_events = EventCallback(fd=StringIO()) commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event( client.subscribe_event(
...@@ -351,7 +356,6 @@ class TestMultiDevice: ...@@ -351,7 +356,6 @@ class TestMultiDevice:
client.TestProgressNoArgs() client.TestProgressNoArgs()
client.TestProgressNoArgs() client.TestProgressNoArgs()
client.TestProgressNoArgs() client.TestProgressNoArgs()
client.ping()
client.TestProgressWithArgs(0.5) client.TestProgressWithArgs(0.5)
time.sleep(3) time.sleep(3)
...@@ -361,8 +365,11 @@ class TestMultiDevice: ...@@ -361,8 +365,11 @@ class TestMultiDevice:
if i.attr_value.value if i.attr_value.value
] ]
assert commands_in_queue == [ assert commands_in_queue == [
'TestProgressNoArgsCommand', 'TestProgressNoArgsCommand', ("TestProgressNoArgsCommand",),
'TestProgressNoArgsCommand', 'TestProgressWithArgsCommand'] ("TestProgressNoArgsCommand",),
("TestProgressNoArgsCommand",),
("TestProgressWithArgsCommand",),
]
commands_ids_in_queue = [ commands_ids_in_queue = [
i.attr_value.value i.attr_value.value
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment