diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json index bbfb3c47ef31db0c647bbc2c5a62b908915601fa..f4032fb611d54f261676041658af859a69f37559 100644 --- a/CDB/LOFAR_ConfigDb.json +++ b/CDB/LOFAR_ConfigDb.json @@ -722,7 +722,7 @@ "SST": { "LTS/SST/1": { "properties": { - "SST_Client_Port": [ + "Statistics_Client_Port": [ "5001" ], "OPC_Server_Name": [ diff --git a/CDB/sdp-sim-config.json b/CDB/sdp-sim-config.json index 75fb9998cbc210bdfee04adf41e6c854bcf13358..64b841e1dacf36e1de9b3e20ea068d36f0011478 100644 --- a/CDB/sdp-sim-config.json +++ b/CDB/sdp-sim-config.json @@ -24,7 +24,7 @@ "SST": { "LTS/SST/1": { "properties": { - "SST_Client_Port": [ + "Statistics_Client_Port": [ "5001" ], "OPC_Server_Name": [ diff --git a/CDB/thijs_ConfigDb.json b/CDB/thijs_ConfigDb.json index e60ce20eacdf24bad708009c12259bdcdf8d1cbd..37ae6d7b66acb4bbb0be1fd36bfc78e2f93eba8e 100644 --- a/CDB/thijs_ConfigDb.json +++ b/CDB/thijs_ConfigDb.json @@ -94,7 +94,7 @@ "SST": { "LTS/SST/1": { "properties": { - "SST_Client_Port": [ + "Statistics_Client_Port": [ "5001" ], "OPC_Server_Name": [ diff --git a/devices/clients/sst_client.py b/devices/clients/sst_client.py deleted file mode 100644 index af021d2a0a33dc9a6bf6f278de7d03a7cb758f31..0000000000000000000000000000000000000000 --- a/devices/clients/sst_client.py +++ /dev/null @@ -1,304 +0,0 @@ -from queue import Queue -from threading import Thread -import logging -import numpy -import queue -import socket -import time - -from clients.comms_client import CommClient -from devices.sdp.statistics_packet import SSTPacket - -logger = logging.getLogger() - - -class sst_client(CommClient): - """ - Connects to OPC-UA in the foreground or background, and sends HELLO - messages to keep a check on the connection. On connection failure, reconnects once. - """ - - def start(self): - super().start() - - def __init__(self, host, port, fault_func, streams, try_interval=2, queuesize=1024): - """ - Create the sst client and connect() to it and get the object node - """ - self.host = host - self.port = port - self.poll_timeout = 0.1 - self.disconnect_timeout = 10.0 - self.queuesize = queuesize - - super().__init__(fault_func, streams, try_interval) - - # Explicitly connect - if not self.connect(): - # hardware or infra is down -- needs fixing first - fault_func() - return - - def queue_fill_percentage(self): - try: - return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0 - except NotImplementedError: - # some platforms don't have qsize(), nothing we can do here - return 0 - - def connect(self): - """ - Function used to connect to the client. - """ - if not self.connected: - self.queue = Queue(maxsize=self.queuesize) - self.udp = UDP_Receiver(self.host, self.port, self.queue, self.poll_timeout, self.disconnect_timeout) - self.sst = SST_collector(self.queue, self.disconnect_timeout) - - return super().connect() - - def ping(self): - if not self.sst.isAlive(): - raise Exception("SST thread died unexpectedly") - - if not self.udp.isAlive(): - raise Exception("UDP thread died unexpectedly") - - def disconnect(self): - # explicit disconnect, instead of waiting for the GC to kick in after "del" below - self.sst.disconnect() - self.udp.disconnect() - - del self.udp - del self.sst - del self.queue - - return super().disconnect() - - def setup_value_conversion(self, attribute): - """ - gives the client access to the attribute_wrapper object in order to access all data it could potentially need. - the OPC ua read/write functions require the dimensionality and the type to be known - """ - return - - def setup_attribute(self, annotation, attribute): - """ - MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions - """ - - parameter = annotation["parameter"] - - # get all the necessary data to set up the read/write functions from the attribute_wrapper - self.setup_value_conversion(attribute) - - # redirect to right object. this works as long as the parameter names are unique among them. - if annotation["type"] == "sst": - def read_function(): - return self.sst.parameters[parameter] - elif annotation["type"] == "udp": - def read_function(): - return self.udp.parameters[parameter] - elif annotation["type"] == "queue": - if parameter == "fill_percentage": - def read_function(): - return numpy.uint64(self.queue_fill_percentage()) - else: - raise ValueError("Unknown queue parameter requested: %s" % parameter) - - def write_function(value): - """ - Not used here - """ - pass - - return read_function, write_function - -class UDP_Receiver(Thread): - """ - This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code - """ - - def __init__(self, host, port, queue, poll_timeout=0.1, disconnect_timeout=10.0): - self.queue = queue - self.host = host - self.port = port - self.disconnect_timeout = disconnect_timeout - - self.parameters = { - # Number of packets we received - "nof_packets_received": numpy.uint64(0), - # Number of packets we had to drop due to a full queue - "nof_packets_dropped": numpy.uint64(0), - # Packets are at most 9000 bytes, the largest payload (well, MTU) of an Ethernet Jumbo frame - "last_packet": numpy.zeros((9000,), dtype=numpy.uint8), - # Timestamp of when the last packet was received - "last_packet_timestamp": numpy.uint64(0), - } - - logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port)) - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - - # Allow binding even if there are still lingering packets in the kernel for a - # previous listener that already died. If not, we get an "Address already in use". - # This is stock socket usage. - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - # specify what host and port to listen on - self.sock.bind((self.host, self.port)) - - # Make sure we can stop receiving packets even if none arrive. - # Without this, the recvmsg() call blocks indefinitely if no packet arrives. - self.sock.settimeout(poll_timeout) - - self.stream_on = True - super().__init__() - - self.start() - - def run(self): - # all variables are manually defined and are updated each time - logger.info("Starting UDP thread for {}:{}".format(self.host, self.port)) - - while self.stream_on: - try: - packet, _, _, _ = self.sock.recvmsg(9000) - - self.parameters["nof_packets_received"] += numpy.uint64(1) - self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) - self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time())) - - # Forward packet to processing thread - self.queue.put(packet) - except socket.timeout: - # timeout -- expected, allows us to check whether to stop - pass - except queue.Full: - # overflow -- just discard - self.parameters["nof_packets_dropped"] += numpy.uint64(1) - - logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port)) - - def join(self, timeout=0): - self.stream_on = False - logging.info("Sent shutdown to UDP thread for {}:{}".format(self.host, self.port)) - - super().join(timeout) - - if self.isAlive(): - # happens if timeout is hit - return - - # shutdown the socket so that others can listen on this port - self.sock.shutdown(socket.SHUT_RDWR) - - def disconnect(self): - if not self.isAlive(): - return - - # try to get the thread shutdown, but don't stall forever - self.join(self.disconnect_timeout) - - if self.isAlive(): - logger.error("UDP thread not shutting down for {}:{}".format(self.host, self.port)) - - def __del__(self): - self.disconnect() - -class SST_collector(Thread): - # Maximum number of antenna inputs we support (used to determine array sizes) - MAX_INPUTS = 192 - - # Maximum number of subbands we support (used to determine array sizes) - MAX_SUBBANDS = 512 - - def __init__(self, queue, disconnect_timeout=10.0): - self.queue = queue - self.last_packet = None - self.disconnect_timeout = disconnect_timeout - - self.parameters = { - "nof_packets": numpy.uint64(0), - - # Packet count for packets that could not be parsed as SSTs - "nof_invalid_packets": numpy.uint64(0), - - # Full contents of the latest packet we deemed invalid. - "last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8), - - # Number of packets received so far that we could parse correctly and do not have a payload error - "nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), - - # Packets that reported a payload error - "nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), - - # Last value array we've constructed out of the packets - "sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64), - "sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), - "integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32), - } - - super().__init__() - self.start() - - def run(self): - logging.info("Starting SST thread") - - while True: - self.last_packet = self.queue.get() - - # This is the exception/slow path, but python doesn't allow us to optimise that - if self.last_packet is None: - # None is the magic marker to stop processing - break - - self.process_packet(self.last_packet) - - logging.info("Stopping SST thread") - - def join(self, timeout=0): - # insert magic marker - self.queue.put(None) - logging.info("Sent shutdown to SST thread") - - super().join(timeout) - - def disconnect(self): - if not self.isAlive(): - return - - # try to get the thread shutdown, but don't stall forever - self.join(self.disconnect_timeout) - - if self.isAlive: - logger.error("SST thread not shutting down") - - def process_packet(self, packet): - self.parameters["nof_packets"] += numpy.uint64(1) - - try: - fields = SSTPacket(packet) - - # determine which input this packet contains data for - if fields.signal_input_index >= self.MAX_INPUTS: - # packet describes an input that is out of bounds for us - raise ValueError("Packet describes input %d, but we are limited to describing MAX_INPUTS=%d" % (fields.signal_input_index, self.MAX_INPUTS)) - - input_index = fields.signal_input_index - - if fields.payload_error: - # cannot trust the data if a payload error is reported - self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1) - return - - # process the packet - self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1) - self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload - self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) - self.parameters["integration_intervals"][input_index] = fields.integration_interval() - except Exception as e: - # This is unexpected, so print a stack trace - logging.exception("Could not parse SST UDP packet") - - self.parameters["nof_invalid_packets"] += numpy.uint64(1) - self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) diff --git a/devices/clients/statistics_client.py b/devices/clients/statistics_client.py new file mode 100644 index 0000000000000000000000000000000000000000..5d45ac472b52ac2f024dfd4a338cb3d03f4d3c77 --- /dev/null +++ b/devices/clients/statistics_client.py @@ -0,0 +1,126 @@ +from queue import Queue +from threading import Thread +import logging +import numpy +import queue + +from .comms_client import CommClient +from .udp_receiver import UDPReceiver + +logger = logging.getLogger() + + +class StatisticsClient(CommClient): + """ + Collects statistics packets over UDP, forwards them to a StatisticsCollector, + and provides a CommClient interface to expose points to a Device Server. + """ + + def start(self): + super().start() + + def __init__(self, statistics_collector_class, host, port, fault_func, streams, try_interval=2, queuesize=1024): + """ + Create the statistics client and connect() to it and get the object node. + + statistics_collector_class: a subclass of StatisticsCollector that specialises in processing the received packets. + host: hostname to listen on + port: port number to listen on + """ + self.host = host + self.port = port + self.poll_timeout = 0.1 + self.queuesize = queuesize + self.statistics_collector_class = statistics_collector_class + + super().__init__(fault_func, streams, try_interval) + + # Explicitly connect + if not self.connect(): + # hardware or infra is down -- needs fixing first + fault_func() + return + + def queue_fill_percentage(self): + try: + return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0 + except NotImplementedError: + # some platforms don't have qsize(), nothing we can do here + return 0 + + def connect(self): + """ + Function used to connect to the client. + """ + if not self.connected: + self.queue = Queue(maxsize=self.queuesize) + self.udp = UDPReceiver(self.host, self.port, self.queue, self.poll_timeout) + self.statistics = self.statistics_collector_class(self.queue) + + return super().connect() + + def ping(self): + if not self.statistics.is_alive(): + raise Exception("Statistics processing thread died unexpectedly") + + if not self.udp.is_alive(): + raise Exception("UDP thread died unexpectedly") + + def disconnect(self): + # explicit disconnect, instead of waiting for the GC to kick in after "del" below + try: + self.statistics.disconnect() + except Exception: + # nothing we can do, but we should continue cleaning up + logger.log_exception("Could not disconnect statistics processing class") + + try: + self.udp.disconnect() + except Exception: + # nothing we can do, but we should continue cleaning up + logger.log_exception("Could not disconnect UDP receiver class") + + del self.udp + del self.statistics + del self.queue + + return super().disconnect() + + def setup_value_conversion(self, attribute): + """ + gives the client access to the attribute_wrapper object in order to access all data it could potentially need. + the OPC ua read/write functions require the dimensionality and the type to be known + """ + return + + def setup_attribute(self, annotation, attribute): + """ + MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions + """ + + parameter = annotation["parameter"] + + # get all the necessary data to set up the read/write functions from the attribute_wrapper + self.setup_value_conversion(attribute) + + # redirect to right object. this works as long as the parameter names are unique among them. + if annotation["type"] == "statistics": + def read_function(): + return self.statistics.parameters[parameter] + elif annotation["type"] == "udp": + def read_function(): + return self.udp.parameters[parameter] + elif annotation["type"] == "queue": + if parameter == "fill_percentage": + def read_function(): + return numpy.uint64(self.queue_fill_percentage()) + else: + raise ValueError("Unknown queue parameter requested: %s" % parameter) + + def write_function(value): + """ + Not used here + """ + pass + + return read_function, write_function diff --git a/devices/clients/udp_receiver.py b/devices/clients/udp_receiver.py new file mode 100644 index 0000000000000000000000000000000000000000..13f68f509ede31ac69c6fa0ab9b9d023cbda349b --- /dev/null +++ b/devices/clients/udp_receiver.py @@ -0,0 +1,103 @@ +from queue import Queue +from threading import Thread +import numpy +import logging +import socket +import time + +logger = logging.getLogger() + + +class UDPReceiver(Thread): + """ + This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code + """ + + # How long to wait for a stuck Thread + DISCONNECT_TIMEOUT = 10.0 + + def __init__(self, host, port, queue, poll_timeout=0.1): + self.queue = queue + self.host = host + self.port = port + + self.parameters = { + # Number of packets we received + "nof_packets_received": numpy.uint64(0), + # Number of packets we had to drop due to a full queue + "nof_packets_dropped": numpy.uint64(0), + # Packets are at most 9000 bytes, the largest payload (well, MTU) of an Ethernet Jumbo frame + "last_packet": numpy.zeros((9000,), dtype=numpy.uint8), + # Timestamp of when the last packet was received + "last_packet_timestamp": numpy.uint64(0), + } + + logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port)) + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + # Allow binding even if there are still lingering packets in the kernel for a + # previous listener that already died. If not, we get an "Address already in use". + # This is stock socket usage. + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # specify what host and port to listen on + self.sock.bind((self.host, self.port)) + + # Make sure we can stop receiving packets even if none arrive. + # Without this, the recvmsg() call blocks indefinitely if no packet arrives. + self.sock.settimeout(poll_timeout) + + self.stream_on = True + super().__init__() + + self.start() + + def run(self): + # all variables are manually defined and are updated each time + logger.info("Starting UDP thread for {}:{}".format(self.host, self.port)) + + while self.stream_on: + try: + packet, _, _, _ = self.sock.recvmsg(9000) + + self.parameters["nof_packets_received"] += numpy.uint64(1) + self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) + self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time())) + + # Forward packet to processing thread + self.queue.put(packet) + except socket.timeout: + # timeout -- expected, allows us to check whether to stop + pass + except queue.Full: + # overflow -- just discard + self.parameters["nof_packets_dropped"] += numpy.uint64(1) + + logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port)) + + def join(self, timeout=0): + self.stream_on = False + logging.info("Sent shutdown to UDP thread for {}:{}".format(self.host, self.port)) + + super().join(timeout) + + if self.is_alive(): + # happens if timeout is hit + return + + # shutdown the socket so that others can listen on this port + self.sock.shutdown(socket.SHUT_RDWR) + + def disconnect(self): + if not self.is_alive(): + return + + # try to get the thread shutdown, but don't stall forever + self.join(self.DISCONNECT_TIMEOUT) + + if self.is_alive(): + # there is nothing we can do except wait (stall) longer, which could be indefinitely. + logger.error(f"UDP thread for {self.host}:{self.port} did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.") + + def __del__(self): + self.disconnect() diff --git a/devices/devices/hardware_device.py b/devices/devices/hardware_device.py index 4bdc63386076a53caa57d5fd639661ee949f7b8e..a4da09297a6696c4fb5a31e2359b63958cb4eb4d 100644 --- a/devices/devices/hardware_device.py +++ b/devices/devices/hardware_device.py @@ -11,8 +11,10 @@ """ +from abc import ABCMeta, abstractmethod + # PyTango imports -from tango.server import Device, command +from tango.server import Device, command, DeviceMeta from tango import DevState, DebugIt # Additional import @@ -26,8 +28,12 @@ from devices.device_decorators import only_in_states, fault_on_error import logging logger = logging.getLogger() +class AbstractDeviceMetas(DeviceMeta, ABCMeta): + ''' Collects meta classes to allow hardware_device to be both a Device and an ABC. ''' + pass + #@log_exceptions() -class hardware_device(Device): +class hardware_device(Device, metaclass=AbstractDeviceMetas): """ **Properties:** @@ -155,13 +161,18 @@ class hardware_device(Device): self.set_state(DevState.FAULT) - # functions that can be overloaded + # functions that can or must be overloaded def configure_for_fault(self): pass + + @abstractmethod def configure_for_off(self): pass + def configure_for_on(self): pass + + @abstractmethod def configure_for_initialise(self): pass diff --git a/devices/devices/sdp/sst.py b/devices/devices/sdp/sst.py index e6b5ef4015b64dd3bf546cfc24b3287745a19bf4..2dc32c64a6fae3688d4ce78970490dafe0c618fd 100644 --- a/devices/devices/sdp/sst.py +++ b/devices/devices/sdp/sst.py @@ -24,52 +24,34 @@ from tango.server import device_property, attribute from tango import AttrWriteType # Additional import -from clients.sst_client import sst_client, SST_collector -from clients.opcua_client import OPCUAConnection from clients.attribute_wrapper import attribute_wrapper +from clients.opcua_client import OPCUAConnection +from clients.StatisticsClient import StatisticsClient from devices.hardware_device import hardware_device from common.lofar_git import get_version from common.lofar_logging import device_logging_to_python, log_exceptions +from devices.sdp.statistics import Statistics +from devices.sdp.statistics_collector import SSTCollector + import numpy __all__ = ["SST", "main"] -@device_logging_to_python() -class SST(hardware_device): +class SST(Statistics): + + STATISTICS_COLLECTOR_CLASS = SSTCollector # ----------------- # Device Properties # ----------------- - OPC_Server_Name = device_property( - dtype='DevString', - mandatory=True - ) - - OPC_Server_Port = device_property( - dtype='DevULong', - mandatory=True - ) - - OPC_Time_Out = device_property( - dtype='DevDouble', - mandatory=True - ) - - SST_Client_Port = device_property( - dtype='DevUShort', - mandatory=True - ) - # ---------- # Attributes # ---------- - version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version()) - # FPGA control points for SSTs FPGA_sst_offload_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_sst_offload_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_enable_R"], datatype=numpy.bool_, dims=(16,)) @@ -82,76 +64,20 @@ class SST(hardware_device): FPGA_sst_offload_selector_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_sst_offload_selector_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,)) - # number of UDP packets that were received - nof_packets_received_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64) - # number of UDP packets that were dropped because we couldn't keep up with processing - nof_packets_dropped_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64) - # last packet we processed - last_packet_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8) - # when last packet was received - last_packet_timestamp_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64) - - # number of UDP packets that were processed - nof_packets_processed_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64) - # queue fill percentage, as reported by the consumer - queue_fill_percentage_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64) - - # number of invalid (non-SST) packets received - nof_invalid_packets_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64) - # last packet that could not be parsed - last_invalid_packet_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8) # number of packets with valid payloads - nof_valid_payloads_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) # number of packets with invalid payloads - nof_payload_errors_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) # latest SSTs - sst_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) + sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(SSTCollector.MAX_SUBBANDS, SSTCollector.MAX_INPUTS), datatype=numpy.uint64) # reported timestamp for each row in the latest SSTs - sst_timestamp_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) # integration interval for each row in the latest SSTs - integration_interval_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32) + integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.float32) # -------- - # overloaded functions - def configure_for_off(self): - """ user code here. is called when the state is set to OFF """ - - # Stop keep-alive - try: - self.sst_client.stop() - except Exception as e: - self.warn_stream("Exception while stopping sst_client in configure_for_off function: {}. Exception ignored".format(e)) - - try: - self.opcua_connection.stop() - except Exception as e: - self.warn_stream("Exception while stopping OPC UA connection in configure_for_off function: {}. Exception ignored".format(e)) - - @log_exceptions() - def configure_for_initialise(self): - """ user code here. is called when the sate is set to INIT """ - """Initialises the attributes and properties of the statistics device.""" - - self.sst_client = sst_client("0.0.0.0", self.SST_Client_Port, self.Fault, self) - - self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) - - # map an access helper class - for i in self.attr_list(): - try: - if i.comms_id == sst_client: - i.set_comm_client(self.sst_client) - if i.comms_id == OPCUAConnection: - i.set_comm_client(self.OPCUA_client) - except Exception as e: - # use the pass function instead of setting read/write fails - i.set_pass_func() - self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e)) - pass - - self.sst_client.start() - - self.OPCUA_client.start() + # Overloaded functions + # -------- # -------- # Commands diff --git a/devices/devices/sdp/statistics.py b/devices/devices/sdp/statistics.py new file mode 100644 index 0000000000000000000000000000000000000000..2a9ddb9ec91e8f3bd55ef2d2d2fed50e1b637c9f --- /dev/null +++ b/devices/devices/sdp/statistics.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the SST project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" Base device for Statistics (SST/BST/XST) + +""" + +# TODO(Corne): Remove sys.path.append hack once packaging is in place! +import os, sys +currentdir = os.path.dirname(os.path.realpath(__file__)) +parentdir = os.path.dirname(currentdir) +parentdir = os.path.dirname(parentdir) +sys.path.append(parentdir) + +from abc import ABCMeta, abstractmethod + +# PyTango imports +from tango.server import run +from tango.server import device_property, attribute +from tango import AttrWriteType +# Additional import + +from clients.statistics_client import StatisticsClient +from clients.opcua_client import OPCUAConnection +from clients.attribute_wrapper import attribute_wrapper + +from devices.hardware_device import hardware_device + +from common.lofar_git import get_version +from common.lofar_logging import device_logging_to_python, log_exceptions + +import numpy + +__all__ = ["Statistics"] + +class Statistics(hardware_device, metaclass=ABCMeta): + + # In derived classes, set this to a subclass of StatisticsCollector + @property + @abstractmethod + def STATISTICS_COLLECTOR_CLASS(self): + pass + + # ----------------- + # Device Properties + # ----------------- + + OPC_Server_Name = device_property( + dtype='DevString', + mandatory=True + ) + + OPC_Server_Port = device_property( + dtype='DevULong', + mandatory=True + ) + + OPC_Time_Out = device_property( + dtype='DevDouble', + mandatory=True + ) + + Statistics_Client_Port = device_property( + dtype='DevUShort', + mandatory=True + ) + + # ---------- + # Attributes + # ---------- + + version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version()) + + # number of UDP packets that were received + nof_packets_received_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64) + # number of UDP packets that were dropped because we couldn't keep up with processing + nof_packets_dropped_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64) + # last packet we processed + last_packet_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8) + # when last packet was received + last_packet_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64) + + # queue fill percentage, as reported by the consumer + queue_fill_percentage_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64) + + # number of UDP packets that were processed + nof_packets_processed_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_packets"}, datatype=numpy.uint64) + + # number of invalid (non-SST) packets received + nof_invalid_packets_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64) + # last packet that could not be parsed + last_invalid_packet_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8) + + # -------- + # Overloaded functions + # -------- + + def configure_for_off(self): + """ user code here. is called when the state is set to OFF """ + + # Stop keep-alive + try: + self.statistics_client.stop() + except Exception as e: + self.warn_stream("Exception while stopping statistics_client in configure_for_off function: {}. Exception ignored".format(e)) + + try: + self.OPCUA_client.stop() + except Exception as e: + self.warn_stream("Exception while stopping OPC UA connection in configure_for_off function: {}. Exception ignored".format(e)) + + @log_exceptions() + def configure_for_initialise(self): + """ user code here. is called when the sate is set to INIT """ + """Initialises the attributes and properties of the statistics device.""" + + self.statistics_client = StasticsClient(self.STATISTICS_COLLECTOR_CLASS, "0.0.0.0", self.Statistics_Client_Port, self.Fault, self) + + self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) + + # map an access helper class + for i in self.attr_list(): + try: + if i.comms_id == StatisticsClient: + i.set_comm_client(self.statistics_client) + elif i.comms_id == OPCUAConnection: + i.set_comm_client(self.OPCUA_client) + else: + raise ValueError("Cannot set comm client for attribute {}: Unknown comms_id {}".format(i, i.comms_id)) + except Exception as e: + # use the pass function instead of setting read/write fails + i.set_pass_func() + self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e)) + pass + + self.statistics_client.start() + + self.OPCUA_client.start() + + # -------- + # Commands + # -------- diff --git a/devices/devices/sdp/statistics_collector.py b/devices/devices/sdp/statistics_collector.py new file mode 100644 index 0000000000000000000000000000000000000000..f7d01d2cb0615f157b2b6cc161cb199495d94a45 --- /dev/null +++ b/devices/devices/sdp/statistics_collector.py @@ -0,0 +1,132 @@ +from queue import Queue +from threading import Thread +import logging +import numpy + +from .statistics_packet import SSTPacket + +logger = logging.getLogger() + +class StatisticsCollector(Thread): + """ Base class to process statistics packets from a queue, asynchronously. """ + + # Maximum number of antenna inputs we support (used to determine array sizes) + MAX_INPUTS = 192 + + # Maximum number of subbands we support (used to determine array sizes) + MAX_SUBBANDS = 512 + + # Maximum time to wait for the Thread to get unstuck, if we want to stop + DISCONNECT_TIMEOUT = 10.0 + + def __init__(self, queue: Queue): + self.queue = queue + self.last_packet = None + + self.parameters = { + "nof_packets": numpy.uint64(0), + + # Packet count for packets that could not be parsed + "nof_invalid_packets": numpy.uint64(0), + + # Full contents of the latest packet we deemed invalid. + "last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8), + } + + super().__init__() + self.start() + + def run(self): + logger.info("Starting statistics thread") + + while True: + self.last_packet = self.queue.get() + + # This is the exception/slow path, but python doesn't allow us to optimise that + if self.last_packet is None: + # None is the magic marker to stop processing + break + + self.parameters["nof_packets"] += numpy.uint64(1) + + try: + self.process_packet(self.last_packet) + except Exception as e: + logger.exception("Could not parse statistics UDP packet") + + self.parameters["last_invalid_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) + self.parameters["nof_invalid_packets"] += numpy.uint64(1) + + logger.info("Stopped statistics thread") + + def join(self, timeout=0): + # insert magic marker + self.queue.put(None) + logger.info("Sent shutdown to statistics thread") + + super().join(timeout) + + def disconnect(self): + if not self.is_alive(): + return + + # try to get the thread shutdown, but don't stall forever + self.join(self.DISCONNECT_TIMEOUT) + + if self.is_alive(): + # there is nothing we can do except wait (stall) longer, which could be indefinitely. + logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.") + + def process_packet(self, packet): + """ Update any information based on this packet. """ + + raise NotImplementedError + + +class SSTCollector(StatisticsCollector): + """ Class to process SST statistics packets. """ + + # Maximum number of antenna inputs we support (used to determine array sizes) + MAX_INPUTS = 192 + + # Maximum number of subbands we support (used to determine array sizes) + MAX_SUBBANDS = 512 + + def __init__(self, queue): + super().__init__(queue) + + self.parameters.extend({ + # Number of packets received so far that we could parse correctly and do not have a payload error + "nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), + + # Packets that reported a payload error + "nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), + + # Last value array we've constructed out of the packets + "sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64), + "sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), + "integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32), + }) + + def process_packet(self, packet): + fields = SSTPacket(packet) + + # determine which input this packet contains data for + if fields.signal_input_index >= self.MAX_INPUTS: + # packet describes an input that is out of bounds for us + raise ValueError("Packet describes input %d, but we are limited to describing MAX_INPUTS=%d" % (fields.signal_input_index, self.MAX_INPUTS)) + + input_index = fields.signal_input_index + + if fields.payload_error: + # cannot trust the data if a payload error is reported + self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1) + + # don't raise, as packet is valid + return + + # process the packet + self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1) + self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload + self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) + self.parameters["integration_intervals"][input_index] = fields.integration_interval()