Skip to content
Snippets Groups Projects
Select Git revision
  • a6c88810a566f16c09e666b3b8ec39aea97583f9
  • master default protected
  • gec-94-bdaddecal-fixes
  • gec-94-changelog
  • integrate_predict_library
  • svp_cobalt
  • merl-112-python-release-the-gil
  • gec-94-total-bda-factor
  • merl-105-dp3-python-move-not-copy
  • rap-1132-create-dynspec-step
  • padre-filestream-input
  • ast-1238-use-xtensor-char
  • gec-110-step-infoin
  • new-everybeam-interface
  • rap-1044-data-interpolation-step
  • fix-ddecal-docs
  • two-step-faraday-constraint
  • line-search
  • azelgeo-revised
  • SVP
  • test_everybeam_multifreq
  • v6.4.1
  • v6.4
  • v6.3
  • v6.2.1
  • v6.2
  • v6.1
  • v6.0.1
  • v6.0
  • v5.3
  • v5.2
  • v5.1
  • v5.0
  • v4.2
  • v4.1
  • v4.0
  • LOFAR-Release-3_1_0
  • LOFAR-Release-3_1_1
  • LOFAR-Release-3_1_2
  • LOFAR-Release-3_1_3
  • LOFAR-Release-3_1_4
41 results

tPyDpInfo.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_tcp_replicator.py 2.95 KiB
    # -*- coding: utf-8 -*-
    #
    # This file is part of the LOFAR 2.0 Station Software
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    import logging
    import time
    import socket
    import sys
    
    from clients.tcp_replicator import TCPReplicator
    
    from integration_test import base
    
    import timeout_decorator
    
    logger = logging.getLogger()
    
    
    class TestTCPReplicator(base.IntegrationTestCase):
    
        def setUp(self):
    
            super(TestTCPReplicator, self).setUp()
    
        def test_start_stop(self):
            """Test start and stopping the server gracefully"""
    
            test_options = {
                "tcp_port": 56565,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
        def test_start_transmit_empty_stop(self):
            """Test transmitting without clients"""
    
            test_options = {
                "tcp_port": 56566,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            replicator.transmit("Hello World!".encode('utf-8'))
    
        def test_start_connect_close(self):
            test_options = {
                "tcp_port": 56567,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(2)
    
            replicator.join()
    
            self.assertEquals(b'', s.recv(9000))
    
        def test_start_connect_receive(self):
            test_options = {
                "tcp_port": 56568,  # Pick some port with low change of collision
            }
    
            m_data = "hello world".encode("utf-8")
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(2)
    
            replicator.transmit(m_data)
    
            data = s.recv(sys.getsizeof(m_data))
            s.close()
    
            self.assertEqual(m_data, data)
    
        def test_start_connect_receive_multiple(self):
            test_options = {
                "tcp_port": 56569,  # Pick some port with low change of collision
            }
    
            m_data = "hello world".encode("utf-8")
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s1.connect(("127.0.0.1", test_options['tcp_port']))
    
            s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s2.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(3)
    
            replicator.transmit(m_data)
    
            data1 = s1.recv(sys.getsizeof(m_data))
            s1.close()
    
            data2 = s2.recv(sys.getsizeof(m_data))
            s2.close()
    
            self.assertEqual(m_data, data1)
            self.assertEqual(m_data, data2)