Skip to content
Snippets Groups Projects
Commit 2fecd42f authored by Hannes Feldt's avatar Hannes Feldt
Browse files

Merge branch 'L2SS-960-fix-unittest-deadlock' into 'master'

Resolve L2SS-960 "Unit tests deadlock starting python 3.7.5"

Closes L2SS-960

See merge request !442
parents 233b5bcd d2ed5158
Branches
Tags
1 merge request!442Resolve L2SS-960 "Unit tests deadlock starting python 3.7.5"
import atexit
from threading import Condition from threading import Condition
from threading import Semaphore from threading import Semaphore
from threading import Thread from threading import Thread
...@@ -54,7 +54,7 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -54,7 +54,7 @@ class TCPReplicator(Thread, StatisticsClientThread):
} }
def __init__(self, options: dict = None, queuesize=0): def __init__(self, options: dict = None, queuesize=0):
super().__init__() super().__init__(daemon=True)
self.queuesize = queuesize self.queuesize = queuesize
...@@ -162,7 +162,8 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -162,7 +162,8 @@ class TCPReplicator(Thread, StatisticsClientThread):
self._loop = asyncio.new_event_loop() self._loop = asyncio.new_event_loop()
# Create the input queue # Create the input queue
self.queue = asyncio.Queue(maxsize=self.queuesize, loop=self._loop) asyncio.set_event_loop(self._loop)
self.queue = asyncio.Queue(maxsize=self.queuesize)
# When wanting to debug event loop behavior, uncomment this # When wanting to debug event loop behavior, uncomment this
# self._loop.set_debug(True) # self._loop.set_debug(True)
...@@ -178,6 +179,10 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -178,6 +179,10 @@ class TCPReplicator(Thread, StatisticsClientThread):
# call self._loop.stop() # call self._loop.stop()
server_task.add_done_callback(self._server_start_callback) server_task.add_done_callback(self._server_start_callback)
# Register _clean_shutdown to be executed at termination to make
# sure all tcp connections are cleaned up.
atexit.register(self._clean_shutdown)
# Keep running event loop until self._loop.stop() is called. # Keep running event loop until self._loop.stop() is called.
# Calling this will lose control flow to the event loop # Calling this will lose control flow to the event loop
# indefinitely, upon self._loop.stop() control flow is returned # indefinitely, upon self._loop.stop() control flow is returned
...@@ -325,6 +330,10 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -325,6 +330,10 @@ class TCPReplicator(Thread, StatisticsClientThread):
def _clean_shutdown(self): def _clean_shutdown(self):
"""Disconnect clients, stop the event loop and wait for it to close""" """Disconnect clients, stop the event loop and wait for it to close"""
# Unregister _clean_shutdown to prevent double execution and make
# sure the thread gets cleaned up on stop/join
atexit.unregister(self._clean_shutdown)
# The event loop is not running anymore, we can't send tasks to shut # The event loop is not running anymore, we can't send tasks to shut
# it down further. # it down further.
if not self._loop.is_running(): if not self._loop.is_running():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment