Skip to content
Snippets Groups Projects
Commit d78b92ec authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'enhance-event-subscription-api' into 'master'

Improving event subscription interface

See merge request !925
parents c31e5c41 6bf9d87d
Branches
Tags
1 merge request!925Improving event subscription interface
...@@ -161,6 +161,7 @@ Next change the version in the following places: ...@@ -161,6 +161,7 @@ Next change the version in the following places:
# Release Notes # Release Notes
* 0.37.2 Improved event-subscription interface, avoid overlap between polling loops.
* 0.37.1 Improved asyncio resource teardown when devices go Off. * 0.37.1 Improved asyncio resource teardown when devices go Off.
* 0.37.0-1 Fix for deploying on DTS Lab * 0.37.0-1 Fix for deploying on DTS Lab
* 0.37.0 Run casacore in separate processes, increasing beam-tracking performance * 0.37.0 Run casacore in separate processes, increasing beam-tracking performance
......
0.37.1 0.37.2
...@@ -54,15 +54,21 @@ class EventLoopThread: ...@@ -54,15 +54,21 @@ class EventLoopThread:
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
logger.debug(f"EventLoop: starting")
loop.run_forever() loop.run_forever()
logger.debug(f"EventLoop: loop stopped gracefully")
finally: finally:
# stop any tasks still running # stop any tasks still running
for task in asyncio.all_tasks(loop): for task in asyncio.all_tasks(loop):
logger.debug(f"EventLoop: cancelling {task}")
task.cancel() task.cancel()
# properly clean up, see https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.shutdown_asyncgens # properly clean up, see https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.shutdown_asyncgens
logger.debug(f"EventLoop: finishing cancelled tasks")
loop.run_until_complete(loop.shutdown_asyncgens()) loop.run_until_complete(loop.shutdown_asyncgens())
logger.debug(f"EventLoop: closing")
loop.close() loop.close()
logger.debug(f"EventLoop: done")
class PeriodicTask: class PeriodicTask:
......
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from typing import Dict
import numpy import numpy
from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString
from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
from tango.server import Device from tango.server import Device
...@@ -17,7 +17,15 @@ class ChangeEvents: ...@@ -17,7 +17,15 @@ class ChangeEvents:
def __init__(self, device: Device): def __init__(self, device: Device):
self.device = device self.device = device
self.prev_values: Dict[str, object] = {} # keep track of which attributes we manage
self.attributes: list[CaseInsensitiveString] = []
# previous values of attributes, to avoid
# emitting CHANGE_EVENTs when nothing changed.
self.prev_values: CaseInsensitiveDict[str, object] = {}
def is_configured(self, attr_name: str) -> bool:
return attr_name in self.attributes
def configure_attribute(self, attr_name: str): def configure_attribute(self, attr_name: str):
"""Prepares an attribute for emitting custom change events.""" """Prepares an attribute for emitting custom change events."""
...@@ -33,6 +41,8 @@ class ChangeEvents: ...@@ -33,6 +41,8 @@ class ChangeEvents:
# so detecting it ourselves is easier. # so detecting it ourselves is easier.
self.device.set_change_event(attr_name, True, False) self.device.set_change_event(attr_name, True, False)
self.attributes.append(CaseInsensitiveString(attr_name))
def send_change_event(self, attr_name: str, value: object | None): def send_change_event(self, attr_name: str, value: object | None):
"""Emits a CHANGE_EVENT if the attribute has changed value.""" """Emits a CHANGE_EVENT if the attribute has changed value."""
......
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass
from functools import partial
import logging import logging
from typing import List, Tuple, Dict, Callable from typing import List, Dict, Callable
from tango import DeviceProxy, EventType from tango import DeviceProxy, EventType, DevFailed
from prometheus_client import Counter from prometheus_client import Counter
from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.common.lofar_logging import exception_to_str
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_MS from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_MS
from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString
from tangostationcontrol.metrics import AttributeMetric from tangostationcontrol.metrics import AttributeMetric
logger = logging.getLogger() logger = logging.getLogger()
EventCallbackType = Callable[[DeviceProxy, str, object], None]
@dataclass(frozen=False)
class Subscription:
proxy: DeviceProxy
attribute_name: CaseInsensitiveString
event_id: int | None
callbacks: List[EventCallbackType]
class EventSubscriptions: class EventSubscriptions:
"""Manages a set of subscriptions to Tango events.""" """Manages a set of subscriptions to Tango events."""
def __init__(self, device_labels: Dict[str, str] | None = None): def __init__(self, device_labels: Dict[str, str] | None = None):
# events we're subscribed to # events we're subscribed to
self.subscriptions: List[Tuple[DeviceProxy, int]] = [] self.subscriptions: List[Subscription] = []
# metric to count errors # metric to count errors
self.error_count_metric = AttributeMetric( self.error_count_metric = AttributeMetric(
...@@ -30,25 +43,20 @@ class EventSubscriptions: ...@@ -30,25 +43,20 @@ class EventSubscriptions:
dynamic_labels=["event_device", "event_attribute"], dynamic_labels=["event_device", "event_attribute"],
) )
def subscribe_change_event( def _get_subscription(self, dev_name: str, attr_name: str) -> bool:
self, for sub in self.subscriptions:
proxy: DeviceProxy, if (
attr_name: str, CaseInsensitiveString(sub.proxy.name()) == dev_name
callback: Callable[[DeviceProxy, str, object], None], and sub.attribute_name == attr_name
): ):
"""Subscribe to changes to an attribute of another device. return sub
Immediately and on change, the provided callback will be called as
callback(device, attribute_name, value) return None
Where the attribute_name can be in either the original or lower case. def is_subscribed(self, dev_name: str, attr_name: str) -> bool:
return self._get_subscription(dev_name, attr_name) is not None
Event errors are logged, counted, and dropped without invoking def _event_callback_wrapper(self, sub, event):
the callback.
"""
@log_exceptions(suppress=True)
def event_callback_wrapper(event):
device = event.device device = event.device
# some errors report about the device in general, and thus are not # some errors report about the device in general, and thus are not
...@@ -84,27 +92,84 @@ class EventSubscriptions: ...@@ -84,27 +92,84 @@ class EventSubscriptions:
) )
# Call requested callback # Call requested callback
for callback in sub.callbacks:
try:
callback(device, attribute_name, value) callback(device, attribute_name, value)
except Exception as ex:
logger.exception(
f"Callback {callback} for device {device} attribute {attribute_name} threw an exception: {exception_to_str(ex)}"
)
def subscribe_change_event(
self,
proxy: DeviceProxy,
attr_name: str,
callback: EventCallbackType,
):
"""Subscribe to changes to an attribute of another device.
Immediately and on change, the provided callback will be called as
callback(device, attribute_name, value)
Where the attribute_name can be in either the original or lower case.
Event errors are logged, counted, and dropped without invoking
the callback.
"""
if sub := self._get_subscription(proxy.name(), attr_name):
# extend existing subscription
logger.debug(
f"Adding callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}"
)
sub.callbacks.append(callback)
# manually trigger first call, mimicking Tango doing so when subscribing
try:
value = proxy.read_attribute(attr_name).value
callback(proxy, attr_name, value)
except Exception as ex:
logger.exception(
f"Callback {callback} for device {proxy} attribute {attr_name} threw an exception"
)
else:
# make new subscription
sub = Subscription(
proxy=proxy,
attribute_name=CaseInsensitiveString(attr_name),
event_id=None,
callbacks=[callback],
)
# make sure the attribute is polled, otherwise we wont receive event # make sure the attribute is polled, otherwise we wont receive event
if not proxy.is_attribute_polled_by_lofardevice(attr_name):
# LOFARDevice does not poll this attribute, but maybe Tango does
if not proxy.is_attribute_polled(attr_name): if not proxy.is_attribute_polled(attr_name):
logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.") logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.")
proxy.poll_attribute(attr_name, DEFAULT_POLLING_PERIOD_MS) proxy.poll_attribute(attr_name, DEFAULT_POLLING_PERIOD_MS)
# subscribe # subscribe
event_id = proxy.subscribe_event( logger.debug(
f"Subscribing callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}"
)
sub.event_id = proxy.subscribe_event(
attr_name, attr_name,
EventType.CHANGE_EVENT, EventType.CHANGE_EVENT,
event_callback_wrapper, partial(self._event_callback_wrapper, sub),
stateless=True, stateless=True,
) )
self.subscriptions.append((proxy, event_id)) self.subscriptions.append(sub)
def unsubscribe_all(self): def unsubscribe_all(self):
# unsubscribe from all events # unsubscribe from all events
try: try:
for proxy, event_id in self.subscriptions: for sub in self.subscriptions:
proxy.unsubscribe_event(event_id) try:
sub.proxy.unsubscribe_event(sub.event_id)
except DevFailed:
logger.error(f"Failed to unsubscribe from event {sub}")
finally: finally:
self.subscriptions = [] self.subscriptions = []
...@@ -45,6 +45,7 @@ from tangostationcontrol.common.constants import ( ...@@ -45,6 +45,7 @@ from tangostationcontrol.common.constants import (
DEFAULT_METRICS_POLLING_PERIOD_MS, DEFAULT_METRICS_POLLING_PERIOD_MS,
DEFAULT_POLLING_PERIOD_MS, DEFAULT_POLLING_PERIOD_MS,
) )
from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
from tangostationcontrol.common.events import EventSubscriptions, ChangeEvents from tangostationcontrol.common.events import EventSubscriptions, ChangeEvents
from tangostationcontrol.common.lofar_logging import ( from tangostationcontrol.common.lofar_logging import (
log_exceptions, log_exceptions,
...@@ -102,7 +103,7 @@ class AttributePoller: ...@@ -102,7 +103,7 @@ class AttributePoller:
self.device = device self.device = device
# attributes to poll # attributes to poll
self._poll_list: Dict[str, Dict[str, object]] = {} self._poll_list: CaseInsensitiveDict[str, Dict[str, object]] = {}
# change event system # change event system
self.change_events = ChangeEvents(self.device) self.change_events = ChangeEvents(self.device)
...@@ -121,6 +122,9 @@ class AttributePoller: ...@@ -121,6 +122,9 @@ class AttributePoller:
if send_change_events: if send_change_events:
self.change_events.configure_attribute(attr_name) self.change_events.configure_attribute(attr_name)
def is_registered(self, attr_name: str) -> bool:
return attr_name in self._poll_list
@DurationMetric() @DurationMetric()
async def _read_attribute(self, attr_name: str): async def _read_attribute(self, attr_name: str):
return await self.device.async_read_attribute(attr_name) return await self.device.async_read_attribute(attr_name)
...@@ -183,8 +187,16 @@ class AttributePoller: ...@@ -183,8 +187,16 @@ class AttributePoller:
# even if it functions correctly. # even if it functions correctly.
for attr_name, attr_data in self._poll_list.items(): for attr_name, attr_data in self._poll_list.items():
# stop polling if we turned to OFF/FAULT during this loop
if not self.polling_allowed():
return
value = await self._read_attribute_nothrow(attr_name) value = await self._read_attribute_nothrow(attr_name)
# stop polling if we turned to OFF/FAULT during this loop
if not self.polling_allowed():
return
# Update Prometheus metric, if any # Update Prometheus metric, if any
if attr_data["metric"]: if attr_data["metric"]:
self._update_metric(attr_data["metric"], value) self._update_metric(attr_data["metric"], value)
...@@ -441,6 +453,10 @@ class LOFARDevice(Device): ...@@ -441,6 +453,10 @@ class LOFARDevice(Device):
) )
_ = future.result() _ = future.result()
@command(dtype_in=str, dtype_out=bool)
def is_attribute_polled_by_lofardevice(self, attr_name: str) -> bool:
return self.attribute_poller.is_registered(attr_name)
def __init__(self, cl, name): def __init__(self, cl, name):
# a proxy to ourself. can only be constructed in or after init_device # a proxy to ourself. can only be constructed in or after init_device
# is called, during super().__init__(). # is called, during super().__init__().
...@@ -601,6 +617,7 @@ class LOFARDevice(Device): ...@@ -601,6 +617,7 @@ class LOFARDevice(Device):
self.set_state(DevState.OFF) self.set_state(DevState.OFF)
self.set_status("Device is in the OFF state.") self.set_status("Device is in the OFF state.")
logger.info("Unsubscribing events from other devices")
self.events.unsubscribe_all() self.events.unsubscribe_all()
# stop polling (ungracefully, as it may take too long) # stop polling (ungracefully, as it may take too long)
...@@ -611,13 +628,17 @@ class LOFARDevice(Device): ...@@ -611,13 +628,17 @@ class LOFARDevice(Device):
self.poll_task.join() self.poll_task.join()
# clear metrics, as they will all be stale # clear metrics, as they will all be stale
logger.info("Clearing metrics")
self.attribute_poller.clear_all() self.attribute_poller.clear_all()
logger.info(">>> configure_for_off()")
self.configure_for_off() self.configure_for_off()
logger.info("<<< configure_for_off()")
# stop event thread # stop event thread
try: try:
if self.event_loop_thread: if self.event_loop_thread:
logger.info("Stopping EventLoop thread")
self.event_loop_thread.stop() self.event_loop_thread.stop()
finally: finally:
self.event_loop_thread = None self.event_loop_thread = None
......
...@@ -8,7 +8,7 @@ from tangostationcontrol.common.events import ( ...@@ -8,7 +8,7 @@ from tangostationcontrol.common.events import (
ChangeEvents, ChangeEvents,
) )
from tango.server import Device, attribute, AttrWriteType from tango.server import Device, attribute, AttrWriteType, command
from tango.test_context import DeviceTestContext from tango.test_context import DeviceTestContext
from test import base from test import base
...@@ -58,6 +58,11 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase): ...@@ -58,6 +58,11 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase):
def write_A(self, value): def write_A(self, value):
self._A = value self._A = value
@command(dtype_in=str, dtype_out=bool)
def is_attribute_polled_by_lofardevice(self, attr_name):
# used by EventSubscriptions. we poll nothing ourselves
return False
def test_initial_event(self): def test_initial_event(self):
"""Do we receive the initial event emitted when subscribing?""" """Do we receive the initial event emitted when subscribing?"""
...@@ -71,6 +76,18 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase): ...@@ -71,6 +76,18 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase):
# check call sequence # check call sequence
self.assertListEqual([("A", 0)], self.callback_called_values) self.assertListEqual([("A", 0)], self.callback_called_values)
def test_is_subscribed(self):
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
self.assertFalse(self.events.is_subscribed(proxy.name(), "A"))
self.events.subscribe_change_event(proxy, "A", self.callback)
self.assertTrue(self.events.is_subscribed(proxy.name(), "A"))
self.events.unsubscribe_all()
self.assertFalse(self.events.is_subscribed(proxy.name(), "A"))
def test_change_event(self): def test_change_event(self):
"""Do we also receive an event emitted after the value changes (plus the initial)?""" """Do we also receive an event emitted after the value changes (plus the initial)?"""
...@@ -88,6 +105,30 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase): ...@@ -88,6 +105,30 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase):
# check call sequence # check call sequence
self.assertListEqual([("A", 0), ("a", 42)], self.callback_called_values) self.assertListEqual([("A", 0), ("a", 42)], self.callback_called_values)
def test_multiple_callbacks(self):
"""Can we register multiple callbacks on the same attribute?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# subscribe again (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# change (triggering a change event)
proxy.A = 42
# wait for 4 change events
self.wait_for_callback()
self.wait_for_callback()
self.wait_for_callback()
self.wait_for_callback()
# check call sequence
self.assertListEqual(
[("A", 0), ("A", 0), ("a", 42), ("a", 42)], self.callback_called_values
)
def test_unsubscribe(self): def test_unsubscribe(self):
"""Do we stop receiving if we unsubscribe?""" """Do we stop receiving if we unsubscribe?"""
...@@ -176,6 +217,11 @@ class TestChangeEvents(EventSubscriptionMixin, base.TestCase): ...@@ -176,6 +217,11 @@ class TestChangeEvents(EventSubscriptionMixin, base.TestCase):
self._int = value self._int = value
self.custom_change_events.send_change_event("int_attr", value) self.custom_change_events.send_change_event("int_attr", value)
@command(dtype_in=str, dtype_out=bool)
def is_attribute_polled_by_lofardevice(self, attr_name):
# used by EventSubscriptions. we poll everything ourselves
return True
def test_bool_change_event(self): def test_bool_change_event(self):
"""Do we also receive an event emitted after the value changes (plus the initial)?""" """Do we also receive an event emitted after the value changes (plus the initial)?"""
......
...@@ -36,6 +36,29 @@ class TestAttributePoller(IsolatedAsyncioTestCase): ...@@ -36,6 +36,29 @@ class TestAttributePoller(IsolatedAsyncioTestCase):
def is_attribute_access_allowed(self, _): def is_attribute_access_allowed(self, _):
return True return True
async def test_is_registered(self):
"""Does is_registered() reflect which attributes are registered for polling?"""
class DeviceMock(self.DeviceMockBase):
def __init__(self):
super().__init__()
self.attr_read_counter = 0
async def async_read_attribute(self, _):
self.attr_read_counter += 1
return 42.0
device = DeviceMock()
ap = lofar_device.AttributePoller(device)
self.assertFalse(ap.is_registered("A"))
# register attribute with metric
metric = AttributeMetric("metric", "description", device.metric_labels)
ap.register("A", metric, False)
self.assertTrue(ap.is_registered("A"))
async def test_poll_updates_metrics(self): async def test_poll_updates_metrics(self):
"""Does poll() really read the attribute and update the metric?""" """Does poll() really read the attribute and update the metric?"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment