Skip to content
Snippets Groups Projects
Commit 7f98e54b authored by Corné Lukken's avatar Corné Lukken
Browse files

Add TCP -> TCP mode for Python replicator

parent 19415998
Branches
No related tags found
No related merge requests found
import asyncio
import getopt
import sys
# List of currently connected clients
connected_clients = []
class Options:
tcp_bind = '127.0.0.1'
tcp_port = 666
tcp_buffer_size = 128000000
udp_mode = True
udp_bind = '127.0.0.1'
udp_port = 6666
# List of currently connected clients
connected_clients = []
class TCPServerProtocol(asyncio.Protocol):
"""TCP server used for client connections and replication data"""
"""TCP server used for client connections and sending replication data"""
def __init__(self, options: Options):
self.options = options
def connection_made(self, transport):
"""Setup client connnection and add entry to connected_clients"""
peername = transport.get_extra_info('peername')
print('TCP Connection from {}'.format(peername))
print('TCP connection from {}'.format(peername))
self.transport = transport
# Set the TCP buffer limit to 128MB
self.transport.set_write_buffer_limits(high=tcp_buffer_size)
self.transport.set_write_buffer_limits(
high=options.tcp_buffer_size)
connected_clients.append(self)
def pause_writing(self):
......@@ -30,7 +38,6 @@ class TCPServerProtocol(asyncio.Protocol):
Upon encountering a full TCP buffer we deem the client to slow and
forcefully close its connection.
"""
self.transport.abort()
def connection_lost(self, exc):
......@@ -38,15 +45,26 @@ class TCPServerProtocol(asyncio.Protocol):
Used to remove entries from connected_clients
"""
peername = self.transport.get_extra_info('peername')
print('TCP connection lost from {}'.format(peername))
connected_clients.remove(self)
async def send_datagram(self, data):
"""Async task to perform data transmission to client"""
self.transport.write(data)
class TCPClientProtocol(asyncio.Protocol):
"""TCP client connect to TCP server and retransmit data to connected_clients"""
def connection_made(self, transport):
"""Setup TCP connection"""
self.transport = transport
def data_received(self, data):
"""Replicate any received TCP data to all connected_clients"""
for connection in connected_clients:
connection.transport.write(data)
def connection_lost(self, exc):
"""Have the program exit upon losing connection so it can be restarted"""
sys.exit(2)
class UDPServerProtocol(asyncio.Protocol):
......@@ -54,30 +72,63 @@ class UDPServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
"""Setup UDP connection"""
self.transport = transport
def datagram_received(self, data, address):
"""Schedule a task to replicate data to each client for the datagram"""
"""Schedule data to be transmitted to the connected_clients"""
for connection in connected_clients:
# Submit the data as is to each connected client using async task
asyncio.create_task(connection.send_datagram(data))
connection.transport.write(data)
async def main(host, port):
async def main(options: Options):
"""Setup both UDP and TCP servers"""
loop = asyncio.get_event_loop()
tcp_server = await loop.create_server(
lambda: TCPServerProtocol(),
host, port)
lambda: TCPServerProtocol(options),
options.tcp_bind, options.tcp_port)
if not options.udp_mode:
"""None UDP mode connects to an TCP server instead of listening UDP"""
print("TCP mode establishing connection")
tcp_client = await loop.create_connection(
lambda: TCPClientProtocol(),
options.udp_bind, options.udp_port)
return
udp_transport = await loop.create_datagram_endpoint(
lambda: UDPServerProtocol(),
local_addr=(host, 6666))
local_addr=(options.udp_bind, options.udp_port))
def parse_args(options: Options):
try:
opts, args = getopt.getopt(
sys.argv[1:], 'm:u:d:p:P:', ['mode', 'upstream', 'downstream'])
for opt, arg in opts:
if opt in ('-m', '--mode'):
options.udp_mode = False if arg.lower() == 'tcp' else True
elif opt in ('-u', '--upstream'):
options.tcp_bind = arg
elif opt in ('-d', '--downstream'):
options.udp_bind = arg
elif opt in ('-p',):
options.tcp_port = int(arg)
elif opt in ('-P',):
options.udp_port = int(arg)
except getopt.GetoptError as err:
print(err)
print('usage: -m <tcp|udp> -u <upstream> -d <bind address> -p'
' <upstream port> -P <bind port>')
sys.exit(2)
"""Run this part if it is run as main scope"""
if __name__ == "__main__":
options = Options()
parse_args(options)
loop = asyncio.get_event_loop()
loop.create_task(main('127.0.0.1', 666))
loop.create_task(main(options))
loop.run_forever()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment