diff --git a/devices/SST.py b/devices/SST.py index f7387b5ad4a5b243fea1e6fa0eddd470c4c32dcf..2d809321b195783b0671cd4eb60fb495ba412f4e 100644 --- a/devices/SST.py +++ b/devices/SST.py @@ -47,19 +47,32 @@ class SST(hardware_device): version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version()) - # SST client annotation consists of a dict that contains the parameter name that needs to be read. - # Example: comms_annotation={"parameter": "this_value_R"} - nof_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_packets"}, datatype=numpy.uint64) - last_packet_R = attribute_wrapper(comms_annotation={"parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8) - last_packet_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "last_packet_timestamp"}, datatype=numpy.uint64) - queue_fill_percentage_R = attribute_wrapper(comms_annotation={"parameter": "queue_fill_percentage"}, datatype=numpy.float32) - - nof_invalid_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_invalid_packets"}, datatype=numpy.uint64) - nof_valid_payloads_R = attribute_wrapper(comms_annotation={"parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) - nof_payload_errors_R = attribute_wrapper(comms_annotation={"parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) - sst_R = attribute_wrapper(comms_annotation={"parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) - sst_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) - integration_interval_R = attribute_wrapper(comms_annotation={"parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32) + # number of UDP packets that were received + nof_packets_received_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64) + # number of UDP packets that were dropped because we couldn't keep up with processing + nof_packets_dropped_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64) + # last packet we processed + last_packet_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8) + # when last packet was received + last_packet_timestamp_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64) + + # number of UDP packets that were processed + nof_packets_processed_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64) + # queue fill percentage, as reported by the consumer + queue_fill_percentage_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "queue_fill_percentage"}, datatype=numpy.float32) + + # number of invalid (non-SST) packets received + nof_invalid_packets_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64) + # number of packets with valid payloads + nof_valid_payloads_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + # number of packets with invalid payloads + nof_payload_errors_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + # latest SSTs + sst_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) + # reported timestamp for each row in the latest SSTs + sst_timestamp_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + # integration interval for each row in the latest SSTs + integration_interval_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32) # -------- # overloaded functions diff --git a/devices/clients/sst_client.py b/devices/clients/sst_client.py index 5a6153fc0f38df3ab837e2881d588c65382462b1..11749ec18187af8854feab1d6425d462e22f745c 100644 --- a/devices/clients/sst_client.py +++ b/devices/clients/sst_client.py @@ -33,7 +33,8 @@ class sst_client(CommClient): """ self.host = host self.port = port - self.timeout = 0.1 + self.poll_timeout = 0.1 + self.disconnect_timeout = 10.0 self.queuesize = queuesize super().__init__(fault_func, streams, try_interval) @@ -50,15 +51,15 @@ class sst_client(CommClient): """ if not self.connected: self.queue = Queue(maxsize=self.queuesize) - self.udp = UDP_Receiver(self.host, self.port, self.queue, self.timeout) - self.sst = SST_collector(self.queue) + self.udp = UDP_Receiver(self.host, self.port, self.queue, self.poll_timeout, self.disconnect_timeout) + self.sst = SST_collector(self.queue, self.disconnect_timeout) return super().connect() def disconnect(self): # explicit disconnect, instead of waiting for the GC to kick in after "del" below - self.sst.join() - self.udp.join() + self.sst.disconnect() + self.udp.disconnect() del self.udp del self.sst @@ -95,9 +96,13 @@ class sst_client(CommClient): # get all the necessary data to set up the read/write functions from the attribute_wrapper self.setup_value_conversion(attribute) - - def read_function(): - return self.sst.parameters[SST_param] + # redirect to right object. this works as long as the parameter names are unique among them. + if annotation["type"] == "sst": + def read_function(): + return self.sst.parameters[SST_param] + elif annotation["type"] == "udp": + def read_function(): + return self.udp.parameters[SST_param] def write_function(value): """ @@ -112,22 +117,31 @@ class UDP_Receiver(Thread): This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code """ - def __init__(self, host, port, queue, timeout=0.1): + def __init__(self, host, port, queue, poll_timeout=0.1, disconnect_timeout=10.0): self.queue = queue self.host = host self.port = port + self.disconnect_timeout = disconnect_timeout + + self.parameters = { + "nof_packets_received": numpy.uint64(0), + "nof_packets_dropped": numpy.uint64(0), + } logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port)) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # allow binding even if there are still lingering packets for a previous listener + # Allow binding even if there are still lingering packets in the kernel for a + # previous listener that already died. If not, we get an "Address already in use". + # This is stock socket usage. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # specify what host and port to listen on self.sock.bind((self.host, self.port)) - # make sure we can stop receiving packets even if none arrive - self.sock.settimeout(timeout) + # Make sure we can stop receiving packets even if none arrive. + # Without this, the recvmsg() call blocks indefinitely if no packet arrives. + self.sock.settimeout(poll_timeout) self.stream_on = True super().__init__() @@ -140,13 +154,21 @@ class UDP_Receiver(Thread): while self.stream_on: try: + # Packets are at most 9000 bytes, the largest payload (well, MTU) of an Ethernet Jumbo frame packet, _, _, _ = self.sock.recvmsg(9000) + + self.parameters["nof_packets_received"] += numpy.uint64(1) + self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) + self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time())) + + # Forward packet to processing thread self.queue.put(packet) except socket.timeout: + # timeout -- expected, allows us to check whether to stop pass except queue.Full: # overflow -- just discard - pass + self.parameters["nof_packets_dropped"] += numpy.uint64(1) logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port)) @@ -159,6 +181,16 @@ class UDP_Receiver(Thread): # shutdown the socket so that others can listen on this port self.sock.shutdown(socket.SHUT_RDWR) + def disconnect(self): + if not self.isAlive(): + return + + # try to get the thread shutdown, but don't stall forever + self.join(self.disconnect_timeout) + + if self.isAlive: + logger.error("UDP thread not shutting down for {}:{}".format(self.host, self.port)) + def __del__(self): self.disconnect() @@ -169,9 +201,10 @@ class SST_collector(Thread): # Maximum number of subbands we support (used to determine array sizes) MAX_SUBBANDS = 512 - def __init__(self, queue): + def __init__(self, queue, disconnect_timeout=10.0): self.queue = queue self.last_packet = None + self.disconnect_timeout = disconnect_timeout self.parameters = { "nof_packets": numpy.uint64(0), @@ -216,10 +249,18 @@ class SST_collector(Thread): super().join(timeout) + def disconnect(self): + if not self.isAlive(): + return + + # try to get the thread shutdown, but don't stall forever + self.join(self.disconnect_timeout) + + if self.isAlive: + logger.error("SST thread not shutting down") + def process_packet(self, packet): self.parameters["nof_packets"] += numpy.uint64(1) - self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) - self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time())) self.parameters["queue_fill_percentage"] = numpy.double(100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0) try: