Skip to content
Snippets Groups Projects
Select Git revision
  • 16ad05edcb9261f824b5c385b6ddcac508898626
  • master default protected
  • RTSD-375
  • L2SDP-1134
  • L2SDP-1137
  • L2SDP-LIFT
  • L2SDP-1113
  • HPR-158
8 results

io_ddr_driver.vhd

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    _metrics.py 8.77 KiB
    from tango.server import Device
    from tango import AttrWriteType
    from tango import CmdArgType
    from tango import Attribute
    from tango import DevFailed
    from tango import DevState
    from prometheus_client import Gauge, Info, Enum
    from asyncio import iscoroutinefunction
    from enum import IntEnum
    from typing import List, Dict, Callable, Union
    import functools
    import logging
    
    from lofar_lotus.metrics import Metric
    
    __all__ = [
        "wrap_method",
        "device_labels",
        "AttributeMetric",
        "ScalarAttributeMetric",
        "SpectrumAttributeMetric",
        "ImageAttributeMetric",
    ]
    
    logger = logging.getLogger()
    
    # Global cache to have devices share their metrics for their own attributes,
    # as metrics with the same name must exist only once.
    METRICS = {}
    
    # metric label values representing attribute access levels
    access_label_values = {
        AttrWriteType.READ: "r",
        AttrWriteType.READ_WITH_WRITE: "rw",
        AttrWriteType.READ_WRITE: "rw",
        AttrWriteType.WRITE: "w",
    }
    
    
    def wrap_method(
        obj: object,
        func: Union[Callable, str],
        wrapper: Callable,
        post_execute: bool = True,
        double_wrap: bool = False,
    ):
        if not double_wrap and hasattr(func, "__wrapped__"):
            return
    
        if type(func) is str:
            func_name = func
            func = getattr(obj, func_name)
        else:
            func_name = func.__name__
    
        if iscoroutinefunction(func):
            # make sure the wrapped functions remain coroutines
            async def decorated_pre(instance, *args, **kwargs):
                wrapper(instance, *args, **kwargs)
                return await func(instance, *args, **kwargs)
    
            async def decorated_post(instance, *args, **kwargs):
                return_value = await func(instance, *args, **kwargs)
                wrapper.return_value = return_value  # allow wrapper to access the return value of the original function
                wrapper(instance, *args, **kwargs)
                return return_value
    
        else:
    
            def decorated_pre(instance, *args, **kwargs):
                wrapper(instance, *args, **kwargs)
                return func(instance, *args, **kwargs)
    
            def decorated_post(instance, *args, **kwargs):
                return_value = func(instance, *args, **kwargs)
                wrapper.return_value = return_value  # allow wrapper to access the return value of the original function
                wrapper(instance, *args, **kwargs)
                return return_value
    
        setattr(
            obj,
            func_name,
            functools.update_wrapper(
                decorated_post if post_execute else decorated_pre,
                func,
            ),
        )
    
    
    def device_labels(device: Device) -> Dict[str, str]:
        domain, family, member = device.get_name().split("/", 2)
        return {
            "domain": domain,
            "family": family,
            "member": member,
            "device_class": device.get_device_class().get_name(),
        }
    
    
    def metric_name(attribute_name: str) -> str:
        """Return the name of the Prometheus metric to track the given attribute."""
    
        # strip _R and _RW suffixes. the associated access level is already provided in the labels.
        metric_name = attribute_name.removesuffix("_R").removesuffix("_RW")
    
        # only export lower-case metric names
        return f"ds_{metric_name}".lower()
    
    
    class AttributeMetric(Metric):
        """Manage a Prometheus Metric object for Tango devices."""
    
        def __init__(
            self,
            name: str,
            description: str,
            static_labels: Dict[str, str],
            metric_class=Gauge,
            metric_class_init_kwargs: Dict[str, object] | None = None,
            dynamic_labels: List[str] | None = None,
        ):
            super().__init__(
                metric_name(name),
                description,
                static_labels,
                metric_class,
                metric_class_init_kwargs,
                dynamic_labels,
            )
    
        def _metric_enum_value(self, value: str | IntEnum | DevState, labels: List[str]):
            value = value.name if isinstance(value, DevState) else value
    
            super()._metric_enum_value(value, labels)
    
    
    class ScalarAttributeMetric(AttributeMetric):
        """Manage a Prometheus Metric object attached to a Scalar Tango Attribute."""
    
        def __init__(self, device: Device, attribute: Attribute, wrap_read: bool = True):
            self.device = device
            self.attribute = attribute
            self.data_type = self.attribute.get_data_type()
            self.access = self.attribute.get_writable()
    
            try:
                description = attribute.get_properties().description
            except DevFailed:
                # get_properties() can fail for devices not registered in the TangoDB
                # TODO: get this value another way?
                description = ""
    
            static_labels = device_labels(device)
            static_labels["access"] = access_label_values[self.access]
    
            if self.data_type == CmdArgType.DevString:
                super().__init__(attribute.get_name(), description, static_labels, Info)
            elif self.data_type == CmdArgType.DevEnum:
                # evil PyTango foo to obtain enum labels from class attribute
                enum_labels = getattr(
                    device.__class__, attribute.get_name()
                ).att_prop.enum_labels.split(",")
    
                super().__init__(
                    attribute.get_name(),
                    description,
                    static_labels,
                    Enum,
                    metric_class_init_kwargs={"states": enum_labels},
                )
            else:
                super().__init__(attribute.get_name(), description, static_labels)
    
            if wrap_read:
                self._wrap_attribute_read(device)
            self._wrap_attribute_write(device)
    
        def _set_value(self, value: object, labels):
            """Update the metric with the given value and labels.
    
            Make sure all labels added to `label_keys()` are provided."""
    
            if self.data_type == CmdArgType.DevString:
                super()._set_value(dict(str_value=value), labels)
            else:
                super()._set_value(value, labels)
    
        def _wrap_attribute_read(self, device: Device):
            """Wrap the attribute's read functions to feed new values to set_value()."""
    
            name = self.attribute.get_name()
    
            def read_func_wrapper(device):
                try:
                    value = read_func_wrapper.return_value
    
                    self.set_value(value)
                except Exception:
                    logger.exception(
                        f"Metrics: error updating metrics for {name} after reading it"
                    )
    
            wrap_method(device, f"__read_{name}_wrapper__", read_func_wrapper)
    
        def _wrap_attribute_write(self, device: Device):
            """Wrap the attribute's write functions to feed new values to set_value()."""
    
            name = self.attribute.get_name()
    
            def write_func_wrapper(device):
                try:
                    value = self.attribute.get_write_value()
                    self.set_value(value)
                except Exception:
                    logger.exception(
                        f"Metrics: error updating metrics for {name} after writing it"
                    )
    
            if hasattr(device, f"__write_{name}_wrapper__"):
                wrap_method(device, f"__write_{name}_wrapper__", write_func_wrapper)
    
    
    class SpectrumAttributeMetric(ScalarAttributeMetric):
        """Manage a Prometheus Metric object attached to a Spectrum (1D) Tango Attribute."""
    
        def label_keys(self):
            return super().label_keys() + ["x"]
    
        def _set_value(self, value, labels):
            for x, element in enumerate(value):
                super()._set_value(element, labels + [f"{x:02}"])
    
    
    class ImageAttributeMetric(ScalarAttributeMetric):
        """Manage a Prometheus Metric object attached to an Image (2D) Tango Attribute."""
    
        # Maximum number of elements in an attribute before we warn about it
        WARN_ARRAY_SIZE = 1024
    
        def __init__(self, device: Device, attribute: Attribute, wrap_read: bool = True):
            super().__init__(device, attribute, wrap_read)
    
            if self.max_dim_x * self.max_dim_y >= self.WARN_ARRAY_SIZE:
                logger.warning(
                    f"Metrics: exposing attribute {self.attribute.get_name()} of device {self.device.get_name()} with {self.max_dim_x} x {self.max_dim_y} elements. Performance may suffer."
                )
    
        @property
        def max_dim_x(self):
            return self.attribute.get_max_dim_x()
    
        @property
        def max_dim_y(self):
            return self.attribute.get_max_dim_y()
    
        def label_keys(self):
            return super().label_keys() + ["x", "y", "idx"]
    
        def _set_value(self, value, labels):
            max_dim_x = self.max_dim_x
    
            for y, row in enumerate(value):
                for x, element in enumerate(row):
                    idx = y * max_dim_x + x
                    super()._set_value(
                        # swap (x,y) to have x in Prometheus consistently be
                        # the major dimension in both Spectrum and Image attributes.
                        element,
                        labels + [f"{y:02}", f"{x:02}", f"{idx:03}"],
                    )