Skip to content
Snippets Groups Projects
Commit 64f745f6 authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-340: Implement async transmit and disconnect with tests

parent ff5d3928
Branches
Tags
1 merge request!117create TCPReplicator for StatisticsClient
......@@ -9,7 +9,29 @@ logger = logging.getLogger()
class TCPReplicator(Thread):
"""TCP replicator intended to fan out incoming UDP packets"""
"""TCP replicator intended to fan out incoming UDP packets
There are three different processing layers in this class, several
methods can be called from the context of the thread that spawned this
class (main thread). These include: __init__, transmit, join and start.
When start is called, the thread will launch, this will call run from the
context of this new thread. This thread will create the new event loop as
this can only be done from the context of the thread you desire to use the
event loop in. A semaphore is used to prevent a potential race between this
new thread setting up the event loop and the main thread trying to tear it
down by calling join. Similarly, transmit also uses this semaphore to
prevent scheduling transmissions before the thread has fully started.
The final layer is the event loop itself, it handles instances of the
TCPServerProtocol. These can be found in the _connected_clients list.
However, only async task are allowed to call methods on these objects!
The async methods are _transmit, _disconnect and _run_server.
Tearing down the thread in __del__ is not needed as upon deconstruction
Python will always call join.
"""
"""Default options for TCPReplicator
we kindly ask to not change this static variable at runtime.
......@@ -35,7 +57,7 @@ class TCPReplicator(Thread):
self.initialization_semaphore = Semaphore()
self.initialization_semaphore.acquire()
"Connected clients the TCPReplicator thread is managing"
"Connected clients the event loop is managing"
self._connected_clients = []
"Shallow copy the options, native data types and strings are immutable"
......@@ -96,15 +118,18 @@ class TCPReplicator(Thread):
# Create the event loop, must be done in the new thread
self._loop = asyncio.new_event_loop()
# Schedule the task to create the server
self._loop.create_task(TCPReplicator._run_server(
self.options, self._connected_clients))
# Everything is initialized, join or __del__ can now safely be called
# Everything is initialized, join can now safely be called
self.initialization_semaphore.release()
# Keep running event loop until self._loop.stop() is called
self._loop.run_forever()
logger.info("Closing TCPReplicator event loop")
# Stop must have been called, close the event loop
logger.debug("Closing TCPReplicator event loop")
self._loop.close()
return
......@@ -117,7 +142,8 @@ class TCPReplicator(Thread):
"fully started.")
return
self._loop.create_task(self._transmit(data))
self._loop.call_soon_threadsafe(
self._loop.create_task, self._transmit(data))
def join(self, timeout=None):
with self.initialization_semaphore:
......@@ -130,9 +156,14 @@ class TCPReplicator(Thread):
super().join(timeout)
async def _transmit(self, data):
logger.debug("Transmitting")
for client in self._connected_clients:
client.transport.write(data)
async def _disconnect(self):
for client in self._connected_clients:
client.transport.abort()
@staticmethod
async def _run_server(options: dict, connected_clients: list):
"""Retrieve the event loop created in run() and launch the server"""
......@@ -144,8 +175,8 @@ class TCPReplicator(Thread):
def _clean_shutdown(self):
"""Disconnect clients, stop the event loop and wait for it to close"""
for client in self._connected_clients:
logging.debug("Disconnecting client")
self._loop.call_soon_threadsafe(
self._loop.create_task, self._disconnect())
# Early termination prevents unnecessary nesting
if not self._loop:
......
......@@ -83,3 +83,51 @@ class TestTCPReplicator(base.TestCase):
replicator.start()
del replicator
@staticmethod
async def dummy_task():
pass
@mock.patch.object(TCPReplicator, "_run_server")
def test_transmit(self, m_run_server):
"""Test that clients are getting data written to their transport"""
m_run_server.return_value = self.dummy_task()
m_data = "Hello World!"
m_client = mock.Mock()
replicator = TCPReplicator()
replicator.start()
replicator._connected_clients.append(m_client)
replicator.transmit(m_data)
# TODO(Corne): Find suitable primitive to synchronize async task update
# with main thread.
time.sleep(1)
time.sleep(1)
time.sleep(1)
time.sleep(1)
time.sleep(1)
time.sleep(1)
m_client.transport.write.assert_called_once_with(m_data)
@mock.patch.object(TCPReplicator, "_run_server")
def test_disconnect(self, m_run_server):
m_run_server.return_value = self.dummy_task()
m_client = mock.Mock()
replicator = TCPReplicator()
replicator.start()
replicator._connected_clients.append(m_client)
replicator.join(5)
m_client.transport.abort.assert_called_once_with()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment