Skip to content
Snippets Groups Projects
Commit 59b9fe64 authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-340: Employ condition to synchronize shutdown

The event loop is owned by the thread spawned in TCPReplicator.
The result is that any access to this event loop and its tasks
is unsafe from the main thread. To 'query' the progression of
shutdown from the main thread, an condition is used.
parent 64f745f6
No related branches found
No related tags found
1 merge request!117create TCPReplicator for StatisticsClient
from threading import Condition
from threading import Thread
from threading import Semaphore
......@@ -26,7 +27,8 @@ class TCPReplicator(Thread):
The final layer is the event loop itself, it handles instances of the
TCPServerProtocol. These can be found in the _connected_clients list.
However, only async task are allowed to call methods on these objects!
The async methods are _transmit, _disconnect and _run_server.
The async methods are _transmit, _disconnect, _condtional_stop and
_run_server.
Tearing down the thread in __del__ is not needed as upon deconstruction
Python will always call join.
......@@ -53,20 +55,23 @@ class TCPReplicator(Thread):
"""
self._loop = None
"""Create and acquire lock to prevent premature termination in join"""
# Create and acquire lock to prevent premature termination in join
self.initialization_semaphore = Semaphore()
self.initialization_semaphore.acquire()
"Connected clients the event loop is managing"
# Create condition to orchestrate clean shutdown
self.shutdown_condition = Condition()
# Connected clients the event loop is managing
self._connected_clients = []
"Shallow copy the options, native data types and strings are immutable"
# Shallow copy the options, native data types and strings are immutable
self.options = self._options.copy()
if not options:
return
"Find all matching keys in the options arguments and override"
# Find all matching keys in the options arguments and override
for option, value in options.items():
if option in self.options:
self.options[option] = value
......@@ -118,6 +123,9 @@ class TCPReplicator(Thread):
# Create the event loop, must be done in the new thread
self._loop = asyncio.new_event_loop()
# TODO(Corne): REMOVE ME
self._loop.set_debug(True)
# Schedule the task to create the server
self._loop.create_task(TCPReplicator._run_server(
self.options, self._connected_clients))
......@@ -129,8 +137,10 @@ class TCPReplicator(Thread):
self._loop.run_forever()
# Stop must have been called, close the event loop
with self.shutdown_condition:
logger.debug("Closing TCPReplicator event loop")
self._loop.close()
self.shutdown_condition.notify()
return
......@@ -161,8 +171,14 @@ class TCPReplicator(Thread):
client.transport.write(data)
async def _disconnect(self):
with self.shutdown_condition:
for client in self._connected_clients:
client.transport.abort()
self.shutdown_condition.notify()
async def _conditional_stop(self):
with self.shutdown_condition:
self._loop.stop()
@staticmethod
async def _run_server(options: dict, connected_clients: list):
......@@ -175,23 +191,33 @@ class TCPReplicator(Thread):
def _clean_shutdown(self):
"""Disconnect clients, stop the event loop and wait for it to close"""
self._loop.call_soon_threadsafe(
self._loop.create_task, self._disconnect())
# Early termination prevents unnecessary nesting
# This should never ever happen, semaphore race condition
if not self._loop:
logging.warning(
logging.error(
"TCPReplicator event loop unset, early termination?!")
return
with self.shutdown_condition:
self._loop.call_soon_threadsafe(
self._loop.create_task, self._disconnect())
self.shutdown_condition.wait()
if self._loop.is_running():
with self.shutdown_condition:
logging.debug("Stopping TCPReplicator event loop")
self._loop.call_soon_threadsafe(self._loop.stop)
self._loop.call_soon_threadsafe(
self._loop.create_task, self._conditional_stop())
self.shutdown_condition.wait()
# Required to yield execution here so the event loop can stop
# Should never happen, conditional race condition
while self._loop.is_running():
logging.error("TCPReplicator event loop still running after"
"returning from condition.wait!")
time.sleep(1)
# Required to yield execution here so the event loop can close
# Should never happen, conditional race condition
while not self._loop.is_closed():
logging.error("TCPReplicator event loop not closed after"
"returning from condition.wait!")
time.sleep(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment