Skip to content
Snippets Groups Projects
Commit d2ed5158 authored by Hannes Feldt's avatar Hannes Feldt
Browse files

Resolve L2SS-960 "Unit tests deadlock starting python 3.7.5"

parent 7fb3ff82
No related branches found
No related tags found
1 merge request!442Resolve L2SS-960 "Unit tests deadlock starting python 3.7.5"
import atexit
from threading import Condition from threading import Condition
from threading import Semaphore from threading import Semaphore
from threading import Thread from threading import Thread
...@@ -54,7 +54,7 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -54,7 +54,7 @@ class TCPReplicator(Thread, StatisticsClientThread):
} }
def __init__(self, options: dict = None, queuesize=0): def __init__(self, options: dict = None, queuesize=0):
super().__init__() super().__init__(daemon=True)
self.queuesize = queuesize self.queuesize = queuesize
...@@ -162,7 +162,8 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -162,7 +162,8 @@ class TCPReplicator(Thread, StatisticsClientThread):
self._loop = asyncio.new_event_loop() self._loop = asyncio.new_event_loop()
# Create the input queue # Create the input queue
self.queue = asyncio.Queue(maxsize=self.queuesize, loop=self._loop) asyncio.set_event_loop(self._loop)
self.queue = asyncio.Queue(maxsize=self.queuesize)
# When wanting to debug event loop behavior, uncomment this # When wanting to debug event loop behavior, uncomment this
# self._loop.set_debug(True) # self._loop.set_debug(True)
...@@ -178,6 +179,10 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -178,6 +179,10 @@ class TCPReplicator(Thread, StatisticsClientThread):
# call self._loop.stop() # call self._loop.stop()
server_task.add_done_callback(self._server_start_callback) server_task.add_done_callback(self._server_start_callback)
# Register _clean_shutdown to be executed at termination to make
# sure all tcp connections are cleaned up.
atexit.register(self._clean_shutdown)
# Keep running event loop until self._loop.stop() is called. # Keep running event loop until self._loop.stop() is called.
# Calling this will lose control flow to the event loop # Calling this will lose control flow to the event loop
# indefinitely, upon self._loop.stop() control flow is returned # indefinitely, upon self._loop.stop() control flow is returned
...@@ -325,6 +330,10 @@ class TCPReplicator(Thread, StatisticsClientThread): ...@@ -325,6 +330,10 @@ class TCPReplicator(Thread, StatisticsClientThread):
def _clean_shutdown(self): def _clean_shutdown(self):
"""Disconnect clients, stop the event loop and wait for it to close""" """Disconnect clients, stop the event loop and wait for it to close"""
# Unregister _clean_shutdown to prevent double execution and make
# sure the thread gets cleaned up on stop/join
atexit.unregister(self._clean_shutdown)
# The event loop is not running anymore, we can't send tasks to shut # The event loop is not running anymore, we can't send tasks to shut
# it down further. # it down further.
if not self._loop.is_running(): if not self._loop.is_running():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment