diff --git a/README.md b/README.md index 4ad8fbd01653a0162dd64780d5d2661bd9f8e56a..a30b29f86e08ae563f5f9e2c5fc74ef420aab338 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,7 @@ Next change the version in the following places: # Release Notes +* 0.37.2 Improved event-subscription interface, avoid overlap between polling loops. * 0.37.1 Improved asyncio resource teardown when devices go Off. * 0.37.0-1 Fix for deploying on DTS Lab * 0.37.0 Run casacore in separate processes, increasing beam-tracking performance diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 9b1bb85123967f31711a58c9f475e412860269de..8570a3aeb97e531ccb49dce7e2435f844fa29ad1 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.37.1 +0.37.2 diff --git a/tangostationcontrol/tangostationcontrol/common/asyncio.py b/tangostationcontrol/tangostationcontrol/common/asyncio.py index 1beb91169d37b3aa47081647e86de9de4f3b000f..38b64efe364cbb727f24ad934d4b0f8c9a0d8948 100644 --- a/tangostationcontrol/tangostationcontrol/common/asyncio.py +++ b/tangostationcontrol/tangostationcontrol/common/asyncio.py @@ -54,15 +54,21 @@ class EventLoopThread: asyncio.set_event_loop(loop) try: + logger.debug(f"EventLoop: starting") loop.run_forever() + logger.debug(f"EventLoop: loop stopped gracefully") finally: # stop any tasks still running for task in asyncio.all_tasks(loop): + logger.debug(f"EventLoop: cancelling {task}") task.cancel() # properly clean up, see https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.shutdown_asyncgens + logger.debug(f"EventLoop: finishing cancelled tasks") loop.run_until_complete(loop.shutdown_asyncgens()) + logger.debug(f"EventLoop: closing") loop.close() + logger.debug(f"EventLoop: done") class PeriodicTask: diff --git a/tangostationcontrol/tangostationcontrol/common/events/change_events.py b/tangostationcontrol/tangostationcontrol/common/events/change_events.py index d95403b4cd63cefdad22e32a184a9d53a79615da..e59c633987601a5df5fb67f0aa0d7e3928b05a54 100644 --- a/tangostationcontrol/tangostationcontrol/common/events/change_events.py +++ b/tangostationcontrol/tangostationcontrol/common/events/change_events.py @@ -1,10 +1,10 @@ # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -from typing import Dict - import numpy +from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString +from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict from tango.server import Device @@ -17,7 +17,15 @@ class ChangeEvents: def __init__(self, device: Device): self.device = device - self.prev_values: Dict[str, object] = {} + # keep track of which attributes we manage + self.attributes: list[CaseInsensitiveString] = [] + + # previous values of attributes, to avoid + # emitting CHANGE_EVENTs when nothing changed. + self.prev_values: CaseInsensitiveDict[str, object] = {} + + def is_configured(self, attr_name: str) -> bool: + return attr_name in self.attributes def configure_attribute(self, attr_name: str): """Prepares an attribute for emitting custom change events.""" @@ -33,6 +41,8 @@ class ChangeEvents: # so detecting it ourselves is easier. self.device.set_change_event(attr_name, True, False) + self.attributes.append(CaseInsensitiveString(attr_name)) + def send_change_event(self, attr_name: str, value: object | None): """Emits a CHANGE_EVENT if the attribute has changed value.""" diff --git a/tangostationcontrol/tangostationcontrol/common/events/subscriptions.py b/tangostationcontrol/tangostationcontrol/common/events/subscriptions.py index e29519c6dc6f5d3b5b0962bc2e2e0d960da5272c..e48003150aa207a379f475eefe62f706eb48f902 100644 --- a/tangostationcontrol/tangostationcontrol/common/events/subscriptions.py +++ b/tangostationcontrol/tangostationcontrol/common/events/subscriptions.py @@ -1,25 +1,38 @@ # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 +from dataclasses import dataclass +from functools import partial import logging -from typing import List, Tuple, Dict, Callable +from typing import List, Dict, Callable -from tango import DeviceProxy, EventType +from tango import DeviceProxy, EventType, DevFailed from prometheus_client import Counter -from tangostationcontrol.common.lofar_logging import log_exceptions +from tangostationcontrol.common.lofar_logging import exception_to_str from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_MS +from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString from tangostationcontrol.metrics import AttributeMetric logger = logging.getLogger() +EventCallbackType = Callable[[DeviceProxy, str, object], None] + + +@dataclass(frozen=False) +class Subscription: + proxy: DeviceProxy + attribute_name: CaseInsensitiveString + event_id: int | None + callbacks: List[EventCallbackType] + class EventSubscriptions: """Manages a set of subscriptions to Tango events.""" def __init__(self, device_labels: Dict[str, str] | None = None): # events we're subscribed to - self.subscriptions: List[Tuple[DeviceProxy, int]] = [] + self.subscriptions: List[Subscription] = [] # metric to count errors self.error_count_metric = AttributeMetric( @@ -30,11 +43,68 @@ class EventSubscriptions: dynamic_labels=["event_device", "event_attribute"], ) + def _get_subscription(self, dev_name: str, attr_name: str) -> bool: + for sub in self.subscriptions: + if ( + CaseInsensitiveString(sub.proxy.name()) == dev_name + and sub.attribute_name == attr_name + ): + return sub + + return None + + def is_subscribed(self, dev_name: str, attr_name: str) -> bool: + return self._get_subscription(dev_name, attr_name) is not None + + def _event_callback_wrapper(self, sub, event): + device = event.device + + # some errors report about the device in general, and thus are not + # annotated with any attr_value. + attribute_name = event.attr_value.name if event.attr_value else None + + # Count reported errors. These can also occur if the device + # is down, intentional or not. So event errors do not necessarily + # indicate a problem. + if event.err: + self.error_count_metric.get_metric( + [device.name(), str(attribute_name)] + ).inc() + + logger.debug( + "Received attribute change event ERROR from %s for attribute %s: %s", + device, + attribute_name, + event.errors, + ) + + # Little we can do here + return + + value = event.attr_value.value + + # Log succesful changes + logger.info( + "Received attribute change event from %s: %s := %s", + device, + attribute_name, + value, + ) + + # Call requested callback + for callback in sub.callbacks: + try: + callback(device, attribute_name, value) + except Exception as ex: + logger.exception( + f"Callback {callback} for device {device} attribute {attribute_name} threw an exception: {exception_to_str(ex)}" + ) + def subscribe_change_event( self, proxy: DeviceProxy, attr_name: str, - callback: Callable[[DeviceProxy, str, object], None], + callback: EventCallbackType, ): """Subscribe to changes to an attribute of another device. Immediately and on change, the provided callback will be called as @@ -47,64 +117,59 @@ class EventSubscriptions: the callback. """ - @log_exceptions(suppress=True) - def event_callback_wrapper(event): - device = event.device - - # some errors report about the device in general, and thus are not - # annotated with any attr_value. - attribute_name = event.attr_value.name if event.attr_value else None - - # Count reported errors. These can also occur if the device - # is down, intentional or not. So event errors do not necessarily - # indicate a problem. - if event.err: - self.error_count_metric.get_metric( - [device.name(), str(attribute_name)] - ).inc() - - logger.debug( - "Received attribute change event ERROR from %s for attribute %s: %s", - device, - attribute_name, - event.errors, - ) + if sub := self._get_subscription(proxy.name(), attr_name): + # extend existing subscription + logger.debug( + f"Adding callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}" + ) + sub.callbacks.append(callback) - # Little we can do here - return + # manually trigger first call, mimicking Tango doing so when subscribing + try: + value = proxy.read_attribute(attr_name).value - value = event.attr_value.value + callback(proxy, attr_name, value) + except Exception as ex: + logger.exception( + f"Callback {callback} for device {proxy} attribute {attr_name} threw an exception" + ) - # Log succesful changes - logger.info( - "Received attribute change event from %s: %s := %s", - device, - attribute_name, - value, + else: + # make new subscription + sub = Subscription( + proxy=proxy, + attribute_name=CaseInsensitiveString(attr_name), + event_id=None, + callbacks=[callback], ) - # Call requested callback - callback(device, attribute_name, value) - - # make sure the attribute is polled, otherwise we wont receive event - if not proxy.is_attribute_polled(attr_name): - logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.") - proxy.poll_attribute(attr_name, DEFAULT_POLLING_PERIOD_MS) + # make sure the attribute is polled, otherwise we wont receive event + if not proxy.is_attribute_polled_by_lofardevice(attr_name): + # LOFARDevice does not poll this attribute, but maybe Tango does + if not proxy.is_attribute_polled(attr_name): + logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.") + proxy.poll_attribute(attr_name, DEFAULT_POLLING_PERIOD_MS) - # subscribe - event_id = proxy.subscribe_event( - attr_name, - EventType.CHANGE_EVENT, - event_callback_wrapper, - stateless=True, - ) + # subscribe + logger.debug( + f"Subscribing callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}" + ) + sub.event_id = proxy.subscribe_event( + attr_name, + EventType.CHANGE_EVENT, + partial(self._event_callback_wrapper, sub), + stateless=True, + ) - self.subscriptions.append((proxy, event_id)) + self.subscriptions.append(sub) def unsubscribe_all(self): # unsubscribe from all events try: - for proxy, event_id in self.subscriptions: - proxy.unsubscribe_event(event_id) + for sub in self.subscriptions: + try: + sub.proxy.unsubscribe_event(sub.event_id) + except DevFailed: + logger.error(f"Failed to unsubscribe from event {sub}") finally: self.subscriptions = [] diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py index e74b21877c3a9e0c31436b255f6e44dd3605e105..e65c152135b862de58c59dbf353db6d48aa2bc15 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py @@ -45,6 +45,7 @@ from tangostationcontrol.common.constants import ( DEFAULT_METRICS_POLLING_PERIOD_MS, DEFAULT_POLLING_PERIOD_MS, ) +from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict from tangostationcontrol.common.events import EventSubscriptions, ChangeEvents from tangostationcontrol.common.lofar_logging import ( log_exceptions, @@ -102,7 +103,7 @@ class AttributePoller: self.device = device # attributes to poll - self._poll_list: Dict[str, Dict[str, object]] = {} + self._poll_list: CaseInsensitiveDict[str, Dict[str, object]] = {} # change event system self.change_events = ChangeEvents(self.device) @@ -121,6 +122,9 @@ class AttributePoller: if send_change_events: self.change_events.configure_attribute(attr_name) + def is_registered(self, attr_name: str) -> bool: + return attr_name in self._poll_list + @DurationMetric() async def _read_attribute(self, attr_name: str): return await self.device.async_read_attribute(attr_name) @@ -183,8 +187,16 @@ class AttributePoller: # even if it functions correctly. for attr_name, attr_data in self._poll_list.items(): + # stop polling if we turned to OFF/FAULT during this loop + if not self.polling_allowed(): + return + value = await self._read_attribute_nothrow(attr_name) + # stop polling if we turned to OFF/FAULT during this loop + if not self.polling_allowed(): + return + # Update Prometheus metric, if any if attr_data["metric"]: self._update_metric(attr_data["metric"], value) @@ -441,6 +453,10 @@ class LOFARDevice(Device): ) _ = future.result() + @command(dtype_in=str, dtype_out=bool) + def is_attribute_polled_by_lofardevice(self, attr_name: str) -> bool: + return self.attribute_poller.is_registered(attr_name) + def __init__(self, cl, name): # a proxy to ourself. can only be constructed in or after init_device # is called, during super().__init__(). @@ -601,6 +617,7 @@ class LOFARDevice(Device): self.set_state(DevState.OFF) self.set_status("Device is in the OFF state.") + logger.info("Unsubscribing events from other devices") self.events.unsubscribe_all() # stop polling (ungracefully, as it may take too long) @@ -611,13 +628,17 @@ class LOFARDevice(Device): self.poll_task.join() # clear metrics, as they will all be stale + logger.info("Clearing metrics") self.attribute_poller.clear_all() + logger.info(">>> configure_for_off()") self.configure_for_off() + logger.info("<<< configure_for_off()") # stop event thread try: if self.event_loop_thread: + logger.info("Stopping EventLoop thread") self.event_loop_thread.stop() finally: self.event_loop_thread = None diff --git a/tangostationcontrol/test/common/test_events.py b/tangostationcontrol/test/common/test_events.py index 0bf7497408779f25bc109279bb657928106ead29..fbd49fe717d4fbdf25cde9698d2f8d2ccc24495d 100644 --- a/tangostationcontrol/test/common/test_events.py +++ b/tangostationcontrol/test/common/test_events.py @@ -8,7 +8,7 @@ from tangostationcontrol.common.events import ( ChangeEvents, ) -from tango.server import Device, attribute, AttrWriteType +from tango.server import Device, attribute, AttrWriteType, command from tango.test_context import DeviceTestContext from test import base @@ -58,6 +58,11 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase): def write_A(self, value): self._A = value + @command(dtype_in=str, dtype_out=bool) + def is_attribute_polled_by_lofardevice(self, attr_name): + # used by EventSubscriptions. we poll nothing ourselves + return False + def test_initial_event(self): """Do we receive the initial event emitted when subscribing?""" @@ -71,6 +76,18 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase): # check call sequence self.assertListEqual([("A", 0)], self.callback_called_values) + def test_is_subscribed(self): + with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy: + self.assertFalse(self.events.is_subscribed(proxy.name(), "A")) + + self.events.subscribe_change_event(proxy, "A", self.callback) + + self.assertTrue(self.events.is_subscribed(proxy.name(), "A")) + + self.events.unsubscribe_all() + + self.assertFalse(self.events.is_subscribed(proxy.name(), "A")) + def test_change_event(self): """Do we also receive an event emitted after the value changes (plus the initial)?""" @@ -88,6 +105,30 @@ class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase): # check call sequence self.assertListEqual([("A", 0), ("a", 42)], self.callback_called_values) + def test_multiple_callbacks(self): + """Can we register multiple callbacks on the same attribute?""" + + with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy: + # subscribe (triggering a change event) + self.events.subscribe_change_event(proxy, "A", self.callback) + + # subscribe again (triggering a change event) + self.events.subscribe_change_event(proxy, "A", self.callback) + + # change (triggering a change event) + proxy.A = 42 + + # wait for 4 change events + self.wait_for_callback() + self.wait_for_callback() + self.wait_for_callback() + self.wait_for_callback() + + # check call sequence + self.assertListEqual( + [("A", 0), ("A", 0), ("a", 42), ("a", 42)], self.callback_called_values + ) + def test_unsubscribe(self): """Do we stop receiving if we unsubscribe?""" @@ -176,6 +217,11 @@ class TestChangeEvents(EventSubscriptionMixin, base.TestCase): self._int = value self.custom_change_events.send_change_event("int_attr", value) + @command(dtype_in=str, dtype_out=bool) + def is_attribute_polled_by_lofardevice(self, attr_name): + # used by EventSubscriptions. we poll everything ourselves + return True + def test_bool_change_event(self): """Do we also receive an event emitted after the value changes (plus the initial)?""" diff --git a/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py b/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py index ad17e828d22e13ec6f1ceb62c210afc9d2edc796..7eceb2630836a0cc1b1b768a937fbcdf14b68fe7 100644 --- a/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py +++ b/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py @@ -36,6 +36,29 @@ class TestAttributePoller(IsolatedAsyncioTestCase): def is_attribute_access_allowed(self, _): return True + async def test_is_registered(self): + """Does is_registered() reflect which attributes are registered for polling?""" + + class DeviceMock(self.DeviceMockBase): + def __init__(self): + super().__init__() + self.attr_read_counter = 0 + + async def async_read_attribute(self, _): + self.attr_read_counter += 1 + return 42.0 + + device = DeviceMock() + ap = lofar_device.AttributePoller(device) + + self.assertFalse(ap.is_registered("A")) + + # register attribute with metric + metric = AttributeMetric("metric", "description", device.metric_labels) + ap.register("A", metric, False) + + self.assertTrue(ap.is_registered("A")) + async def test_poll_updates_metrics(self): """Does poll() really read the attribute and update the metric?"""