From ae351682272ba3aeee8d51ce8f577a731b8fe7c5 Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Wed, 16 Jun 2021 19:29:50 +0000 Subject: [PATCH] Added comments and improved naming. --- devices/Statistics.py | 6 ++--- devices/clients/sst_client.py | 48 ++++++++++++++++++++++++----------- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/devices/Statistics.py b/devices/Statistics.py index 539b87d85..d599a9dbe 100644 --- a/devices/Statistics.py +++ b/devices/Statistics.py @@ -55,10 +55,10 @@ class Statistics(hardware_device): queue_fill_percentage_R = attribute_wrapper(comms_annotation={"parameter": "queue_fill_percentage"}, datatype=numpy.float32) nof_invalid_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_invalid_packets"}, datatype=numpy.uint64) - nof_valid_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_valid_packets"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + nof_valid_payloads_R = attribute_wrapper(comms_annotation={"parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) nof_payload_errors_R = attribute_wrapper(comms_annotation={"parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) - last_value_R = attribute_wrapper(comms_annotation={"parameter": "last_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) - last_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "last_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + sst_R = attribute_wrapper(comms_annotation={"parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) + sst_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) integration_interval_R = attribute_wrapper(comms_annotation={"parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32) # -------- diff --git a/devices/clients/sst_client.py b/devices/clients/sst_client.py index 0cb43c39f..d4823c165 100644 --- a/devices/clients/sst_client.py +++ b/devices/clients/sst_client.py @@ -56,6 +56,10 @@ class sst_client(CommClient): return super().connect() def disconnect(self): + # explicit disconnect, instead of waiting for the GC to kick in after "del" below + self.sst.join() + self.udp.join() + del self.udp del self.sst del self.queue @@ -113,10 +117,16 @@ class UDP_Receiver(Thread): self.host = host self.port = port - logger.debug("binding a socket on UDP port {} and host {}".format(self.port, self.host)) + logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port)) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + # allow binding even if there are still lingering packets for a previous listener self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # specify what host and port to listen on self.sock.bind((self.host, self.port)) + + # make sure we can stop receiving packets even if none arrive self.sock.settimeout(timeout) self.stream_on = True @@ -126,7 +136,7 @@ class UDP_Receiver(Thread): def run(self): # all variables are manually defined and are updated each time - logger.debug("starting UDP thread with port {} and host {}".format(self.port, self.host)) + logger.info("Starting UDP thread for {}:{}".format(self.host, self.port)) while self.stream_on: try: @@ -135,12 +145,18 @@ class UDP_Receiver(Thread): except socket.timeout: pass except queue.Full: + # overflow -- just discard pass - def disconnect(self): + logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port)) + + def join(self, timeout=0): + logging.info("Sending shutdown to UDP thread for {}:{}".format(self.host, self.port)) self.stream_on = False - self.join() + self.join(timeout) + + # shutdown the socket so that others can listen on this port self.sock.shutdown(socket.SHUT_RDWR) def __del__(self): @@ -166,15 +182,15 @@ class SST_collector(Thread): # Packet count for packets that could not be parsed as SSTs "nof_invalid_packets": numpy.uint64(0), - # Number of packets received so far that we could parse correctly - "nof_valid_packets": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), + # Number of packets received so far that we could parse correctly and do not have a payload error + "nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), # Packets that reported a payload error "nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), # Last value array we've constructed out of the packets - "last_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64), - "last_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), + "sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64), + "sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), "integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32), } @@ -182,7 +198,7 @@ class SST_collector(Thread): self.start() def run(self): - logging.info("starting SST thread") + logging.info("Starting SST thread") while True: self.last_packet = self.queue.get() @@ -192,11 +208,12 @@ class SST_collector(Thread): self.process_packet(self.last_packet) - logging.info("shutting down SST thread") + logging.info("Stopping SST thread") def join(self, timeout=0): - logging.info("sending shutdown to SST thread") + logging.info("Sending shutdown to SST thread") self.queue.put(None) + super().join(timeout) def process_packet(self, packet): @@ -218,6 +235,7 @@ class SST_collector(Thread): self.parameters["nof_invalid_packets"] += numpy.uint64(1) return + # which input this packet contains data for input_index = fields.sst_signal_input_index if input_index >= self.MAX_INPUTS: @@ -231,12 +249,12 @@ class SST_collector(Thread): return # process the packet - self.parameters["nof_valid_packets"][input_index] += numpy.uint64(1) - self.parameters["last_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload_sst - self.parameters["last_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) + self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1) + self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload_sst + self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["integration_intervals"][input_index] = fields.integration_interval() except Exception as e: # This is unexpected, so print a stack trace - logging.exception("Could not parse UDP packet") + logging.exception("Could not parse SST UDP packet") self.parameters["nof_invalid_packets"] += numpy.uint64(1) -- GitLab