Skip to content
Snippets Groups Projects
Select Git revision
  • ae3445f152ef89988df4a76cce0e11a876de075e
  • MCCS-163 default
  • main
  • sar-277-update-docs-with-examples-for-lrc
  • st-946-automate
  • sar_302-log-fix
  • sar-287_subarray_commands_to_lrc
  • sar_302-POC_await_sub_device_state
  • sat_302_fix_pipelines
  • sar-286_lrc_one_subarry_command
  • sar-286_lrc_improvements
  • sar-288-async-controller
  • sar-276-combine-tango-queue
  • sar-255_remove_nexus_reference
  • sar-275-add-LRC
  • sar-273-add-lrc-attributes
  • sar-272
  • sp-1106-marvin-1230525148-ska-tango-base
  • sp-1106-marvin-813091765-ska-tango-base
  • sar-255/Publish-package-to-CAR
  • mccs-661-device-under-test-fixture
  • mccs-659-pep257-docstring-linting
  • 0.11.3
  • 0.11.2
  • 0.11.1
  • 0.11.0
  • 0.10.1
  • 0.10.0
  • 0.9.1
  • 0.9.0
  • 0.8.1
  • 0.8.0
  • 0.7.2
  • 0.7.1
  • 0.7.0
  • 0.6.6
  • 0.6.5
  • 0.6.4
  • 0.6.3
  • 0.6.2
  • 0.6.1
  • 0.6.0
42 results

purge_xmi_tree.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_tcp_replicator.py 2.95 KiB
    # -*- coding: utf-8 -*-
    #
    # This file is part of the LOFAR 2.0 Station Software
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    import logging
    import time
    import socket
    import sys
    
    from clients.tcp_replicator import TCPReplicator
    
    from integration_test import base
    
    import timeout_decorator
    
    logger = logging.getLogger()
    
    
    class TestTCPReplicator(base.IntegrationTestCase):
    
        def setUp(self):
    
            super(TestTCPReplicator, self).setUp()
    
        def test_start_stop(self):
            """Test start and stopping the server gracefully"""
    
            test_options = {
                "tcp_port": 56565,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
        def test_start_transmit_empty_stop(self):
            """Test transmitting without clients"""
    
            test_options = {
                "tcp_port": 56566,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            replicator.transmit("Hello World!".encode('utf-8'))
    
        def test_start_connect_close(self):
            test_options = {
                "tcp_port": 56567,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(2)
    
            replicator.join()
    
            self.assertEquals(b'', s.recv(9000))
    
        def test_start_connect_receive(self):
            test_options = {
                "tcp_port": 56568,  # Pick some port with low change of collision
            }
    
            m_data = "hello world".encode("utf-8")
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(2)
    
            replicator.transmit(m_data)
    
            data = s.recv(sys.getsizeof(m_data))
            s.close()
    
            self.assertEqual(m_data, data)
    
        def test_start_connect_receive_multiple(self):
            test_options = {
                "tcp_port": 56569,  # Pick some port with low change of collision
            }
    
            m_data = "hello world".encode("utf-8")
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s1.connect(("127.0.0.1", test_options['tcp_port']))
    
            s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s2.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(3)
    
            replicator.transmit(m_data)
    
            data1 = s1.recv(sys.getsizeof(m_data))
            s1.close()
    
            data2 = s2.recv(sys.getsizeof(m_data))
            s2.close()
    
            self.assertEqual(m_data, data1)
            self.assertEqual(m_data, data2)