Skip to content
Snippets Groups Projects
Select Git revision
  • master
1 result

README.md

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    messagehandler.py 5.33 KiB
    # Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    
    from datetime import datetime
    import json
    from threading import Thread
    from typing import Callable, Dict
    
    from prometheus_client import Counter, Gauge
    import zmq
    
    from lofar_lotus.zeromq import ZeroMQSubscriber
    from tangostationcontrol.metrics import AttributeMetric
    
    import logging
    
    logger = logging.getLogger()
    
    
    class ZeroMQSubscriberMetrics:
        def __init__(self, static_labels: Dict[str, str]):
            self.message_count_metric = AttributeMetric(
                "zmq_message_count",
                "Number of messages received over ZMQ",
                static_labels,
                Counter,
                dynamic_labels=["topic"],
            )
    
            self.is_connected_metric = AttributeMetric(
                "zmq_connected",
                "Is the ZMQ receiver connected to a publisher",
                static_labels,
                Gauge,
            )
    
            self.nr_connects_metric = AttributeMetric(
                "zmq_nr_connects",
                "Number of times the ZMQ receiver connected to a publisher",
                static_labels,
                Gauge,
            )
    
            self.nr_disconnects_metric = AttributeMetric(
                "zmq_nr_disconnects",
                "Number of times the ZMQ receiver disconnected to a publisher",
                static_labels,
                Gauge,
            )
    
            self.message_decode_error_count_metric = AttributeMetric(
                "zmq_decode_error_count",
                "Number of messages received over ZMQ that could not be decoded as JSON",
                static_labels,
                Counter,
                dynamic_labels=["topic"],
            )
    
            self.message_handling_error_count_metric = AttributeMetric(
                "zmq_handling_error_count",
                "Number of messages received over ZMQ that raised an exception on processing",
                static_labels,
                Counter,
                dynamic_labels=["topic"],
            )
    
    
    class MultiEndpointZMQMessageHandler:
        """Handles messages from ZMQ from multiple endpoints in separate threads."""
    
        def __init__(
            self,
            handle_message: Callable[[str, datetime, dict], None],
            metric_labels: Dict[str, str] | None = None,
        ):
            self.handle_message = handle_message
            self.receiver_threads: dict[str, Thread] = {}
            self._subscribers: dict[str, ZeroMQSubscriber] = {}
            self.metric_labels = metric_labels
    
            self.closing = False
    
        def add_receiver(self, uri: str, topics: list[str]):
            """Spawn a thread to subscribe to the given URI and topics."""
    
            if uri in self.receiver_threads:
                raise RuntimeError(f"Already listening to {uri}")
    
            thread = Thread(target=self._run_receiver, args=(uri, topics))
            thread.start()
    
            self.receiver_threads[uri] = thread
    
            logger.debug(f"Added received thread for {uri=} {topics=}")
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_value, traceback):
            self.close()
            return True
    
        def close(self):
            self.closing = True
    
            for subscriber in self._subscribers.values():
                subscriber.close()
    
        def _run_receiver(self, uri, topics):
            with ZeroMQSubscriber(uri, topics) as subscriber:
                logger.info(f"Subscribed to {uri=} {topics=}")
    
                if self.closing:
                    return
                self._subscribers[uri] = subscriber
    
                metric_labels = self.metric_labels or {}
                metric_labels["uri"] = uri
    
                metrics = ZeroMQSubscriberMetrics(metric_labels)
    
                metrics.is_connected_metric.get_metric().set_function(
                    lambda: subscriber.is_connected
                )
                metrics.nr_connects_metric.get_metric().set_function(
                    lambda: subscriber.nr_connects
                )
                metrics.nr_disconnects_metric.get_metric().set_function(
                    lambda: subscriber.nr_disconnects
                )
    
                while True:
                    try:
                        # receive message
                        topic, timestamp, payload = subscriber.recv()
                        logger.debug(f"Received message {uri=} {topic=} {timestamp=}")
                        metrics.message_count_metric.get_metric([topic]).inc()
    
                        # decode payload
                        try:
                            message = json.loads(payload)
                        except (json.decoder.JSONDecodeError, UnicodeDecodeError):
                            logger.exception(
                                f"Could not decode message payload as JSON {uri=} {topic=} {payload=}"
                            )
                            metrics.message_decode_error_count_metric.get_metric(
                                [topic]
                            ).inc()
                            continue
    
                        try:
                            self.handle_message(topic, timestamp, message)
                        except Exception:
                            logger.exception(
                                f"Exception handling message {topic=} {timestamp=}) {message=}"
                            )
                            metrics.message_handling_error_count_metric.get_metric(
                                [topic]
                            ).inc()
                    except zmq.ZMQError as e:
                        if e.errno == zmq.ETERM:
                            break
                        else:
                            raise