Skip to content
Snippets Groups Projects
Commit 27837eb9 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-1816-do-not-serve-stale-values' into 'master'

Resolve L2SS-1816 "Do not serve stale values"

Closes L2SS-1816

See merge request !891
parents 84d2cbc0 d1fdc602
No related branches found
No related tags found
1 merge request!891Resolve L2SS-1816 "Do not serve stale values"
...@@ -166,6 +166,7 @@ Next change the version in the following places: ...@@ -166,6 +166,7 @@ Next change the version in the following places:
# Release Notes # Release Notes
* 0.32.1 Do not serve stale metrics
* 0.32.0 Add available_in_power_state_R attribute to determine from which station state a device will be available * 0.32.0 Add available_in_power_state_R attribute to determine from which station state a device will be available
* 0.31.4 Bugfixes for DTS configuration, * 0.31.4 Bugfixes for DTS configuration,
Fixes spurious errors when a station state transition fails Fixes spurious errors when a station state transition fails
......
0.32.0 0.32.1
...@@ -12,6 +12,7 @@ import numpy ...@@ -12,6 +12,7 @@ import numpy
from asyncua import Client from asyncua import Client
from tangostationcontrol.clients.comms_client import AsyncCommClient from tangostationcontrol.clients.comms_client import AsyncCommClient
from tangostationcontrol.common.lofar_logging import exception_to_str
logger = logging.getLogger() logger = logging.getLogger()
...@@ -357,7 +358,9 @@ class OPCUAConnection(AsyncCommClient): ...@@ -357,7 +358,9 @@ class OPCUAConnection(AsyncCommClient):
prot_attr.read_function(), self.event_loop prot_attr.read_function(), self.event_loop
).result() ).result()
except Exception as e: except Exception as e:
logger.exception("Failed to read attribute") logger.error(
f"Failed to read attribute {prot_attr.name}: {exception_to_str(e)}"
)
asyncio.run_coroutine_threadsafe( asyncio.run_coroutine_threadsafe(
self.handle_connection_exception(e), self.event_loop self.handle_connection_exception(e), self.event_loop
...@@ -369,7 +372,9 @@ class OPCUAConnection(AsyncCommClient): ...@@ -369,7 +372,9 @@ class OPCUAConnection(AsyncCommClient):
prot_attr.write_function(value), self.event_loop prot_attr.write_function(value), self.event_loop
).result() ).result()
except Exception as e: except Exception as e:
logger.exception("Failed to write attribute") logger.error(
f"Failed to write attribute {prot_attr.name}: {exception_to_str(e)}"
)
asyncio.run_coroutine_threadsafe( asyncio.run_coroutine_threadsafe(
self.handle_connection_exception(e), self.event_loop self.handle_connection_exception(e), self.event_loop
......
...@@ -40,6 +40,9 @@ class EventLoopThread: ...@@ -40,6 +40,9 @@ class EventLoopThread:
self.event_loop_thread.join() self.event_loop_thread.join()
self.event_loop_thread = None self.event_loop_thread = None
def is_running(self):
return self.event_loop.is_running()
def __del__(self): def __del__(self):
self.stop() self.stop()
...@@ -102,3 +105,8 @@ class PeriodicTask: ...@@ -102,3 +105,8 @@ class PeriodicTask:
self.event_loop.call_soon_threadsafe(self.task.cancel) self.event_loop.call_soon_threadsafe(self.task.cancel)
self.stop() self.stop()
def is_running(self):
"""Return whether the periodic call is still scheduled."""
return self.task and not self.task.done() and not self.done
...@@ -40,7 +40,10 @@ from tangostationcontrol.common.device_decorators import ( ...@@ -40,7 +40,10 @@ from tangostationcontrol.common.device_decorators import (
debugit, debugit,
DurationMetric, DurationMetric,
) )
from tangostationcontrol.common.constants import DEFAULT_METRICS_POLLING_PERIOD from tangostationcontrol.common.constants import (
DEFAULT_METRICS_POLLING_PERIOD,
DEFAULT_POLLING_PERIOD,
)
from tangostationcontrol.common.events import EventSubscriptions from tangostationcontrol.common.events import EventSubscriptions
from tangostationcontrol.common.lofar_logging import ( from tangostationcontrol.common.lofar_logging import (
log_exceptions, log_exceptions,
...@@ -107,6 +110,12 @@ class AttributePoller: ...@@ -107,6 +110,12 @@ class AttributePoller:
async def _read_attribute(self, attr_name: str): async def _read_attribute(self, attr_name: str):
return await self.device.async_read_attribute(attr_name) return await self.device.async_read_attribute(attr_name)
def clear_all(self):
"""Clear all metrics to prevent maintaining stale values."""
for attr_data in self._poll_list.values():
if attr_data["metric"]:
attr_data["metric"].clear()
@DurationMetric() @DurationMetric()
async def _poll(self): async def _poll(self):
first_exception = None first_exception = None
...@@ -142,6 +151,9 @@ class AttributePoller: ...@@ -142,6 +151,9 @@ class AttributePoller:
raise first_exception raise first_exception
async def poll(self): async def poll(self):
# invalidate all metrics, in case reading fails
self.clear_all()
if not self.device.is_attribute_access_allowed(AttReqType.READ_REQ): if not self.device.is_attribute_access_allowed(AttReqType.READ_REQ):
# TODO(JDM): Poll attributes based on their individual is_allowed states # TODO(JDM): Poll attributes based on their individual is_allowed states
return return
...@@ -219,6 +231,30 @@ class LOFARDevice(Device): ...@@ -219,6 +231,30 @@ class LOFARDevice(Device):
fget=lambda self: self._read_hardware_powered_R(), fget=lambda self: self._read_hardware_powered_R(),
) )
event_thread_running_R = attribute(
doc="Whether the event thread is running." "",
dtype=bool,
fget=lambda self: self.event_loop_thread
and self.event_loop_thread.is_running(),
# Tango needs to poll this, as otherwise this attribute will never
# be exposed as "False" as the event thread must run to do so.
polling_period=DEFAULT_POLLING_PERIOD,
)
poll_thread_running_R = attribute(
doc="Whether the attributes are being polled." "",
dtype=bool,
fget=lambda self: (
self.event_loop_thread
and self.event_loop_thread.is_running()
and self.poll_task
and self.poll_task.is_running()
),
# Tango needs to poll this, as otherwise this attribute will never
# be exposed as "False" as the event thread must run to do so.
polling_period=DEFAULT_POLLING_PERIOD,
)
# list of translator property names to be set by set_translator_defaults # list of translator property names to be set by set_translator_defaults
TRANSLATOR_DEFAULT_SETTINGS = [] TRANSLATOR_DEFAULT_SETTINGS = []
...@@ -527,6 +563,9 @@ class LOFARDevice(Device): ...@@ -527,6 +563,9 @@ class LOFARDevice(Device):
if self.poll_task: if self.poll_task:
self.poll_task.stop() self.poll_task.stop()
# clear metrics, as they will all be stale
self.attribute_poller.clear_all()
self.configure_for_off() self.configure_for_off()
# stop event thread # stop event thread
......
...@@ -9,7 +9,7 @@ from ._metrics import ( ...@@ -9,7 +9,7 @@ from ._metrics import (
SpectrumAttributeMetric, SpectrumAttributeMetric,
ImageAttributeMetric, ImageAttributeMetric,
) )
from prometheus_client import start_http_server from prometheus_client import start_http_server, disable_created_metrics
__all__ = [ __all__ = [
"device_labels", "device_labels",
...@@ -23,4 +23,8 @@ __all__ = [ ...@@ -23,4 +23,8 @@ __all__ = [
def start_metrics_server(port: int = 8000): def start_metrics_server(port: int = 8000):
# configure
disable_created_metrics()
# start server
start_http_server(port) start_http_server(port)
...@@ -136,6 +136,11 @@ class AttributeMetric: ...@@ -136,6 +136,11 @@ class AttributeMetric:
self.metric.__class__ == metric_class self.metric.__class__ == metric_class
), f"Metric {self.name} was previously provided as {self.metric.__class__} but is now needed as {metric_class}" ), f"Metric {self.name} was previously provided as {self.metric.__class__} but is now needed as {metric_class}"
def clear(self):
"""Remove all cached metrics."""
self.metric.clear()
def label_keys(self) -> List[str]: def label_keys(self) -> List[str]:
"""Return the list of labels that we will use.""" """Return the list of labels that we will use."""
......
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy) # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from unittest import mock from unittest import mock, IsolatedAsyncioTestCase
from unittest.mock import ANY from unittest.mock import ANY
import numpy import numpy
...@@ -11,21 +11,95 @@ from tango import DevSource ...@@ -11,21 +11,95 @@ from tango import DevSource
from tango import DevVarBooleanArray from tango import DevVarBooleanArray
from tango.server import attribute from tango.server import attribute
from tango.server import command from tango.server import command
from tango.server import Device
from tango.server import device_property from tango.server import device_property
from tango.test_context import DeviceTestContext from tango.test_context import DeviceTestContext
from tangostationcontrol.metrics import AttributeMetric
from tangostationcontrol.devices.base_device_classes import lofar_device from tangostationcontrol.devices.base_device_classes import lofar_device
from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice
from test.devices import device_base from test.devices import device_base
class TestAttributePoller(device_base.DeviceTestCase): class TestAttributePoller(IsolatedAsyncioTestCase):
def setUp(self): class DeviceMockBase(Device):
# DeviceTestCase setUp patches lofar_device DeviceProxy """Device providing the necessary interface for AttributePoller and metrics."""
super(TestAttributePoller, self).setUp()
self.test_device = lofar_device.LOFARDevice def __init__(self):
self.metric_labels = {"label_name": "label_value"}
def get_name(self):
return "DeviceMock"
def is_attribute_access_allowed(self, _):
return True
async def test_poll_updates_metrics(self):
"""Does poll() really read the attribute and update the metric?"""
class DeviceMock(self.DeviceMockBase):
def __init__(self):
super().__init__()
self.attr_read_counter = 0
async def async_read_attribute(self, _):
self.attr_read_counter += 1
return 42.0
device = DeviceMock()
ap = lofar_device.AttributePoller(device)
# register attribute with metric
metric = AttributeMetric("metric", "description", device.metric_labels)
ap.register("A", metric)
# poll
await ap.poll()
# check whether attribute was read
self.assertEqual(1, device.attr_read_counter)
# check whether metric got updated
samples = metric.metric.collect()[0]
self.assertEqual(1, len(samples.samples))
self.assertEqual(42.0, samples.samples[0].value)
async def test_poll_stale_removes_metric(self):
"""Does poll() really remove the metric if the attribute cannot be read?"""
class DeviceMock(self.DeviceMockBase):
def __init__(self):
super().__init__()
self.stale = False
async def async_read_attribute(self, _):
if self.stale:
# claim we lost connnection to the OPC-UA translator
raise ConnectionError()
return 42.0
device = DeviceMock()
ap = lofar_device.AttributePoller(device)
# register attribute with metric
metric = AttributeMetric("metric", "description", device.metric_labels)
ap.register("A", metric)
# poll
await ap.poll()
# check whether metric got updated
samples = metric.metric.collect()[0]
self.assertEqual(1, len(samples.samples))
# make attribute stale & poll again
device.stale = True
await ap.poll()
# check whether metric got removed
samples = metric.metric.collect()[0]
self.assertEqual(0, len(samples.samples))
class TestLofarDevice(device_base.DeviceTestCase): class TestLofarDevice(device_base.DeviceTestCase):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment