diff --git a/README.md b/README.md index 5277b1313baacdca664a279e7ddd98d5d6ff363d..25e9c4e6143e302ae7a63bc2c771727029d91566 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,7 @@ Next change the version in the following places: # Release Notes +* 0.31.0 Poll attributes independently from Tango * 0.30.5 Log and count event subscription errors * 0.30.4 Fix Tango attribute parameter types * 0.30.3 Configure FPGA_beamlet_output_nof_destinations in SDP before enabling FPGA_processing diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 41e1d505b484dd3123e1c0e56258bc56a33a3682..26bea73e811981fe7a2a09a00c54f23943d309aa 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.30.5 +0.31.0 diff --git a/tangostationcontrol/tangostationcontrol/clients/comms_client.py b/tangostationcontrol/tangostationcontrol/clients/comms_client.py index 246721272f9b650d26c148a9c60fb19475c5892a..2d914df5425290967b7bdce521bdd3e1b95f0d6b 100644 --- a/tangostationcontrol/tangostationcontrol/clients/comms_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/comms_client.py @@ -7,6 +7,8 @@ import time from abc import ABC, abstractmethod from threading import Thread +from tangostationcontrol.common.asyncio import EventLoopThread + logger = logging.getLogger() @@ -164,30 +166,11 @@ class AsyncCommClient(object): # # All co-routines need to be called through this event loop, # for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop). - - def run_loop(loop: asyncio.AbstractEventLoop) -> None: - asyncio.set_event_loop(loop) - loop.run_forever() - - self.event_loop = asyncio.new_event_loop() - self.event_loop_thread = Thread( - target=run_loop, - args=(self.event_loop,), - name=f"AsyncCommClient {self.name()} event loop", - daemon=True, - ) - self.event_loop_thread.start() + self.event_loop_thread = EventLoopThread(f"AsyncCommClient {self.name()}") + self.event_loop = self.event_loop_thread.event_loop else: - self.event_loop = event_loop self.event_loop_thread = None - - def __del__(self): - if self.event_loop_thread is not None: - # signal our event loop thread to stop - self.event_loop.call_soon_threadsafe(self.event_loop.stop) - - # reap our event loop thread once it is done processing tasks - self.event_loop_thread.join() + self.event_loop = event_loop def name(self): """The name of this CommClient, for use in logs.""" diff --git a/tangostationcontrol/tangostationcontrol/common/asyncio.py b/tangostationcontrol/tangostationcontrol/common/asyncio.py new file mode 100644 index 0000000000000000000000000000000000000000..9e8ce6daa09e86a8c4df2cc7ce632265029dc886 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/common/asyncio.py @@ -0,0 +1,104 @@ +# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from contextlib import suppress +from concurrent.futures import Future, CancelledError +from threading import Thread +from typing import Callable + + +class EventLoopThread: + """A forever running thread that keeps executing the given event_loop.""" + + def __init__(self, name: str = "anonymous", event_loop=None): + self.name = name + self.event_loop = event_loop or asyncio.new_event_loop() + + self.event_loop_thread = Thread( + target=self._run_loop, + args=(self.event_loop,), + name=f"{self.name} event loop", + daemon=True, + ) + self.event_loop_thread.start() + + def call_soon_threadsafe(self, coro, *args) -> asyncio.Handle: + """Schedules the coroutine to run (fire & forget). Return a Handle to allow cancelling.""" + return self.event_loop.call_soon_threadsafe(coro, *args) + + def run_coroutine_threadsafe(self, coro) -> Future: + """Schedules the coroutine to run and returns a Future for the result.""" + return asyncio.run_coroutine_threadsafe(coro, self.event_loop) + + def stop(self): + if self.event_loop_thread is not None: + # signal our event loop thread to stop + self.event_loop.call_soon_threadsafe(self.event_loop.stop) + + # reap our event loop thread once it is done processing tasks + self.event_loop_thread.join() + self.event_loop_thread = None + + def __del__(self): + self.stop() + + @staticmethod + def _run_loop(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + + try: + loop.run_forever() + finally: + # stop any tasks still running + for task in asyncio.all_tasks(loop): + task.cancel() + + # properly clean up, see https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.shutdown_asyncgens + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + + +class PeriodicTask: + """Provide a periodic call to a coroutine.""" + + def __init__(self, event_loop, func: Callable[[], None], interval: float = 1.0): + self.event_loop = event_loop + self.func = func + self.interval = interval + + self.done = False + self.task = None + + future = asyncio.run_coroutine_threadsafe( + self._schedule_call_periodically(), self.event_loop + ) + _ = future.result() + + async def _schedule_call_periodically(self): + self.task = asyncio.create_task(self._call_periodically()) + + async def _call_periodically(self): + while not self.done: + await self.func() + + # TODO(JDM): Calculate how long to sleep to have the runtime of + # func be subtracted. + await asyncio.sleep(self.interval) + + async def join(self): + """Wait for the periodic task to stop or throw an exception, and reap that result.""" + + with suppress(CancelledError): + await self.task + + def stop(self): + """Stop gracefully, to avoid cancelling self.func(), breaking their state.""" + + self.done = True + + def cancel(self): + """Stop non-gracefully.""" + + self.event_loop.call_soon_threadsafe(self.task.cancel) + self.stop() diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/async_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/async_device.py index 1685c0951e09be323a58f42e6acb29e3a9feecc5..eb3415558184b7e368f9028ae79a197e29f6624d 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/async_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/async_device.py @@ -6,11 +6,6 @@ from tango import GreenMode from tango.server import Device, command -from tangostationcontrol.common.device_decorators import ( - DurationMetric, -) -from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD -from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice __all__ = ["AsyncDevice"] @@ -49,9 +44,6 @@ class AsyncDevice(LOFARDevice): def read_attribute(self, attr_name: str): raise NotImplementedError("Use async_read_attribute instead!") - @command(polling_period=DEFAULT_POLLING_PERIOD) - @log_exceptions() - @DurationMetric() + @command() async def poll_attributes(self): - if self.attribute_poller: - await self.attribute_poller.poll() + await self.attribute_poller.poll() diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py index 06d27a0e67016417ccf6fce4338cc31418ac3dff..c0e267eef2aa5fb26fcb68940b9d9b861c7b99f8 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py @@ -33,6 +33,7 @@ from tango.server import attribute, command, Device, device_property # Additional import from tangostationcontrol import __version__ as version +from tangostationcontrol.common.asyncio import EventLoopThread, PeriodicTask from tangostationcontrol.common.device_decorators import ( only_in_states, fault_on_error, @@ -103,11 +104,8 @@ class AttributePoller: async def _read_attribute(self, attr_name: str): return await self.device.async_read_attribute(attr_name) - async def poll(self): - if not self.device.is_attribute_access_allowed(AttReqType.READ_REQ): - # TODO(JDM): Poll attributes based on their individual is_allowed states - return - + @DurationMetric() + async def _poll(self): first_exception = None for attr_name, attr_data in self._poll_list.items(): @@ -140,6 +138,13 @@ class AttributePoller: if first_exception: raise first_exception + async def poll(self): + if not self.device.is_attribute_access_allowed(AttReqType.READ_REQ): + # TODO(JDM): Poll attributes based on their individual is_allowed states + return + + return await self._poll() + @device_logging_to_python() @device_metrics( @@ -341,18 +346,23 @@ class LOFARDevice(Device): elif not mask[idx]: merge_values[idx] = current_values[idx] - @command(polling_period=DEFAULT_METRICS_POLLING_PERIOD) - @log_exceptions(suppress=True) - @DurationMetric() + @command() def poll_attributes(self): - if self.attribute_poller: - asyncio.run(self.attribute_poller.poll()) + future = self.event_loop_thread.run_coroutine_threadsafe( + self.attribute_poller.poll() + ) + _ = future.result() def __init__(self, cl, name): # a proxy to ourself. can only be constructed in or after init_device # is called, during super().__init__(). self.proxy = None + self.poll_task = None + self.event_loop_thread = None + + # @device_metrics() will register attributes here + # after init_device. self.attribute_poller = AttributePoller(self) super().__init__(cl, name) @@ -409,6 +419,7 @@ class LOFARDevice(Device): logger.info("Shutting down...") self._Off() + logger.info("Shut down. Good bye.") # -------- @@ -432,6 +443,10 @@ class LOFARDevice(Device): self.get_device_properties() self.properties_changed() + # a thread running to schedule work in. This thread is active + # from configure_for_initialise up to and including configure_for_off. + self.event_loop_thread = EventLoopThread(f"LOFARDevice {self.get_name()}") + # Constructing the control hierarchy can not be part of init_device due to # use of `.off(); .put_property(); .boot()` pattern in integration tests! self.control = ControlHierarchyDevice() @@ -439,6 +454,13 @@ class LOFARDevice(Device): self.configure_for_initialise() + # start polling + self.poll_task = PeriodicTask( + self.event_loop_thread.event_loop, + self.attribute_poller.poll, + DEFAULT_METRICS_POLLING_PERIOD, + ) + # WARNING: any values read so far are stale. # Proxies either need to wait for the next poll round, or # use proxy.set_source(DevSource.DEV) to avoid the cache @@ -493,8 +515,19 @@ class LOFARDevice(Device): self.events.unsubscribe_all() + # stop polling (gracefully) + if self.poll_task: + self.poll_task.stop() + self.configure_for_off() + # stop event thread + try: + if self.event_loop_thread: + self.event_loop_thread.stop() + finally: + self.event_loop_thread = None + # Turn off again, in case of race conditions through reconnecting self.set_state(DevState.OFF) self.set_status("Device is in the OFF state.") @@ -633,6 +666,7 @@ class LOFARDevice(Device): return {k: v[2] for k, v in self.device_property_list.items()} @command() + @only_in_states(DEFAULT_COMMAND_STATES) @debugit() @log_exceptions() def set_defaults(self): diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/opcua_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/opcua_device.py index c51e1b9917ed9e77dbaeeac9fdcdc789dce6e013..30469301a513d710e8270a01df0b2f7a00864e3b 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/opcua_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/opcua_device.py @@ -173,6 +173,7 @@ class OPCUADevice(LOFARDevice): self.OPC_Time_Out, self.Fault, self.opcua_connection_status, + event_loop=self.event_loop_thread.event_loop, device=self, ) self.opcua_connection.node_path_prefix = self.OPC_Node_Path_Prefix diff --git a/tangostationcontrol/test/common/test_asyncio.py b/tangostationcontrol/test/common/test_asyncio.py new file mode 100644 index 0000000000000000000000000000000000000000..2f290d4fb4450ef8f48eadc887807dc43a462811 --- /dev/null +++ b/tangostationcontrol/test/common/test_asyncio.py @@ -0,0 +1,72 @@ +# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from concurrent.futures import CancelledError +from unittest.mock import MagicMock + +from tangostationcontrol.common.asyncio import EventLoopThread, PeriodicTask + +from test import base + + +class TestEventLoopThread(base.TestCase): + def test_clean_construct_teardown(self): + """Whether the EventLoopThread cleanly constructs and tears down""" + + elt = EventLoopThread() + elt.stop() + + def test_call_soon_threadsafe(self): + """Whether the call_soon_threadsafe function indeed executes the given function.""" + + # a callback that tracks how it was called + callback = MagicMock() + + elt = EventLoopThread() + handle = elt.call_soon_threadsafe(callback) + elt.stop() + + # check that the callback was indeed called + callback.assert_called() + + def test_run_coroutine_threadsafe(self): + """Whether the run_coroutine_threadsafe function indeed executes the given function.""" + + async def callback(): + # make sure we yield to force asyncio magic + await asyncio.sleep(0.0) + + return 42 + + elt = EventLoopThread() + future = elt.run_coroutine_threadsafe(callback()) + self.assertEqual(42, future.result()) + elt.stop() + + +class TestPeriodicTask(base.TestCase): + def test(self): + """Test whether PeriodicTask calls the given function repeatingly.""" + + call_counter = [0] + + async def callback(): + call_counter[0] += 1 + if call_counter[0] >= 5: + pt.cancel() + return + + elt = EventLoopThread() + pt = PeriodicTask(elt.event_loop, callback, interval=0.01) + + # callback cancels the future, so if future is cancelled, + # we know the callback was indeed called often enough. + future = elt.run_coroutine_threadsafe(pt.join()) + with self.assertRaises(CancelledError): + _ = future.result() + + self.assertEqual(5, call_counter[0]) + + pt.stop() + elt.stop() diff --git a/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py b/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py index 047c565c3c21bcb1af83f42db5569f4bfa19ba6a..1ed43ce2cf77b8e0207eabf55c24f7b240001a88 100644 --- a/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py +++ b/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py @@ -171,6 +171,56 @@ class TestLofarDevice(device_base.DeviceTestCase): # call from poll_attributes, but that's what we're testing so it's ok. self.assertGreaterEqual(proxy.A_read_counter, 1) + def test_attributes_polled(self): + """Test whether attributes are actually polled.""" + + if issubclass(self.test_device, AsyncDevice): + # AsyncDevices replace init_device (etc) with an async version, + # and has its own tests. + return + + class MyLofarDevice(self.test_device): + def init_device(self): + super().init_device() + + self._A_read_counter = 0 + + self.attribute_poller.register("A") + + @attribute(dtype=float) + def A(self): + self._A_read_counter += 1 + return 42.0 + + @attribute(dtype=int) + def A_read_counter(self): + return self._A_read_counter + + # The first (and possibly only) call to poll() + # is done when the device is still in the INIT state, because + # that is when polling is set up. + # + # So we make sure polling is allowed then already. + def is_attribute_access_allowed(self, _): + return True + + with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy: + # make sure we don't get a cached result if the + # poll_attribute was already called by Tango's polling loop. + proxy.set_source(DevSource.DEV) + + # turn device ON to allow polling + proxy.initialise() + proxy.on() + + # turn device OFF to force polling to be gracefully terminated, + # ensuring at least one poll. + proxy.off() + + # check whether A was read. It could have been read by a periodic + # call from poll_attributes, but that's what we're testing so it's ok. + self.assertGreaterEqual(proxy.A_read_counter, 1) + def test_disable_state_transitions(self): with DeviceTestContext(self.test_device, process=False, timeout=10) as proxy: proxy.off() diff --git a/tangostationcontrol/test/devices/test_station_manager_device.py b/tangostationcontrol/test/devices/test_station_manager_device.py index 6f5f6df88feff3de02f22ce0a41fc415a935ba81..32af9aaf693417dee2774a510a0c51b64ebb9ba8 100644 --- a/tangostationcontrol/test/devices/test_station_manager_device.py +++ b/tangostationcontrol/test/devices/test_station_manager_device.py @@ -25,7 +25,7 @@ class TestStationManagerDevice(device_base.DeviceTestCase): self.assertEqual(proxy.state(), DevState.ON) - async def test_transitions_lock(self): + def test_transitions_lock(self): """Test whether the lock mechanism ensure only one transition at a time is executed""" @@ -55,7 +55,7 @@ class TestStationManagerDevice(device_base.DeviceTestCase): def test_lock_serialization(lock, shared_var): """Test whether lock serialization is correctly working""" # Execute concurrent transitions multiple times to test serialization - for _ in range(100): + for _ in range(10): shared_var = 0 # reset variable asyncio.run(test_concurrent_transitions(lock, shared_var)) self.assertEqual(