Skip to content
Snippets Groups Projects
Commit 3974a696 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-244: Fetch last UDP packet from UDP receiver, not client. Updated comments.

parent 44c819b7
No related branches found
No related tags found
1 merge request!56L2SS-244: Expose the SSTs in MPs
...@@ -47,19 +47,32 @@ class SST(hardware_device): ...@@ -47,19 +47,32 @@ class SST(hardware_device):
version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version()) version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version())
# SST client annotation consists of a dict that contains the parameter name that needs to be read. # number of UDP packets that were received
# Example: comms_annotation={"parameter": "this_value_R"} nof_packets_received_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64)
nof_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_packets"}, datatype=numpy.uint64) # number of UDP packets that were dropped because we couldn't keep up with processing
last_packet_R = attribute_wrapper(comms_annotation={"parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8) nof_packets_dropped_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64)
last_packet_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "last_packet_timestamp"}, datatype=numpy.uint64) # last packet we processed
queue_fill_percentage_R = attribute_wrapper(comms_annotation={"parameter": "queue_fill_percentage"}, datatype=numpy.float32) last_packet_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8)
# when last packet was received
nof_invalid_packets_R = attribute_wrapper(comms_annotation={"parameter": "nof_invalid_packets"}, datatype=numpy.uint64) last_packet_timestamp_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64)
nof_valid_payloads_R = attribute_wrapper(comms_annotation={"parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
nof_payload_errors_R = attribute_wrapper(comms_annotation={"parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) # number of UDP packets that were processed
sst_R = attribute_wrapper(comms_annotation={"parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) nof_packets_processed_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64)
sst_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) # queue fill percentage, as reported by the consumer
integration_interval_R = attribute_wrapper(comms_annotation={"parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32) queue_fill_percentage_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "queue_fill_percentage"}, datatype=numpy.float32)
# number of invalid (non-SST) packets received
nof_invalid_packets_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64)
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# latest SSTs
sst_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64)
# reported timestamp for each row in the latest SSTs
sst_timestamp_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# integration interval for each row in the latest SSTs
integration_interval_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32)
# -------- # --------
# overloaded functions # overloaded functions
......
...@@ -33,7 +33,8 @@ class sst_client(CommClient): ...@@ -33,7 +33,8 @@ class sst_client(CommClient):
""" """
self.host = host self.host = host
self.port = port self.port = port
self.timeout = 0.1 self.poll_timeout = 0.1
self.disconnect_timeout = 10.0
self.queuesize = queuesize self.queuesize = queuesize
super().__init__(fault_func, streams, try_interval) super().__init__(fault_func, streams, try_interval)
...@@ -50,15 +51,15 @@ class sst_client(CommClient): ...@@ -50,15 +51,15 @@ class sst_client(CommClient):
""" """
if not self.connected: if not self.connected:
self.queue = Queue(maxsize=self.queuesize) self.queue = Queue(maxsize=self.queuesize)
self.udp = UDP_Receiver(self.host, self.port, self.queue, self.timeout) self.udp = UDP_Receiver(self.host, self.port, self.queue, self.poll_timeout, self.disconnect_timeout)
self.sst = SST_collector(self.queue) self.sst = SST_collector(self.queue, self.disconnect_timeout)
return super().connect() return super().connect()
def disconnect(self): def disconnect(self):
# explicit disconnect, instead of waiting for the GC to kick in after "del" below # explicit disconnect, instead of waiting for the GC to kick in after "del" below
self.sst.join() self.sst.disconnect()
self.udp.join() self.udp.disconnect()
del self.udp del self.udp
del self.sst del self.sst
...@@ -95,9 +96,13 @@ class sst_client(CommClient): ...@@ -95,9 +96,13 @@ class sst_client(CommClient):
# get all the necessary data to set up the read/write functions from the attribute_wrapper # get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute) self.setup_value_conversion(attribute)
# redirect to right object. this works as long as the parameter names are unique among them.
if annotation["type"] == "sst":
def read_function(): def read_function():
return self.sst.parameters[SST_param] return self.sst.parameters[SST_param]
elif annotation["type"] == "udp":
def read_function():
return self.udp.parameters[SST_param]
def write_function(value): def write_function(value):
""" """
...@@ -112,22 +117,31 @@ class UDP_Receiver(Thread): ...@@ -112,22 +117,31 @@ class UDP_Receiver(Thread):
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
""" """
def __init__(self, host, port, queue, timeout=0.1): def __init__(self, host, port, queue, poll_timeout=0.1, disconnect_timeout=10.0):
self.queue = queue self.queue = queue
self.host = host self.host = host
self.port = port self.port = port
self.disconnect_timeout = disconnect_timeout
self.parameters = {
"nof_packets_received": numpy.uint64(0),
"nof_packets_dropped": numpy.uint64(0),
}
logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port)) logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# allow binding even if there are still lingering packets for a previous listener # Allow binding even if there are still lingering packets in the kernel for a
# previous listener that already died. If not, we get an "Address already in use".
# This is stock socket usage.
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# specify what host and port to listen on # specify what host and port to listen on
self.sock.bind((self.host, self.port)) self.sock.bind((self.host, self.port))
# make sure we can stop receiving packets even if none arrive # Make sure we can stop receiving packets even if none arrive.
self.sock.settimeout(timeout) # Without this, the recvmsg() call blocks indefinitely if no packet arrives.
self.sock.settimeout(poll_timeout)
self.stream_on = True self.stream_on = True
super().__init__() super().__init__()
...@@ -140,13 +154,21 @@ class UDP_Receiver(Thread): ...@@ -140,13 +154,21 @@ class UDP_Receiver(Thread):
while self.stream_on: while self.stream_on:
try: try:
# Packets are at most 9000 bytes, the largest payload (well, MTU) of an Ethernet Jumbo frame
packet, _, _, _ = self.sock.recvmsg(9000) packet, _, _, _ = self.sock.recvmsg(9000)
self.parameters["nof_packets_received"] += numpy.uint64(1)
self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time()))
# Forward packet to processing thread
self.queue.put(packet) self.queue.put(packet)
except socket.timeout: except socket.timeout:
# timeout -- expected, allows us to check whether to stop
pass pass
except queue.Full: except queue.Full:
# overflow -- just discard # overflow -- just discard
pass self.parameters["nof_packets_dropped"] += numpy.uint64(1)
logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port)) logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port))
...@@ -159,6 +181,16 @@ class UDP_Receiver(Thread): ...@@ -159,6 +181,16 @@ class UDP_Receiver(Thread):
# shutdown the socket so that others can listen on this port # shutdown the socket so that others can listen on this port
self.sock.shutdown(socket.SHUT_RDWR) self.sock.shutdown(socket.SHUT_RDWR)
def disconnect(self):
if not self.isAlive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.disconnect_timeout)
if self.isAlive:
logger.error("UDP thread not shutting down for {}:{}".format(self.host, self.port))
def __del__(self): def __del__(self):
self.disconnect() self.disconnect()
...@@ -169,9 +201,10 @@ class SST_collector(Thread): ...@@ -169,9 +201,10 @@ class SST_collector(Thread):
# Maximum number of subbands we support (used to determine array sizes) # Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512 MAX_SUBBANDS = 512
def __init__(self, queue): def __init__(self, queue, disconnect_timeout=10.0):
self.queue = queue self.queue = queue
self.last_packet = None self.last_packet = None
self.disconnect_timeout = disconnect_timeout
self.parameters = { self.parameters = {
"nof_packets": numpy.uint64(0), "nof_packets": numpy.uint64(0),
...@@ -216,10 +249,18 @@ class SST_collector(Thread): ...@@ -216,10 +249,18 @@ class SST_collector(Thread):
super().join(timeout) super().join(timeout)
def disconnect(self):
if not self.isAlive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.disconnect_timeout)
if self.isAlive:
logger.error("SST thread not shutting down")
def process_packet(self, packet): def process_packet(self, packet):
self.parameters["nof_packets"] += numpy.uint64(1) self.parameters["nof_packets"] += numpy.uint64(1)
self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time()))
self.parameters["queue_fill_percentage"] = numpy.double(100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0) self.parameters["queue_fill_percentage"] = numpy.double(100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0)
try: try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment