Select Git revision
common_pkg.vhd
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
statistics.py 8.04 KiB
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Base device for Statistics (SST/BST/XST)"""
# Additional import
import asyncio
import concurrent.futures
from contextlib import suppress
import ipaddress
import json
import logging
import socket
import zmq
from typing import Dict
from prometheus_client import Counter, Gauge
from lofar_lotus.zeromq import AsyncZeroMQSubscriber
# PyTango imports
from tango.server import device_property, attribute
from tangostationcontrol.common.constants import N_pn
from tangostationcontrol.devices.base_device_classes.opcua_device import OPCUADevice
from tangostationcontrol.metrics import device_metrics, device_labels, AttributeMetric
logger = logging.getLogger()
__all__ = ["Statistics"]
@device_metrics()
class Statistics(OPCUADevice):
"""Base device for Statistics (SST/BST/XST)"""
# -----------------
# Device Properties
# -----------------
Statistics_Client_UDP_Port = device_property(
dtype="DevUShort", mandatory=False, default_value=5001
)
Statistics_Client_ZMQ_Port = device_property(
dtype="DevUShort", mandatory=False, default_value=6001
)
# Containers in CNI get a MAC address consisting of a prefix, followed by a hex
# representation of their IPs. The default prefix is "0A:58", see
# https://github.com/k8snetworkplumbingwg/ovs-cni/blob/main/pkg/plugin/plugin.go.
#
# For example, a container with IP 1.2.3.4 gets the MAC address 0A:58:01:02:03:04.
Statistics_Client_MAC_prefix = device_property(
dtype="DevString", mandatory=False, default_value="0A:58:"
)
Statistics_Client_Hostname = device_property(
dtype="DevString",
mandatory=True,
)
# ----------
# Attributes
# ----------
zmq_thread_running_R = attribute(
doc="Whether the ZMQ messages are being listened to.",
dtype=bool,
fget=lambda self: (
self.event_loop_thread
and self.event_loop_thread.is_running()
and self.zmq_message_handler_future
and not self.zmq_message_handler_future.done()
),
)
# --------
# Support functions
# --------
@staticmethod
def _ip_to_hex(ip: ipaddress.IPv4Address) -> str:
"""Return the octets of the given IP address as a hex string,
separated by :."""
return ":".join(["%02X" % c for c in ipaddress.v4_int_to_packed(int(ip))])
def _statistics_defaults(
self, statistics_type: str, properties: Dict[str, object]
) -> Dict[str, object]:
host = properties["Statistics_Client_Hostname"]
ip = ipaddress.IPv4Address(socket.gethostbyname(host))
mac = properties["Statistics_Client_MAC_prefix"] + self._ip_to_hex(ip)
port = properties["Statistics_Client_UDP_Port"]
prefix = f"FPGA_{statistics_type}_offload_hdr"
return [
(f"{prefix}_eth_destination_mac_RW", [mac] * N_pn),
(f"{prefix}_ip_destination_address_RW", [str(ip)] * N_pn),
(f"{prefix}_udp_destination_port_RW", [port] * N_pn),
]
# --------
# Overloaded functions
# --------
def __init__(self, cl, name):
super().__init__(cl, name)
self.zmq_message_handler_future = None
self.zmq_message_count_metric = AttributeMetric(
"zmq_message_count",
"Number of messages received over ZMQ",
device_labels(self),
Counter,
dynamic_labels=["topic"],
)
self.zmq_is_connected_metric = AttributeMetric(
"zmq_connected",
"Is the ZMQ receiver connected to a publisher",
device_labels(self),
Gauge,
)
self.zmq_nr_connects_metric = AttributeMetric(
"zmq_nr_connects",
"Number of times the ZMQ receiver connected to a publisher",
device_labels(self),
Gauge,
)
self.zmq_nr_disconnects_metric = AttributeMetric(
"zmq_nr_disconnects",
"Number of times the ZMQ receiver disconnected to a publisher",
device_labels(self),
Gauge,
)
self.zmq_message_decode_error_count_metric = AttributeMetric(
"zmq_decode_error_count",
"Number of messages received over ZMQ that could not be decoded as JSON",
device_labels(self),
Counter,
dynamic_labels=["topic"],
)
self.zmq_message_handling_error_count_metric = AttributeMetric(
"zmq_handling_error_count",
"Number of messages received over ZMQ that raised an exception on processing",
device_labels(self),
Counter,
dynamic_labels=["topic"],
)
def configure_for_initialise(self):
super().configure_for_initialise()
self.zmq_message_handler_future = asyncio.run_coroutine_threadsafe(
self.zmq_message_handler(), self.event_loop_thread.event_loop
)
async def zmq_message_handler(self):
logger.debug("ZeroMQ message handler thread started")
try:
async with AsyncZeroMQSubscriber(
f"tcp://{self.Statistics_Client_Hostname}:{self.Statistics_Client_ZMQ_Port}",
[""],
) as zmq_subscriber:
# register metrics
self.zmq_is_connected_metric.get_metric().set_function(
lambda: zmq_subscriber.is_connected
)
self.zmq_nr_connects_metric.get_metric().set_function(
lambda: zmq_subscriber.nr_connects
)
self.zmq_nr_disconnects_metric.get_metric().set_function(
lambda: zmq_subscriber.nr_disconnects
)
while True:
# receive message
(topic, timestamp, payload) = await zmq_subscriber.async_recv()
self.zmq_message_count_metric.get_metric([topic]).inc()
# decode payload
try:
message = json.loads(payload)
except (json.decoder.JSONDecodeError, UnicodeDecodeError):
logger.exception(
f"Could not decode message payload as JSON {topic=} {message=}"
)
self.zmq_message_decode_error_count_metric.get_metric(
[topic]
).inc()
continue
# handle message
try:
self.handle_statistics_message(topic, message)
except Exception:
logger.exception(f"Error handling message {topic=} {message=}")
self.zmq_message_handling_error_count_metric.get_metric(
[topic]
).inc()
continue
except (
asyncio.CancelledError,
concurrent.futures.CancelledError,
):
logger.debug("ZeroMQ message handler thread cancelled")
raise
except zmq.ContextTerminated:
# beneign exception from a forced stop
logger.debug("ZeroMQ message handler ZMQ context terminated")
except Exception:
logger.exception("Error handling ZeroMQ messages")
logger.debug("ZeroMQ message handler thread stopped")
def configure_for_off(self):
super().configure_for_off()
if self.zmq_message_handler_future:
logger.info("Cancelling ZeroMQ message handler thread")
self.zmq_message_handler_future.cancel()
with suppress(concurrent.futures.CancelledError):
_ = self.zmq_message_handler_future.result()
self.zmq_message_handler_future = None
logger.debug("Cancelled ZeroMQ message handler thread")
def handle_statistics_message(self, topic: str, message: dict):
raise NotImplementedError
# --------
# Commands
# --------