Skip to content
Snippets Groups Projects
Commit ca31a2d1 authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-340: Integration tests for TCPReplicator

parent 59b9fe64
Branches
No related tags found
1 merge request!117create TCPReplicator for StatisticsClient
...@@ -76,7 +76,7 @@ class TCPReplicator(Thread): ...@@ -76,7 +76,7 @@ class TCPReplicator(Thread):
if option in self.options: if option in self.options:
self.options[option] = value self.options[option] = value
class TCPServerProtocol(asyncio.BaseProtocol): class TCPServerProtocol(asyncio.Protocol):
"""TCP protocol used for connected clients""" """TCP protocol used for connected clients"""
def __init__(self, options: dict, connected_clients: list): def __init__(self, options: dict, connected_clients: list):
...@@ -92,7 +92,7 @@ class TCPReplicator(Thread): ...@@ -92,7 +92,7 @@ class TCPReplicator(Thread):
self.transport = transport self.transport = transport
# Set the TCP buffer limit # Set the TCP buffer limit
self.transport.set_write_buffer_limits( self.transport.set_write_buffer_limits(
high=options['tcp_buffer_size']) high=self.options['tcp_buffer_size'])
self.connected_clients.append(self) self.connected_clients.append(self)
def pause_writing(self): def pause_writing(self):
...@@ -112,6 +112,10 @@ class TCPReplicator(Thread): ...@@ -112,6 +112,10 @@ class TCPReplicator(Thread):
logger.debug('TCP connection lost from {}'.format(peername)) logger.debug('TCP connection lost from {}'.format(peername))
self.connected_clients.remove(self) self.connected_clients.remove(self)
def eof_received(self):
"""After eof_received, connection_lost is still called"""
pass
def run(self): def run(self):
"""Run is launched by calling .start() on TCPReplicator """Run is launched by calling .start() on TCPReplicator
...@@ -144,8 +148,12 @@ class TCPReplicator(Thread): ...@@ -144,8 +148,12 @@ class TCPReplicator(Thread):
return return
def transmit(self, data): def transmit(self, data: bytes):
"""Transmit data to connected clients""" """Transmit data to connected clients"""
if not isinstance(data, (bytes, bytearray)):
raise TypeError("Data must be byte-like object")
with self.initialization_semaphore: with self.initialization_semaphore:
if not self._loop.is_running(): if not self._loop.is_running():
logger.warning("Attempt to transmit with TCPReplicator before" logger.warning("Attempt to transmit with TCPReplicator before"
...@@ -173,6 +181,8 @@ class TCPReplicator(Thread): ...@@ -173,6 +181,8 @@ class TCPReplicator(Thread):
async def _disconnect(self): async def _disconnect(self):
with self.shutdown_condition: with self.shutdown_condition:
for client in self._connected_clients: for client in self._connected_clients:
peername = client.transport.get_extra_info('peername')
logger.debug('Disconnecting client {}'.format(peername))
client.transport.abort() client.transport.abort()
self.shutdown_condition.notify() self.shutdown_condition.notify()
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import logging
import time
import socket
import sys
from clients.tcp_replicator import TCPReplicator
from integration_test import base
import timeout_decorator
logger = logging.getLogger()
class TestTCPReplicator(base.IntegrationTestCase):
def setUp(self):
super(TestTCPReplicator, self).setUp()
def test_start_stop(self):
"""Test start and stopping the server gracefully"""
test_options = {
"tcp_port": 56565, # Pick some port with low change of collision
}
replicator = TCPReplicator(test_options)
replicator.start()
def test_start_transmit_empty_stop(self):
"""Test transmitting without clients"""
test_options = {
"tcp_port": 56566, # Pick some port with low change of collision
}
replicator = TCPReplicator(test_options)
replicator.start()
replicator.transmit("Hello World!".encode('utf-8'))
def test_start_connect_close(self):
test_options = {
"tcp_port": 56567, # Pick some port with low change of collision
}
replicator = TCPReplicator(test_options)
replicator.start()
time.sleep(2)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", test_options['tcp_port']))
time.sleep(2)
replicator.join()
self.assertEquals(b'', s.recv(9000))
def test_start_connect_receive(self):
test_options = {
"tcp_port": 56568, # Pick some port with low change of collision
}
m_data = "hello world".encode("utf-8")
replicator = TCPReplicator(test_options)
replicator.start()
time.sleep(2)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", test_options['tcp_port']))
time.sleep(2)
replicator.transmit(m_data)
data = s.recv(sys.getsizeof(m_data))
s.close()
self.assertEqual(m_data, data)
def test_start_connect_receive_multiple(self):
test_options = {
"tcp_port": 56569, # Pick some port with low change of collision
}
m_data = "hello world".encode("utf-8")
replicator = TCPReplicator(test_options)
replicator.start()
time.sleep(2)
s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s1.connect(("127.0.0.1", test_options['tcp_port']))
s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s2.connect(("127.0.0.1", test_options['tcp_port']))
time.sleep(3)
replicator.transmit(m_data)
data1 = s1.recv(sys.getsizeof(m_data))
s1.close()
data2 = s2.recv(sys.getsizeof(m_data))
s2.close()
self.assertEqual(m_data, data1)
self.assertEqual(m_data, data2)
...@@ -12,5 +12,4 @@ Pygments>=2.6.0 ...@@ -12,5 +12,4 @@ Pygments>=2.6.0
stestr>=3.0.0 # Apache-2.0 stestr>=3.0.0 # Apache-2.0
testscenarios>=0.5.0 # Apache-2.0/BSD testscenarios>=0.5.0 # Apache-2.0/BSD
testtools>=2.4.0 # MIT testtools>=2.4.0 # MIT
timeout-decorator>=0.5 timeout-decorator>=0.5 # MIT
...@@ -93,7 +93,7 @@ class TestTCPReplicator(base.TestCase): ...@@ -93,7 +93,7 @@ class TestTCPReplicator(base.TestCase):
"""Test that clients are getting data written to their transport""" """Test that clients are getting data written to their transport"""
m_run_server.return_value = self.dummy_task() m_run_server.return_value = self.dummy_task()
m_data = "Hello World!" m_data = "Hello World!".encode('utf-8')
m_client = mock.Mock() m_client = mock.Mock()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment