From 5241984a79bd6ea55bc1c676ebce9e1d49969de6 Mon Sep 17 00:00:00 2001 From: lukken <lukken@astron.nl> Date: Tue, 21 Sep 2021 10:27:33 +0000 Subject: [PATCH] L2SS-340: Separate condition names for different stages --- devices/clients/tcp_replicator.py | 24 ++++++++++++++------- devices/test/clients/test_tcp_replicator.py | 1 + 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/devices/clients/tcp_replicator.py b/devices/clients/tcp_replicator.py index 39e9e64dd..ce3ba0d3b 100644 --- a/devices/clients/tcp_replicator.py +++ b/devices/clients/tcp_replicator.py @@ -61,8 +61,11 @@ class TCPReplicator(Thread, StatisticsClientThread): self.initialization_semaphore = Semaphore() self.initialization_semaphore.acquire() - # Create condition to orchestrate clean shutdown - self.shutdown_condition = Condition() + # Create condition to orchestrate clean disconnecting and shutdown + # They are actually the same object, just with different names for + # clarity. + self.disconnect_condition = Condition() + self.shutdown_condition = self.disconnect_condition # Connected clients the event loop is managing self._connected_clients = [] @@ -162,11 +165,15 @@ class TCPReplicator(Thread, StatisticsClientThread): # on the main thread. logging.fatal("TCPReplicator thread encountered fatal exception: " "{}".format(e)) + # We will lose the exception and the original stacktrace of the # thread. Once we use a threadpool it will be much easier to # retrieve this so I propose to not bother implementing it now. # For the pattern to do this see anyway: # https://stackoverflow.com/a/6894023 + + # Due to the exception the run method will return making is_alive() + # false finally: # Always release the lock upon error so the constructor can return if self.initialization_semaphore.acquire(blocking=False) is False: @@ -212,15 +219,16 @@ class TCPReplicator(Thread, StatisticsClientThread): client.transport.write(data) async def _disconnect(self): - with self.shutdown_condition: + with self.disconnect_condition: for client in self._connected_clients: peername = client.transport.get_extra_info('peername') logger.debug('Disconnecting client {}'.format(peername)) client.transport.abort() - self.shutdown_condition.notify() + self.disconnect_condition.notify() - async def _conditional_stop(self): + async def _stop_event_loop(self): with self.shutdown_condition: + # Calling stop() will return control flow to self._loop.run_*() self._loop.stop() @staticmethod @@ -245,14 +253,14 @@ class TCPReplicator(Thread, StatisticsClientThread): if not self._loop.is_running(): return - with self.shutdown_condition: + with self.disconnect_condition: self._loop.call_soon_threadsafe( self._loop.create_task, self._disconnect()) - self.shutdown_condition.wait() + self.disconnect_condition.wait() if self._loop.is_running(): with self.shutdown_condition: logging.debug("Stopping TCPReplicator event loop") self._loop.call_soon_threadsafe( - self._loop.create_task, self._conditional_stop()) + self._loop.create_task, self._stop_event_loop()) self.shutdown_condition.wait() diff --git a/devices/test/clients/test_tcp_replicator.py b/devices/test/clients/test_tcp_replicator.py index 0b8039de7..b487a48e0 100644 --- a/devices/test/clients/test_tcp_replicator.py +++ b/devices/test/clients/test_tcp_replicator.py @@ -87,6 +87,7 @@ class TestTCPReplicator(base.TestCase): # Thread should now be dead self.assertFalse(replicator.is_alive()) + @timeout_decorator.timeout(5) def test_start_exception(self): """Verify the run() methods kills the thread cleanly on exceptions""" m_loop = mock.Mock() -- GitLab