Select Git revision
_hdf5_utils.py
-
Hannes Feldt authored
This reverts commit f36bfe89.
Hannes Feldt authoredThis reverts commit f36bfe89.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
messagehandler.py 5.33 KiB
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from datetime import datetime
import json
from threading import Thread
from typing import Callable, Dict
from prometheus_client import Counter, Gauge
import zmq
from lofar_lotus.zeromq import ZeroMQSubscriber
from tangostationcontrol.metrics import AttributeMetric
import logging
logger = logging.getLogger()
class ZeroMQSubscriberMetrics:
def __init__(self, static_labels: Dict[str, str]):
self.message_count_metric = AttributeMetric(
"zmq_message_count",
"Number of messages received over ZMQ",
static_labels,
Counter,
dynamic_labels=["topic"],
)
self.is_connected_metric = AttributeMetric(
"zmq_connected",
"Is the ZMQ receiver connected to a publisher",
static_labels,
Gauge,
)
self.nr_connects_metric = AttributeMetric(
"zmq_nr_connects",
"Number of times the ZMQ receiver connected to a publisher",
static_labels,
Gauge,
)
self.nr_disconnects_metric = AttributeMetric(
"zmq_nr_disconnects",
"Number of times the ZMQ receiver disconnected to a publisher",
static_labels,
Gauge,
)
self.message_decode_error_count_metric = AttributeMetric(
"zmq_decode_error_count",
"Number of messages received over ZMQ that could not be decoded as JSON",
static_labels,
Counter,
dynamic_labels=["topic"],
)
self.message_handling_error_count_metric = AttributeMetric(
"zmq_handling_error_count",
"Number of messages received over ZMQ that raised an exception on processing",
static_labels,
Counter,
dynamic_labels=["topic"],
)
class MultiEndpointZMQMessageHandler:
"""Handles messages from ZMQ from multiple endpoints in separate threads."""
def __init__(
self,
handle_message: Callable[[str, datetime, dict], None],
metric_labels: Dict[str, str] | None = None,
):
self.handle_message = handle_message
self.receiver_threads: dict[str, Thread] = {}
self._subscribers: dict[str, ZeroMQSubscriber] = {}
self.metric_labels = metric_labels
self.closing = False
def add_receiver(self, uri: str, topics: list[str]):
"""Spawn a thread to subscribe to the given URI and topics."""
if uri in self.receiver_threads:
raise RuntimeError(f"Already listening to {uri}")
thread = Thread(target=self._run_receiver, args=(uri, topics))
thread.start()
self.receiver_threads[uri] = thread
logger.debug(f"Added received thread for {uri=} {topics=}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
return True
def close(self):
self.closing = True
for subscriber in self._subscribers.values():
subscriber.close()
def _run_receiver(self, uri, topics):
with ZeroMQSubscriber(uri, topics) as subscriber:
logger.info(f"Subscribed to {uri=} {topics=}")
if self.closing:
return
self._subscribers[uri] = subscriber
metric_labels = self.metric_labels or {}
metric_labels["uri"] = uri
metrics = ZeroMQSubscriberMetrics(metric_labels)
metrics.is_connected_metric.get_metric().set_function(
lambda: subscriber.is_connected
)
metrics.nr_connects_metric.get_metric().set_function(
lambda: subscriber.nr_connects
)
metrics.nr_disconnects_metric.get_metric().set_function(
lambda: subscriber.nr_disconnects
)
while True:
try:
# receive message
topic, timestamp, payload = subscriber.recv()
logger.debug(f"Received message {uri=} {topic=} {timestamp=}")
metrics.message_count_metric.get_metric([topic]).inc()
# decode payload
try:
message = json.loads(payload)
except (json.decoder.JSONDecodeError, UnicodeDecodeError):
logger.exception(
f"Could not decode message payload as JSON {uri=} {topic=} {payload=}"
)
metrics.message_decode_error_count_metric.get_metric(
[topic]
).inc()
continue
try:
self.handle_message(topic, timestamp, message)
except Exception:
logger.exception(
f"Exception handling message {topic=} {timestamp=}) {message=}"
)
metrics.message_handling_error_count_metric.get_metric(
[topic]
).inc()
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break
else:
raise