Skip to content
Snippets Groups Projects
Commit 6bf9d87d authored by Jan David Mol's avatar Jan David Mol
Browse files

Improving event subscription interface

parent c31e5c41
Branches
Tags
1 merge request!925Improving event subscription interface
......@@ -161,6 +161,7 @@ Next change the version in the following places:
# Release Notes
* 0.37.2 Improved event-subscription interface, avoid overlap between polling loops.
* 0.37.1 Improved asyncio resource teardown when devices go Off.
* 0.37.0-1 Fix for deploying on DTS Lab
* 0.37.0 Run casacore in separate processes, increasing beam-tracking performance
......
0.37.1
0.37.2
......@@ -54,15 +54,21 @@ class EventLoopThread:
asyncio.set_event_loop(loop)
try:
logger.debug(f"EventLoop: starting")
loop.run_forever()
logger.debug(f"EventLoop: loop stopped gracefully")
finally:
# stop any tasks still running
for task in asyncio.all_tasks(loop):
logger.debug(f"EventLoop: cancelling {task}")
task.cancel()
# properly clean up, see https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.shutdown_asyncgens
logger.debug(f"EventLoop: finishing cancelled tasks")
loop.run_until_complete(loop.shutdown_asyncgens())
logger.debug(f"EventLoop: closing")
loop.close()
logger.debug(f"EventLoop: done")
class PeriodicTask:
......
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from typing import Dict
import numpy
from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString
from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
from tango.server import Device
......@@ -17,7 +17,15 @@ class ChangeEvents:
def __init__(self, device: Device):
self.device = device
self.prev_values: Dict[str, object] = {}
# keep track of which attributes we manage
self.attributes: list[CaseInsensitiveString] = []
# previous values of attributes, to avoid
# emitting CHANGE_EVENTs when nothing changed.
self.prev_values: CaseInsensitiveDict[str, object] = {}
def is_configured(self, attr_name: str) -> bool:
return attr_name in self.attributes
def configure_attribute(self, attr_name: str):
"""Prepares an attribute for emitting custom change events."""
......@@ -33,6 +41,8 @@ class ChangeEvents:
# so detecting it ourselves is easier.
self.device.set_change_event(attr_name, True, False)
self.attributes.append(CaseInsensitiveString(attr_name))
def send_change_event(self, attr_name: str, value: object | None):
"""Emits a CHANGE_EVENT if the attribute has changed value."""
......
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass
from functools import partial
import logging
from typing import List, Tuple, Dict, Callable
from typing import List, Dict, Callable
from tango import DeviceProxy, EventType
from tango import DeviceProxy, EventType, DevFailed
from prometheus_client import Counter
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.lofar_logging import exception_to_str
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_MS
from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString
from tangostationcontrol.metrics import AttributeMetric
logger = logging.getLogger()
EventCallbackType = Callable[[DeviceProxy, str, object], None]
@dataclass(frozen=False)
class Subscription:
proxy: DeviceProxy
attribute_name: CaseInsensitiveString
event_id: int | None
callbacks: List[EventCallbackType]
class EventSubscriptions:
"""Manages a set of subscriptions to Tango events."""
def __init__(self, device_labels: Dict[str, str] | None = None):
# events we're subscribed to
self.subscriptions: List[Tuple[DeviceProxy, int]] = []
self.subscriptions: List[Subscription] = []
# metric to count errors
self.error_count_metric = AttributeMetric(
......@@ -30,25 +43,20 @@ class EventSubscriptions:
dynamic_labels=["event_device", "event_attribute"],
)
def subscribe_change_event(
self,
proxy: DeviceProxy,
attr_name: str,
callback: Callable[[DeviceProxy, str, object], None],
def _get_subscription(self, dev_name: str, attr_name: str) -> bool:
for sub in self.subscriptions:
if (
CaseInsensitiveString(sub.proxy.name()) == dev_name
and sub.attribute_name == attr_name
):
"""Subscribe to changes to an attribute of another device.
Immediately and on change, the provided callback will be called as
return sub
callback(device, attribute_name, value)
return None
Where the attribute_name can be in either the original or lower case.
def is_subscribed(self, dev_name: str, attr_name: str) -> bool:
return self._get_subscription(dev_name, attr_name) is not None
Event errors are logged, counted, and dropped without invoking
the callback.
"""
@log_exceptions(suppress=True)
def event_callback_wrapper(event):
def _event_callback_wrapper(self, sub, event):
device = event.device
# some errors report about the device in general, and thus are not
......@@ -84,27 +92,84 @@ class EventSubscriptions:
)
# Call requested callback
for callback in sub.callbacks:
try:
callback(device, attribute_name, value)
except Exception as ex:
logger.exception(
f"Callback {callback} for device {device} attribute {attribute_name} threw an exception: {exception_to_str(ex)}"
)
def subscribe_change_event(
self,
proxy: DeviceProxy,
attr_name: str,
callback: EventCallbackType,
):
"""Subscribe to changes to an attribute of another device.
Immediately and on change, the provided callback will be called as
callback(device, attribute_name, value)
Where the attribute_name can be in either the original or lower case.
Event errors are logged, counted, and dropped without invoking
the callback.
"""
if sub := self._get_subscription(proxy.name(), attr_name):
# extend existing subscription
logger.debug(
f"Adding callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}"
)
sub.callbacks.append(callback)
# manually trigger first call, mimicking Tango doing so when subscribing
try:
value = proxy.read_attribute(attr_name).value
callback(proxy, attr_name, value)
except Exception as ex:
logger.exception(
f"Callback {callback} for device {proxy} attribute {attr_name} threw an exception"
)
else:
# make new subscription
sub = Subscription(
proxy=proxy,
attribute_name=CaseInsensitiveString(attr_name),
event_id=None,
callbacks=[callback],
)
# make sure the attribute is polled, otherwise we wont receive event
if not proxy.is_attribute_polled_by_lofardevice(attr_name):
# LOFARDevice does not poll this attribute, but maybe Tango does
if not proxy.is_attribute_polled(attr_name):
logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.")
proxy.poll_attribute(attr_name, DEFAULT_POLLING_PERIOD_MS)
# subscribe
event_id = proxy.subscribe_event(
logger.debug(
f"Subscribing callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}"
)
sub.event_id = proxy.subscribe_event(
attr_name,
EventType.CHANGE_EVENT,
event_callback_wrapper,
partial(self._event_callback_wrapper, sub),
stateless=True,
)
self.subscriptions.append((proxy, event_id))
self.subscriptions.append(sub)
def unsubscribe_all(self):
# unsubscribe from all events
try:
for proxy, event_id in self.subscriptions:
proxy.unsubscribe_event(event_id)
for sub in self.subscriptions:
try:
sub.proxy.unsubscribe_event(sub.event_id)
except DevFailed:
logger.error(f"Failed to unsubscribe from event {sub}")
finally:
self.subscriptions = []
......@@ -45,6 +45,7 @@ from tangostationcontrol.common.constants import (
DEFAULT_METRICS_POLLING_PERIOD_MS,
DEFAULT_POLLING_PERIOD_MS,
)
from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
from tangostationcontrol.common.events import EventSubscriptions, ChangeEvents
from tangostationcontrol.common.lofar_logging import (
log_exceptions,
......@@ -102,7 +103,7 @@ class AttributePoller:
self.device = device
# attributes to poll
self._poll_list: Dict[str, Dict[str, object]] = {}
self._poll_list: CaseInsensitiveDict[str, Dict[str, object]] = {}
# change event system
self.change_events = ChangeEvents(self.device)
......@@ -121,6 +122,9 @@ class AttributePoller:
if send_change_events:
self.change_events.configure_attribute(attr_name)
def is_registered(self, attr_name: str) -> bool:
return attr_name in self._poll_list
@DurationMetric()
async def _read_attribute(self, attr_name: str):
return await self.device.async_read_attribute(attr_name)
......@@ -183,8 +187,16 @@ class AttributePoller:
# even if it functions correctly.
for attr_name, attr_data in self._poll_list.items():
# stop polling if we turned to OFF/FAULT during this loop
if not self.polling_allowed():
return
value = await self._read_attribute_nothrow(attr_name)
# stop polling if we turned to OFF/FAULT during this loop
if not self.polling_allowed():
return
# Update Prometheus metric, if any
if attr_data["metric"]:
self._update_metric(attr_data["metric"], value)
......@@ -441,6 +453,10 @@ class LOFARDevice(Device):
)
_ = future.result()
@command(dtype_in=str, dtype_out=bool)
def is_attribute_polled_by_lofardevice(self, attr_name: str) -> bool:
return self.attribute_poller.is_registered(attr_name)
def __init__(self, cl, name):
# a proxy to ourself. can only be constructed in or after init_device
# is called, during super().__init__().
......@@ -601,6 +617,7 @@ class LOFARDevice(Device):
self.set_state(DevState.OFF)
self.set_status("Device is in the OFF state.")
logger.info("Unsubscribing events from other devices")
self.events.unsubscribe_all()
# stop polling (ungracefully, as it may take too long)
......@@ -611,13 +628,17 @@ class LOFARDevice(Device):
self.poll_task.join()
# clear metrics, as they will all be stale
logger.info("Clearing metrics")
self.attribute_poller.clear_all()
logger.info(">>> configure_for_off()")
self.configure_for_off()
logger.info("<<< configure_for_off()")
# stop event thread
try:
if self.event_loop_thread:
logger.info("Stopping EventLoop thread")
self.event_loop_thread.stop()
finally:
self.event_loop_thread = None
......
......@@ -8,7 +8,7 @@ from tangostationcontrol.common.events import (
ChangeEvents,
)
from tango.server import Device, attribute, AttrWriteType
from tango.server import Device, attribute, AttrWriteType, command
from tango.test_context import DeviceTestContext
from test import base
......@@ -58,6 +58,11 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase):
def write_A(self, value):
self._A = value
@command(dtype_in=str, dtype_out=bool)
def is_attribute_polled_by_lofardevice(self, attr_name):
# used by EventSubscriptions. we poll nothing ourselves
return False
def test_initial_event(self):
"""Do we receive the initial event emitted when subscribing?"""
......@@ -71,6 +76,18 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase):
# check call sequence
self.assertListEqual([("A", 0)], self.callback_called_values)
def test_is_subscribed(self):
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
self.assertFalse(self.events.is_subscribed(proxy.name(), "A"))
self.events.subscribe_change_event(proxy, "A", self.callback)
self.assertTrue(self.events.is_subscribed(proxy.name(), "A"))
self.events.unsubscribe_all()
self.assertFalse(self.events.is_subscribed(proxy.name(), "A"))
def test_change_event(self):
"""Do we also receive an event emitted after the value changes (plus the initial)?"""
......@@ -88,6 +105,30 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase):
# check call sequence
self.assertListEqual([("A", 0), ("a", 42)], self.callback_called_values)
def test_multiple_callbacks(self):
"""Can we register multiple callbacks on the same attribute?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# subscribe again (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# change (triggering a change event)
proxy.A = 42
# wait for 4 change events
self.wait_for_callback()
self.wait_for_callback()
self.wait_for_callback()
self.wait_for_callback()
# check call sequence
self.assertListEqual(
[("A", 0), ("A", 0), ("a", 42), ("a", 42)], self.callback_called_values
)
def test_unsubscribe(self):
"""Do we stop receiving if we unsubscribe?"""
......@@ -176,6 +217,11 @@ class TestChangeEvents(EventSubscriptionMixin, base.TestCase):
self._int = value
self.custom_change_events.send_change_event("int_attr", value)
@command(dtype_in=str, dtype_out=bool)
def is_attribute_polled_by_lofardevice(self, attr_name):
# used by EventSubscriptions. we poll everything ourselves
return True
def test_bool_change_event(self):
"""Do we also receive an event emitted after the value changes (plus the initial)?"""
......
......@@ -36,6 +36,29 @@ class TestAttributePoller(IsolatedAsyncioTestCase):
def is_attribute_access_allowed(self, _):
return True
async def test_is_registered(self):
"""Does is_registered() reflect which attributes are registered for polling?"""
class DeviceMock(self.DeviceMockBase):
def __init__(self):
super().__init__()
self.attr_read_counter = 0
async def async_read_attribute(self, _):
self.attr_read_counter += 1
return 42.0
device = DeviceMock()
ap = lofar_device.AttributePoller(device)
self.assertFalse(ap.is_registered("A"))
# register attribute with metric
metric = AttributeMetric("metric", "description", device.metric_labels)
ap.register("A", metric, False)
self.assertTrue(ap.is_registered("A"))
async def test_poll_updates_metrics(self):
"""Does poll() really read the attribute and update the metric?"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment