Skip to content
Snippets Groups Projects
Select Git revision
  • cbd627aec952c5cede3e0de2f0af772ff4e4c1f4
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

postgres.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_tcp_replicator.py 2.95 KiB
    # -*- coding: utf-8 -*-
    #
    # This file is part of the LOFAR 2.0 Station Software
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    import logging
    import time
    import socket
    import sys
    
    from clients.tcp_replicator import TCPReplicator
    
    from integration_test import base
    
    import timeout_decorator
    
    logger = logging.getLogger()
    
    
    class TestTCPReplicator(base.IntegrationTestCase):
    
        def setUp(self):
    
            super(TestTCPReplicator, self).setUp()
    
        def test_start_stop(self):
            """Test start and stopping the server gracefully"""
    
            test_options = {
                "tcp_port": 56565,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
        def test_start_transmit_empty_stop(self):
            """Test transmitting without clients"""
    
            test_options = {
                "tcp_port": 56566,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            replicator.transmit("Hello World!".encode('utf-8'))
    
        def test_start_connect_close(self):
            test_options = {
                "tcp_port": 56567,  # Pick some port with low change of collision
            }
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(2)
    
            replicator.join()
    
            self.assertEquals(b'', s.recv(9000))
    
        def test_start_connect_receive(self):
            test_options = {
                "tcp_port": 56568,  # Pick some port with low change of collision
            }
    
            m_data = "hello world".encode("utf-8")
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(2)
    
            replicator.transmit(m_data)
    
            data = s.recv(sys.getsizeof(m_data))
            s.close()
    
            self.assertEqual(m_data, data)
    
        def test_start_connect_receive_multiple(self):
            test_options = {
                "tcp_port": 56569,  # Pick some port with low change of collision
            }
    
            m_data = "hello world".encode("utf-8")
    
            replicator = TCPReplicator(test_options)
            replicator.start()
    
            time.sleep(2)
    
            s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s1.connect(("127.0.0.1", test_options['tcp_port']))
    
            s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s2.connect(("127.0.0.1", test_options['tcp_port']))
    
            time.sleep(3)
    
            replicator.transmit(m_data)
    
            data1 = s1.recv(sys.getsizeof(m_data))
            s1.close()
    
            data2 = s2.recv(sys.getsizeof(m_data))
            s2.close()
    
            self.assertEqual(m_data, data1)
            self.assertEqual(m_data, data2)