Skip to content
Snippets Groups Projects
Select Git revision
  • 59b9fe64225b0aa95438a0fa717160653468d73c
  • master default protected
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

tcp_replicator.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    tcp_replicator.py 8.16 KiB
    from threading import Condition
    from threading import Thread
    from threading import Semaphore
    
    import asyncio
    import logging
    import time
    
    logger = logging.getLogger()
    
    
    class TCPReplicator(Thread):
        """TCP replicator intended to fan out incoming UDP packets
    
        There are three different processing layers in this class, several
        methods can be called from the context of the thread that spawned this
        class (main thread). These include: __init__, transmit, join and start.
    
        When start is called, the thread will launch, this will call run from the
        context of this new thread. This thread will create the new event loop as
        this can only be done from the context of the thread you desire to use the
        event loop in. A semaphore is used to prevent a potential race between this
        new thread setting up the event loop and the main thread trying to tear it
        down by calling join. Similarly, transmit also uses this semaphore to
        prevent scheduling transmissions before the thread has fully started.
    
        The final layer is the event loop itself, it handles instances of the
        TCPServerProtocol. These can be found in the _connected_clients list.
        However, only async task are allowed to call methods on these objects!
        The async methods are _transmit, _disconnect, _condtional_stop and
        _run_server.
    
        Tearing down the thread in __del__ is not needed as upon deconstruction
        Python will always call join.
    
        """
    
        """Default options for TCPReplicator
        we kindly ask to not change this static variable at runtime.
        """
        _options = {
            "tcp_bind": '127.0.0.1',
            "tcp_port": 6666,
            "tcp_buffer_size": 128000000,   # In bytes
        }
    
        def __init__(self, options: dict = None):
            super().__init__()
    
            """Reserve asyncio event loop attribute but don't create it yet.
            This event loop is created inside the new Thread, the result is that
            the thread owns the event loop! EVENT LOOPS ARE NOT THREAD SAFE ALL
            CALLS TO THE EVENT LOOP OBJECT MUST USE THE call_soon_threadsafe
            FUNCTION!!
            """
            self._loop = None
    
            # Create and acquire lock to prevent premature termination in join
            self.initialization_semaphore = Semaphore()
            self.initialization_semaphore.acquire()
    
            # Create condition to orchestrate clean shutdown
            self.shutdown_condition = Condition()
    
            # Connected clients the event loop is managing
            self._connected_clients = []
    
            # Shallow copy the options, native data types and strings are immutable
            self.options = self._options.copy()
    
            if not options:
                return
    
            # Find all matching keys in the options arguments and override
            for option, value in options.items():
                if option in self.options:
                    self.options[option] = value
    
        class TCPServerProtocol(asyncio.BaseProtocol):
            """TCP protocol used for connected clients"""
    
            def __init__(self, options: dict, connected_clients: list):
                self.options = options
    
                # Make connected_clients reflect the TCPReplicator connected_clients
                self.connected_clients = connected_clients
    
            def connection_made(self, transport):
                """Setup client connection and add entry to connected_clients"""
                peername = transport.get_extra_info('peername')
                logger.debug('TCP connection from {}'.format(peername))
                self.transport = transport
                # Set the TCP buffer limit
                self.transport.set_write_buffer_limits(
                    high=options['tcp_buffer_size'])
                self.connected_clients.append(self)
    
            def pause_writing(self):
                """Called when TCP buffer for the specific connection is full
    
                Upon encountering a full TCP buffer we deem the client to slow and
                forcefully close its connection.
                """
                self.transport.abort()
    
            def connection_lost(self, exc):
                """Called when connection is lost
    
                Used to remove entries from connected_clients
                """
                peername = self.transport.get_extra_info('peername')
                logger.debug('TCP connection lost from {}'.format(peername))
                self.connected_clients.remove(self)
    
        def run(self):
            """Run is launched by calling .start() on TCPReplicator
    
            It manages an asyncio event loop to orchestrate our TCPServerProtocol.
            """
    
            logger.info("Starting TCPReplicator thread")
    
            # Create the event loop, must be done in the new thread
            self._loop = asyncio.new_event_loop()
    
            # TODO(Corne): REMOVE ME
            self._loop.set_debug(True)
    
            # Schedule the task to create the server
            self._loop.create_task(TCPReplicator._run_server(
                self.options, self._connected_clients))
    
            # Everything is initialized, join can now safely be called
            self.initialization_semaphore.release()
    
            # Keep running event loop until self._loop.stop() is called
            self._loop.run_forever()
    
            # Stop must have been called, close the event loop
            with self.shutdown_condition:
                logger.debug("Closing TCPReplicator event loop")
                self._loop.close()
                self.shutdown_condition.notify()
    
            return
    
        def transmit(self, data):
            """Transmit data to connected clients"""
            with self.initialization_semaphore:
                if not self._loop.is_running():
                    logger.warning("Attempt to transmit with TCPReplicator before"
                                   "fully started.")
                    return
    
                self._loop.call_soon_threadsafe(
                    self._loop.create_task, self._transmit(data))
    
        def join(self, timeout=None):
            with self.initialization_semaphore:
                logging.info("Received shutdown request on TCPReplicator thread")
    
                self._clean_shutdown()
    
                # Only call join at the end otherwise Thread will falsely assume
                # all child 'processes' have stopped
                super().join(timeout)
    
        async def _transmit(self, data):
            logger.debug("Transmitting")
            for client in self._connected_clients:
                client.transport.write(data)
    
        async def _disconnect(self):
            with self.shutdown_condition:
                for client in self._connected_clients:
                    client.transport.abort()
                self.shutdown_condition.notify()
    
        async def _conditional_stop(self):
            with self.shutdown_condition:
                self._loop.stop()
    
        @staticmethod
        async def _run_server(options: dict, connected_clients: list):
            """Retrieve the event loop created in run() and launch the server"""
            loop = asyncio.get_event_loop()
    
            tcp_server = await loop.create_server(
                lambda: TCPReplicator.TCPServerProtocol(options, connected_clients),
                options['tcp_bind'], options['tcp_port'])
    
        def _clean_shutdown(self):
            """Disconnect clients, stop the event loop and wait for it to close"""
    
            # This should never ever happen, semaphore race condition
            if not self._loop:
                logging.error(
                    "TCPReplicator event loop unset, early termination?!")
                return
    
            with self.shutdown_condition:
                self._loop.call_soon_threadsafe(
                    self._loop.create_task, self._disconnect())
                self.shutdown_condition.wait()
    
            if self._loop.is_running():
                with self.shutdown_condition:
                    logging.debug("Stopping TCPReplicator event loop")
                    self._loop.call_soon_threadsafe(
                        self._loop.create_task, self._conditional_stop())
                    self.shutdown_condition.wait()
    
            # Should never happen, conditional race condition
            while self._loop.is_running():
                logging.error("TCPReplicator event loop still running after"
                              "returning from condition.wait!")
                time.sleep(1)
    
            # Should never happen, conditional race condition
            while not self._loop.is_closed():
                logging.error("TCPReplicator event loop not closed after"
                              "returning from condition.wait!")
                time.sleep(1)