Skip to content
Snippets Groups Projects
Select Git revision
  • 516b583e2e71141e075287a289a1d27817153e25
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

t_tmssapp_specification_REST_API.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    tcp_replicator.py 11.62 KiB
    #  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    import asyncio
    import atexit
    import logging
    import threading
    from asyncio import StreamWriter, AbstractEventLoop, CancelledError, Task, QueueFull
    from concurrent import futures
    from threading import Semaphore
    from threading import Thread
    from typing import Optional
    
    from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
    
    logger = logging.getLogger()
    
    
    class TCPReplicator(Thread, StatisticsClientThread):
        class Stats(object):
            def __init__(self):
                self._nof_packets_sent = 0
                self._nof_bytes_sent = 0
                self._lock = threading.Lock()
    
            def packet_sent(self, size: int):
                with self._lock:
                    self._nof_packets_sent += 1
                    self._nof_bytes_sent += size
    
            @property
            def nof_packets_sent(self):
                return self._nof_packets_sent
    
            @property
            def nof_bytes_sent(self):
                return self._nof_bytes_sent
    
        """TCP replicator intended to fan out incoming UDP packets
    
        There are three different processing layers in this class, several
        methods can be called from the context of the thread that spawned this
        class (main thread). These include: __init__, transmit, join.
    
        When constructed start is called, the thread will launch, this will call run
        from the context of this new thread. This thread will create the new event
        loop as this can only be done from the context of the thread you desire to
        use the event loop in. A semaphore is used to prevent a potential race
        between this new thread setting up the event loop and the main thread trying
        to tear it down by calling join. The constructor waits on this semaphore
        which will always be released either by _server_start_callback or by the
        finally clause in run.
    
        The final layer is the event loop itself, it handles instances of the
        TCPServerProtocol. These can be found in the _connected_clients list.
        However, only async task are allowed to call methods on these objects!
        The async methods are _transmit, _disconnect, _stop_event_loop,
        _process_queue and _run_server.
    
        _process_queue takes elements of the queue and transmits them across clients.
        It uses an asyncio.Queue to process elements, given to the replicator through
        the put method.
    
        To cleanly shutdown this loop in _stop_event_loop, we insert a None magic marker
        into the queue, causing the _process_task to return.
    
        Disconnecting the clients and stopping of the server is handled in _disconnect.
    
        """
    
        """Default options for TCPReplicator
        we kindly ask to not change this static variable at runtime.
        """
        _DEFAULT_OPTIONS = {
            "tcp_bind": "0.0.0.0",
            "tcp_port": 6666,
            "tcp_buffer_size": 128000000,  # In bytes
        }
    
        def __init__(self, options: dict = None, queue_size=0):
            super().__init__(daemon=True)
    
            self.queue_size = queue_size
    
            # statistics
            self.stats: TCPReplicator.Stats = TCPReplicator.Stats()
    
            """Reserve asyncio event loop attribute but don't create it yet.
            This event loop is created inside the new Thread, the result is that
            the thread owns the event loop! EVENT LOOPS ARE NOT THREAD SAFE ALL
            CALLS TO THE EVENT LOOP OBJECT MUST USE THE call_soon_threadsafe
            FUNCTION!!
            """
            self._loop: Optional[AbstractEventLoop] = None
            self._tasks: [Task] = set()
    
            # Used to maintain a reference to the server object, so we can stop
            # listening cleanly
            self._server: Optional[asyncio.AbstractServer] = None
    
            # Create and acquire lock to prevent leaving the constructor without
            # starting the thread.
            self._initialization_semaphore = Semaphore()
            self._initialization_semaphore.acquire()
    
            # Connected clients the event loop is managing
            self._connected_clients: [TCPReplicator.TcpReplicatorClient] = []
    
            # Parse the configured options
            self.options = self._parse_options(options)
    
            # We start ourselves immediately to reduce amount of possible states.
            self.start()
    
            # Wait until we can hold the semaphore, this indicates the thread has
            # initialized or encountered an exception.
            with self._initialization_semaphore:
                if not self.is_alive():
                    raise RuntimeError("TCPReplicator failed to initialize")
    
                logging.debug("TCPReplicator initialization completed")
    
        @property
        def _options(self) -> dict:
            return TCPReplicator._DEFAULT_OPTIONS
    
        class TcpReplicatorClient:
            def __init__(self, stats, writer: StreamWriter, queue_size):
                self._writer = writer
                self._stats = stats
                self.is_alive = True
                self.addr = writer.get_extra_info("peername")
                self.queue = asyncio.Queue(maxsize=queue_size)
                self.tasks = set()
                self.tasks.add(
                    asyncio.create_task(
                        self.worker(), name=f"TcpReplicatorClient.worker for {self.addr}"
                    )
                )
    
            @property
            def queue_fill_percentage(self):
                return (
                    100 * self.queue.qsize() / self.queue.maxsize
                    if self.queue.maxsize
                    else 0
                )
    
            async def worker(self):
                """Take packets from the queue and transmit them across our clients."""
                while True:
                    try:
                        packet = await self.queue.get()
                        try:
                            self._writer.write(packet)
                            await self._writer.drain()
                            self._stats.packet_sent(len(packet))
                        except ConnectionError:
                            logger.debug("Client closed connection")
                            self.is_alive = False
                            break
                        finally:
                            self.queue.task_done()
                    except RuntimeError as _e:
                        logger.debug(_e)
                        self.is_alive = False
                        break
    
            async def put(self, packet):
                if not self.is_alive:
                    raise asyncio.CancelledError("Client is dead")
    
                self.queue.put_nowait(packet)
    
            async def _clean_shutdown(self):
                for task in self.tasks:
                    task.cancel()
                await asyncio.gather(*self.tasks, return_exceptions=True)
    
            async def close(self):
                self._writer.close()
                if self.is_alive:
                    # this is due to a bug that causes a deadlock if wait_closed is called after
                    # the connection is already closed https://github.com/python/cpython/pull/98620
                    await self._writer.wait_closed()
                await self._clean_shutdown()
    
        def run(self):
            """Run is launched from constructor of TCPReplicator
    
            It manages an asyncio event loop to orchestrate our TCPServerProtocol.
            """
            try:
                logger.info(
                    "Starting TCPReplicator thread for %s:%s",
                    self.options["tcp_bind"],
                    self.options["tcp_port"],
                )
    
                atexit.register(self.join)
    
                # Create the event loop, must be done in the new thread
                self._loop = asyncio.new_event_loop()
                asyncio.set_event_loop(self._loop)
    
                self._loop.run_until_complete(self._serve())
            except CancelledError:
                pass
            except Exception as _e:
                # Log the exception as thread exceptions won't be returned to us
                # on the main thread.
                logging.exception(
                    "TCPReplicator thread encountered fatal exception: %s", _e
                )
                self._initialization_semaphore.release()
    
        def _client_connected(self):
            async def _cb(_, writer):
                self._connected_clients.append(
                    TCPReplicator.TcpReplicatorClient(self.stats, writer, self.queue_size)
                )
    
            return _cb
    
        async def _serve(self):
            self._server = await asyncio.start_server(
                self._client_connected(),
                self.options["tcp_bind"],
                self.options["tcp_port"],
                reuse_address=True,
                start_serving=True,
            )
            self._initialization_semaphore.release()
            async with self._server:
                await self._server.serve_forever()
    
        async def _transmit_to_client(self, client, data):
            try:
                await client.put(data)
            except (QueueFull, CancelledError):
                t = self._loop.create_task(
                    client.close(), name=f"TCPReplicator.close for {client}"
                )
                self._tasks.add(t)
                t.add_done_callback(self._tasks.discard)
                self._connected_clients.remove(client)
    
        def transmit(self, data: bytes):
            """Transmit data to connected clients"""
    
            if not isinstance(data, (bytes, bytearray)):
                raise TypeError("Data must be byte-like object")
    
            futures.wait(
                [
                    asyncio.run_coroutine_threadsafe(
                        self._transmit_to_client(c, data), self._loop
                    )
                    for c in self._connected_clients
                ]
            )
    
        def join(self, timeout=None):
            logging.info(
                "Received shutdown request on TCPReplicator thread for %s:%s",
                self.options["tcp_bind"],
                self.options["tcp_port"],
            )
    
            # Unregister _clean_shutdown to prevent double execution and make
            # sure the thread gets cleaned up on stop/join
            atexit.unregister(self.join)
    
            if self._loop and self._loop.is_running():
                asyncio.run_coroutine_threadsafe(
                    self._clean_shutdown(), self._loop
                ).result()
    
            # Only call join at the end otherwise Thread will falsely assume
            # all child 'processes' have stopped
            super().join(timeout)
    
            if self.is_alive():
                # re-register because thread is not fully shutdown yet.
                atexit.register(self.join)
    
        def disconnect(self):
            if not self.is_alive():
                return
    
            self.join(self.DISCONNECT_TIMEOUT)
    
            if self.is_alive():
                # there is nothing we can do except wait (stall) longer, which
                # could be indefinitely.
                logger.error(
                    "TCP thread for %s:%s did not shutdown after %s seconds, just leaving it dangling. \
                        Please attach a debugger to thread ID %s.",
                    self.options["tcp_bind"],
                    self.options["tcp_port"],
                    self.DISCONNECT_TIMEOUT,
                    self.ident,
                )
    
        def put(self, packet):
            self.transmit(packet)
    
        async def _clean_shutdown(self):
            """Disconnect clients, stop the event loop and wait for it to close"""
            for c in self._connected_clients:
                await c.close()
    
            # Shutdown server and disconnect clients
            self._server.close()
            await self._server.wait_closed()
    
            for task in self._tasks:
                task.cancel()
            await asyncio.gather(*self._tasks, return_exceptions=True)
    
        def clients(self):
            """Return the list of connected clients."""
    
            return [f"{client.addr}" for client in self._connected_clients]
    
        @property
        def client_queue_fill_percentage(self):
            return [c.queue_fill_percentage for c in self._connected_clients]
    
        @property
        def nof_tasks_pending(self):
            """Return the number of pending tasks in our event loop."""
            return len(asyncio.all_tasks(self._loop))