Skip to content
Snippets Groups Projects
Commit ebe91e63 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-244: Catch exception for qsize() on some platforms, don't assign values that might be illegal

parent 3974a696
No related branches found
No related tags found
1 merge request!56L2SS-244: Expose the SSTs in MPs
......@@ -173,11 +173,15 @@ class UDP_Receiver(Thread):
logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port))
def join(self, timeout=0):
logging.info("Sending shutdown to UDP thread for {}:{}".format(self.host, self.port))
self.stream_on = False
logging.info("Sent shutdown to UDP thread for {}:{}".format(self.host, self.port))
self.join(timeout)
if self.isAlive():
# happens if timeout is hit
return
# shutdown the socket so that others can listen on this port
self.sock.shutdown(socket.SHUT_RDWR)
......@@ -210,7 +214,7 @@ class SST_collector(Thread):
"nof_packets": numpy.uint64(0),
"last_packet": numpy.zeros((9000,), dtype=numpy.uint8),
"last_packet_timestamp": numpy.uint64(0),
"queue_fill_percentage": numpy.float32(100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0),
"queue_fill_percentage": numpy.float32(self.queue_fill_percentage()),
# Packet count for packets that could not be parsed as SSTs
"nof_invalid_packets": numpy.uint64(0),
......@@ -230,6 +234,13 @@ class SST_collector(Thread):
super().__init__()
self.start()
def queue_fill_percentage(self):
try:
return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0
except NotImplementedError:
# some platforms don't have qsize(), nothing we can do here
return 0
def run(self):
logging.info("Starting SST thread")
......@@ -237,6 +248,7 @@ class SST_collector(Thread):
self.last_packet = self.queue.get()
if self.last_packet is None:
# None is the magic marker to stop processing
break
self.process_packet(self.last_packet)
......@@ -244,8 +256,9 @@ class SST_collector(Thread):
logging.info("Stopping SST thread")
def join(self, timeout=0):
logging.info("Sending shutdown to SST thread")
# insert magic marker
self.queue.put(None)
logging.info("Sent shutdown to SST thread")
super().join(timeout)
......@@ -261,7 +274,7 @@ class SST_collector(Thread):
def process_packet(self, packet):
self.parameters["nof_packets"] += numpy.uint64(1)
self.parameters["queue_fill_percentage"] = numpy.double(100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0)
self.parameters["queue_fill_percentage"] = queue_fill_percentage()
try:
try:
......@@ -271,14 +284,14 @@ class SST_collector(Thread):
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
return
# which input this packet contains data for
input_index = fields.signal_input_index
if input_index >= self.MAX_INPUTS:
# determine which input this packet contains data for
if fields.signal_input_index >= self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
return
input_index = fields.signal_input_index
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment