Select Git revision
t_tmssapp_specification_REST_API.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tcp_replicator.py 11.62 KiB
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import asyncio
import atexit
import logging
import threading
from asyncio import StreamWriter, AbstractEventLoop, CancelledError, Task, QueueFull
from concurrent import futures
from threading import Semaphore
from threading import Thread
from typing import Optional
from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
logger = logging.getLogger()
class TCPReplicator(Thread, StatisticsClientThread):
class Stats(object):
def __init__(self):
self._nof_packets_sent = 0
self._nof_bytes_sent = 0
self._lock = threading.Lock()
def packet_sent(self, size: int):
with self._lock:
self._nof_packets_sent += 1
self._nof_bytes_sent += size
@property
def nof_packets_sent(self):
return self._nof_packets_sent
@property
def nof_bytes_sent(self):
return self._nof_bytes_sent
"""TCP replicator intended to fan out incoming UDP packets
There are three different processing layers in this class, several
methods can be called from the context of the thread that spawned this
class (main thread). These include: __init__, transmit, join.
When constructed start is called, the thread will launch, this will call run
from the context of this new thread. This thread will create the new event
loop as this can only be done from the context of the thread you desire to
use the event loop in. A semaphore is used to prevent a potential race
between this new thread setting up the event loop and the main thread trying
to tear it down by calling join. The constructor waits on this semaphore
which will always be released either by _server_start_callback or by the
finally clause in run.
The final layer is the event loop itself, it handles instances of the
TCPServerProtocol. These can be found in the _connected_clients list.
However, only async task are allowed to call methods on these objects!
The async methods are _transmit, _disconnect, _stop_event_loop,
_process_queue and _run_server.
_process_queue takes elements of the queue and transmits them across clients.
It uses an asyncio.Queue to process elements, given to the replicator through
the put method.
To cleanly shutdown this loop in _stop_event_loop, we insert a None magic marker
into the queue, causing the _process_task to return.
Disconnecting the clients and stopping of the server is handled in _disconnect.
"""
"""Default options for TCPReplicator
we kindly ask to not change this static variable at runtime.
"""
_DEFAULT_OPTIONS = {
"tcp_bind": "0.0.0.0",
"tcp_port": 6666,
"tcp_buffer_size": 128000000, # In bytes
}
def __init__(self, options: dict = None, queue_size=0):
super().__init__(daemon=True)
self.queue_size = queue_size
# statistics
self.stats: TCPReplicator.Stats = TCPReplicator.Stats()
"""Reserve asyncio event loop attribute but don't create it yet.
This event loop is created inside the new Thread, the result is that
the thread owns the event loop! EVENT LOOPS ARE NOT THREAD SAFE ALL
CALLS TO THE EVENT LOOP OBJECT MUST USE THE call_soon_threadsafe
FUNCTION!!
"""
self._loop: Optional[AbstractEventLoop] = None
self._tasks: [Task] = set()
# Used to maintain a reference to the server object, so we can stop
# listening cleanly
self._server: Optional[asyncio.AbstractServer] = None
# Create and acquire lock to prevent leaving the constructor without
# starting the thread.
self._initialization_semaphore = Semaphore()
self._initialization_semaphore.acquire()
# Connected clients the event loop is managing
self._connected_clients: [TCPReplicator.TcpReplicatorClient] = []
# Parse the configured options
self.options = self._parse_options(options)
# We start ourselves immediately to reduce amount of possible states.
self.start()
# Wait until we can hold the semaphore, this indicates the thread has
# initialized or encountered an exception.
with self._initialization_semaphore:
if not self.is_alive():
raise RuntimeError("TCPReplicator failed to initialize")
logging.debug("TCPReplicator initialization completed")
@property
def _options(self) -> dict:
return TCPReplicator._DEFAULT_OPTIONS
class TcpReplicatorClient:
def __init__(self, stats, writer: StreamWriter, queue_size):
self._writer = writer
self._stats = stats
self.is_alive = True
self.addr = writer.get_extra_info("peername")
self.queue = asyncio.Queue(maxsize=queue_size)
self.tasks = set()
self.tasks.add(
asyncio.create_task(
self.worker(), name=f"TcpReplicatorClient.worker for {self.addr}"
)
)
@property
def queue_fill_percentage(self):
return (
100 * self.queue.qsize() / self.queue.maxsize
if self.queue.maxsize
else 0
)
async def worker(self):
"""Take packets from the queue and transmit them across our clients."""
while True:
try:
packet = await self.queue.get()
try:
self._writer.write(packet)
await self._writer.drain()
self._stats.packet_sent(len(packet))
except ConnectionError:
logger.debug("Client closed connection")
self.is_alive = False
break
finally:
self.queue.task_done()
except RuntimeError as _e:
logger.debug(_e)
self.is_alive = False
break
async def put(self, packet):
if not self.is_alive:
raise asyncio.CancelledError("Client is dead")
self.queue.put_nowait(packet)
async def _clean_shutdown(self):
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
async def close(self):
self._writer.close()
if self.is_alive:
# this is due to a bug that causes a deadlock if wait_closed is called after
# the connection is already closed https://github.com/python/cpython/pull/98620
await self._writer.wait_closed()
await self._clean_shutdown()
def run(self):
"""Run is launched from constructor of TCPReplicator
It manages an asyncio event loop to orchestrate our TCPServerProtocol.
"""
try:
logger.info(
"Starting TCPReplicator thread for %s:%s",
self.options["tcp_bind"],
self.options["tcp_port"],
)
atexit.register(self.join)
# Create the event loop, must be done in the new thread
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._serve())
except CancelledError:
pass
except Exception as _e:
# Log the exception as thread exceptions won't be returned to us
# on the main thread.
logging.exception(
"TCPReplicator thread encountered fatal exception: %s", _e
)
self._initialization_semaphore.release()
def _client_connected(self):
async def _cb(_, writer):
self._connected_clients.append(
TCPReplicator.TcpReplicatorClient(self.stats, writer, self.queue_size)
)
return _cb
async def _serve(self):
self._server = await asyncio.start_server(
self._client_connected(),
self.options["tcp_bind"],
self.options["tcp_port"],
reuse_address=True,
start_serving=True,
)
self._initialization_semaphore.release()
async with self._server:
await self._server.serve_forever()
async def _transmit_to_client(self, client, data):
try:
await client.put(data)
except (QueueFull, CancelledError):
t = self._loop.create_task(
client.close(), name=f"TCPReplicator.close for {client}"
)
self._tasks.add(t)
t.add_done_callback(self._tasks.discard)
self._connected_clients.remove(client)
def transmit(self, data: bytes):
"""Transmit data to connected clients"""
if not isinstance(data, (bytes, bytearray)):
raise TypeError("Data must be byte-like object")
futures.wait(
[
asyncio.run_coroutine_threadsafe(
self._transmit_to_client(c, data), self._loop
)
for c in self._connected_clients
]
)
def join(self, timeout=None):
logging.info(
"Received shutdown request on TCPReplicator thread for %s:%s",
self.options["tcp_bind"],
self.options["tcp_port"],
)
# Unregister _clean_shutdown to prevent double execution and make
# sure the thread gets cleaned up on stop/join
atexit.unregister(self.join)
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(
self._clean_shutdown(), self._loop
).result()
# Only call join at the end otherwise Thread will falsely assume
# all child 'processes' have stopped
super().join(timeout)
if self.is_alive():
# re-register because thread is not fully shutdown yet.
atexit.register(self.join)
def disconnect(self):
if not self.is_alive():
return
self.join(self.DISCONNECT_TIMEOUT)
if self.is_alive():
# there is nothing we can do except wait (stall) longer, which
# could be indefinitely.
logger.error(
"TCP thread for %s:%s did not shutdown after %s seconds, just leaving it dangling. \
Please attach a debugger to thread ID %s.",
self.options["tcp_bind"],
self.options["tcp_port"],
self.DISCONNECT_TIMEOUT,
self.ident,
)
def put(self, packet):
self.transmit(packet)
async def _clean_shutdown(self):
"""Disconnect clients, stop the event loop and wait for it to close"""
for c in self._connected_clients:
await c.close()
# Shutdown server and disconnect clients
self._server.close()
await self._server.wait_closed()
for task in self._tasks:
task.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
def clients(self):
"""Return the list of connected clients."""
return [f"{client.addr}" for client in self._connected_clients]
@property
def client_queue_fill_percentage(self):
return [c.queue_fill_percentage for c in self._connected_clients]
@property
def nof_tasks_pending(self):
"""Return the number of pending tasks in our event loop."""
return len(asyncio.all_tasks(self._loop))