Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
udp_receiver.py 3.66 KiB
import ctypes
import pathlib
from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_RCVBUF
from threading import Thread
from queue import Queue
import numpy
class UdpReceiver(Thread):
""" Receives UDP packets at high speed. """
def __init__(self, port, batch_size, max_packet_size=9000):
""" Listen on port "port" and receive UDP packets in batches
of "batch_size", and with a maximum size of "max_packet_size".
"""
super().__init__(name="UDP receive thread", daemon=True)
self.socket = self._open_socket(port)
self.batch_size = batch_size
self.max_packet_size = max_packet_size
# get recvmmsg function from the C library
c_lib = self._load_c_library("recvmmsg.so")
self.recvmmsg = c_lib.recvmmsg_python
# packet queue
self.queue = Queue()
# start receiving
self.start()
def run(self):
# create our buffers
packet_buffer_type = ctypes.c_char * self.max_packet_size * self.batch_size
packet_buffer = packet_buffer_type()
packet_lengths_type = ctypes.c_int * self.batch_size
packet_lengths = packet_lengths_type()
# recvmmsg works on file descriptors, which is constant per socket
fd = self.socket.fileno()
while True:
# receive the UDP packets.
# NB: will block until "self.batch_size" packets have been received
num_received = self.recvmmsg(fd, self.batch_size, self.max_packet_size, packet_buffer, packet_lengths)
if num_received <= 0:
break
# NB: we have to do as little as possible now, to return to recvmmsg as fast as possible
# put a copy in the queue, so we can reuse our buffer
self.queue.put((packet_buffer[:num_received], packet_lengths[:num_received]))
def pop(self):
""" Return a tuple (packets, packet_lengths) of received packets.
The returned types are ctype arrays. """
return self.queue.get()
def pop_numpy(self):
""" Return a tuple (packets, packet_lengths) of received packets.
The returned types are numpy arrays. """
packets, packet_lengths = self.pop_raw()
return (numpy.ctypeslib.as_array(packets).view(numpy.uint8), numpy.ctypeslib.as_array(packet_lengths))
def _open_socket(self, port):
""" Open a socket to receive UDP packets on the given port. """
# bind a new socket to the given port, listening on all interfaces
s = socket(AF_INET, SOCK_DGRAM)
s.bind(('', port))
# use 128 MByte receive buffer, which is needed to
# collect UDP packets between recvmmsg calls.
#
# increase this if packets are dropped
s.setsockopt(SOL_SOCKET, SO_RCVBUF, 128 * 1024 * 1024)
return s
def _load_c_library(self, libname):
""" Load the given C library. """
libpath = pathlib.Path().absolute() / libname
c_lib = ctypes.CDLL(libpath)
return c_lib
if __name__ == "__main__":
# receive on port 9999, in batches of 10k packets
receiver = UdpReceiver(9999, 10240)
import time
bytes_received = 0
while True:
# wait for a batch of UDP packets
packets, packet_lengths = receiver.pop()
if bytes_received == 0:
# start counting when we receive the first packet,
# NB: causing an initial overestimation of the speed.
begin_time = time.time()
# update byte count
bytes_received += sum(packet_lengths)
# report average speed since start
print(f"{bytes_received/(time.time() - begin_time)/1e6*8} MBit/s")