Skip to content
Snippets Groups Projects
Commit ae351682 authored by Jan David Mol's avatar Jan David Mol
Browse files

Added comments and improved naming.

parent a8970e5e
No related branches found
No related tags found
1 merge request!56L2SS-244: Expose the SSTs in MPs
...@@ -55,10 +55,10 @@ class Statistics(hardware_device): ...@@ -55,10 +55,10 @@ class Statistics(hardware_device):
queue_fill_percentage_R = attribute_wrapper(comms_annotation={"parameter": "queue_fill_percentage"}, datatype=numpy.float32) queue_fill_percentage_R = attribute_wrapper(comms_annotation={"parameter": "queue_fill_percentage"}, datatype=numpy.float32)
nof_invalid_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_invalid_packets"}, datatype=numpy.uint64) nof_invalid_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_invalid_packets"}, datatype=numpy.uint64)
nof_valid_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_valid_packets"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) nof_valid_payloads_R = attribute_wrapper(comms_annotation={"parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
nof_payload_errors_R = attribute_wrapper(comms_annotation={"parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) nof_payload_errors_R = attribute_wrapper(comms_annotation={"parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
last_value_R = attribute_wrapper(comms_annotation={"parameter": "last_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) sst_R = attribute_wrapper(comms_annotation={"parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64)
last_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "last_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) sst_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
integration_interval_R = attribute_wrapper(comms_annotation={"parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32) integration_interval_R = attribute_wrapper(comms_annotation={"parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32)
# -------- # --------
......
...@@ -56,6 +56,10 @@ class sst_client(CommClient): ...@@ -56,6 +56,10 @@ class sst_client(CommClient):
return super().connect() return super().connect()
def disconnect(self): def disconnect(self):
# explicit disconnect, instead of waiting for the GC to kick in after "del" below
self.sst.join()
self.udp.join()
del self.udp del self.udp
del self.sst del self.sst
del self.queue del self.queue
...@@ -113,10 +117,16 @@ class UDP_Receiver(Thread): ...@@ -113,10 +117,16 @@ class UDP_Receiver(Thread):
self.host = host self.host = host
self.port = port self.port = port
logger.debug("binding a socket on UDP port {} and host {}".format(self.port, self.host)) logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# allow binding even if there are still lingering packets for a previous listener
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# specify what host and port to listen on
self.sock.bind((self.host, self.port)) self.sock.bind((self.host, self.port))
# make sure we can stop receiving packets even if none arrive
self.sock.settimeout(timeout) self.sock.settimeout(timeout)
self.stream_on = True self.stream_on = True
...@@ -126,7 +136,7 @@ class UDP_Receiver(Thread): ...@@ -126,7 +136,7 @@ class UDP_Receiver(Thread):
def run(self): def run(self):
# all variables are manually defined and are updated each time # all variables are manually defined and are updated each time
logger.debug("starting UDP thread with port {} and host {}".format(self.port, self.host)) logger.info("Starting UDP thread for {}:{}".format(self.host, self.port))
while self.stream_on: while self.stream_on:
try: try:
...@@ -135,12 +145,18 @@ class UDP_Receiver(Thread): ...@@ -135,12 +145,18 @@ class UDP_Receiver(Thread):
except socket.timeout: except socket.timeout:
pass pass
except queue.Full: except queue.Full:
# overflow -- just discard
pass pass
def disconnect(self): logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port))
def join(self, timeout=0):
logging.info("Sending shutdown to UDP thread for {}:{}".format(self.host, self.port))
self.stream_on = False self.stream_on = False
self.join()
self.join(timeout)
# shutdown the socket so that others can listen on this port
self.sock.shutdown(socket.SHUT_RDWR) self.sock.shutdown(socket.SHUT_RDWR)
def __del__(self): def __del__(self):
...@@ -166,15 +182,15 @@ class SST_collector(Thread): ...@@ -166,15 +182,15 @@ class SST_collector(Thread):
# Packet count for packets that could not be parsed as SSTs # Packet count for packets that could not be parsed as SSTs
"nof_invalid_packets": numpy.uint64(0), "nof_invalid_packets": numpy.uint64(0),
# Number of packets received so far that we could parse correctly # Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_packets": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), "nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
# Packets that reported a payload error # Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), "nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets # Last value array we've constructed out of the packets
"last_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64), "sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64),
"last_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), "sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32), "integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32),
} }
...@@ -182,7 +198,7 @@ class SST_collector(Thread): ...@@ -182,7 +198,7 @@ class SST_collector(Thread):
self.start() self.start()
def run(self): def run(self):
logging.info("starting SST thread") logging.info("Starting SST thread")
while True: while True:
self.last_packet = self.queue.get() self.last_packet = self.queue.get()
...@@ -192,11 +208,12 @@ class SST_collector(Thread): ...@@ -192,11 +208,12 @@ class SST_collector(Thread):
self.process_packet(self.last_packet) self.process_packet(self.last_packet)
logging.info("shutting down SST thread") logging.info("Stopping SST thread")
def join(self, timeout=0): def join(self, timeout=0):
logging.info("sending shutdown to SST thread") logging.info("Sending shutdown to SST thread")
self.queue.put(None) self.queue.put(None)
super().join(timeout) super().join(timeout)
def process_packet(self, packet): def process_packet(self, packet):
...@@ -218,6 +235,7 @@ class SST_collector(Thread): ...@@ -218,6 +235,7 @@ class SST_collector(Thread):
self.parameters["nof_invalid_packets"] += numpy.uint64(1) self.parameters["nof_invalid_packets"] += numpy.uint64(1)
return return
# which input this packet contains data for
input_index = fields.sst_signal_input_index input_index = fields.sst_signal_input_index
if input_index >= self.MAX_INPUTS: if input_index >= self.MAX_INPUTS:
...@@ -231,12 +249,12 @@ class SST_collector(Thread): ...@@ -231,12 +249,12 @@ class SST_collector(Thread):
return return
# process the packet # process the packet
self.parameters["nof_valid_packets"][input_index] += numpy.uint64(1) self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1)
self.parameters["last_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload_sst self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload_sst
self.parameters["last_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval() self.parameters["integration_intervals"][input_index] = fields.integration_interval()
except Exception as e: except Exception as e:
# This is unexpected, so print a stack trace # This is unexpected, so print a stack trace
logging.exception("Could not parse UDP packet") logging.exception("Could not parse SST UDP packet")
self.parameters["nof_invalid_packets"] += numpy.uint64(1) self.parameters["nof_invalid_packets"] += numpy.uint64(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment