Skip to content
Snippets Groups Projects
Commit 2448f22b authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-340: Start integration TCPReplicator into StatsticsClient

parent ca31a2d1
Branches
Tags
1 merge request!117create TCPReplicator for StatisticsClient
from queue import Queue
from threading import Thread
import logging
import numpy
import queue
from .comms_client import CommClient
from .tcp_replicator import TCPReplicator
from .udp_receiver import UDPReceiver
logger = logging.getLogger()
......@@ -19,7 +18,7 @@ class StatisticsClient(CommClient):
def start(self):
super().start()
def __init__(self, statistics_collector_class, host, port, fault_func, streams, try_interval=2, queuesize=1024):
def __init__(self, statistics_collector_class, udp_options, tcp_options, fault_func, streams, try_interval=2, queuesize=1024):
"""
Create the statistics client and connect() to it and get the object node.
......@@ -27,9 +26,8 @@ class StatisticsClient(CommClient):
host: hostname to listen on
port: port number to listen on
"""
self.host = host
self.port = port
self.poll_timeout = 0.1
self.udp_options = udp_options
self.tcp_options = tcp_options
self.queuesize = queuesize
self.statistics_collector_class = statistics_collector_class
......@@ -54,7 +52,8 @@ class StatisticsClient(CommClient):
"""
if not self.connected:
self.queue = Queue(maxsize=self.queuesize)
self.udp = UDPReceiver(self.host, self.port, self.queue, self.poll_timeout)
self.udp = UDPReceiver(self.queue, self.udp_options)
self.tcp = TCPReplicator(self.tcp_options)
self.statistics = self.statistics_collector_class(self.queue)
return super().connect()
......
from abc import ABC
from abc import abstractmethod
import logging
logger = logging.getLogger()
class StatisticsClientThread(ABC):
# Maximum time to wait for the Thread to get unstuck, if we want to stop
DISCONNECT_TIMEOUT = 10
@property
@abstractmethod
def _options(self) -> dict:
"""Implement me to return reasonable defaults
Don't create the variable inside this property, instead create a class
variable inside the child class and return that."""
pass
def _parse_options(self, options: dict) -> dict:
"""Parse the arguments"""
# Parse options if any otherwise return defaults
if not options:
return self._options
# Shallow copy the options, native data types and strings are immutable
temp_options = self._options.copy()
# Find all matching keys in the options arguments and override
for option, value in options.items():
if option in temp_options:
temp_options[option] = value
return temp_options
@abstractmethod
def disconnect(self):
"""Should call join with DISCONNECT_TIMEOUT, only if still alive"""
pass
......@@ -6,10 +6,12 @@ import asyncio
import logging
import time
from clients.statistics_client_thread import StatisticsClientThread
logger = logging.getLogger()
class TCPReplicator(Thread):
class TCPReplicator(Thread, StatisticsClientThread):
"""TCP replicator intended to fan out incoming UDP packets
There are three different processing layers in this class, several
......@@ -38,7 +40,7 @@ class TCPReplicator(Thread):
"""Default options for TCPReplicator
we kindly ask to not change this static variable at runtime.
"""
_options = {
_default_options = {
"tcp_bind": '127.0.0.1',
"tcp_port": 6666,
"tcp_buffer_size": 128000000, # In bytes
......@@ -65,16 +67,11 @@ class TCPReplicator(Thread):
# Connected clients the event loop is managing
self._connected_clients = []
# Shallow copy the options, native data types and strings are immutable
self.options = self._options.copy()
if not options:
return
self.options = self._parse_options(options)
# Find all matching keys in the options arguments and override
for option, value in options.items():
if option in self.options:
self.options[option] = value
@property
def _options(self) -> dict:
return TCPReplicator._default_options
class TCPServerProtocol(asyncio.Protocol):
"""TCP protocol used for connected clients"""
......@@ -173,6 +170,22 @@ class TCPReplicator(Thread):
# all child 'processes' have stopped
super().join(timeout)
def disconnect(self):
# TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver
# and StatisticsCollector.
if not self.is_alive():
return
self.join(self.DISCONNECT_TIMEOUT)
if self.is_alive():
# there is nothing we can do except wait (stall) longer, which
# could be indefinitely.
logger.error(
f"UDP thread for {self.host}:{self.port} did not shutdown after"
f"{self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling."
f"Please attach a debugger to thread ID {self.ident}.")
async def _transmit(self, data):
logger.debug("Transmitting")
for client in self._connected_clients:
......
......@@ -5,21 +5,37 @@ import logging
import socket
import time
from devices.clients.statistics_client_thread import StatisticsClientThread
logger = logging.getLogger()
class UDPReceiver(Thread):
class UDPReceiver(Thread, StatisticsClientThread):
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
# How long to wait for a stuck Thread
DISCONNECT_TIMEOUT = 10.0
# Default options for UDPReceiver
_default_options = {
"poll_timeout": 0.1,
}
def __init__(self, host, port, queue, poll_timeout=0.1):
def __init__(self, queue, options: dict = None):
self.queue = queue
self.host = host
self.port = port
self.options = self._parse_options(options)
try:
self.host = self.options['udp_host']
except Exception:
raise
try:
self.port = self.options['udp_port']
except Exception:
raise
self.poll_timeout = self.options['poll_timeout']
self.parameters = {
# Number of packets we received
......@@ -48,13 +64,17 @@ class UDPReceiver(Thread):
# Make sure we can stop receiving packets even if none arrive.
# Without this, the recvmsg() call blocks indefinitely if no packet arrives.
self.sock.settimeout(poll_timeout)
self.sock.settimeout(self.poll_timeout)
self.stream_on = True
super().__init__()
self.start()
@property
def _options(self) -> dict:
return UDPReceiver._default_options
def run(self):
# all variables are manually defined and are updated each time
logger.info("Starting UDP thread for {}:{}".format(self.host, self.port))
......@@ -92,6 +112,8 @@ class UDPReceiver(Thread):
self.sock.shutdown(socket.SHUT_RDWR)
def disconnect(self):
# TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver
# and StatisticsCollector.
if not self.is_alive():
return
......
......@@ -66,6 +66,11 @@ class Statistics(hardware_device, metaclass=ABCMeta):
mandatory=True
)
Statistics_Client_UDP_Port = device_property(
dtype='DevUShort',
mandatory=True
)
Statistics_Client_Port = device_property(
dtype='DevUShort',
mandatory=True
......@@ -91,7 +96,6 @@ class Statistics(hardware_device, metaclass=ABCMeta):
# number of UDP packets that were processed
nof_packets_processed_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_packets"}, datatype=numpy.uint64)
# number of invalid (non-SST) packets received
nof_invalid_packets_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64)
# last packet that could not be parsed
......
......@@ -4,10 +4,12 @@ import logging
import numpy
from .statistics_packet import SSTPacket
from ...clients.statistics_client_thread import StatisticsClientThread
logger = logging.getLogger()
class StatisticsCollector(Thread):
class StatisticsCollector(Thread, StatisticsClientThread):
""" Base class to process statistics packets from a queue, asynchronously. """
# Maximum number of antenna inputs we support (used to determine array sizes)
......@@ -16,8 +18,8 @@ class StatisticsCollector(Thread):
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
# Maximum time to wait for the Thread to get unstuck, if we want to stop
DISCONNECT_TIMEOUT = 10.0
# No default options required, for now?
_default_options = {}
def __init__(self, queue: Queue):
self.queue = queue
......@@ -28,6 +30,10 @@ class StatisticsCollector(Thread):
super().__init__()
self.start()
@property
def _options(self) -> dict:
return StatisticsCollector._default_options
def _default_parameters(self):
return {
"nof_packets": numpy.uint64(0),
......@@ -70,6 +76,8 @@ class StatisticsCollector(Thread):
super().join(timeout)
def disconnect(self):
# TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver
# and StatisticsCollector.
if not self.is_alive():
return
......
......@@ -29,7 +29,7 @@ class TestTCPReplicator(base.TestCase):
"""Validate option parsing"""
# Perform string copy of current tcp_bind value
t_tcp_bind = str(TCPReplicator._options['tcp_bind'])
t_tcp_bind = str(TCPReplicator._default_options['tcp_bind'])
test_options = {
"random": 12346, # I should be ignored
......@@ -39,7 +39,7 @@ class TestTCPReplicator(base.TestCase):
replicator = TCPReplicator(test_options)
# Ensure replicator initialization does not modify static variable
self.assertEqual(t_tcp_bind, TCPReplicator._options['tcp_bind'])
self.assertEqual(t_tcp_bind, TCPReplicator._default_options['tcp_bind'])
# Ensure options are correctly updated upon initialization
self.assertEqual(test_options['tcp_bind'], replicator.options['tcp_bind'])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment