Skip to content
Snippets Groups Projects
Select Git revision
  • ca31a2d1bef23aa4888171b5a57d40d543052688
  • master default protected
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

test_tcp_replicator.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_tcp_replicator.py 2.95 KiB
    # -*- coding: utf-8 -*-
    #
    # This file is part of the LOFAR 2.0 Station Software
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    import logging
    import time
    import socket
    import sys
    
    from clients.tcp_replicator import TCPReplicator
    
    from integration_test import base
    
    import timeout_decorator
    
    logger = logging.getLogger()
    
    
    class TestTCPReplicator(base.IntegrationTestCase):
    
        def setUp(self):
    
            super(TestTCPReplicator, self).setUp()
    
        def test_start_stop(self):
            """Test start and stopping the server gracefully"""
    
            test_options = {
                "tcp_port": 56565,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
        def test_start_transmit_empty_stop(self):
            """Test transmitting without clients"""
    
            test_options = {
                "tcp_port": 56566,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            replicator.transmit("Hello World!".encode('utf-8'))
    
        def test_start_connect_close(self):
            test_options = {
                "tcp_port": 56567,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(2)
    
            replicator.join()
    
            self.assertEquals(b'', s.recv(9000))
    
        def test_start_connect_receive(self):
            test_options = {
                "tcp_port": 56568,  # Pick some port with low change of collision
            }
    
            m_data = "hello world".encode("utf-8")
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(2)
    
            replicator.transmit(m_data)
    
            data = s.recv(sys.getsizeof(m_data))
            s.close()
    
            self.assertEqual(m_data, data)
    
        def test_start_connect_receive_multiple(self):
            test_options = {
                "tcp_port": 56569,  # Pick some port with low change of collision
            }
    
            m_data = "hello world".encode("utf-8")
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s1.connect(("127.0.0.1", test_options['tcp_port']))
    
            s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s2.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(3)
    
            replicator.transmit(m_data)
    
            data1 = s1.recv(sys.getsizeof(m_data))
            s1.close()
    
            data2 = s2.recv(sys.getsizeof(m_data))
            s2.close()
    
            self.assertEqual(m_data, data1)
            self.assertEqual(m_data, data2)