Skip to content
Snippets Groups Projects
Select Git revision
  • 569cf223aaf8079caa0465e70d24718f1c7b0736
  • master default protected
  • dither_on_off_disabled
  • yocto
  • pypcc2
  • pypcc3
  • 2020-12-07-the_only_working_copy
  • v2.1
  • v2.0
  • v1.0
  • v0.9
  • Working-RCU_ADC,ID
  • 2020-12-11-Holiday_Season_release
13 results

pypcc2.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    udp_receiver.py 3.66 KiB
    import ctypes
    import pathlib
    from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_RCVBUF
    from threading import Thread
    from queue import Queue
    import numpy
    
    class UdpReceiver(Thread):
        """ Receives UDP packets at high speed. """
    
        def __init__(self, port, batch_size, max_packet_size=9000):
            """ Listen on port "port" and receive UDP packets in batches
                of "batch_size", and with a maximum size of "max_packet_size".
            """
                
            super().__init__(name="UDP receive thread", daemon=True)
    
            self.socket = self._open_socket(port)
            self.batch_size = batch_size
            self.max_packet_size = max_packet_size
    
            # get recvmmsg function from the C library
            c_lib = self._load_c_library("recvmmsg.so")
            self.recvmmsg = c_lib.recvmmsg_python
    
            # packet queue
            self.queue = Queue()
    
            # start receiving
            self.start()
    
        def run(self):
            # create our buffers
            packet_buffer_type = ctypes.c_char * self.max_packet_size * self.batch_size
            packet_buffer = packet_buffer_type()
    
            packet_lengths_type = ctypes.c_int * self.batch_size
            packet_lengths = packet_lengths_type()
    
            # recvmmsg works on file descriptors, which is constant per socket
            fd = self.socket.fileno()
    
            while True:
                # receive the UDP packets.
                # NB: will block until "self.batch_size" packets have been received
                num_received = self.recvmmsg(fd, self.batch_size, self.max_packet_size, packet_buffer, packet_lengths)
                if num_received <= 0:
                    break
    
                # NB: we have to do as little as possible now, to return to recvmmsg as fast as possible
    
                # put a copy in the queue, so we can reuse our buffer
                self.queue.put((packet_buffer[:num_received], packet_lengths[:num_received]))
    
        def pop(self):
            """ Return a tuple (packets, packet_lengths) of received packets.
    
                The returned types are ctype arrays. """
    
            return self.queue.get()
    
        def pop_numpy(self):
            """ Return a tuple (packets, packet_lengths) of received packets.
    
                The returned types are numpy arrays. """
    
            packets, packet_lengths = self.pop_raw()
    
            return (numpy.ctypeslib.as_array(packets).view(numpy.uint8), numpy.ctypeslib.as_array(packet_lengths))
    
        def _open_socket(self, port):
            """ Open a socket to receive UDP packets on the given port. """
    
            # bind a new socket to the given port, listening on all interfaces
            s = socket(AF_INET, SOCK_DGRAM)
            s.bind(('', port))
    
            # use 128 MByte receive buffer, which is needed to
            # collect UDP packets between recvmmsg calls.
            #
            # increase this if packets are dropped
            s.setsockopt(SOL_SOCKET, SO_RCVBUF, 128 * 1024 * 1024)
    
            return s
    
        def _load_c_library(self, libname):
            """ Load the given C library. """
    
            libpath = pathlib.Path().absolute() / libname
            c_lib = ctypes.CDLL(libpath)
    
            return c_lib
    
    if __name__ == "__main__":
        # receive on port 9999, in batches of 10k packets
        receiver = UdpReceiver(9999, 10240)
    
        import time
        bytes_received = 0
    
        while True:
            # wait for a batch of UDP packets
            packets, packet_lengths = receiver.pop()
    
            if bytes_received == 0:
               # start counting when we receive the first packet,
               # NB: causing an initial overestimation of the speed.
               begin_time = time.time()
    
            # update byte count
            bytes_received += sum(packet_lengths)
    
            # report average speed since start
            print(f"{bytes_received/(time.time() - begin_time)/1e6*8} MBit/s")