Skip to content
Snippets Groups Projects
Select Git revision
  • f08b843392d8695d9cf079d0cb9e2a6c1388f688
  • master default protected
  • L2SDP-1131
  • L2SDP-LIFT
  • L2SDP-1137
  • HPR-158
6 results

common_pkg.vhd

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    statistics.py 8.04 KiB
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    
    """Base device for Statistics (SST/BST/XST)"""
    
    # Additional import
    import asyncio
    import concurrent.futures
    from contextlib import suppress
    import ipaddress
    import json
    import logging
    import socket
    import zmq
    from typing import Dict
    
    from prometheus_client import Counter, Gauge
    
    from lofar_lotus.zeromq import AsyncZeroMQSubscriber
    
    # PyTango imports
    from tango.server import device_property, attribute
    from tangostationcontrol.common.constants import N_pn
    from tangostationcontrol.devices.base_device_classes.opcua_device import OPCUADevice
    from tangostationcontrol.metrics import device_metrics, device_labels, AttributeMetric
    
    logger = logging.getLogger()
    
    __all__ = ["Statistics"]
    
    
    @device_metrics()
    class Statistics(OPCUADevice):
        """Base device for Statistics (SST/BST/XST)"""
    
        # -----------------
        # Device Properties
        # -----------------
    
        Statistics_Client_UDP_Port = device_property(
            dtype="DevUShort", mandatory=False, default_value=5001
        )
    
        Statistics_Client_ZMQ_Port = device_property(
            dtype="DevUShort", mandatory=False, default_value=6001
        )
    
        # Containers in CNI get a MAC address consisting of a prefix, followed by a hex
        # representation of their IPs. The default prefix is "0A:58", see
        # https://github.com/k8snetworkplumbingwg/ovs-cni/blob/main/pkg/plugin/plugin.go.
        #
        # For example, a container with IP 1.2.3.4 gets the MAC address 0A:58:01:02:03:04.
        Statistics_Client_MAC_prefix = device_property(
            dtype="DevString", mandatory=False, default_value="0A:58:"
        )
    
        Statistics_Client_Hostname = device_property(
            dtype="DevString",
            mandatory=True,
        )
    
        # ----------
        # Attributes
        # ----------
    
        zmq_thread_running_R = attribute(
            doc="Whether the ZMQ messages are being listened to.",
            dtype=bool,
            fget=lambda self: (
                self.event_loop_thread
                and self.event_loop_thread.is_running()
                and self.zmq_message_handler_future
                and not self.zmq_message_handler_future.done()
            ),
        )
    
        # --------
        # Support functions
        # --------
    
        @staticmethod
        def _ip_to_hex(ip: ipaddress.IPv4Address) -> str:
            """Return the octets of the given IP address as a hex string,
            separated by :."""
    
            return ":".join(["%02X" % c for c in ipaddress.v4_int_to_packed(int(ip))])
    
        def _statistics_defaults(
            self, statistics_type: str, properties: Dict[str, object]
        ) -> Dict[str, object]:
            host = properties["Statistics_Client_Hostname"]
            ip = ipaddress.IPv4Address(socket.gethostbyname(host))
            mac = properties["Statistics_Client_MAC_prefix"] + self._ip_to_hex(ip)
    
            port = properties["Statistics_Client_UDP_Port"]
    
            prefix = f"FPGA_{statistics_type}_offload_hdr"
    
            return [
                (f"{prefix}_eth_destination_mac_RW", [mac] * N_pn),
                (f"{prefix}_ip_destination_address_RW", [str(ip)] * N_pn),
                (f"{prefix}_udp_destination_port_RW", [port] * N_pn),
            ]
    
        # --------
        # Overloaded functions
        # --------
    
        def __init__(self, cl, name):
            super().__init__(cl, name)
    
            self.zmq_message_handler_future = None
    
            self.zmq_message_count_metric = AttributeMetric(
                "zmq_message_count",
                "Number of messages received over ZMQ",
                device_labels(self),
                Counter,
                dynamic_labels=["topic"],
            )
    
            self.zmq_is_connected_metric = AttributeMetric(
                "zmq_connected",
                "Is the ZMQ receiver connected to a publisher",
                device_labels(self),
                Gauge,
            )
    
            self.zmq_nr_connects_metric = AttributeMetric(
                "zmq_nr_connects",
                "Number of times the ZMQ receiver connected to a publisher",
                device_labels(self),
                Gauge,
            )
    
            self.zmq_nr_disconnects_metric = AttributeMetric(
                "zmq_nr_disconnects",
                "Number of times the ZMQ receiver disconnected to a publisher",
                device_labels(self),
                Gauge,
            )
    
            self.zmq_message_decode_error_count_metric = AttributeMetric(
                "zmq_decode_error_count",
                "Number of messages received over ZMQ that could not be decoded as JSON",
                device_labels(self),
                Counter,
                dynamic_labels=["topic"],
            )
    
            self.zmq_message_handling_error_count_metric = AttributeMetric(
                "zmq_handling_error_count",
                "Number of messages received over ZMQ that raised an exception on processing",
                device_labels(self),
                Counter,
                dynamic_labels=["topic"],
            )
    
        def configure_for_initialise(self):
            super().configure_for_initialise()
    
            self.zmq_message_handler_future = asyncio.run_coroutine_threadsafe(
                self.zmq_message_handler(), self.event_loop_thread.event_loop
            )
    
        async def zmq_message_handler(self):
            logger.debug("ZeroMQ message handler thread started")
    
            try:
                async with AsyncZeroMQSubscriber(
                    f"tcp://{self.Statistics_Client_Hostname}:{self.Statistics_Client_ZMQ_Port}",
                    [""],
                ) as zmq_subscriber:
                    # register metrics
                    self.zmq_is_connected_metric.get_metric().set_function(
                        lambda: zmq_subscriber.is_connected
                    )
                    self.zmq_nr_connects_metric.get_metric().set_function(
                        lambda: zmq_subscriber.nr_connects
                    )
                    self.zmq_nr_disconnects_metric.get_metric().set_function(
                        lambda: zmq_subscriber.nr_disconnects
                    )
    
                    while True:
                        # receive message
                        (topic, timestamp, payload) = await zmq_subscriber.async_recv()
                        self.zmq_message_count_metric.get_metric([topic]).inc()
    
                        # decode payload
                        try:
                            message = json.loads(payload)
                        except (json.decoder.JSONDecodeError, UnicodeDecodeError):
                            logger.exception(
                                f"Could not decode message payload as JSON {topic=} {message=}"
                            )
                            self.zmq_message_decode_error_count_metric.get_metric(
                                [topic]
                            ).inc()
                            continue
    
                        # handle message
                        try:
                            self.handle_statistics_message(topic, message)
                        except Exception:
                            logger.exception(f"Error handling message {topic=} {message=}")
                            self.zmq_message_handling_error_count_metric.get_metric(
                                [topic]
                            ).inc()
                            continue
            except (
                asyncio.CancelledError,
                concurrent.futures.CancelledError,
            ):
                logger.debug("ZeroMQ message handler thread cancelled")
                raise
            except zmq.ContextTerminated:
                # beneign exception from a forced stop
                logger.debug("ZeroMQ message handler ZMQ context terminated")
            except Exception:
                logger.exception("Error handling ZeroMQ messages")
    
            logger.debug("ZeroMQ message handler thread stopped")
    
        def configure_for_off(self):
            super().configure_for_off()
    
            if self.zmq_message_handler_future:
                logger.info("Cancelling ZeroMQ message handler thread")
                self.zmq_message_handler_future.cancel()
    
                with suppress(concurrent.futures.CancelledError):
                    _ = self.zmq_message_handler_future.result()
    
                self.zmq_message_handler_future = None
                logger.debug("Cancelled ZeroMQ message handler thread")
    
        def handle_statistics_message(self, topic: str, message: dict):
            raise NotImplementedError
    
        # --------
        # Commands
        # --------