diff --git a/README.md b/README.md index 8d6575fe9ef3e7d476bf6ec14ad411fda000a46f..01a1f0f3bd5346be3abfee03e1ef89fc4290066f 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,7 @@ Next change the version in the following places: # Release Notes +* 0.32.1 Do not serve stale metrics * 0.32.0 Add available_in_power_state_R attribute to determine from which station state a device will be available * 0.31.4 Bugfixes for DTS configuration, Fixes spurious errors when a station state transition fails diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 9eb2aa3f1095de8c5a2c8c73d33a06f09b323520..fd9620c08c1510d4be7d653d09b499dec0f4eb20 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.32.0 +0.32.1 diff --git a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py index 61d8bcc73a6f84c9879264b1b96f4b4ddae98e03..542b947b5c36fcd5f8a4b80da4704e14c7f2f6ec 100644 --- a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py @@ -12,6 +12,7 @@ import numpy from asyncua import Client from tangostationcontrol.clients.comms_client import AsyncCommClient +from tangostationcontrol.common.lofar_logging import exception_to_str logger = logging.getLogger() @@ -357,7 +358,9 @@ class OPCUAConnection(AsyncCommClient): prot_attr.read_function(), self.event_loop ).result() except Exception as e: - logger.exception("Failed to read attribute") + logger.error( + f"Failed to read attribute {prot_attr.name}: {exception_to_str(e)}" + ) asyncio.run_coroutine_threadsafe( self.handle_connection_exception(e), self.event_loop @@ -369,7 +372,9 @@ class OPCUAConnection(AsyncCommClient): prot_attr.write_function(value), self.event_loop ).result() except Exception as e: - logger.exception("Failed to write attribute") + logger.error( + f"Failed to write attribute {prot_attr.name}: {exception_to_str(e)}" + ) asyncio.run_coroutine_threadsafe( self.handle_connection_exception(e), self.event_loop diff --git a/tangostationcontrol/tangostationcontrol/common/asyncio.py b/tangostationcontrol/tangostationcontrol/common/asyncio.py index 9e8ce6daa09e86a8c4df2cc7ce632265029dc886..e5c0c40e7b591af7847114779e59ff4d006b35c3 100644 --- a/tangostationcontrol/tangostationcontrol/common/asyncio.py +++ b/tangostationcontrol/tangostationcontrol/common/asyncio.py @@ -40,6 +40,9 @@ class EventLoopThread: self.event_loop_thread.join() self.event_loop_thread = None + def is_running(self): + return self.event_loop.is_running() + def __del__(self): self.stop() @@ -102,3 +105,8 @@ class PeriodicTask: self.event_loop.call_soon_threadsafe(self.task.cancel) self.stop() + + def is_running(self): + """Return whether the periodic call is still scheduled.""" + + return self.task and not self.task.done() and not self.done diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py index bbd7ea309370982a24936be12e7f2ff2b0500abd..8a8ede8e7f9ffcca30b4351a2268ded4b350722f 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py @@ -40,7 +40,10 @@ from tangostationcontrol.common.device_decorators import ( debugit, DurationMetric, ) -from tangostationcontrol.common.constants import DEFAULT_METRICS_POLLING_PERIOD +from tangostationcontrol.common.constants import ( + DEFAULT_METRICS_POLLING_PERIOD, + DEFAULT_POLLING_PERIOD, +) from tangostationcontrol.common.events import EventSubscriptions from tangostationcontrol.common.lofar_logging import ( log_exceptions, @@ -107,6 +110,12 @@ class AttributePoller: async def _read_attribute(self, attr_name: str): return await self.device.async_read_attribute(attr_name) + def clear_all(self): + """Clear all metrics to prevent maintaining stale values.""" + for attr_data in self._poll_list.values(): + if attr_data["metric"]: + attr_data["metric"].clear() + @DurationMetric() async def _poll(self): first_exception = None @@ -142,6 +151,9 @@ class AttributePoller: raise first_exception async def poll(self): + # invalidate all metrics, in case reading fails + self.clear_all() + if not self.device.is_attribute_access_allowed(AttReqType.READ_REQ): # TODO(JDM): Poll attributes based on their individual is_allowed states return @@ -219,6 +231,30 @@ class LOFARDevice(Device): fget=lambda self: self._read_hardware_powered_R(), ) + event_thread_running_R = attribute( + doc="Whether the event thread is running." "", + dtype=bool, + fget=lambda self: self.event_loop_thread + and self.event_loop_thread.is_running(), + # Tango needs to poll this, as otherwise this attribute will never + # be exposed as "False" as the event thread must run to do so. + polling_period=DEFAULT_POLLING_PERIOD, + ) + + poll_thread_running_R = attribute( + doc="Whether the attributes are being polled." "", + dtype=bool, + fget=lambda self: ( + self.event_loop_thread + and self.event_loop_thread.is_running() + and self.poll_task + and self.poll_task.is_running() + ), + # Tango needs to poll this, as otherwise this attribute will never + # be exposed as "False" as the event thread must run to do so. + polling_period=DEFAULT_POLLING_PERIOD, + ) + # list of translator property names to be set by set_translator_defaults TRANSLATOR_DEFAULT_SETTINGS = [] @@ -527,6 +563,9 @@ class LOFARDevice(Device): if self.poll_task: self.poll_task.stop() + # clear metrics, as they will all be stale + self.attribute_poller.clear_all() + self.configure_for_off() # stop event thread diff --git a/tangostationcontrol/tangostationcontrol/metrics/__init__.py b/tangostationcontrol/tangostationcontrol/metrics/__init__.py index 6fff3817d8fe3198d9a2c2b7a166b5ac68ecdce4..2f6e07e36d8dd8e5d03bdf0c00c368b9320c68bb 100644 --- a/tangostationcontrol/tangostationcontrol/metrics/__init__.py +++ b/tangostationcontrol/tangostationcontrol/metrics/__init__.py @@ -9,7 +9,7 @@ from ._metrics import ( SpectrumAttributeMetric, ImageAttributeMetric, ) -from prometheus_client import start_http_server +from prometheus_client import start_http_server, disable_created_metrics __all__ = [ "device_labels", @@ -23,4 +23,8 @@ __all__ = [ def start_metrics_server(port: int = 8000): + # configure + disable_created_metrics() + + # start server start_http_server(port) diff --git a/tangostationcontrol/tangostationcontrol/metrics/_metrics.py b/tangostationcontrol/tangostationcontrol/metrics/_metrics.py index 3216565f3f3b2749d9dfc1bab706213aef9be1c7..21062c93841fcd71be49fdab6bd6977bd948f268 100644 --- a/tangostationcontrol/tangostationcontrol/metrics/_metrics.py +++ b/tangostationcontrol/tangostationcontrol/metrics/_metrics.py @@ -136,6 +136,11 @@ class AttributeMetric: self.metric.__class__ == metric_class ), f"Metric {self.name} was previously provided as {self.metric.__class__} but is now needed as {metric_class}" + def clear(self): + """Remove all cached metrics.""" + + self.metric.clear() + def label_keys(self) -> List[str]: """Return the list of labels that we will use.""" diff --git a/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py b/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py index 1ed43ce2cf77b8e0207eabf55c24f7b240001a88..f8bc7c0badc692bb5dd313396b73f865768a1d29 100644 --- a/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py +++ b/tangostationcontrol/test/devices/base_device_classes/test_lofar_device.py @@ -1,7 +1,7 @@ # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -from unittest import mock +from unittest import mock, IsolatedAsyncioTestCase from unittest.mock import ANY import numpy @@ -11,21 +11,95 @@ from tango import DevSource from tango import DevVarBooleanArray from tango.server import attribute from tango.server import command +from tango.server import Device from tango.server import device_property from tango.test_context import DeviceTestContext +from tangostationcontrol.metrics import AttributeMetric from tangostationcontrol.devices.base_device_classes import lofar_device from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice from test.devices import device_base -class TestAttributePoller(device_base.DeviceTestCase): - def setUp(self): - # DeviceTestCase setUp patches lofar_device DeviceProxy - super(TestAttributePoller, self).setUp() +class TestAttributePoller(IsolatedAsyncioTestCase): + class DeviceMockBase(Device): + """Device providing the necessary interface for AttributePoller and metrics.""" - self.test_device = lofar_device.LOFARDevice + def __init__(self): + self.metric_labels = {"label_name": "label_value"} + + def get_name(self): + return "DeviceMock" + + def is_attribute_access_allowed(self, _): + return True + + async def test_poll_updates_metrics(self): + """Does poll() really read the attribute and update the metric?""" + + class DeviceMock(self.DeviceMockBase): + def __init__(self): + super().__init__() + self.attr_read_counter = 0 + + async def async_read_attribute(self, _): + self.attr_read_counter += 1 + return 42.0 + + device = DeviceMock() + ap = lofar_device.AttributePoller(device) + + # register attribute with metric + metric = AttributeMetric("metric", "description", device.metric_labels) + ap.register("A", metric) + + # poll + await ap.poll() + + # check whether attribute was read + self.assertEqual(1, device.attr_read_counter) + + # check whether metric got updated + samples = metric.metric.collect()[0] + self.assertEqual(1, len(samples.samples)) + self.assertEqual(42.0, samples.samples[0].value) + + async def test_poll_stale_removes_metric(self): + """Does poll() really remove the metric if the attribute cannot be read?""" + + class DeviceMock(self.DeviceMockBase): + def __init__(self): + super().__init__() + self.stale = False + + async def async_read_attribute(self, _): + if self.stale: + # claim we lost connnection to the OPC-UA translator + raise ConnectionError() + return 42.0 + + device = DeviceMock() + ap = lofar_device.AttributePoller(device) + + # register attribute with metric + metric = AttributeMetric("metric", "description", device.metric_labels) + ap.register("A", metric) + + # poll + await ap.poll() + + # check whether metric got updated + samples = metric.metric.collect()[0] + self.assertEqual(1, len(samples.samples)) + + # make attribute stale & poll again + device.stale = True + await ap.poll() + + # check whether metric got removed + samples = metric.metric.collect()[0] + self.assertEqual(0, len(samples.samples)) class TestLofarDevice(device_base.DeviceTestCase):