Skip to content
Snippets Groups Projects

create TCPReplicator for StatisticsClient

Merged Corné Lukken requested to merge L2SS-340-tcp-statisticsclient-server into master
Compare and
4 files
+ 491
1
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 233
0
from threading import Condition
from threading import Thread
from threading import Semaphore
import asyncio
import logging
import time
logger = logging.getLogger()
class TCPReplicator(Thread):
"""TCP replicator intended to fan out incoming UDP packets
There are three different processing layers in this class, several
methods can be called from the context of the thread that spawned this
class (main thread). These include: __init__, transmit, join and start.
When start is called, the thread will launch, this will call run from the
context of this new thread. This thread will create the new event loop as
this can only be done from the context of the thread you desire to use the
event loop in. A semaphore is used to prevent a potential race between this
new thread setting up the event loop and the main thread trying to tear it
down by calling join. Similarly, transmit also uses this semaphore to
prevent scheduling transmissions before the thread has fully started.
The final layer is the event loop itself, it handles instances of the
TCPServerProtocol. These can be found in the _connected_clients list.
However, only async task are allowed to call methods on these objects!
The async methods are _transmit, _disconnect, _condtional_stop and
_run_server.
Tearing down the thread in __del__ is not needed as upon deconstruction
Python will always call join.
"""
"""Default options for TCPReplicator
we kindly ask to not change this static variable at runtime.
"""
_options = {
"tcp_bind": '127.0.0.1',
"tcp_port": 6666,
"tcp_buffer_size": 128000000, # In bytes
}
def __init__(self, options: dict = None):
super().__init__()
"""Reserve asyncio event loop attribute but don't create it yet.
This event loop is created inside the new Thread, the result is that
the thread owns the event loop! EVENT LOOPS ARE NOT THREAD SAFE ALL
CALLS TO THE EVENT LOOP OBJECT MUST USE THE call_soon_threadsafe
FUNCTION!!
"""
self._loop = None
# Create and acquire lock to prevent premature termination in join
self.initialization_semaphore = Semaphore()
self.initialization_semaphore.acquire()
# Create condition to orchestrate clean shutdown
self.shutdown_condition = Condition()
# Connected clients the event loop is managing
self._connected_clients = []
# Shallow copy the options, native data types and strings are immutable
self.options = self._options.copy()
if not options:
return
# Find all matching keys in the options arguments and override
for option, value in options.items():
if option in self.options:
self.options[option] = value
class TCPServerProtocol(asyncio.Protocol):
"""TCP protocol used for connected clients"""
def __init__(self, options: dict, connected_clients: list):
self.options = options
# Make connected_clients reflect the TCPReplicator connected_clients
self.connected_clients = connected_clients
def connection_made(self, transport):
"""Setup client connection and add entry to connected_clients"""
peername = transport.get_extra_info('peername')
logger.debug('TCP connection from {}'.format(peername))
self.transport = transport
# Set the TCP buffer limit
self.transport.set_write_buffer_limits(
high=self.options['tcp_buffer_size'])
self.connected_clients.append(self)
def pause_writing(self):
"""Called when TCP buffer for the specific connection is full
Upon encountering a full TCP buffer we deem the client to slow and
forcefully close its connection.
"""
self.transport.abort()
def connection_lost(self, exc):
"""Called when connection is lost
Used to remove entries from connected_clients
"""
peername = self.transport.get_extra_info('peername')
logger.debug('TCP connection lost from {}'.format(peername))
self.connected_clients.remove(self)
def eof_received(self):
"""After eof_received, connection_lost is still called"""
pass
def run(self):
"""Run is launched by calling .start() on TCPReplicator
It manages an asyncio event loop to orchestrate our TCPServerProtocol.
"""
logger.info("Starting TCPReplicator thread")
# Create the event loop, must be done in the new thread
self._loop = asyncio.new_event_loop()
# TODO(Corne): REMOVE ME
self._loop.set_debug(True)
# Schedule the task to create the server
self._loop.create_task(TCPReplicator._run_server(
self.options, self._connected_clients))
# Everything is initialized, join can now safely be called
self.initialization_semaphore.release()
# Keep running event loop until self._loop.stop() is called
self._loop.run_forever()
# Stop must have been called, close the event loop
with self.shutdown_condition:
logger.debug("Closing TCPReplicator event loop")
self._loop.close()
self.shutdown_condition.notify()
return
def transmit(self, data: bytes):
"""Transmit data to connected clients"""
if not isinstance(data, (bytes, bytearray)):
raise TypeError("Data must be byte-like object")
with self.initialization_semaphore:
if not self._loop.is_running():
logger.warning("Attempt to transmit with TCPReplicator before"
"fully started.")
return
self._loop.call_soon_threadsafe(
self._loop.create_task, self._transmit(data))
def join(self, timeout=None):
with self.initialization_semaphore:
logging.info("Received shutdown request on TCPReplicator thread")
self._clean_shutdown()
# Only call join at the end otherwise Thread will falsely assume
# all child 'processes' have stopped
super().join(timeout)
async def _transmit(self, data):
logger.debug("Transmitting")
for client in self._connected_clients:
client.transport.write(data)
async def _disconnect(self):
with self.shutdown_condition:
for client in self._connected_clients:
peername = client.transport.get_extra_info('peername')
logger.debug('Disconnecting client {}'.format(peername))
client.transport.abort()
self.shutdown_condition.notify()
async def _conditional_stop(self):
with self.shutdown_condition:
self._loop.stop()
@staticmethod
async def _run_server(options: dict, connected_clients: list):
"""Retrieve the event loop created in run() and launch the server"""
loop = asyncio.get_event_loop()
tcp_server = await loop.create_server(
lambda: TCPReplicator.TCPServerProtocol(options, connected_clients),
options['tcp_bind'], options['tcp_port'])
def _clean_shutdown(self):
"""Disconnect clients, stop the event loop and wait for it to close"""
# This should never ever happen, semaphore race condition
if not self._loop:
logging.error(
"TCPReplicator event loop unset, early termination?!")
return
with self.shutdown_condition:
self._loop.call_soon_threadsafe(
self._loop.create_task, self._disconnect())
self.shutdown_condition.wait()
if self._loop.is_running():
with self.shutdown_condition:
logging.debug("Stopping TCPReplicator event loop")
self._loop.call_soon_threadsafe(
self._loop.create_task, self._conditional_stop())
self.shutdown_condition.wait()
# Should never happen, conditional race condition
while self._loop.is_running():
logging.error("TCPReplicator event loop still running after"
"returning from condition.wait!")
time.sleep(1)
# Should never happen, conditional race condition
while not self._loop.is_closed():
logging.error("TCPReplicator event loop not closed after"
"returning from condition.wait!")
time.sleep(1)
Loading