Select Git revision
test_device_sst.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_device_sst.py 1.86 KiB
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import socket
import sys
import time
from tango._tango import DevState
from .base import AbstractTestBases
class TestDeviceSST(AbstractTestBases.TestDeviceBase):
def setUp(self):
"""Intentionally recreate the device object in each test"""
super().setUp("STAT/SST/1")
def test_device_sst_send_udp(self):
port_property = {"Statistics_Client_TCP_Port": "4998"}
self.proxy.put_property(port_property)
self.proxy.initialise()
self.assertEqual(DevState.STANDBY, self.proxy.state())
self.proxy.on()
self.assertEqual(DevState.ON, self.proxy.state())
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(("device-sst", 5001))
# TODO(Corne): Change me into an actual SST packet
s1.send("Hello World!".encode("UTF-8"))
s1.close()
def test_device_sst_connect_tcp_receive(self):
port_property = {"Statistics_Client_TCP_Port": "5101"}
self.proxy.put_property(port_property)
self.proxy.initialise()
self.assertEqual(DevState.STANDBY, self.proxy.state())
self.proxy.on()
self.assertEqual(DevState.ON, self.proxy.state())
time.sleep(2)
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(("device-sst", 5001))
s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s2.connect(("device-sst", 5101))
time.sleep(2)
# TODO(Corne): Change me into an actual SST packet
m_data = "Hello World!".encode("UTF-8")
s1.send(m_data)
time.sleep(2)
data = s2.recv(sys.getsizeof(m_data))
s1.close()
s2.close()
self.assertEqual(m_data, data)