Select Git revision
io_ddr_driver.vhd
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
_metrics.py 8.77 KiB
from tango.server import Device
from tango import AttrWriteType
from tango import CmdArgType
from tango import Attribute
from tango import DevFailed
from tango import DevState
from prometheus_client import Gauge, Info, Enum
from asyncio import iscoroutinefunction
from enum import IntEnum
from typing import List, Dict, Callable, Union
import functools
import logging
from lofar_lotus.metrics import Metric
__all__ = [
"wrap_method",
"device_labels",
"AttributeMetric",
"ScalarAttributeMetric",
"SpectrumAttributeMetric",
"ImageAttributeMetric",
]
logger = logging.getLogger()
# Global cache to have devices share their metrics for their own attributes,
# as metrics with the same name must exist only once.
METRICS = {}
# metric label values representing attribute access levels
access_label_values = {
AttrWriteType.READ: "r",
AttrWriteType.READ_WITH_WRITE: "rw",
AttrWriteType.READ_WRITE: "rw",
AttrWriteType.WRITE: "w",
}
def wrap_method(
obj: object,
func: Union[Callable, str],
wrapper: Callable,
post_execute: bool = True,
double_wrap: bool = False,
):
if not double_wrap and hasattr(func, "__wrapped__"):
return
if type(func) is str:
func_name = func
func = getattr(obj, func_name)
else:
func_name = func.__name__
if iscoroutinefunction(func):
# make sure the wrapped functions remain coroutines
async def decorated_pre(instance, *args, **kwargs):
wrapper(instance, *args, **kwargs)
return await func(instance, *args, **kwargs)
async def decorated_post(instance, *args, **kwargs):
return_value = await func(instance, *args, **kwargs)
wrapper.return_value = return_value # allow wrapper to access the return value of the original function
wrapper(instance, *args, **kwargs)
return return_value
else:
def decorated_pre(instance, *args, **kwargs):
wrapper(instance, *args, **kwargs)
return func(instance, *args, **kwargs)
def decorated_post(instance, *args, **kwargs):
return_value = func(instance, *args, **kwargs)
wrapper.return_value = return_value # allow wrapper to access the return value of the original function
wrapper(instance, *args, **kwargs)
return return_value
setattr(
obj,
func_name,
functools.update_wrapper(
decorated_post if post_execute else decorated_pre,
func,
),
)
def device_labels(device: Device) -> Dict[str, str]:
domain, family, member = device.get_name().split("/", 2)
return {
"domain": domain,
"family": family,
"member": member,
"device_class": device.get_device_class().get_name(),
}
def metric_name(attribute_name: str) -> str:
"""Return the name of the Prometheus metric to track the given attribute."""
# strip _R and _RW suffixes. the associated access level is already provided in the labels.
metric_name = attribute_name.removesuffix("_R").removesuffix("_RW")
# only export lower-case metric names
return f"ds_{metric_name}".lower()
class AttributeMetric(Metric):
"""Manage a Prometheus Metric object for Tango devices."""
def __init__(
self,
name: str,
description: str,
static_labels: Dict[str, str],
metric_class=Gauge,
metric_class_init_kwargs: Dict[str, object] | None = None,
dynamic_labels: List[str] | None = None,
):
super().__init__(
metric_name(name),
description,
static_labels,
metric_class,
metric_class_init_kwargs,
dynamic_labels,
)
def _metric_enum_value(self, value: str | IntEnum | DevState, labels: List[str]):
value = value.name if isinstance(value, DevState) else value
super()._metric_enum_value(value, labels)
class ScalarAttributeMetric(AttributeMetric):
"""Manage a Prometheus Metric object attached to a Scalar Tango Attribute."""
def __init__(self, device: Device, attribute: Attribute, wrap_read: bool = True):
self.device = device
self.attribute = attribute
self.data_type = self.attribute.get_data_type()
self.access = self.attribute.get_writable()
try:
description = attribute.get_properties().description
except DevFailed:
# get_properties() can fail for devices not registered in the TangoDB
# TODO: get this value another way?
description = ""
static_labels = device_labels(device)
static_labels["access"] = access_label_values[self.access]
if self.data_type == CmdArgType.DevString:
super().__init__(attribute.get_name(), description, static_labels, Info)
elif self.data_type == CmdArgType.DevEnum:
# evil PyTango foo to obtain enum labels from class attribute
enum_labels = getattr(
device.__class__, attribute.get_name()
).att_prop.enum_labels.split(",")
super().__init__(
attribute.get_name(),
description,
static_labels,
Enum,
metric_class_init_kwargs={"states": enum_labels},
)
else:
super().__init__(attribute.get_name(), description, static_labels)
if wrap_read:
self._wrap_attribute_read(device)
self._wrap_attribute_write(device)
def _set_value(self, value: object, labels):
"""Update the metric with the given value and labels.
Make sure all labels added to `label_keys()` are provided."""
if self.data_type == CmdArgType.DevString:
super()._set_value(dict(str_value=value), labels)
else:
super()._set_value(value, labels)
def _wrap_attribute_read(self, device: Device):
"""Wrap the attribute's read functions to feed new values to set_value()."""
name = self.attribute.get_name()
def read_func_wrapper(device):
try:
value = read_func_wrapper.return_value
self.set_value(value)
except Exception:
logger.exception(
f"Metrics: error updating metrics for {name} after reading it"
)
wrap_method(device, f"__read_{name}_wrapper__", read_func_wrapper)
def _wrap_attribute_write(self, device: Device):
"""Wrap the attribute's write functions to feed new values to set_value()."""
name = self.attribute.get_name()
def write_func_wrapper(device):
try:
value = self.attribute.get_write_value()
self.set_value(value)
except Exception:
logger.exception(
f"Metrics: error updating metrics for {name} after writing it"
)
if hasattr(device, f"__write_{name}_wrapper__"):
wrap_method(device, f"__write_{name}_wrapper__", write_func_wrapper)
class SpectrumAttributeMetric(ScalarAttributeMetric):
"""Manage a Prometheus Metric object attached to a Spectrum (1D) Tango Attribute."""
def label_keys(self):
return super().label_keys() + ["x"]
def _set_value(self, value, labels):
for x, element in enumerate(value):
super()._set_value(element, labels + [f"{x:02}"])
class ImageAttributeMetric(ScalarAttributeMetric):
"""Manage a Prometheus Metric object attached to an Image (2D) Tango Attribute."""
# Maximum number of elements in an attribute before we warn about it
WARN_ARRAY_SIZE = 1024
def __init__(self, device: Device, attribute: Attribute, wrap_read: bool = True):
super().__init__(device, attribute, wrap_read)
if self.max_dim_x * self.max_dim_y >= self.WARN_ARRAY_SIZE:
logger.warning(
f"Metrics: exposing attribute {self.attribute.get_name()} of device {self.device.get_name()} with {self.max_dim_x} x {self.max_dim_y} elements. Performance may suffer."
)
@property
def max_dim_x(self):
return self.attribute.get_max_dim_x()
@property
def max_dim_y(self):
return self.attribute.get_max_dim_y()
def label_keys(self):
return super().label_keys() + ["x", "y", "idx"]
def _set_value(self, value, labels):
max_dim_x = self.max_dim_x
for y, row in enumerate(value):
for x, element in enumerate(row):
idx = y * max_dim_x + x
super()._set_value(
# swap (x,y) to have x in Prometheus consistently be
# the major dimension in both Spectrum and Image attributes.
element,
labels + [f"{y:02}", f"{x:02}", f"{idx:03}"],
)