From 893e31f83aa4007efc0090ed4042f47f71163609 Mon Sep 17 00:00:00 2001
From: lukken <lukken@astron.nl>
Date: Tue, 21 Sep 2021 18:30:36 +0000
Subject: [PATCH] L2SS-340: test double queue functionality

---
 CDB/LOFAR_ConfigDb.json                       |  5 +-
 CDB/integration_ConfigDb.json                 | 48 +++++++++++++
 CDB/sdp-sim-config.json                       |  5 +-
 CDB/thijs_ConfigDb.json                       |  5 +-
 devices/clients/statistics_client.py          | 35 +++++++---
 devices/clients/tcp_replicator.py             | 38 +++++++---
 devices/clients/udp_receiver.py               | 13 ++--
 devices/devices/sdp/statistics.py             | 19 ++++-
 .../client/test_tcp_replicator.py             | 53 ++++++++++++--
 devices/test/clients/test_tcp_replicator.py   | 69 ++++++++++++++++---
 10 files changed, 244 insertions(+), 46 deletions(-)

diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json
index 197686104..00a2035a6 100644
--- a/CDB/LOFAR_ConfigDb.json
+++ b/CDB/LOFAR_ConfigDb.json
@@ -742,9 +742,12 @@
                 "SST": {
                     "LTS/SST/1": {
                         "properties": {
-                            "Statistics_Client_Port": [
+                            "Statistics_Client_UDP_Port": [
                                 "5001"
                             ],
+                            "Statistics_Client_TCP_Port": [
+                                "5002"
+                            ],
                             "OPC_Server_Name": [
                                 "dop36.astron.nl"
                             ],
diff --git a/CDB/integration_ConfigDb.json b/CDB/integration_ConfigDb.json
index b2f9cca6d..cf113340e 100644
--- a/CDB/integration_ConfigDb.json
+++ b/CDB/integration_ConfigDb.json
@@ -32,6 +32,54 @@
                             ],
                             "OPC_Time_Out": [
                                 "5.0"
+                            ],
+                            "FPGA_sdp_info_station_id_RW_default": [
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901",
+                                "901"
+                            ],
+                            "polled_attr": [
+                                "fpga_temp_r",
+                                "1000",
+                                "state",
+                                "1000",
+                                "status",
+                                "1000",
+                                "fpga_mask_rw",
+                                "1000",
+                                "fpga_scrap_r",
+                                "1000",
+                                "fpga_scrap_rw",
+                                "1000",
+                                "fpga_status_r",
+                                "1000",
+                                "fpga_version_r",
+                                "1000",
+                                "fpga_weights_r",
+                                "1000",
+                                "fpga_weights_rw",
+                                "1000",
+                                "tr_busy_r",
+                                "1000",
+                                "tr_reload_rw",
+                                "1000",
+                                "tr_tod_r",
+                                "1000",
+                                "tr_uptime_r",
+                                "1000"
                             ]
                         }
                     }
diff --git a/CDB/sdp-sim-config.json b/CDB/sdp-sim-config.json
index 64b841e1d..eb36b0c5a 100644
--- a/CDB/sdp-sim-config.json
+++ b/CDB/sdp-sim-config.json
@@ -24,9 +24,12 @@
                 "SST": {
                     "LTS/SST/1": {
                         "properties": {
-                            "Statistics_Client_Port": [
+                            "Statistics_Client_UDP_Port": [
                                 "5001"
                             ],
+                            "Statistics_Client_TCP_Port": [
+                                "5002"
+                            ],
                             "OPC_Server_Name": [
                                 "sdptr-sim"
                             ],
diff --git a/CDB/thijs_ConfigDb.json b/CDB/thijs_ConfigDb.json
index 37ae6d7b6..78728bfcd 100644
--- a/CDB/thijs_ConfigDb.json
+++ b/CDB/thijs_ConfigDb.json
@@ -94,9 +94,12 @@
                 "SST": {
                     "LTS/SST/1": {
                         "properties": {
-                            "Statistics_Client_Port": [
+                            "Statistics_Client_UDP_Port": [
                                 "5001"
                             ],
+                            "Statistics_Client_TCP_Port": [
+                                "5002"
+                            ],
                             "OPC_Server_Name": [
                                 "dop36.astron.nl"
                             ],
diff --git a/devices/clients/statistics_client.py b/devices/clients/statistics_client.py
index 61a6befaf..fdbb80fe0 100644
--- a/devices/clients/statistics_client.py
+++ b/devices/clients/statistics_client.py
@@ -39,9 +39,10 @@ class StatisticsClient(CommClient):
             fault_func()
             return
 
-    def queue_fill_percentage(self):
+    @staticmethod
+    def _queue_fill_percentage(queue: Queue):
         try:
-            return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0
+            return 100 * queue.qsize() / queue.maxsize if queue.maxsize else 0
         except NotImplementedError:
             # some platforms don't have qsize(), nothing we can do here
             return 0
@@ -51,10 +52,14 @@ class StatisticsClient(CommClient):
         Function used to connect to the client.
         """
         if not self.connected:
-            self.queue = Queue(maxsize=self.queuesize)
-            self.udp = UDPReceiver(self.queue, self.udp_options)
-            self.tcp = TCPReplicator(self.tcp_options)
-            self.statistics = self.statistics_collector_class(self.queue)
+            self.collector_queue = Queue(maxsize=self.queuesize)
+            self.replicator_queue = Queue(maxsize=self.queuesize)
+
+            self.udp = UDPReceiver(self.collector_queue, self.replicator_queue,
+                                   self.udp_options)
+
+            self.tcp = TCPReplicator(self.replicator_queue, self.tcp_options)
+            self.statistics = self.statistics_collector_class(self.collector_queue)
 
         return super().connect()
 
@@ -65,6 +70,9 @@ class StatisticsClient(CommClient):
         if not self.udp.is_alive():
             raise Exception("UDP thread died unexpectedly")
 
+        if not self.tcp.is_alive():
+            raise Exception("TCPReplicator thread died unexpectedly")
+
     def disconnect(self):
         # explicit disconnect, instead of waiting for the GC to kick in after "del" below
         try:
@@ -78,7 +86,13 @@ class StatisticsClient(CommClient):
         except Exception:
             # nothing we can do, but we should continue cleaning up
             logger.log_exception("Could not disconnect UDP receiver class")
-        
+
+        try:
+            self.tcp.disconnect()
+        except Exception:
+            logger.log_exception("Could not disconnect TCPReplicator class")
+
+        del self.tcp
         del self.udp
         del self.statistics
         del self.queue
@@ -110,9 +124,12 @@ class StatisticsClient(CommClient):
             def read_function():
                 return self.udp.parameters[parameter]
         elif annotation["type"] == "queue":
-            if parameter == "fill_percentage":
+            if parameter == "collector_fill_percentage":
+                def read_function():
+                    return numpy.uint64(self._queue_fill_percentage(self.collector_queue))
+            elif parameter == "replicator_fill_percentage":
                 def read_function():
-                    return numpy.uint64(self.queue_fill_percentage())
+                    return numpy.uint64(self._queue_fill_percentage(self.replicator_queue))
             else:
                 raise ValueError("Unknown queue parameter requested: %s" % parameter)
 
diff --git a/devices/clients/tcp_replicator.py b/devices/clients/tcp_replicator.py
index b9738ee26..df5e173b1 100644
--- a/devices/clients/tcp_replicator.py
+++ b/devices/clients/tcp_replicator.py
@@ -1,6 +1,8 @@
 from threading import Condition
 from threading import Thread
 from threading import Semaphore
+from queue import Empty
+from queue import Queue
 
 import asyncio
 import logging
@@ -40,14 +42,16 @@ class TCPReplicator(Thread, StatisticsClientThread):
     we kindly ask to not change this static variable at runtime.
     """
     _default_options = {
-        "tcp_bind": '127.0.0.1',
+        "tcp_bind": '0.0.0.0',
         "tcp_port": 6666,
         "tcp_buffer_size": 128000000,  # In bytes
     }
 
-    def __init__(self, options: dict = None):
+    def __init__(self, queue: Queue, options: dict = None):
         super().__init__()
 
+        self.queue = queue
+
         """Reserve asyncio event loop attribute but don't create it yet.
         This event loop is created inside the new Thread, the result is that
         the thread owns the event loop! EVENT LOOPS ARE NOT THREAD SAFE ALL
@@ -143,6 +147,8 @@ class TCPReplicator(Thread, StatisticsClientThread):
             # When wanting to debug event loop behavior, uncomment this
             # self._loop.set_debug(True)
 
+            self._loop.create_task(self._process_queue())
+
             # Schedule the task to create the server
             self._loop.create_task(TCPReplicator._run_server(
                 self.options, self._connected_clients))
@@ -213,6 +219,25 @@ class TCPReplicator(Thread, StatisticsClientThread):
                 f"{self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling."
                 f"Please attach a debugger to thread ID {self.ident}.")
 
+    @staticmethod
+    async def _run_server(options: dict, connected_clients: list):
+        """Retrieve the event loop created in run() and launch the server"""
+        loop = asyncio.get_event_loop()
+
+        tcp_server = await loop.create_server(
+            lambda: TCPReplicator.TCPServerProtocol(options, connected_clients),
+            options['tcp_bind'], options['tcp_port'])
+
+    async def _process_queue(self):
+        """Take a packet from the queue and reschedule this task"""
+        try:
+            packet = self.queue.get(block=False)
+            self._loop.create_task(self._transmit(packet))
+        except Empty:
+            pass
+        finally:
+            self._loop.create_task(self._process_queue())
+
     async def _transmit(self, data):
         logger.debug("Transmitting")
         for client in self._connected_clients:
@@ -231,15 +256,6 @@ class TCPReplicator(Thread, StatisticsClientThread):
             # Calling stop() will return control flow to self._loop.run_*()
             self._loop.stop()
 
-    @staticmethod
-    async def _run_server(options: dict, connected_clients: list):
-        """Retrieve the event loop created in run() and launch the server"""
-        loop = asyncio.get_event_loop()
-
-        tcp_server = await loop.create_server(
-            lambda: TCPReplicator.TCPServerProtocol(options, connected_clients),
-            options['tcp_bind'], options['tcp_port'])
-
     def _clean_shutdown(self):
         """Disconnect clients, stop the event loop and wait for it to close"""
 
diff --git a/devices/clients/udp_receiver.py b/devices/clients/udp_receiver.py
index 160e897b9..b0717d36e 100644
--- a/devices/clients/udp_receiver.py
+++ b/devices/clients/udp_receiver.py
@@ -1,4 +1,5 @@
 from queue import Queue
+from queue import Full
 from threading import Thread
 import numpy
 import logging
@@ -20,8 +21,9 @@ class UDPReceiver(Thread, StatisticsClientThread):
         "poll_timeout": 0.1,
     }
 
-    def __init__(self, queue, options: dict = None):
-        self.queue = queue
+    def __init__(self, collector_queue: Queue, replicator_queue: Queue, options: dict = None):
+        self.collector_queue = collector_queue
+        self.replicator_queue = replicator_queue
 
         self.options = self._parse_options(options)
 
@@ -87,12 +89,13 @@ class UDPReceiver(Thread, StatisticsClientThread):
                 self.parameters["last_packet"]           = numpy.frombuffer(packet, dtype=numpy.uint8)
                 self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time()))
 
-                # Forward packet to processing thread
-                self.queue.put(packet)
+                # Forward packet to processing threads
+                self.collector_queue.put(packet)
+                self.replicator_queue.put(packet)
             except socket.timeout:
                 # timeout -- expected, allows us to check whether to stop
                 pass
-            except queue.Full:
+            except Full:
                 # overflow -- just discard
                 self.parameters["nof_packets_dropped"] += numpy.uint64(1)
 
diff --git a/devices/devices/sdp/statistics.py b/devices/devices/sdp/statistics.py
index 3d782ab42..ba6731ddc 100644
--- a/devices/devices/sdp/statistics.py
+++ b/devices/devices/sdp/statistics.py
@@ -71,7 +71,7 @@ class Statistics(hardware_device, metaclass=ABCMeta):
         mandatory=True
     )
 
-    Statistics_Client_Port = device_property(
+    Statistics_Client_TCP_Port = device_property(
         dtype='DevUShort',
         mandatory=True
     )
@@ -92,7 +92,8 @@ class Statistics(hardware_device, metaclass=ABCMeta):
     last_packet_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64)
 
     # queue fill percentage, as reported by the consumer
-    queue_fill_percentage_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64)
+    queue_collector_fill_percentage_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "queue", "parameter": "collector_fill_percentage"}, datatype=numpy.uint64)
+    queue_replicator_fill_percentage_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "queue", "parameter": "replicator_fill_percentage"}, datatype=numpy.uint64)
 
     # number of UDP packets that were processed
     nof_packets_processed_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_packets"}, datatype=numpy.uint64)
@@ -124,7 +125,19 @@ class Statistics(hardware_device, metaclass=ABCMeta):
         """ user code here. is called when the sate is set to INIT """
         """Initialises the attributes and properties of the statistics device."""
 
-        self.statistics_client = StatisticsClient(self.STATISTICS_COLLECTOR_CLASS, "0.0.0.0", self.Statistics_Client_Port, self.Fault, self)
+        # Options for UDPReceiver
+        udp_options = {
+            "udp_port": self.Statistics_Client_UDP_Port,
+            "udp_host": "0.0.0.0"
+        }
+
+        # Options for TCPReplicator
+        tcp_options = {
+            "tcp_port": self.Statistics_Client_TCP_Port
+            # tcp_host has default value
+        }
+
+        self.statistics_client = StatisticsClient(self.STATISTICS_COLLECTOR_CLASS, udp_options, tcp_options, self.Fault, self)
 
         self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
 
diff --git a/devices/integration_test/client/test_tcp_replicator.py b/devices/integration_test/client/test_tcp_replicator.py
index 440aa43f5..419197fb4 100644
--- a/devices/integration_test/client/test_tcp_replicator.py
+++ b/devices/integration_test/client/test_tcp_replicator.py
@@ -7,6 +7,8 @@
 # Distributed under the terms of the APACHE license.
 # See LICENSE.txt for more info.
 
+from queue import Queue
+
 import logging
 import time
 import socket
@@ -16,6 +18,8 @@ from clients.tcp_replicator import TCPReplicator
 
 from integration_test import base
 
+import timeout_decorator
+
 logger = logging.getLogger()
 
 
@@ -32,7 +36,8 @@ class TestTCPReplicator(base.IntegrationTestCase):
             "tcp_port": 56565,  # Pick some port with low change of collision
         }
 
-        replicator = TCPReplicator(test_options)
+        queue = Queue()
+        replicator = TCPReplicator(queue, test_options)
 
     def test_start_transmit_empty_stop(self):
         """Test transmitting without clients"""
@@ -41,7 +46,8 @@ class TestTCPReplicator(base.IntegrationTestCase):
             "tcp_port": 56566,  # Pick some port with low change of collision
         }
 
-        replicator = TCPReplicator(test_options)
+        queue = Queue()
+        replicator = TCPReplicator(queue, test_options)
 
         replicator.transmit("Hello World!".encode('utf-8'))
 
@@ -50,7 +56,8 @@ class TestTCPReplicator(base.IntegrationTestCase):
             "tcp_port": 56567,  # Pick some port with low change of collision
         }
 
-        replicator = TCPReplicator(test_options)
+        queue = Queue()
+        replicator = TCPReplicator(queue, test_options)
 
         time.sleep(2)
 
@@ -63,6 +70,7 @@ class TestTCPReplicator(base.IntegrationTestCase):
 
         self.assertEquals(b'', s.recv(9000))
 
+    @timeout_decorator.timeout(15)
     def test_start_connect_receive(self):
         test_options = {
             "tcp_port": 56568,  # Pick some port with low change of collision
@@ -70,7 +78,8 @@ class TestTCPReplicator(base.IntegrationTestCase):
 
         m_data = "hello world".encode("utf-8")
 
-        replicator = TCPReplicator(test_options)
+        queue = Queue()
+        replicator = TCPReplicator(queue, test_options)
 
         time.sleep(2)
 
@@ -86,6 +95,7 @@ class TestTCPReplicator(base.IntegrationTestCase):
 
         self.assertEqual(m_data, data)
 
+    @timeout_decorator.timeout(15)
     def test_start_connect_receive_multiple(self):
         test_options = {
             "tcp_port": 56569,  # Pick some port with low change of collision
@@ -93,7 +103,8 @@ class TestTCPReplicator(base.IntegrationTestCase):
 
         m_data = "hello world".encode("utf-8")
 
-        replicator = TCPReplicator(test_options)
+        queue = Queue()
+        replicator = TCPReplicator(queue, test_options)
 
         time.sleep(2)
 
@@ -115,3 +126,35 @@ class TestTCPReplicator(base.IntegrationTestCase):
 
         self.assertEqual(m_data, data1)
         self.assertEqual(m_data, data2)
+
+    @timeout_decorator.timeout(15)
+    def test_start_connect_receive_multiple_queue(self):
+        test_options = {
+            "tcp_port": 56570,  # Pick some port with low change of collision
+        }
+
+        m_data = "hello world".encode("utf-8")
+
+        queue = Queue()
+        replicator = TCPReplicator(queue, test_options)
+
+        time.sleep(2)
+
+        s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s1.connect(("127.0.0.1", test_options['tcp_port']))
+
+        s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s2.connect(("127.0.0.1", test_options['tcp_port']))
+
+        time.sleep(3)
+
+        queue.put(m_data)
+
+        data1 = s1.recv(sys.getsizeof(m_data))
+        s1.close()
+
+        data2 = s2.recv(sys.getsizeof(m_data))
+        s2.close()
+
+        self.assertEqual(m_data, data1)
+        self.assertEqual(m_data, data2)
diff --git a/devices/test/clients/test_tcp_replicator.py b/devices/test/clients/test_tcp_replicator.py
index b487a48e0..f9e2ca78a 100644
--- a/devices/test/clients/test_tcp_replicator.py
+++ b/devices/test/clients/test_tcp_replicator.py
@@ -9,6 +9,7 @@
 
 import logging
 import time
+from queue import Queue
 from unittest import mock
 
 from clients.tcp_replicator import TCPReplicator
@@ -30,13 +31,27 @@ class TestTCPReplicator(base.TestCase):
     def setUp(self):
         super(TestTCPReplicator, self).setUp()
 
+        self.m_queue = mock.Mock()
+
         # Create reusable test fixture for unit tests
         self.m_tcp_replicator = TCPReplicator
-        stat_agg_patcher = mock.patch.object(
+
+        # Patch _run_server and force match spec
+        run_server_patcher = mock.patch.object(
             self.m_tcp_replicator, '_run_server',
             spec=TCPReplicator._run_server, return_value=self.dummy_task())
-        self.mock_replicator = stat_agg_patcher.start()
-        self.addCleanup(stat_agg_patcher.stop)
+        self.m_run_server = run_server_patcher.start()
+        self.addCleanup(run_server_patcher.stop)
+
+        # Stash _process_queue before mocking
+        self.t_process_queue = TCPReplicator._process_queue
+
+        # Patch _process_queue and force match spec
+        process_queue_patcher = mock.patch.object(
+            self.m_tcp_replicator, '_process_queue',
+            spec=TCPReplicator._process_queue, return_value=self.dummy_task())
+        self.m_process_queue = process_queue_patcher.start()
+        self.addCleanup(process_queue_patcher.stop)
 
     def test_parse_options(self):
         """Validate option parsing"""
@@ -49,7 +64,7 @@ class TestTCPReplicator(base.TestCase):
             "tcp_bind": '0.0.0.0',  # I should get set
         }
 
-        replicator = self.m_tcp_replicator(test_options)
+        replicator = self.m_tcp_replicator(self.m_queue, test_options)
 
         # Ensure replicator initialization does not modify static variable
         self.assertEqual(t_tcp_bind, TCPReplicator._default_options['tcp_bind'])
@@ -66,7 +81,7 @@ class TestTCPReplicator(base.TestCase):
         m_client = mock.Mock()
 
         # Create both a TCPReplicator and TCPServerProtocol separately
-        replicator = self.m_tcp_replicator()
+        replicator = self.m_tcp_replicator(self.m_queue)
         protocol = TCPReplicator.TCPServerProtocol(
             replicator._options, replicator._connected_clients)
 
@@ -79,7 +94,7 @@ class TestTCPReplicator(base.TestCase):
     def test_start_stop(self):
         """Verify threading behavior, being able to start and stop the thread"""
 
-        replicator = self.m_tcp_replicator()
+        replicator = self.m_tcp_replicator(self.m_queue)
 
         # Give the thread 5 seconds to stop
         replicator.join(5)
@@ -103,13 +118,13 @@ class TestTCPReplicator(base.TestCase):
             run_patcher.new_event_loop.return_value = m_loop
 
             # Constructor should raise an exception if the thread is killed
-            self.assertRaises(RuntimeError, self.m_tcp_replicator)
+            self.assertRaises(RuntimeError, self.m_tcp_replicator, self.m_queue)
 
     @timeout_decorator.timeout(5)
     def test_start_stop_delete(self):
         """Verify that deleting the TCPReplicator object safely halts thread"""
 
-        replicator = self.m_tcp_replicator()
+        replicator = self.m_tcp_replicator(self.m_queue)
 
         del replicator
 
@@ -120,7 +135,7 @@ class TestTCPReplicator(base.TestCase):
 
         m_client = mock.Mock()
 
-        replicator = self.m_tcp_replicator()
+        replicator = self.m_tcp_replicator(self.m_queue)
 
         replicator._connected_clients.append(m_client)
 
@@ -137,10 +152,44 @@ class TestTCPReplicator(base.TestCase):
 
         m_client.transport.write.assert_called_once_with(m_data)
 
+    def test_queue_start(self):
+        replicator = self.m_tcp_replicator(self.m_queue)
+
+        self.m_process_queue.assert_called_once_with()
+
+    def test_transmit_queue(self):
+        m_data = "Hello World!".encode('utf-8')
+
+        m_client = mock.Mock()
+
+        t_queue = Queue()
+
+        replicator = self.m_tcp_replicator(t_queue)
+
+        # Patch _process_queue back into object and jump start it
+        replicator._process_queue = self.t_process_queue
+        replicator._loop.call_soon_threadsafe(
+            replicator._loop.create_task, replicator._process_queue(replicator))
+
+        replicator._connected_clients.append(m_client)
+
+        t_queue.put(m_data)
+
+        # TODO(Corne): Find suitable primitive to synchronize async task update
+        #              with main thread.
+        time.sleep(1)
+        time.sleep(1)
+        time.sleep(1)
+        time.sleep(1)
+        time.sleep(1)
+        time.sleep(1)
+
+        m_client.transport.write.assert_called_once_with(m_data)
+
     def test_disconnect(self,):
         m_client = mock.Mock()
 
-        replicator = self.m_tcp_replicator()
+        replicator = self.m_tcp_replicator(self.m_queue)
 
         replicator._connected_clients.append(m_client)
 
-- 
GitLab