Skip to content
Snippets Groups Projects
Commit d89ca190 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-1793-poll-asynchronously' into 'master'

L2SS-1793: Poll asynchronously

Closes L2SS-1793

See merge request !882
parents 7e4b239b 9ffbb51f
Branches
Tags v0.31.0 v0.31.0-1
1 merge request!882L2SS-1793: Poll asynchronously
Showing with 282 additions and 45 deletions
...@@ -166,6 +166,7 @@ Next change the version in the following places: ...@@ -166,6 +166,7 @@ Next change the version in the following places:
# Release Notes # Release Notes
* 0.31.0 Poll attributes independently from Tango
* 0.30.5 Log and count event subscription errors * 0.30.5 Log and count event subscription errors
* 0.30.4 Fix Tango attribute parameter types * 0.30.4 Fix Tango attribute parameter types
* 0.30.3 Configure FPGA_beamlet_output_nof_destinations in SDP before enabling FPGA_processing * 0.30.3 Configure FPGA_beamlet_output_nof_destinations in SDP before enabling FPGA_processing
......
0.30.5 0.31.0
...@@ -7,6 +7,8 @@ import time ...@@ -7,6 +7,8 @@ import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from threading import Thread from threading import Thread
from tangostationcontrol.common.asyncio import EventLoopThread
logger = logging.getLogger() logger = logging.getLogger()
...@@ -164,30 +166,11 @@ class AsyncCommClient(object): ...@@ -164,30 +166,11 @@ class AsyncCommClient(object):
# #
# All co-routines need to be called through this event loop, # All co-routines need to be called through this event loop,
# for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop). # for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop).
self.event_loop_thread = EventLoopThread(f"AsyncCommClient {self.name()}")
def run_loop(loop: asyncio.AbstractEventLoop) -> None: self.event_loop = self.event_loop_thread.event_loop
asyncio.set_event_loop(loop)
loop.run_forever()
self.event_loop = asyncio.new_event_loop()
self.event_loop_thread = Thread(
target=run_loop,
args=(self.event_loop,),
name=f"AsyncCommClient {self.name()} event loop",
daemon=True,
)
self.event_loop_thread.start()
else: else:
self.event_loop = event_loop
self.event_loop_thread = None self.event_loop_thread = None
self.event_loop = event_loop
def __del__(self):
if self.event_loop_thread is not None:
# signal our event loop thread to stop
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
# reap our event loop thread once it is done processing tasks
self.event_loop_thread.join()
def name(self): def name(self):
"""The name of this CommClient, for use in logs.""" """The name of this CommClient, for use in logs."""
......
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import asyncio
from contextlib import suppress
from concurrent.futures import Future, CancelledError
from threading import Thread
from typing import Callable
class EventLoopThread:
"""A forever running thread that keeps executing the given event_loop."""
def __init__(self, name: str = "anonymous", event_loop=None):
self.name = name
self.event_loop = event_loop or asyncio.new_event_loop()
self.event_loop_thread = Thread(
target=self._run_loop,
args=(self.event_loop,),
name=f"{self.name} event loop",
daemon=True,
)
self.event_loop_thread.start()
def call_soon_threadsafe(self, coro, *args) -> asyncio.Handle:
"""Schedules the coroutine to run (fire & forget). Return a Handle to allow cancelling."""
return self.event_loop.call_soon_threadsafe(coro, *args)
def run_coroutine_threadsafe(self, coro) -> Future:
"""Schedules the coroutine to run and returns a Future for the result."""
return asyncio.run_coroutine_threadsafe(coro, self.event_loop)
def stop(self):
if self.event_loop_thread is not None:
# signal our event loop thread to stop
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
# reap our event loop thread once it is done processing tasks
self.event_loop_thread.join()
self.event_loop_thread = None
def __del__(self):
self.stop()
@staticmethod
def _run_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
# stop any tasks still running
for task in asyncio.all_tasks(loop):
task.cancel()
# properly clean up, see https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.shutdown_asyncgens
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
class PeriodicTask:
"""Provide a periodic call to a coroutine."""
def __init__(self, event_loop, func: Callable[[], None], interval: float = 1.0):
self.event_loop = event_loop
self.func = func
self.interval = interval
self.done = False
self.task = None
future = asyncio.run_coroutine_threadsafe(
self._schedule_call_periodically(), self.event_loop
)
_ = future.result()
async def _schedule_call_periodically(self):
self.task = asyncio.create_task(self._call_periodically())
async def _call_periodically(self):
while not self.done:
await self.func()
# TODO(JDM): Calculate how long to sleep to have the runtime of
# func be subtracted.
await asyncio.sleep(self.interval)
async def join(self):
"""Wait for the periodic task to stop or throw an exception, and reap that result."""
with suppress(CancelledError):
await self.task
def stop(self):
"""Stop gracefully, to avoid cancelling self.func(), breaking their state."""
self.done = True
def cancel(self):
"""Stop non-gracefully."""
self.event_loop.call_soon_threadsafe(self.task.cancel)
self.stop()
...@@ -6,11 +6,6 @@ ...@@ -6,11 +6,6 @@
from tango import GreenMode from tango import GreenMode
from tango.server import Device, command from tango.server import Device, command
from tangostationcontrol.common.device_decorators import (
DurationMetric,
)
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice
__all__ = ["AsyncDevice"] __all__ = ["AsyncDevice"]
...@@ -49,9 +44,6 @@ class AsyncDevice(LOFARDevice): ...@@ -49,9 +44,6 @@ class AsyncDevice(LOFARDevice):
def read_attribute(self, attr_name: str): def read_attribute(self, attr_name: str):
raise NotImplementedError("Use async_read_attribute instead!") raise NotImplementedError("Use async_read_attribute instead!")
@command(polling_period=DEFAULT_POLLING_PERIOD) @command()
@log_exceptions()
@DurationMetric()
async def poll_attributes(self): async def poll_attributes(self):
if self.attribute_poller:
await self.attribute_poller.poll() await self.attribute_poller.poll()
...@@ -33,6 +33,7 @@ from tango.server import attribute, command, Device, device_property ...@@ -33,6 +33,7 @@ from tango.server import attribute, command, Device, device_property
# Additional import # Additional import
from tangostationcontrol import __version__ as version from tangostationcontrol import __version__ as version
from tangostationcontrol.common.asyncio import EventLoopThread, PeriodicTask
from tangostationcontrol.common.device_decorators import ( from tangostationcontrol.common.device_decorators import (
only_in_states, only_in_states,
fault_on_error, fault_on_error,
...@@ -103,11 +104,8 @@ class AttributePoller: ...@@ -103,11 +104,8 @@ class AttributePoller:
async def _read_attribute(self, attr_name: str): async def _read_attribute(self, attr_name: str):
return await self.device.async_read_attribute(attr_name) return await self.device.async_read_attribute(attr_name)
async def poll(self): @DurationMetric()
if not self.device.is_attribute_access_allowed(AttReqType.READ_REQ): async def _poll(self):
# TODO(JDM): Poll attributes based on their individual is_allowed states
return
first_exception = None first_exception = None
for attr_name, attr_data in self._poll_list.items(): for attr_name, attr_data in self._poll_list.items():
...@@ -140,6 +138,13 @@ class AttributePoller: ...@@ -140,6 +138,13 @@ class AttributePoller:
if first_exception: if first_exception:
raise first_exception raise first_exception
async def poll(self):
if not self.device.is_attribute_access_allowed(AttReqType.READ_REQ):
# TODO(JDM): Poll attributes based on their individual is_allowed states
return
return await self._poll()
@device_logging_to_python() @device_logging_to_python()
@device_metrics( @device_metrics(
...@@ -341,18 +346,23 @@ class LOFARDevice(Device): ...@@ -341,18 +346,23 @@ class LOFARDevice(Device):
elif not mask[idx]: elif not mask[idx]:
merge_values[idx] = current_values[idx] merge_values[idx] = current_values[idx]
@command(polling_period=DEFAULT_METRICS_POLLING_PERIOD) @command()
@log_exceptions(suppress=True)
@DurationMetric()
def poll_attributes(self): def poll_attributes(self):
if self.attribute_poller: future = self.event_loop_thread.run_coroutine_threadsafe(
asyncio.run(self.attribute_poller.poll()) self.attribute_poller.poll()
)
_ = future.result()
def __init__(self, cl, name): def __init__(self, cl, name):
# a proxy to ourself. can only be constructed in or after init_device # a proxy to ourself. can only be constructed in or after init_device
# is called, during super().__init__(). # is called, during super().__init__().
self.proxy = None self.proxy = None
self.poll_task = None
self.event_loop_thread = None
# @device_metrics() will register attributes here
# after init_device.
self.attribute_poller = AttributePoller(self) self.attribute_poller = AttributePoller(self)
super().__init__(cl, name) super().__init__(cl, name)
...@@ -409,6 +419,7 @@ class LOFARDevice(Device): ...@@ -409,6 +419,7 @@ class LOFARDevice(Device):
logger.info("Shutting down...") logger.info("Shutting down...")
self._Off() self._Off()
logger.info("Shut down. Good bye.") logger.info("Shut down. Good bye.")
# -------- # --------
...@@ -432,6 +443,10 @@ class LOFARDevice(Device): ...@@ -432,6 +443,10 @@ class LOFARDevice(Device):
self.get_device_properties() self.get_device_properties()
self.properties_changed() self.properties_changed()
# a thread running to schedule work in. This thread is active
# from configure_for_initialise up to and including configure_for_off.
self.event_loop_thread = EventLoopThread(f"LOFARDevice {self.get_name()}")
# Constructing the control hierarchy can not be part of init_device due to # Constructing the control hierarchy can not be part of init_device due to
# use of `.off(); .put_property(); .boot()` pattern in integration tests! # use of `.off(); .put_property(); .boot()` pattern in integration tests!
self.control = ControlHierarchyDevice() self.control = ControlHierarchyDevice()
...@@ -439,6 +454,13 @@ class LOFARDevice(Device): ...@@ -439,6 +454,13 @@ class LOFARDevice(Device):
self.configure_for_initialise() self.configure_for_initialise()
# start polling
self.poll_task = PeriodicTask(
self.event_loop_thread.event_loop,
self.attribute_poller.poll,
DEFAULT_METRICS_POLLING_PERIOD,
)
# WARNING: any values read so far are stale. # WARNING: any values read so far are stale.
# Proxies either need to wait for the next poll round, or # Proxies either need to wait for the next poll round, or
# use proxy.set_source(DevSource.DEV) to avoid the cache # use proxy.set_source(DevSource.DEV) to avoid the cache
...@@ -493,8 +515,19 @@ class LOFARDevice(Device): ...@@ -493,8 +515,19 @@ class LOFARDevice(Device):
self.events.unsubscribe_all() self.events.unsubscribe_all()
# stop polling (gracefully)
if self.poll_task:
self.poll_task.stop()
self.configure_for_off() self.configure_for_off()
# stop event thread
try:
if self.event_loop_thread:
self.event_loop_thread.stop()
finally:
self.event_loop_thread = None
# Turn off again, in case of race conditions through reconnecting # Turn off again, in case of race conditions through reconnecting
self.set_state(DevState.OFF) self.set_state(DevState.OFF)
self.set_status("Device is in the OFF state.") self.set_status("Device is in the OFF state.")
...@@ -633,6 +666,7 @@ class LOFARDevice(Device): ...@@ -633,6 +666,7 @@ class LOFARDevice(Device):
return {k: v[2] for k, v in self.device_property_list.items()} return {k: v[2] for k, v in self.device_property_list.items()}
@command() @command()
@only_in_states(DEFAULT_COMMAND_STATES)
@debugit() @debugit()
@log_exceptions() @log_exceptions()
def set_defaults(self): def set_defaults(self):
......
...@@ -173,6 +173,7 @@ class OPCUADevice(LOFARDevice): ...@@ -173,6 +173,7 @@ class OPCUADevice(LOFARDevice):
self.OPC_Time_Out, self.OPC_Time_Out,
self.Fault, self.Fault,
self.opcua_connection_status, self.opcua_connection_status,
event_loop=self.event_loop_thread.event_loop,
device=self, device=self,
) )
self.opcua_connection.node_path_prefix = self.OPC_Node_Path_Prefix self.opcua_connection.node_path_prefix = self.OPC_Node_Path_Prefix
......
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import asyncio
from concurrent.futures import CancelledError
from unittest.mock import MagicMock
from tangostationcontrol.common.asyncio import EventLoopThread, PeriodicTask
from test import base
class TestEventLoopThread(base.TestCase):
def test_clean_construct_teardown(self):
"""Whether the EventLoopThread cleanly constructs and tears down"""
elt = EventLoopThread()
elt.stop()
def test_call_soon_threadsafe(self):
"""Whether the call_soon_threadsafe function indeed executes the given function."""
# a callback that tracks how it was called
callback = MagicMock()
elt = EventLoopThread()
handle = elt.call_soon_threadsafe(callback)
elt.stop()
# check that the callback was indeed called
callback.assert_called()
def test_run_coroutine_threadsafe(self):
"""Whether the run_coroutine_threadsafe function indeed executes the given function."""
async def callback():
# make sure we yield to force asyncio magic
await asyncio.sleep(0.0)
return 42
elt = EventLoopThread()
future = elt.run_coroutine_threadsafe(callback())
self.assertEqual(42, future.result())
elt.stop()
class TestPeriodicTask(base.TestCase):
def test(self):
"""Test whether PeriodicTask calls the given function repeatingly."""
call_counter = [0]
async def callback():
call_counter[0] += 1
if call_counter[0] >= 5:
pt.cancel()
return
elt = EventLoopThread()
pt = PeriodicTask(elt.event_loop, callback, interval=0.01)
# callback cancels the future, so if future is cancelled,
# we know the callback was indeed called often enough.
future = elt.run_coroutine_threadsafe(pt.join())
with self.assertRaises(CancelledError):
_ = future.result()
self.assertEqual(5, call_counter[0])
pt.stop()
elt.stop()
...@@ -171,6 +171,56 @@ class TestLofarDevice(device_base.DeviceTestCase): ...@@ -171,6 +171,56 @@ class TestLofarDevice(device_base.DeviceTestCase):
# call from poll_attributes, but that's what we're testing so it's ok. # call from poll_attributes, but that's what we're testing so it's ok.
self.assertGreaterEqual(proxy.A_read_counter, 1) self.assertGreaterEqual(proxy.A_read_counter, 1)
def test_attributes_polled(self):
"""Test whether attributes are actually polled."""
if issubclass(self.test_device, AsyncDevice):
# AsyncDevices replace init_device (etc) with an async version,
# and has its own tests.
return
class MyLofarDevice(self.test_device):
def init_device(self):
super().init_device()
self._A_read_counter = 0
self.attribute_poller.register("A")
@attribute(dtype=float)
def A(self):
self._A_read_counter += 1
return 42.0
@attribute(dtype=int)
def A_read_counter(self):
return self._A_read_counter
# The first (and possibly only) call to poll()
# is done when the device is still in the INIT state, because
# that is when polling is set up.
#
# So we make sure polling is allowed then already.
def is_attribute_access_allowed(self, _):
return True
with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
# make sure we don't get a cached result if the
# poll_attribute was already called by Tango's polling loop.
proxy.set_source(DevSource.DEV)
# turn device ON to allow polling
proxy.initialise()
proxy.on()
# turn device OFF to force polling to be gracefully terminated,
# ensuring at least one poll.
proxy.off()
# check whether A was read. It could have been read by a periodic
# call from poll_attributes, but that's what we're testing so it's ok.
self.assertGreaterEqual(proxy.A_read_counter, 1)
def test_disable_state_transitions(self): def test_disable_state_transitions(self):
with DeviceTestContext(self.test_device, process=False, timeout=10) as proxy: with DeviceTestContext(self.test_device, process=False, timeout=10) as proxy:
proxy.off() proxy.off()
......
...@@ -25,7 +25,7 @@ class TestStationManagerDevice(device_base.DeviceTestCase): ...@@ -25,7 +25,7 @@ class TestStationManagerDevice(device_base.DeviceTestCase):
self.assertEqual(proxy.state(), DevState.ON) self.assertEqual(proxy.state(), DevState.ON)
async def test_transitions_lock(self): def test_transitions_lock(self):
"""Test whether the lock mechanism ensure only one transition """Test whether the lock mechanism ensure only one transition
at a time is executed""" at a time is executed"""
...@@ -55,7 +55,7 @@ class TestStationManagerDevice(device_base.DeviceTestCase): ...@@ -55,7 +55,7 @@ class TestStationManagerDevice(device_base.DeviceTestCase):
def test_lock_serialization(lock, shared_var): def test_lock_serialization(lock, shared_var):
"""Test whether lock serialization is correctly working""" """Test whether lock serialization is correctly working"""
# Execute concurrent transitions multiple times to test serialization # Execute concurrent transitions multiple times to test serialization
for _ in range(100): for _ in range(10):
shared_var = 0 # reset variable shared_var = 0 # reset variable
asyncio.run(test_concurrent_transitions(lock, shared_var)) asyncio.run(test_concurrent_transitions(lock, shared_var))
self.assertEqual( self.assertEqual(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment