Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
tango
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Jira issues
Open Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
LOFAR2.0
tango
Merge requests
!117
create TCPReplicator for StatisticsClient
Code
Review changes
Check out branch
Open in Workspace
Download
Patches
Plain diff
Merged
create TCPReplicator for StatisticsClient
L2SS-340-tcp-statisticsclient-server
into
master
Overview
25
Commits
45
Pipelines
0
Changes
4
Merged
Corné Lukken
requested to merge
L2SS-340-tcp-statisticsclient-server
into
master
3 years ago
Overview
24
Commits
45
Pipelines
0
Changes
4
Expand
Closes
L2SS-340
Edited
3 years ago
by
Corné Lukken
0
0
Merge request reports
Compare
master
version 31
45b2f370
3 years ago
version 30
ac45f922
3 years ago
version 29
53ccfe46
3 years ago
version 28
8fef299e
3 years ago
version 27
1e3a2c3a
3 years ago
version 26
8f2b739a
3 years ago
version 25
b1c53309
3 years ago
version 24
1e8df46b
3 years ago
version 23
d461a946
3 years ago
version 22
fc0a6291
3 years ago
version 21
7876dcc7
3 years ago
version 20
f7104185
3 years ago
version 19
ec26aeeb
3 years ago
version 18
9e71a065
3 years ago
version 17
a7f25724
3 years ago
version 16
9800ed38
3 years ago
version 15
01d715f7
3 years ago
version 14
3941f00e
3 years ago
version 13
ae90344a
3 years ago
version 12
dbf2e359
3 years ago
version 11
2b15b958
3 years ago
version 10
5d1847d5
3 years ago
version 9
ddc05fe0
3 years ago
version 8
893e31f8
3 years ago
version 7
5d22a1d8
3 years ago
version 6
5241984a
3 years ago
version 5
e0413213
3 years ago
version 4
2448f22b
3 years ago
version 3
ca31a2d1
3 years ago
version 2
59b9fe64
3 years ago
version 1
64f745f6
3 years ago
master (base)
and
version 3
latest version
cf1b6740
45 commits,
3 years ago
version 31
45b2f370
44 commits,
3 years ago
version 30
ac45f922
43 commits,
3 years ago
version 29
53ccfe46
42 commits,
3 years ago
version 28
8fef299e
41 commits,
3 years ago
version 27
1e3a2c3a
40 commits,
3 years ago
version 26
8f2b739a
39 commits,
3 years ago
version 25
b1c53309
38 commits,
3 years ago
version 24
1e8df46b
34 commits,
3 years ago
version 23
d461a946
33 commits,
3 years ago
version 22
fc0a6291
32 commits,
3 years ago
version 21
7876dcc7
29 commits,
3 years ago
version 20
f7104185
28 commits,
3 years ago
version 19
ec26aeeb
27 commits,
3 years ago
version 18
9e71a065
26 commits,
3 years ago
version 17
a7f25724
23 commits,
3 years ago
version 16
9800ed38
22 commits,
3 years ago
version 15
01d715f7
21 commits,
3 years ago
version 14
3941f00e
20 commits,
3 years ago
version 13
ae90344a
17 commits,
3 years ago
version 12
dbf2e359
16 commits,
3 years ago
version 11
2b15b958
15 commits,
3 years ago
version 10
5d1847d5
13 commits,
3 years ago
version 9
ddc05fe0
12 commits,
3 years ago
version 8
893e31f8
11 commits,
3 years ago
version 7
5d22a1d8
9 commits,
3 years ago
version 6
5241984a
8 commits,
3 years ago
version 5
e0413213
6 commits,
3 years ago
version 4
2448f22b
5 commits,
3 years ago
version 3
ca31a2d1
4 commits,
3 years ago
version 2
59b9fe64
3 commits,
3 years ago
version 1
64f745f6
2 commits,
3 years ago
4 files
+
491
−
1
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
4
Search (e.g. *.vue) (Ctrl+P)
devices/clients/tcp_replicator.py
0 → 100644
+
233
−
0
Options
from
threading
import
Condition
from
threading
import
Thread
from
threading
import
Semaphore
import
asyncio
import
logging
import
time
logger
=
logging
.
getLogger
()
class
TCPReplicator
(
Thread
):
"""
TCP replicator intended to fan out incoming UDP packets
There are three different processing layers in this class, several
methods can be called from the context of the thread that spawned this
class (main thread). These include: __init__, transmit, join and start.
When start is called, the thread will launch, this will call run from the
context of this new thread. This thread will create the new event loop as
this can only be done from the context of the thread you desire to use the
event loop in. A semaphore is used to prevent a potential race between this
new thread setting up the event loop and the main thread trying to tear it
down by calling join. Similarly, transmit also uses this semaphore to
prevent scheduling transmissions before the thread has fully started.
The final layer is the event loop itself, it handles instances of the
TCPServerProtocol. These can be found in the _connected_clients list.
However, only async task are allowed to call methods on these objects!
The async methods are _transmit, _disconnect, _condtional_stop and
_run_server.
Tearing down the thread in __del__ is not needed as upon deconstruction
Python will always call join.
"""
"""
Default options for TCPReplicator
we kindly ask to not change this static variable at runtime.
"""
_options
=
{
"
tcp_bind
"
:
'
127.0.0.1
'
,
"
tcp_port
"
:
6666
,
"
tcp_buffer_size
"
:
128000000
,
# In bytes
}
def
__init__
(
self
,
options
:
dict
=
None
):
super
().
__init__
()
"""
Reserve asyncio event loop attribute but don
'
t create it yet.
This event loop is created inside the new Thread, the result is that
the thread owns the event loop! EVENT LOOPS ARE NOT THREAD SAFE ALL
CALLS TO THE EVENT LOOP OBJECT MUST USE THE call_soon_threadsafe
FUNCTION!!
"""
self
.
_loop
=
None
# Create and acquire lock to prevent premature termination in join
self
.
initialization_semaphore
=
Semaphore
()
self
.
initialization_semaphore
.
acquire
()
# Create condition to orchestrate clean shutdown
self
.
shutdown_condition
=
Condition
()
# Connected clients the event loop is managing
self
.
_connected_clients
=
[]
# Shallow copy the options, native data types and strings are immutable
self
.
options
=
self
.
_options
.
copy
()
if
not
options
:
return
# Find all matching keys in the options arguments and override
for
option
,
value
in
options
.
items
():
if
option
in
self
.
options
:
self
.
options
[
option
]
=
value
class
TCPServerProtocol
(
asyncio
.
Protocol
):
"""
TCP protocol used for connected clients
"""
def
__init__
(
self
,
options
:
dict
,
connected_clients
:
list
):
self
.
options
=
options
# Make connected_clients reflect the TCPReplicator connected_clients
self
.
connected_clients
=
connected_clients
def
connection_made
(
self
,
transport
):
"""
Setup client connection and add entry to connected_clients
"""
peername
=
transport
.
get_extra_info
(
'
peername
'
)
logger
.
debug
(
'
TCP connection from {}
'
.
format
(
peername
))
self
.
transport
=
transport
# Set the TCP buffer limit
self
.
transport
.
set_write_buffer_limits
(
high
=
self
.
options
[
'
tcp_buffer_size
'
])
self
.
connected_clients
.
append
(
self
)
def
pause_writing
(
self
):
"""
Called when TCP buffer for the specific connection is full
Upon encountering a full TCP buffer we deem the client to slow and
forcefully close its connection.
"""
self
.
transport
.
abort
()
def
connection_lost
(
self
,
exc
):
"""
Called when connection is lost
Used to remove entries from connected_clients
"""
peername
=
self
.
transport
.
get_extra_info
(
'
peername
'
)
logger
.
debug
(
'
TCP connection lost from {}
'
.
format
(
peername
))
self
.
connected_clients
.
remove
(
self
)
def
eof_received
(
self
):
"""
After eof_received, connection_lost is still called
"""
pass
def
run
(
self
):
"""
Run is launched by calling .start() on TCPReplicator
It manages an asyncio event loop to orchestrate our TCPServerProtocol.
"""
logger
.
info
(
"
Starting TCPReplicator thread
"
)
# Create the event loop, must be done in the new thread
self
.
_loop
=
asyncio
.
new_event_loop
()
# TODO(Corne): REMOVE ME
self
.
_loop
.
set_debug
(
True
)
# Schedule the task to create the server
self
.
_loop
.
create_task
(
TCPReplicator
.
_run_server
(
self
.
options
,
self
.
_connected_clients
))
# Everything is initialized, join can now safely be called
self
.
initialization_semaphore
.
release
()
# Keep running event loop until self._loop.stop() is called
self
.
_loop
.
run_forever
()
# Stop must have been called, close the event loop
with
self
.
shutdown_condition
:
logger
.
debug
(
"
Closing TCPReplicator event loop
"
)
self
.
_loop
.
close
()
self
.
shutdown_condition
.
notify
()
return
def
transmit
(
self
,
data
:
bytes
):
"""
Transmit data to connected clients
"""
if
not
isinstance
(
data
,
(
bytes
,
bytearray
)):
raise
TypeError
(
"
Data must be byte-like object
"
)
with
self
.
initialization_semaphore
:
if
not
self
.
_loop
.
is_running
():
logger
.
warning
(
"
Attempt to transmit with TCPReplicator before
"
"
fully started.
"
)
return
self
.
_loop
.
call_soon_threadsafe
(
self
.
_loop
.
create_task
,
self
.
_transmit
(
data
))
def
join
(
self
,
timeout
=
None
):
with
self
.
initialization_semaphore
:
logging
.
info
(
"
Received shutdown request on TCPReplicator thread
"
)
self
.
_clean_shutdown
()
# Only call join at the end otherwise Thread will falsely assume
# all child 'processes' have stopped
super
().
join
(
timeout
)
async
def
_transmit
(
self
,
data
):
logger
.
debug
(
"
Transmitting
"
)
for
client
in
self
.
_connected_clients
:
client
.
transport
.
write
(
data
)
async
def
_disconnect
(
self
):
with
self
.
shutdown_condition
:
for
client
in
self
.
_connected_clients
:
peername
=
client
.
transport
.
get_extra_info
(
'
peername
'
)
logger
.
debug
(
'
Disconnecting client {}
'
.
format
(
peername
))
client
.
transport
.
abort
()
self
.
shutdown_condition
.
notify
()
async
def
_conditional_stop
(
self
):
with
self
.
shutdown_condition
:
self
.
_loop
.
stop
()
@staticmethod
async
def
_run_server
(
options
:
dict
,
connected_clients
:
list
):
"""
Retrieve the event loop created in run() and launch the server
"""
loop
=
asyncio
.
get_event_loop
()
tcp_server
=
await
loop
.
create_server
(
lambda
:
TCPReplicator
.
TCPServerProtocol
(
options
,
connected_clients
),
options
[
'
tcp_bind
'
],
options
[
'
tcp_port
'
])
def
_clean_shutdown
(
self
):
"""
Disconnect clients, stop the event loop and wait for it to close
"""
# This should never ever happen, semaphore race condition
if
not
self
.
_loop
:
logging
.
error
(
"
TCPReplicator event loop unset, early termination?!
"
)
return
with
self
.
shutdown_condition
:
self
.
_loop
.
call_soon_threadsafe
(
self
.
_loop
.
create_task
,
self
.
_disconnect
())
self
.
shutdown_condition
.
wait
()
if
self
.
_loop
.
is_running
():
with
self
.
shutdown_condition
:
logging
.
debug
(
"
Stopping TCPReplicator event loop
"
)
self
.
_loop
.
call_soon_threadsafe
(
self
.
_loop
.
create_task
,
self
.
_conditional_stop
())
self
.
shutdown_condition
.
wait
()
# Should never happen, conditional race condition
while
self
.
_loop
.
is_running
():
logging
.
error
(
"
TCPReplicator event loop still running after
"
"
returning from condition.wait!
"
)
time
.
sleep
(
1
)
# Should never happen, conditional race condition
while
not
self
.
_loop
.
is_closed
():
logging
.
error
(
"
TCPReplicator event loop not closed after
"
"
returning from condition.wait!
"
)
time
.
sleep
(
1
)
Loading