diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 55b7ceabce52c3e49c8e12f20827a5317401eafd..ca3a617c1b052564c46e2a5e426fe9a1e86787d6 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -9,7 +9,6 @@ variables: cache: paths: - .cache/pip - - devices/.tox stages: - building - linting @@ -45,6 +44,8 @@ unit_test: before_script: - sudo apt-get update - sudo apt-get install -y git + - pip3 install -r devices/test-requirements.txt + - pip3 install -r docker-compose/itango/lofar-requirements.txt script: - cd devices - tox -e py37 diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json index 9fcc8cf3514a113eb293bd45a9476cc1769f1bf3..98c3a994428d5a23a93ae842f3cdadf609cb7eca 100644 --- a/CDB/LOFAR_ConfigDb.json +++ b/CDB/LOFAR_ConfigDb.json @@ -742,9 +742,12 @@ "SST": { "LTS/SST/1": { "properties": { - "Statistics_Client_Port": [ + "Statistics_Client_UDP_Port": [ "5001" ], + "Statistics_Client_TCP_Port": [ + "5101" + ], "OPC_Server_Name": [ "dop36.astron.nl" ], diff --git a/CDB/integration_ConfigDb.json b/CDB/integration_ConfigDb.json index c9d928fed394054427950ec85b41b934428ceca1..debeae5263b85b15821ed515521223751ff9d37c 100644 --- a/CDB/integration_ConfigDb.json +++ b/CDB/integration_ConfigDb.json @@ -32,6 +32,54 @@ ], "OPC_Time_Out": [ "5.0" + ], + "FPGA_sdp_info_station_id_RW_default": [ + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901", + "901" + ], + "polled_attr": [ + "fpga_temp_r", + "1000", + "state", + "1000", + "status", + "1000", + "fpga_mask_rw", + "1000", + "fpga_scrap_r", + "1000", + "fpga_scrap_rw", + "1000", + "fpga_status_r", + "1000", + "fpga_version_r", + "1000", + "fpga_weights_r", + "1000", + "fpga_weights_rw", + "1000", + "tr_busy_r", + "1000", + "tr_reload_rw", + "1000", + "tr_tod_r", + "1000", + "tr_uptime_r", + "1000" ] } } @@ -43,9 +91,12 @@ "SST": { "LTS/SST/1": { "properties": { - "SST_Client_Port": [ + "Statistics_Client_UDP_Port": [ "5001" ], + "Statistics_Client_TCP_Port": [ + "5101" + ], "OPC_Server_Name": [ "sdptr-sim" ], @@ -54,6 +105,60 @@ ], "OPC_Time_Out": [ "5.0" + ], + "FPGA_sst_offload_hdr_eth_destination_mac_RW_default": [ + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de", + "6c:2b:59:97:cb:de" + ], + "FPGA_sst_offload_hdr_ip_destination_address_RW_default": [ + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250" + ], + "FPGA_sst_offload_hdr_udp_destination_port_RW_default": [ + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001", + "5001" ] } } diff --git a/CDB/sdp-sim-config.json b/CDB/sdp-sim-config.json index 64b841e1dacf36e1de9b3e20ea068d36f0011478..31232e7701074e19044660af1fd27c6c025b4f81 100644 --- a/CDB/sdp-sim-config.json +++ b/CDB/sdp-sim-config.json @@ -24,9 +24,12 @@ "SST": { "LTS/SST/1": { "properties": { - "Statistics_Client_Port": [ + "Statistics_Client_UDP_Port": [ "5001" ], + "Statistics_Client_TCP_Port": [ + "5101" + ], "OPC_Server_Name": [ "sdptr-sim" ], diff --git a/CDB/thijs_ConfigDb.json b/CDB/thijs_ConfigDb.json index 35c644789655fe577b41536e26d88ab46bc55720..95fa70578a94531454684fdc5ee5bb6df7e8e3a7 100644 --- a/CDB/thijs_ConfigDb.json +++ b/CDB/thijs_ConfigDb.json @@ -94,9 +94,12 @@ "SST": { "LTS/SST/1": { "properties": { - "Statistics_Client_Port": [ + "Statistics_Client_UDP_Port": [ "5001" ], + "Statistics_Client_TCP_Port": [ + "5101" + ], "OPC_Server_Name": [ "dop36.astron.nl" ], diff --git a/devices/clients/statistics_client.py b/devices/clients/statistics_client.py index 20ef7da7d38efcdfbc4bb8ff01f6877b6411787e..05aa28d39ebb6c6f7ea5dc4fb8e0c908856046b1 100644 --- a/devices/clients/statistics_client.py +++ b/devices/clients/statistics_client.py @@ -1,10 +1,9 @@ from queue import Queue -from threading import Thread import logging import numpy -import queue from .comms_client import CommClient +from .tcp_replicator import TCPReplicator from .udp_receiver import UDPReceiver from devices.sdp.statistics_collector import StatisticsConsumer @@ -21,7 +20,7 @@ class StatisticsClient(CommClient): def start(self): super().start() - def __init__(self, collector, host, port, fault_func, streams, try_interval=2, queuesize=1024): + def __init__(self, collector, udp_options, tcp_options, fault_func, streams, try_interval=2, queuesize=1024): """ Create the statistics client and connect() to it and get the object node. @@ -29,9 +28,9 @@ class StatisticsClient(CommClient): host: hostname to listen on port: port number to listen on """ - self.host = host - self.port = port - self.poll_timeout = 0.1 + + self.udp_options = udp_options + self.tcp_options = tcp_options self.queuesize = queuesize self.collector = collector @@ -43,9 +42,10 @@ class StatisticsClient(CommClient): fault_func() return - def queue_fill_percentage(self): + @staticmethod + def _queue_fill_percentage(queue: Queue): try: - return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0 + return 100 * queue.qsize() / queue.maxsize if queue.maxsize else 0 except NotImplementedError: # some platforms don't have qsize(), nothing we can do here return 0 @@ -55,9 +55,13 @@ class StatisticsClient(CommClient): Function used to connect to the client. """ if not self.connected: - self.queue = Queue(maxsize=self.queuesize) - self.udp = UDPReceiver(self.host, self.port, self.queue, self.poll_timeout) - self.statistics = StatisticsConsumer(self.queue, self.collector) + self.collector_queue = Queue(maxsize=self.queuesize) + + self.tcp = TCPReplicator(self.tcp_options, self.queuesize) + self.statistics = StatisticsConsumer(self.collector_queue, self.collector) + + self.udp = UDPReceiver([self.collector_queue, self.tcp], + self.udp_options) return super().connect() @@ -68,6 +72,9 @@ class StatisticsClient(CommClient): if not self.udp.is_alive(): raise Exception("UDP thread died unexpectedly") + if not self.tcp.is_alive(): + raise Exception("TCPReplicator thread died unexpectedly") + def disconnect(self): # explicit disconnect, instead of waiting for the GC to kick in after "del" below try: @@ -78,11 +85,19 @@ class StatisticsClient(CommClient): try: self.udp.disconnect() except Exception: + # nothing we can do, but we should continue cleaning up logger.exception("Could not disconnect UDP receiver class") - + + try: + self.tcp.disconnect() + except Exception: + logger.exception("Could not disconnect TCPReplicator class") + #logger.log_exception("Could not disconnect TCPReplicator class") + + del self.tcp del self.udp del self.statistics - del self.queue + del self.collector_queue return super().disconnect() @@ -111,11 +126,26 @@ class StatisticsClient(CommClient): def read_function(): return self.udp.parameters[parameter] elif annotation["type"] == "queue": - if parameter == "fill_percentage": + if parameter == "collector_fill_percentage": + def read_function(): + return numpy.uint64(self._queue_fill_percentage(self.collector_queue)) + elif parameter == "replicator_fill_percentage": def read_function(): - return numpy.uint64(self.queue_fill_percentage()) + return numpy.uint64(self._queue_fill_percentage(self.tcp.queue)) else: raise ValueError("Unknown queue parameter requested: %s" % parameter) + elif annotation["type"] == "replicator": + if parameter == "clients": + def read_function(): + return numpy.array(self.tcp.clients(),dtype=numpy.str_) + elif parameter == "nof_packets_sent": + def read_function(): + return numpy.uint64(self.tcp.nof_packets_sent) + elif parameter == "nof_tasks_pending": + def read_function(): + return numpy.uint64(self.tcp.nof_tasks_pending) + else: + raise ValueError("Unknown replicator parameter requested: %s" % parameter) def write_function(value): """ diff --git a/devices/clients/statistics_client_thread.py b/devices/clients/statistics_client_thread.py new file mode 100644 index 0000000000000000000000000000000000000000..3da8f76ac135fd4fb631f1de98518ff74f9ec2f9 --- /dev/null +++ b/devices/clients/statistics_client_thread.py @@ -0,0 +1,45 @@ +from abc import ABC +from abc import abstractmethod +import logging + +logger = logging.getLogger() + + +class StatisticsClientThread(ABC): + + # Maximum time to wait for the Thread to get unstuck, if we want to stop + DISCONNECT_TIMEOUT = 10 + + @property + @abstractmethod + def _options(self) -> dict: + """Implement me to return reasonable defaults + + Don't create the variable inside this property, instead create a class + variable inside the child class and return that.""" + pass + + def _parse_options(self, options: dict) -> dict: + """Parse the arguments""" + + # Parse options if any otherwise return defaults + if not options: + return self._options + + # Shallow copy the options, native data types and strings are immutable + temp_options = self._options.copy() + + # Find all matching keys in the options arguments and override + for option, value in options.items(): + if option in temp_options: + temp_options[option] = value + + return temp_options + + def __del__(self): + self.disconnect() + + @abstractmethod + def disconnect(self): + """Should call join with DISCONNECT_TIMEOUT, only if still alive""" + pass diff --git a/devices/clients/tcp_replicator.py b/devices/clients/tcp_replicator.py new file mode 100644 index 0000000000000000000000000000000000000000..37d5a4f78221c1e1d234447f250dc0c4a27b8ad0 --- /dev/null +++ b/devices/clients/tcp_replicator.py @@ -0,0 +1,354 @@ +from queue import Empty +from queue import Queue +from threading import Condition +from threading import Semaphore +from threading import Thread +import asyncio +import logging + +from clients.statistics_client_thread import StatisticsClientThread + +logger = logging.getLogger() + + +class TCPReplicator(Thread, StatisticsClientThread): + """TCP replicator intended to fan out incoming UDP packets + + There are three different processing layers in this class, several + methods can be called from the context of the thread that spawned this + class (main thread). These include: __init__, transmit, join. + + When constructed start is called, the thread will launch, this will call run + from the context of this new thread. This thread will create the new event + loop as this can only be done from the context of the thread you desire to + use the event loop in. A semaphore is used to prevent a potential race + between this new thread setting up the event loop and the main thread trying + to tear it down by calling join. The constructor waits on this semaphore + which will always be released either by _server_start_callback or by the + finally clause in run. + + The final layer is the event loop itself, it handles instances of the + TCPServerProtocol. These can be found in the _connected_clients list. + However, only async task are allowed to call methods on these objects! + The async methods are _transmit, _disconnect, _stop_event_loop, + _process_queue and _run_server. + + _process_queue takes elements of the queue and transmits them across clients. + It uses an asyncio.Queue to process elements, given to the replicator through + the put method. + + To cleanly shutdown this loop in _stop_event_loop, we insert a None magic marker + into the queue, causing the _process_task to return. + + Disconnecting the clients and stopping of the server is handled in _disconnect. + + """ + + """Default options for TCPReplicator + we kindly ask to not change this static variable at runtime. + """ + _default_options = { + "tcp_bind": '0.0.0.0', + "tcp_port": 6666, + "tcp_buffer_size": 128000000, # In bytes + } + + def __init__(self, options: dict = None, queuesize=0): + super().__init__() + + self.queuesize = queuesize + + # statistics + self.nof_packets_sent = 0 + + """Reserve asyncio event loop attribute but don't create it yet. + This event loop is created inside the new Thread, the result is that + the thread owns the event loop! EVENT LOOPS ARE NOT THREAD SAFE ALL + CALLS TO THE EVENT LOOP OBJECT MUST USE THE call_soon_threadsafe + FUNCTION!! + """ + self._loop = None + + # Used to maintain a reference to the server object so we can stop + # listening cleanly + self._server = None + + # Maintain a reference to the current _process_queue task so we can + # cleanly cancel it. This reduces a lot of logging chatter. + self._process_task = None + + # Create and acquire lock to prevent leaving the constructor without + # starting the thread. + self.initialization_semaphore = Semaphore() + self.initialization_semaphore.acquire() + + # Create condition to orchestrate clean disconnecting and shutdown + # They are actually the same object, just with different names for + # clarity. + self.disconnect_condition = Condition() + self.shutdown_condition = self.disconnect_condition + + # Connected clients the event loop is managing + self._connected_clients = [] + + # Parse the configured options + self.options = self._parse_options(options) + + # We start ourselves immediately to reduce amount of possible states. + self.start() + + # Wait until we can hold the semaphore, this indicates the thread has + # initialized or encountered an exception. + with self.initialization_semaphore: + if not self.is_alive(): + raise RuntimeError("TCPReplicator failed to initialize") + + logging.debug("TCPReplicator initialization completed") + + @property + def _options(self) -> dict: + return TCPReplicator._default_options + + class TCPServerProtocol(asyncio.Protocol): + """TCP protocol used for connected clients""" + + def __init__(self, options: dict, connected_clients: list): + self.options = options + + # Make connected_clients reflect the TCPReplicator connected_clients + self.connected_clients = connected_clients + + def connection_made(self, transport): + """Setup client connection and add entry to connected_clients""" + peername = transport.get_extra_info('peername') + logger.debug('TCP connection from {}'.format(peername)) + self.transport = transport + # Set the TCP buffer limit + self.transport.set_write_buffer_limits( + high=self.options['tcp_buffer_size']) + self.connected_clients.append(self) + + def pause_writing(self): + """Called when TCP buffer for the specific connection is full + + Upon encountering a full TCP buffer we deem the client to slow and + forcefully close its connection. + """ + self.transport.abort() + + def connection_lost(self, exc): + """Called when connection is lost + + Used to remove entries from connected_clients + """ + peername = self.transport.get_extra_info('peername') + logger.debug('TCP connection lost from {}'.format(peername)) + self.connected_clients.remove(self) + + def eof_received(self): + """After eof_received, connection_lost is still called""" + pass + + def run(self): + """Run is launched from constructor of TCPReplicator + + It manages an asyncio event loop to orchestrate our TCPServerProtocol. + """ + try: + logger.info("Starting TCPReplicator thread for {}:{}".format(self.options["tcp_bind"], self.options["tcp_port"])) + + # Create the event loop, must be done in the new thread + self._loop = asyncio.new_event_loop() + + # Create the input queue + self.queue = asyncio.Queue(maxsize=self.queuesize, loop=self._loop) + + # When wanting to debug event loop behavior, uncomment this + # self._loop.set_debug(True) + + self._process_task = self._loop.create_task(self._process_queue()) + + # Schedule the task to create the server + server_task = self._loop.create_task(self._run_server( + self.options, self._connected_clients)) + + # Callback monitors server startup and releases + # initialization_semaphore. If server fails to start this callback + # call self._loop.stop() + server_task.add_done_callback(self._server_start_callback) + + # Keep running event loop until self._loop.stop() is called. + # Calling this will lose control flow to the event loop + # indefinitely, upon self._loop.stop() control flow is returned + # here. + self._loop.run_forever() + + # Stop must have been called, close the event loop + with self.shutdown_condition: + logger.debug("Closing TCPReplicator event loop") + self._loop.close() + self.shutdown_condition.notify() + except Exception as e: + # Log the exception as thread exceptions won't be returned to us + # on the main thread. + logging.exception("TCPReplicator thread encountered fatal exception") + + # We will lose the exception and the original stacktrace of the + # thread. Once we use a threadpool it will be much easier to + # retrieve this so I propose to not bother implementing it now. + # For the pattern to do this see anyway: + # https://stackoverflow.com/a/6894023 + + # Due to the exception the run method will return making is_alive() + # false + finally: + # Always release the lock upon error so the constructor can return + if self.initialization_semaphore.acquire(blocking=False) is False: + self.initialization_semaphore.release() + + def transmit(self, data: bytes): + """Transmit data to connected clients""" + + if not isinstance(data, (bytes, bytearray)): + raise TypeError("Data must be byte-like object") + + self._loop.call_soon_threadsafe( + self._loop.create_task, self._transmit(data)) + + def join(self, timeout=None): + logging.info("Received shutdown request on TCPReplicator thread for {}:{}".format(self.options["tcp_bind"], self.options["tcp_port"])) + + self._clean_shutdown() + + # Only call join at the end otherwise Thread will falsely assume + # all child 'processes' have stopped + super().join(timeout) + + def disconnect(self): + if not self.is_alive(): + return + + # TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver + # and StatisticsCollector. + self.join(self.DISCONNECT_TIMEOUT) + + if self.is_alive(): + # there is nothing we can do except wait (stall) longer, which + # could be indefinitely. + logger.error( + f"UDP thread for {self.host}:{self.port} did not shutdown after" + f"{self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling." + f"Please attach a debugger to thread ID {self.ident}.") + + async def _run_server(self, options: dict, connected_clients: list): + """Retrieve the event loop created in run() and launch the server""" + loop = asyncio.get_event_loop() + + self._server = await loop.create_server( + lambda: TCPReplicator.TCPServerProtocol(options, connected_clients), + options['tcp_bind'], options['tcp_port'], reuse_address=True) + + def put(self, packet): + """ Put a packet in the queue to be scheduled for transmission. """ + + # check hereif our queue clogged up, since we'll schedule self.queue.put + # asynchronously. + if self.queue.full(): + raise asyncio.QueueFull("asyncio queue full") + + # if we cannot process fast enough, our task list may clog up instead. + # just use the same limit here, as the task list will be dominated by the + # packet transmission count. + if self.queuesize > 0 and self.nof_tasks_pending > self.queuesize: + raise asyncio.QueueFull("asyncio loop task list full") + + self._loop.call_soon_threadsafe( + self._loop.create_task, self.queue.put(packet)) + + async def _process_queue(self): + """ Take packets from the queue and transmit them across our clients. """ + while True: + packet = await self.queue.get() + + if packet is None: + # Magic marker from caller to terminate + break + + self._loop.create_task(self._transmit(packet)) + + async def _transmit(self, data): + logger.debug("Transmitting") + for client in self._connected_clients: + client.transport.write(data) + + self.nof_packets_sent += 1 + + async def _disconnect(self): + with self.disconnect_condition: + self._server.close() + await self._server.wait_closed() + + for client in self._connected_clients: + peername = client.transport.get_extra_info('peername') + logger.debug('Disconnecting client {}'.format(peername)) + client.transport.abort() + + self.disconnect_condition.notify() + + async def _stop_event_loop(self): + with self.shutdown_condition: + + # Stop the current _process_queue task if it exists + if self._process_task: + # insert magic marker, if the caller hasn't already + await self.queue.put(None) + + # wait for task to finish + await self._process_task + + # Calling stop() will return control flow to self._loop.run_*() + self._loop.stop() + + def _server_start_callback(self, future): + # Server started without exception release initialization semaphore + if not future.exception(): + self.initialization_semaphore.release() + return + + logging.warning("TCPReplicator server raised unexpected exception") + # Stop the loop so run() can fallthrough from self._loop.run_* + self._loop.stop() + # Raise the original exceptions captured from the start_server task + raise future.exception() + + def _clean_shutdown(self): + """Disconnect clients, stop the event loop and wait for it to close""" + + # The event loop is not running anymore, we can't send tasks to shut + # it down further. + if not self._loop.is_running(): + return + + # Shutdown server and disconnect clients + with self.disconnect_condition: + self._loop.call_soon_threadsafe( + self._loop.create_task, self._disconnect()) + self.disconnect_condition.wait() + + # Stop and close the event loop + with self.shutdown_condition: + logging.debug("Stopping TCPReplicator event loop") + self._loop.call_soon_threadsafe( + self._loop.create_task, self._stop_event_loop()) + self.shutdown_condition.wait() + + def clients(self): + """ Return the list of connected clients. """ + + return ["%s:%s" % client.transport.get_extra_info('peername') for client in self._connected_clients] + + @property + def nof_tasks_pending(self): + """ Return the number of pending tasks in our event loop. """ + + return len(asyncio.all_tasks(self._loop)) diff --git a/devices/clients/udp_receiver.py b/devices/clients/udp_receiver.py index c8bc44eb1965b0fa769528b381dbaee5b2fcd5d0..bf86c363bd55d461c5c80d3b11060f19cf4e970e 100644 --- a/devices/clients/udp_receiver.py +++ b/devices/clients/udp_receiver.py @@ -1,25 +1,47 @@ +from queue import Full from queue import Queue from threading import Thread -import numpy import logging +import numpy import socket import time +from typing import List # not needed for python3.9+, where we can use the type "list[Queue]" directly + +from clients.statistics_client_thread import StatisticsClientThread logger = logging.getLogger() -class UDPReceiver(Thread): +class UDPReceiver(Thread, StatisticsClientThread): """ This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code """ - # How long to wait for a stuck Thread - DISCONNECT_TIMEOUT = 10.0 + # Default options for UDPReceiver + _default_options = { + "udp_host": None, + "udp_port": None, + "poll_timeout": 0.1, + } + + def __init__(self, queues: List[Queue], options: dict = None): + self.queues = queues + + try: + options['udp_host'] + except KeyError: + raise - def __init__(self, host, port, queue, poll_timeout=0.1): - self.queue = queue - self.host = host - self.port = port + try: + options['udp_port'] + except KeyError: + raise + + self.options = self._parse_options(options) + + self.host = self.options['udp_host'] + self.port = self.options['udp_port'] + self.poll_timeout = self.options['poll_timeout'] self.parameters = { # Number of packets we received @@ -48,13 +70,17 @@ class UDPReceiver(Thread): # Make sure we can stop receiving packets even if none arrive. # Without this, the recvmsg() call blocks indefinitely if no packet arrives. - self.sock.settimeout(poll_timeout) + self.sock.settimeout(self.poll_timeout) self.stream_on = True super().__init__() self.start() + @property + def _options(self) -> dict: + return UDPReceiver._default_options + def run(self): # all variables are manually defined and are updated each time logger.info("Starting UDP thread for {}:{}".format(self.host, self.port)) @@ -67,12 +93,13 @@ class UDPReceiver(Thread): self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time())) - # Forward packet to processing thread - self.queue.put(packet) + # Forward packet to processing threads + for queue in self.queues: + queue.put(packet) except socket.timeout: # timeout -- expected, allows us to check whether to stop pass - except queue.Full: + except Full: # overflow -- just discard self.parameters["nof_packets_dropped"] += numpy.uint64(1) @@ -88,10 +115,12 @@ class UDPReceiver(Thread): # happens if timeout is hit return - # shutdown the socket so that others can listen on this port - self.sock.shutdown(socket.SHUT_RDWR) + # close the socket so that others can listen on this port + self.sock.close() def disconnect(self): + # TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver + # and StatisticsCollector. if not self.is_alive(): return @@ -101,6 +130,3 @@ class UDPReceiver(Thread): if self.is_alive(): # there is nothing we can do except wait (stall) longer, which could be indefinitely. logger.error(f"UDP thread for {self.host}:{self.port} did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.") - - def __del__(self): - self.disconnect() diff --git a/devices/devices/hardware_device.py b/devices/devices/hardware_device.py index 986d0b0df84e6706e06fa58f63c6a238f7671b2b..589eaa7cf9b06ce4b0a4d3d068d8eb17dd7e9eb8 100644 --- a/devices/devices/hardware_device.py +++ b/devices/devices/hardware_device.py @@ -95,6 +95,9 @@ class hardware_device(Device, metaclass=AbstractDeviceMetas): self.set_state(DevState.INIT) self.setup_value_dict() + # reload our class & device properties from the Tango database + self.get_device_properties() + self.configure_for_initialise() self.set_state(DevState.STANDBY) diff --git a/devices/devices/sdp/sdp.py b/devices/devices/sdp/sdp.py index 9bc94e6c811ac4e63a88c035ef62eba3998df895..3c78775d2773e080c0849bad2a1a39ae9812234f 100644 --- a/devices/devices/sdp/sdp.py +++ b/devices/devices/sdp/sdp.py @@ -162,7 +162,7 @@ class SDP(hardware_device): # Stop keep-alive try: - self.opcua_connection.stop() + self.OPCua_client.stop() except Exception as e: self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) diff --git a/devices/devices/sdp/sst.py b/devices/devices/sdp/sst.py index 97c7463e2ac3728be2418e639d75140c421a11c1..79fb6fb272b199d3069be03825cfd395f9d18929 100644 --- a/devices/devices/sdp/sst.py +++ b/devices/devices/sdp/sst.py @@ -27,12 +27,6 @@ from tango import AttrWriteType from clients.attribute_wrapper import attribute_wrapper from clients.opcua_client import OPCUAConnection from clients.statistics_client import StatisticsClient - -from devices.hardware_device import hardware_device - -from common.lofar_git import get_version -from common.lofar_logging import device_logging_to_python, log_exceptions - from devices.sdp.statistics import Statistics from devices.sdp.statistics_collector import SSTCollector diff --git a/devices/devices/sdp/statistics.py b/devices/devices/sdp/statistics.py index b1a2681052c95650056afb08b8dd4dbf0e1dcf00..a19335c7a835b5903e0d08ede7e84d3492dc0331 100644 --- a/devices/devices/sdp/statistics.py +++ b/devices/devices/sdp/statistics.py @@ -21,7 +21,6 @@ sys.path.append(parentdir) from abc import ABCMeta, abstractmethod # PyTango imports -from tango.server import run from tango.server import device_property, attribute from tango import AttrWriteType # Additional import @@ -34,6 +33,9 @@ from devices.hardware_device import hardware_device from common.lofar_git import get_version from common.lofar_logging import device_logging_to_python, log_exceptions +import logging + +logger = logging.getLogger() import numpy @@ -66,7 +68,12 @@ class Statistics(hardware_device, metaclass=ABCMeta): mandatory=True ) - Statistics_Client_Port = device_property( + Statistics_Client_UDP_Port = device_property( + dtype='DevUShort', + mandatory=True + ) + + Statistics_Client_TCP_Port = device_property( dtype='DevUShort', mandatory=True ) @@ -87,11 +94,14 @@ class Statistics(hardware_device, metaclass=ABCMeta): last_packet_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64) # queue fill percentage, as reported by the consumer - queue_fill_percentage_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64) + queue_collector_fill_percentage_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "queue", "parameter": "collector_fill_percentage"}, datatype=numpy.uint64) + queue_replicator_fill_percentage_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "queue", "parameter": "replicator_fill_percentage"}, datatype=numpy.uint64) + replicator_clients_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "replicator", "parameter": "clients"}, dims=(128,), datatype=numpy.str_) + replicator_nof_packets_sent_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "replicator", "parameter": "nof_packets_sent"}, datatype=numpy.uint64) + replicator_nof_tasks_pending_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "replicator", "parameter": "nof_tasks_pending"}, datatype=numpy.uint64) # number of UDP packets that were processed nof_packets_processed_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_packets"}, datatype=numpy.uint64) - # number of invalid (non-SST) packets received nof_invalid_packets_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64) # last packet that could not be parsed @@ -108,20 +118,32 @@ class Statistics(hardware_device, metaclass=ABCMeta): try: self.statistics_client.stop() except Exception as e: - self.warn_stream("Exception while stopping statistics_client in configure_for_off function: {}. Exception ignored".format(e)) + logger.exception("Exception while stopping statistics_client in configure_for_off. Exception ignored") try: self.OPCUA_client.stop() except Exception as e: - self.warn_stream("Exception while stopping OPC UA connection in configure_for_off function: {}. Exception ignored".format(e)) + logger.exception("Exception while stopping OPC UA connection in configure_for_off. Exception ignored") @log_exceptions() def configure_for_initialise(self): """ user code here. is called when the sate is set to INIT """ """Initialises the attributes and properties of the statistics device.""" + # Options for UDPReceiver + udp_options = { + "udp_port": self.Statistics_Client_UDP_Port, + "udp_host": "0.0.0.0" + } + + # Options for TCPReplicator + tcp_options = { + "tcp_port": self.Statistics_Client_TCP_Port + # tcp_host has default value + } + self.statistics_collector = self.STATISTICS_COLLECTOR_CLASS() - self.statistics_client = StatisticsClient(self.statistics_collector, "0.0.0.0", self.Statistics_Client_Port, self.Fault, self) + self.statistics_client = StatisticsClient(self.statistics_collector, udp_options, tcp_options, self.Fault, self) self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) diff --git a/devices/devices/sdp/statistics_collector.py b/devices/devices/sdp/statistics_collector.py index 4d845b13f1b70785443f2783d8de9f7d85f9d9d9..d19ad01b2a10096ebc637b5a24d51917317afc9f 100644 --- a/devices/devices/sdp/statistics_collector.py +++ b/devices/devices/sdp/statistics_collector.py @@ -4,6 +4,7 @@ import logging import numpy from .statistics_packet import SSTPacket +from clients.statistics_client_thread import StatisticsClientThread logger = logging.getLogger() @@ -98,12 +99,15 @@ class SSTCollector(StatisticsCollector): self.parameters["integration_intervals"][input_index] = fields.integration_interval() -class StatisticsConsumer(Thread): +class StatisticsConsumer(Thread, StatisticsClientThread): """ Base class to process statistics packets from a queue, asynchronously. """ # Maximum time to wait for the Thread to get unstuck, if we want to stop DISCONNECT_TIMEOUT = 10.0 + # No default options required, for now? + _default_options = {} + def __init__(self, queue: Queue, collector: StatisticsCollector): self.queue = queue self.collector = collector @@ -112,6 +116,10 @@ class StatisticsConsumer(Thread): super().__init__() self.start() + @property + def _options(self) -> dict: + return StatisticsConsumer._default_options + def run(self): logger.info("Starting statistics thread") @@ -123,7 +131,6 @@ class StatisticsConsumer(Thread): # None is the magic marker to stop processing break - try: self.collector.process_packet(self.last_packet) except ValueError as e: @@ -141,6 +148,8 @@ class StatisticsConsumer(Thread): super().join(timeout) def disconnect(self): + # TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver + # and StatisticsConsumer. if not self.is_alive(): return diff --git a/devices/integration_test/client/test_tcp_replicator.py b/devices/integration_test/client/test_tcp_replicator.py new file mode 100644 index 0000000000000000000000000000000000000000..ca45c4c52ab7f5e379c484b964a05225950fc9e1 --- /dev/null +++ b/devices/integration_test/client/test_tcp_replicator.py @@ -0,0 +1,172 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from asyncio import Queue + +import logging +import time +import socket +import sys + +from clients.tcp_replicator import TCPReplicator + +from integration_test import base + +import timeout_decorator + +logger = logging.getLogger() + + +class TestTCPReplicator(base.IntegrationTestCase): + + def setUp(self): + + super(TestTCPReplicator, self).setUp() + + def test_start_stop(self): + """Test start and stopping the server gracefully""" + + test_options = { + "tcp_port": 56565, # Pick some port with low change of collision + } + + replicator = TCPReplicator(test_options) + self.assertTrue(replicator.is_alive()) + + def test_start_except(self): + """Test start and stopping the server gracefully""" + + test_options = { + "tcp_port": 56566, # Pick some port with low change of collision + } + + replicator = TCPReplicator(test_options) + self.assertTrue(replicator.is_alive()) + + self.assertRaises(RuntimeError, TCPReplicator, test_options) + + def test_start_transmit_empty_stop(self): + """Test transmitting without clients""" + + test_options = { + "tcp_port": 56567, # Pick some port with low change of collision + } + + replicator = TCPReplicator(test_options) + self.assertTrue(replicator.is_alive()) + + replicator.transmit("Hello World!".encode('utf-8')) + + def test_start_connect_close(self): + test_options = { + "tcp_port": 56568, # Pick some port with low change of collision + } + + replicator = TCPReplicator(test_options) + self.assertTrue(replicator.is_alive()) + + time.sleep(2) + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(("127.0.0.1", test_options['tcp_port'])) + + time.sleep(2) + + replicator.join() + + self.assertEquals(b'', s.recv(9000)) + + @timeout_decorator.timeout(15) + def test_start_connect_receive(self): + test_options = { + "tcp_port": 56569, # Pick some port with low change of collision + } + + m_data = "hello world".encode("utf-8") + + replicator = TCPReplicator(test_options) + self.assertTrue(replicator.is_alive()) + + time.sleep(2) + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(("127.0.0.1", test_options['tcp_port'])) + + time.sleep(2) + + replicator.transmit(m_data) + + data = s.recv(sys.getsizeof(m_data)) + s.close() + + self.assertEqual(m_data, data) + + @timeout_decorator.timeout(15) + def test_start_connect_receive_multiple(self): + test_options = { + "tcp_port": 56570, # Pick some port with low change of collision + } + + m_data = "hello world".encode("utf-8") + + replicator = TCPReplicator(test_options) + self.assertTrue(replicator.is_alive()) + + time.sleep(2) + + s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s1.connect(("127.0.0.1", test_options['tcp_port'])) + + s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s2.connect(("127.0.0.1", test_options['tcp_port'])) + + time.sleep(3) + + replicator.transmit(m_data) + + data1 = s1.recv(sys.getsizeof(m_data)) + s1.close() + + data2 = s2.recv(sys.getsizeof(m_data)) + s2.close() + + self.assertEqual(m_data, data1) + self.assertEqual(m_data, data2) + + @timeout_decorator.timeout(15) + def test_start_connect_receive_multiple_queue(self): + test_options = { + "tcp_port": 56571, # Pick some port with low change of collision + } + + m_data = "hello world".encode("utf-8") + + replicator = TCPReplicator(test_options) + self.assertTrue(replicator.is_alive()) + + time.sleep(2) + + s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s1.connect(("127.0.0.1", test_options['tcp_port'])) + + s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s2.connect(("127.0.0.1", test_options['tcp_port'])) + + time.sleep(3) + + replicator.put(m_data) + + data1 = s1.recv(sys.getsizeof(m_data)) + s1.close() + + data2 = s2.recv(sys.getsizeof(m_data)) + s2.close() + + self.assertEqual(m_data, data1) + self.assertEqual(m_data, data2) diff --git a/devices/integration_test/devices/test_device_sst.py b/devices/integration_test/devices/test_device_sst.py new file mode 100644 index 0000000000000000000000000000000000000000..a6b71d328305f2dafed46f9e4f3ea9209df9601d --- /dev/null +++ b/devices/integration_test/devices/test_device_sst.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. +import socket +import sys +import time + +from tango import DeviceProxy +from tango._tango import DevState + +from integration_test import base + + +class TestDeviceSST(base.IntegrationTestCase): + + def setUp(self): + """Intentionally recreate the device object in each test""" + super(TestDeviceSST, self).setUp() + + def tearDown(self): + """Turn device Off in teardown to prevent blocking tests""" + d = DeviceProxy("LTS/SST/1") + + try: + d.Off() + except Exception as e: + """Failing to turn Off devices should not raise errors here""" + print(f"Failed to turn device off in teardown {e}") + + def test_device_proxy_sst(self): + """Test if we can successfully create a DeviceProxy and fetch state""" + + d = DeviceProxy("LTS/SST/1") + + self.assertEqual(DevState.OFF, d.state()) + + def test_device_sst_initialize(self): + """Test if we can transition to standby""" + + d = DeviceProxy("LTS/SST/1") + + d.initialise() + + self.assertEqual(DevState.STANDBY, d.state()) + + def test_device_sst_on(self): + """Test if we can transition to on""" + + port_property = {"Statistics_Client_TCP_Port": "4999"} + + d = DeviceProxy("LTS/SST/1") + + self.assertEqual(DevState.OFF, d.state(), + "Prerequisite could not be met " + "this test can not continue") + + d.put_property(port_property) + + d.initialise() + + self.assertEqual(DevState.STANDBY, d.state()) + + d.on() + + self.assertEqual(DevState.ON, d.state()) + + def test_device_sst_send_udp(self): + port_property = {"Statistics_Client_TCP_Port": "4998"} + + d = DeviceProxy("LTS/SST/1") + + self.assertEqual(DevState.OFF, d.state(), + "Prerequisite could not be met " + "this test can not continue") + + d.put_property(port_property) + + d.initialise() + + self.assertEqual(DevState.STANDBY, d.state()) + + d.on() + + self.assertEqual(DevState.ON, d.state()) + + s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s1.connect(("device-sst", 5001)) + + # TODO(Corne): Change me into an actual SST packet + s1.send("Hello World!".encode("UTF-8")) + + s1.close() + + def test_device_sst_connect_tcp_receive(self): + port_property = {"Statistics_Client_TCP_Port": "5101"} + + m_data = "Hello World!".encode("UTF-8") + + d = DeviceProxy("LTS/SST/1") + + self.assertEqual(DevState.OFF, d.state(), + "Prerequisite could not be met " + "this test can not continue") + + d.put_property(port_property) + + d.initialise() + + self.assertEqual(DevState.STANDBY, d.state()) + + d.on() + + self.assertEqual(DevState.ON, d.state()) + + time.sleep(2) + + s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s1.connect(("device-sst", 5001)) + + s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s2.connect(("device-sst", 5101)) + + time.sleep(2) + + # TODO(Corne): Change me into an actual SST packet + s1.send(m_data) + + time.sleep(2) + + data = s2.recv(sys.getsizeof(m_data)) + + s1.close() + s2.close() + + self.assertEqual(m_data, data) diff --git a/devices/test-requirements.txt b/devices/test-requirements.txt index af6d9e4218ad53b977b444f7db95ead52d649b21..20ed449cd8f17f9110ebe1b70774916abe8c00cb 100644 --- a/devices/test-requirements.txt +++ b/devices/test-requirements.txt @@ -2,17 +2,17 @@ # order of appearance. Changing the order has an impact on the overall # integration process, which may cause wedges in the gate later. +bandit>=1.6.0 # Apache-2.0 +coverage>=5.2.0 # Apache-2.0 doc8>=0.8.0 # Apache-2.0 flake8>=3.8.0 # MIT flake8-breakpoint>=1.1.0 # MIT flake8-debugger>=4.0.0 #MIT flake8-mock>=0.3 #GPL -bandit>=1.6.0 # Apache-2.0 hacking>=3.2.0,<3.3.0 # Apache-2.0 -coverage>=5.2.0 # Apache-2.0 python-subunit>=1.4.0 # Apache-2.0/BSD Pygments>=2.6.0 stestr>=3.0.0 # Apache-2.0 testscenarios>=0.5.0 # Apache-2.0/BSD testtools>=2.4.0 # MIT - +timeout-decorator>=0.5 # MIT diff --git a/devices/test/clients/test_statistics_client_thread.py b/devices/test/clients/test_statistics_client_thread.py new file mode 100644 index 0000000000000000000000000000000000000000..fd7ce0701f9d792863909b9f8ee4a9d39a2b1fd1 --- /dev/null +++ b/devices/test/clients/test_statistics_client_thread.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import logging +from unittest import mock + +from clients.statistics_client_thread import StatisticsClientThread + +from test import base + +logger = logging.getLogger() + + +class TestStatisticsClientThread(base.TestCase): + + def setUp(self): + super(TestStatisticsClientThread, self).setUp() + + class DummySCThread(StatisticsClientThread): + + def disconnect(self): + pass + + @property + def _options(self) -> dict: + return {} + + @mock.patch.object(DummySCThread, "disconnect") + def test_del_disconnect(self, m_disconnect): + """Ensure that __del__ calls disconnect() of child class""" + + t_test = TestStatisticsClientThread.DummySCThread() + del t_test + + m_disconnect.assert_called_once_with() diff --git a/devices/test/clients/test_tcp_replicator.py b/devices/test/clients/test_tcp_replicator.py new file mode 100644 index 0000000000000000000000000000000000000000..a9babed0eb7af7a58544b3ff7535c3113ed12ca3 --- /dev/null +++ b/devices/test/clients/test_tcp_replicator.py @@ -0,0 +1,219 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import logging +import time +from queue import Queue +from unittest import mock + +from clients.tcp_replicator import TCPReplicator +from clients import tcp_replicator + +from test import base + +import timeout_decorator + +logger = logging.getLogger() + + +class TestTCPReplicator(base.TestCase): + + @staticmethod + async def dummy_task(): + pass + + def setUp(self): + super(TestTCPReplicator, self).setUp() + + self.m_server = mock.Mock() + self.m_server.wait_closed.return_value = self.dummy_task() + + async def dummy_create_server(): + return self.m_server + + # Create reusable test fixture for unit tests + self.m_tcp_replicator = TCPReplicator + + # Patch _run_server and force match spec + event_loop_patcher = mock.patch.object( + tcp_replicator.asyncio, 'get_event_loop') + self.m_event_loop = event_loop_patcher.start() + self.m_event_loop.return_value.create_server.return_value = \ + dummy_create_server() + self.addCleanup(event_loop_patcher.stop) + + # Stash _process_queue before mocking + self.t_process_queue = TCPReplicator._process_queue + + # Patch _process_queue and force match spec + process_queue_patcher = mock.patch.object( + self.m_tcp_replicator, '_process_queue', + autospec=True, return_value=self.dummy_task()) + self.m_process_queue = process_queue_patcher.start() + self.addCleanup(process_queue_patcher.stop) + + def test_parse_options(self): + """Validate option parsing""" + + # Perform string copy of current tcp_bind value + t_tcp_bind = str(TCPReplicator._default_options['tcp_bind']) + + test_options = { + "random": 12346, # I should be ignored + "tcp_bind": '0.0.0.0', # I should get set + } + + replicator = self.m_tcp_replicator(options=test_options) + self.assertTrue(replicator.is_alive()) + + # Ensure replicator initialization does not modify static variable + self.assertEqual(t_tcp_bind, TCPReplicator._default_options['tcp_bind']) + + # Ensure options are correctly updated upon initialization + self.assertEqual(test_options['tcp_bind'], replicator.options['tcp_bind']) + + # Ensure non existing keys don't propagate into options + self.assertFalse('random' in replicator.options) + + def test_connected_clients(self): + """Validate shared list behavior between TCPServerProtocol and thread""" + + m_client = mock.Mock() + + # Create both a TCPReplicator and TCPServerProtocol separately + replicator = self.m_tcp_replicator() + self.assertTrue(replicator.is_alive()) + protocol = TCPReplicator.TCPServerProtocol( + replicator._options, replicator._connected_clients) + + # Add a mocked client to replicators list + replicator._connected_clients.append(m_client) + + # Ensure the mocked client appears in the protocols list + self.assertTrue(m_client in protocol.connected_clients) + + def test_start_stop(self): + """Verify threading behavior, being able to start and stop the thread""" + + replicator = self.m_tcp_replicator() + self.assertTrue(replicator.is_alive()) + + # Give the thread 5 seconds to stop + replicator.join(5) + + # Thread should now be dead + self.assertFalse(replicator.is_alive()) + + @timeout_decorator.timeout(5) + def test_start_except_eventloop(self): + """Verify exception handling inside run() for eventloop creation""" + + m_loop = mock.Mock() + m_loop.create_task.side_effect = RuntimeError("Test Error") + + # Signal to _clean_shutdown that the exception has caused the loop to + # stop + m_loop.is_running.return_value = False + + m_replicator_import = tcp_replicator + + with mock.patch.object(m_replicator_import, 'asyncio') as run_patcher: + run_patcher.new_event_loop.return_value = m_loop + + # Constructor should raise an exception if the thread dies early + self.assertRaises(RuntimeError, self.m_tcp_replicator) + + @timeout_decorator.timeout(5) + def test_start_except_server(self): + """Verify exception handling inside run() for starting server""" + + self.m_event_loop.return_value.create_server.side_effect =\ + RuntimeError("Test Error") + + # Constructor should raise an exception if the thread dies early + self.assertRaises(RuntimeError, self.m_tcp_replicator) + + @timeout_decorator.timeout(5) + def test_start_stop_delete(self): + """Verify that deleting the TCPReplicator object safely halts thread""" + + replicator = self.m_tcp_replicator() + self.assertTrue(replicator.is_alive()) + + del replicator + + def test_transmit(self): + """Test that clients are getting data written to their transport""" + + m_data = "Hello World!".encode('utf-8') + + m_client = mock.Mock() + + replicator = self.m_tcp_replicator() + self.assertTrue(replicator.is_alive()) + + replicator._connected_clients.append(m_client) + + replicator.transmit(m_data) + + # TODO(Corne): Find suitable primitive to synchronize async task update + # with main thread. + time.sleep(1) + time.sleep(1) + time.sleep(1) + time.sleep(1) + time.sleep(1) + time.sleep(1) + + m_client.transport.write.assert_called_once_with(m_data) + + def test_queue_start(self): + replicator = self.m_tcp_replicator() + + self.m_process_queue.assert_called_once_with(replicator) + + def test_transmit_queue(self): + m_data = "Hello World!".encode('utf-8') + + m_client = mock.Mock() + + replicator = self.m_tcp_replicator() + self.assertTrue(replicator.is_alive()) + + # Patch _process_queue back into object and jump start it + replicator._process_queue = self.t_process_queue + replicator._loop.call_soon_threadsafe( + replicator._loop.create_task, replicator._process_queue(replicator)) + + replicator._connected_clients.append(m_client) + + replicator.put(m_data) + + # TODO(Corne): Find suitable primitive to synchronize async task update + # with main thread. + time.sleep(1) + time.sleep(1) + time.sleep(1) + time.sleep(1) + time.sleep(1) + time.sleep(1) + + m_client.transport.write.assert_called_once_with(m_data) + + def test_disconnect(self,): + m_client = mock.Mock() + + replicator = self.m_tcp_replicator() + self.assertTrue(replicator.is_alive()) + + replicator._connected_clients.append(m_client) + + replicator.join(5) + + m_client.transport.abort.assert_called_once_with() diff --git a/devices/tox.ini b/devices/tox.ini index 74931523de0505511cf0eaf8331d06871598441b..59d2347f3ff42ccb084033aea67d478fd63513cb 100644 --- a/devices/tox.ini +++ b/devices/tox.ini @@ -13,7 +13,8 @@ setenv = OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_TEST_TIMEOUT=60 -deps = -r{toxinidir}/test-requirements.txt +deps = + -r{toxinidir}/test-requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt commands = stestr run {posargs} diff --git a/docker-compose/device-sst.yml b/docker-compose/device-sst.yml index c620ba206b6091b1544582e62128575fc231b03c..a7f2e867bc4075d002d764189ef3906ff81fb12a 100644 --- a/docker-compose/device-sst.yml +++ b/docker-compose/device-sst.yml @@ -27,6 +27,7 @@ services: - data ports: - "5001:5001/udp" # port to receive SST UDP packets on + - "5101:5101/tcp" # port to emit SST TCP packets on - "5702:5702" # unique port for this DS volumes: - ${TANGO_LOFAR_CONTAINER_MOUNT} diff --git a/docker-compose/itango/lofar-requirements.txt b/docker-compose/itango/lofar-requirements.txt index 0e869add1a8113a1f63f84e9348321dad5a5c4f2..29942e272353180f3622f4ad6d36fb7c31307eb1 100644 --- a/docker-compose/itango/lofar-requirements.txt +++ b/docker-compose/itango/lofar-requirements.txt @@ -6,3 +6,4 @@ python-logstash-async gitpython PyMySQL[rsa] sqlalchemy +timeout-decorator diff --git a/sbin/run_integration_test.sh b/sbin/run_integration_test.sh index 9eb465a25d070bcca73ad3a2c45eb79a7ef6c48a..a919b892767ef6a7d4eec83ba400cae79294190b 100755 --- a/sbin/run_integration_test.sh +++ b/sbin/run_integration_test.sh @@ -6,14 +6,29 @@ if [ -z "$LOFAR20_DIR" ]; then exit 1 fi -# Start all required containers +# Start and stop sequence cd "$LOFAR20_DIR/docker-compose" || exit 1 -make start databaseds dsconfig device-sdp device-recv jupyter elk sdptr-sim recv-sim +make stop device-sdp device-pcc device-sst sdptr-sim recv-sim +make start databaseds dsconfig jupyter elk + +# Give dsconfig and databaseds time to start +sleep 15 # Update the dsconfig cd "$TANGO_LOFAR_LOCAL_DIR" || exit 1 sbin/update_ConfigDb.sh CDB/integration_ConfigDb.json +cd "$LOFAR20_DIR/docker-compose" || exit 1 +make start sdptr-sim recv-sim + +# Give the simulators time to start +sleep 5 + +make start device-sdp device-pcc device-sst + +# Give the devices time to start +sleep 5 + # Start the integration test cd "$LOFAR20_DIR/docker-compose" || exit 1 make start integration-test