Skip to content
Snippets Groups Projects
Commit d3c190d3 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-2154-migrate-common-to-lotus' into 'master'

L2SS-2154: Use LOTUS library for common metrics code

Closes L2SS-2154

See merge request !1055
parents fed65f75 f0ed5f12
No related branches found
No related tags found
1 merge request!1055L2SS-2154: Use LOTUS library for common metrics code
......@@ -30,6 +30,7 @@ matplotlib
jupyterplot
# useful LOFAR software
lofar-lotus # Apache2
pabeam@git+https://git.astron.nl/mevius/pabeam # Apache2
# user packages
......
......@@ -82,4 +82,5 @@ class TestObservation(base.IntegrationTestCase):
with grpc.insecure_channel("rpc.service.consul:50051") as channel:
stub = ObservationStub(channel)
stub.StopObservation(StopObservationRequest(observation_id=obs_id))
self.assertFalse(obs.is_observation_running(obs_id))
......@@ -4,13 +4,15 @@ from tango import CmdArgType
from tango import Attribute
from tango import DevFailed
from tango import DevState
from prometheus_client import Metric, Gauge, Info, Enum
from prometheus_client import Gauge, Info, Enum
from asyncio import iscoroutinefunction
from enum import IntEnum
from typing import List, Dict, Callable, Union
import functools
import logging
from lofar_lotus.metrics import Metric
__all__ = [
"wrap_method",
"device_labels",
......@@ -105,7 +107,7 @@ def metric_name(attribute_name: str) -> str:
return f"ds_{metric_name}".lower()
class AttributeMetric:
class AttributeMetric(Metric):
"""Manage a Prometheus Metric object for Tango devices."""
def __init__(
......@@ -117,90 +119,19 @@ class AttributeMetric:
metric_class_init_kwargs: Dict[str, object] | None = None,
dynamic_labels: List[str] | None = None,
):
self.name = metric_name(name)
self.description = description
self.metric_class = metric_class
self.static_label_keys = list(static_labels.keys())
self.static_label_values = list(static_labels.values())
self.dynamic_label_keys = dynamic_labels or []
self.metric_class_init_kwargs = metric_class_init_kwargs or {}
if self.name not in METRICS:
METRICS[self.name] = self.make_metric()
self.metric = METRICS[self.name]
assert self.metric.__class__ == metric_class, (
f"Metric {self.name} was previously provided as {self.metric.__class__} but is now needed as {metric_class}"
)
def __str__(self):
return f"{self.__class__.__name__}(name={self.name}, metric_class={self.metric_class}, static_labels={self.static_label_keys}, dynamic_labels={self.dynamic_label_keys})"
def clear(self):
"""Remove all cached metrics."""
self.metric.clear()
def label_keys(self) -> List[str]:
"""Return the list of labels that we will use."""
return self.static_label_keys + self.dynamic_label_keys
def make_metric(self) -> Metric:
"""Construct a metric that collects samples for this attribute."""
return self.metric_class(
self.name,
self.description,
labelnames=self.label_keys(),
**self.metric_class_init_kwargs,
)
def get_metric(self, dynamic_label_values: List = None) -> Metric:
"""Return the metric that uses the default labels."""
return self.metric.labels(
*self.static_label_values, *(dynamic_label_values or [])
super().__init__(
metric_name(name),
description,
static_labels,
metric_class,
metric_class_init_kwargs,
dynamic_labels,
)
def set_value(self, value: object):
"""A new value for the attribute is known. Feed it to the metric."""
# set it, this class will take care of the default labels
self._set_value(value, self.static_label_values)
def _set_value(self, value: object, labels: List[str]):
if self.metric_class == Enum:
self._metric_enum_value(value, labels)
elif self.metric_class == Info:
self._metric_info_value(value, labels)
else:
self._metric_set_value(value, labels)
def _metric_set_value(self, value: object, labels: List[str]):
if value is None:
raise ValueError(f"Invalid value for metric: {value}")
self.metric.labels(*labels).set(value)
def _metric_info_value(self, value: Dict[str, str], labels: List[str]):
if value is None or None in value.values():
raise ValueError(f"Invalid value for metric: {value}")
self.metric.labels(*labels).info(value)
def _metric_enum_value(self, value: str | IntEnum, labels: List[str]):
if value is None:
raise ValueError(f"Invalid value for metric: {value}")
self.metric.labels(*labels).state(
value.name if isinstance(value, (DevState, IntEnum)) else value
)
def _metric_enum_value(self, value: str | IntEnum | DevState, labels: List[str]):
value = value.name if isinstance(value, DevState) else value
def collect(self) -> List[Metric]:
"""Return all collected samples."""
return self.metric.collect()
super()._metric_enum_value(value, labels)
class ScalarAttributeMetric(AttributeMetric):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment