Skip to content
Snippets Groups Projects
Commit 5241984a authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-340: Separate condition names for different stages

parent d594420b
No related branches found
No related tags found
1 merge request!117create TCPReplicator for StatisticsClient
...@@ -61,8 +61,11 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -61,8 +61,11 @@ class TCPReplicator(Thread, StatisticsClientThread):
self.initialization_semaphore = Semaphore() self.initialization_semaphore = Semaphore()
self.initialization_semaphore.acquire() self.initialization_semaphore.acquire()
# Create condition to orchestrate clean shutdown # Create condition to orchestrate clean disconnecting and shutdown
self.shutdown_condition = Condition() # They are actually the same object, just with different names for
# clarity.
self.disconnect_condition = Condition()
self.shutdown_condition = self.disconnect_condition
# Connected clients the event loop is managing # Connected clients the event loop is managing
self._connected_clients = [] self._connected_clients = []
...@@ -162,11 +165,15 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -162,11 +165,15 @@ class TCPReplicator(Thread, StatisticsClientThread):
# on the main thread. # on the main thread.
logging.fatal("TCPReplicator thread encountered fatal exception: " logging.fatal("TCPReplicator thread encountered fatal exception: "
"{}".format(e)) "{}".format(e))
# We will lose the exception and the original stacktrace of the # We will lose the exception and the original stacktrace of the
# thread. Once we use a threadpool it will be much easier to # thread. Once we use a threadpool it will be much easier to
# retrieve this so I propose to not bother implementing it now. # retrieve this so I propose to not bother implementing it now.
# For the pattern to do this see anyway: # For the pattern to do this see anyway:
# https://stackoverflow.com/a/6894023 # https://stackoverflow.com/a/6894023
# Due to the exception the run method will return making is_alive()
# false
finally: finally:
# Always release the lock upon error so the constructor can return # Always release the lock upon error so the constructor can return
if self.initialization_semaphore.acquire(blocking=False) is False: if self.initialization_semaphore.acquire(blocking=False) is False:
...@@ -212,15 +219,16 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -212,15 +219,16 @@ class TCPReplicator(Thread, StatisticsClientThread):
client.transport.write(data) client.transport.write(data)
async def _disconnect(self): async def _disconnect(self):
with self.shutdown_condition: with self.disconnect_condition:
for client in self._connected_clients: for client in self._connected_clients:
peername = client.transport.get_extra_info('peername') peername = client.transport.get_extra_info('peername')
logger.debug('Disconnecting client {}'.format(peername)) logger.debug('Disconnecting client {}'.format(peername))
client.transport.abort() client.transport.abort()
self.shutdown_condition.notify() self.disconnect_condition.notify()
async def _conditional_stop(self): async def _stop_event_loop(self):
with self.shutdown_condition: with self.shutdown_condition:
# Calling stop() will return control flow to self._loop.run_*()
self._loop.stop() self._loop.stop()
@staticmethod @staticmethod
...@@ -245,14 +253,14 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -245,14 +253,14 @@ class TCPReplicator(Thread, StatisticsClientThread):
if not self._loop.is_running(): if not self._loop.is_running():
return return
with self.shutdown_condition: with self.disconnect_condition:
self._loop.call_soon_threadsafe( self._loop.call_soon_threadsafe(
self._loop.create_task, self._disconnect()) self._loop.create_task, self._disconnect())
self.shutdown_condition.wait() self.disconnect_condition.wait()
if self._loop.is_running(): if self._loop.is_running():
with self.shutdown_condition: with self.shutdown_condition:
logging.debug("Stopping TCPReplicator event loop") logging.debug("Stopping TCPReplicator event loop")
self._loop.call_soon_threadsafe( self._loop.call_soon_threadsafe(
self._loop.create_task, self._conditional_stop()) self._loop.create_task, self._stop_event_loop())
self.shutdown_condition.wait() self.shutdown_condition.wait()
...@@ -87,6 +87,7 @@ class TestTCPReplicator(base.TestCase): ...@@ -87,6 +87,7 @@ class TestTCPReplicator(base.TestCase):
# Thread should now be dead # Thread should now be dead
self.assertFalse(replicator.is_alive()) self.assertFalse(replicator.is_alive())
@timeout_decorator.timeout(5)
def test_start_exception(self): def test_start_exception(self):
"""Verify the run() methods kills the thread cleanly on exceptions""" """Verify the run() methods kills the thread cleanly on exceptions"""
m_loop = mock.Mock() m_loop = mock.Mock()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment