Skip to content
Snippets Groups Projects
Select Git revision
  • 480cbcdbba7e2f9d4951f944e361e9bc36bdaf46
  • master default protected
  • refactor-control-power-properties
  • L2SS-2199-apply-dab-to-xy
  • stop-using-mesh-gateway
  • set_hba_element_power
  • test-pytango-10.0.3
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • v0.52.8 protected
  • v0.52.7 protected
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
41 results

test_device_sst.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_device_sst.py 1.86 KiB
    
    # -*- coding: utf-8 -*-
    #
    # This file is part of the LOFAR 2.0 Station Software
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    import socket
    import sys
    import time
    
    from tango._tango import DevState
    
    from .base import AbstractTestBases
    
    
    class TestDeviceSST(AbstractTestBases.TestDeviceBase):
    
        def setUp(self):
            """Intentionally recreate the device object in each test"""
            super().setUp("STAT/SST/1")
    
        def test_device_sst_send_udp(self):
            port_property = {"Statistics_Client_TCP_Port": "4998"}
            self.proxy.put_property(port_property)
            self.proxy.initialise()
    
            self.assertEqual(DevState.STANDBY, self.proxy.state())
    
            self.proxy.on()
    
            self.assertEqual(DevState.ON, self.proxy.state())
    
            s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s1.connect(("device-sst", 5001))
    
            # TODO(Corne): Change me into an actual SST packet
            s1.send("Hello World!".encode("UTF-8"))
    
            s1.close()
    
        def test_device_sst_connect_tcp_receive(self):
            port_property = {"Statistics_Client_TCP_Port": "5101"}
            self.proxy.put_property(port_property)
            self.proxy.initialise()
    
            self.assertEqual(DevState.STANDBY, self.proxy.state())
    
            self.proxy.on()
    
            self.assertEqual(DevState.ON, self.proxy.state())
    
            time.sleep(2)
    
            s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s1.connect(("device-sst", 5001))
    
            s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s2.connect(("device-sst", 5101))
    
            time.sleep(2)
    
            # TODO(Corne): Change me into an actual SST packet
            m_data = "Hello World!".encode("UTF-8")
            s1.send(m_data)
    
            time.sleep(2)
    
            data = s2.recv(sys.getsizeof(m_data))
    
            s1.close()
            s2.close()
    
            self.assertEqual(m_data, data)