diff --git a/devices/clients/sst_client.py b/devices/clients/sst_client.py index 11749ec18187af8854feab1d6425d462e22f745c..66835a69c755d202235dabcfa19a5cf2e2f7ade0 100644 --- a/devices/clients/sst_client.py +++ b/devices/clients/sst_client.py @@ -173,11 +173,15 @@ class UDP_Receiver(Thread): logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port)) def join(self, timeout=0): - logging.info("Sending shutdown to UDP thread for {}:{}".format(self.host, self.port)) self.stream_on = False + logging.info("Sent shutdown to UDP thread for {}:{}".format(self.host, self.port)) self.join(timeout) + if self.isAlive(): + # happens if timeout is hit + return + # shutdown the socket so that others can listen on this port self.sock.shutdown(socket.SHUT_RDWR) @@ -210,7 +214,7 @@ class SST_collector(Thread): "nof_packets": numpy.uint64(0), "last_packet": numpy.zeros((9000,), dtype=numpy.uint8), "last_packet_timestamp": numpy.uint64(0), - "queue_fill_percentage": numpy.float32(100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0), + "queue_fill_percentage": numpy.float32(self.queue_fill_percentage()), # Packet count for packets that could not be parsed as SSTs "nof_invalid_packets": numpy.uint64(0), @@ -230,6 +234,13 @@ class SST_collector(Thread): super().__init__() self.start() + def queue_fill_percentage(self): + try: + return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0 + except NotImplementedError: + # some platforms don't have qsize(), nothing we can do here + return 0 + def run(self): logging.info("Starting SST thread") @@ -237,6 +248,7 @@ class SST_collector(Thread): self.last_packet = self.queue.get() if self.last_packet is None: + # None is the magic marker to stop processing break self.process_packet(self.last_packet) @@ -244,8 +256,9 @@ class SST_collector(Thread): logging.info("Stopping SST thread") def join(self, timeout=0): - logging.info("Sending shutdown to SST thread") + # insert magic marker self.queue.put(None) + logging.info("Sent shutdown to SST thread") super().join(timeout) @@ -261,7 +274,7 @@ class SST_collector(Thread): def process_packet(self, packet): self.parameters["nof_packets"] += numpy.uint64(1) - self.parameters["queue_fill_percentage"] = numpy.double(100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0) + self.parameters["queue_fill_percentage"] = queue_fill_percentage() try: try: @@ -271,14 +284,14 @@ class SST_collector(Thread): self.parameters["nof_invalid_packets"] += numpy.uint64(1) return - # which input this packet contains data for - input_index = fields.signal_input_index - - if input_index >= self.MAX_INPUTS: + # determine which input this packet contains data for + if fields.signal_input_index >= self.MAX_INPUTS: # packet describes an input that is out of bounds for us self.parameters["nof_invalid_packets"] += numpy.uint64(1) return + input_index = fields.signal_input_index + if fields.payload_error: # cannot trust the data if a payload error is reported self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1)