diff --git a/README.md b/README.md index 7d0dcb9f5216fc2bd2d822d377afa3264104e76d..c8e3c6d763601bed286bcf89033de8b94270a66d 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,7 @@ Next change the version in the following places: # Release Notes +* 0.43.5 Fix reconnect storm to OPC/UA servers * 0.43.4 Drain nomad jobs at startup to prevent misinitialisation after ungraceful shutdown * 0.43.3 Fix lingering connections to OPC/UA servers. * 0.43.2 Fix jumppad integration test using custom nomad image that includes consul diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index ccee441ba5b0b411bcf89a2f72918e628a052b55..64a2ba658ffe42f7754272b80d03e72debfac3d0 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.43.4 +0.43.5 diff --git a/tangostationcontrol/tangostationcontrol/clients/comms_client.py b/tangostationcontrol/tangostationcontrol/clients/comms_client.py index f937e81769ec43aeb254d68260ae2b5b6e5da8a1..82ab64e1d732f66a75234c830568114933bc0a14 100644 --- a/tangostationcontrol/tangostationcontrol/clients/comms_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/comms_client.py @@ -5,6 +5,7 @@ import asyncio import logging import time from abc import ABC, abstractmethod +from typing import Optional from tangostationcontrol.common.threading import OmniThread from tangostationcontrol.asyncio import EventLoopThread @@ -64,18 +65,28 @@ class CommClient(AbstractCommClient, OmniThread): super().__init__(daemon=True, target=self._run) + def _connect(self): + pass + + def _disconnect(self): + pass + def connect(self): """ Function used to connect to the client. Throws an Exception if the connection cannot be established. """ + + self._connect() self.connected = True def disconnect(self): """ Function used to connect to the client. """ + + self._disconnect() self.connected = False def _run(self): @@ -184,7 +195,7 @@ class AsyncCommClient(object): """ @abstractmethod - async def disconnect(self): + async def disconnect(self, reason: Optional[Exception]): """ Function used to disconnect from the client. """ @@ -195,9 +206,6 @@ class AsyncCommClient(object): interval whenever the connection is considered lost. """ - # disconnect will cancel us - await self.disconnect() - while self.running: try: await self.connect() @@ -208,7 +216,7 @@ class AsyncCommClient(object): await asyncio.sleep(self.reconnect_retry_time) async def watch_connection(self): - """Notice when the connection goes down.""" + """Notice when the connection goes down, and auto-reconnect.""" try: logger.info(f"[AsyncCommClient {self.name()}] Start watching") @@ -217,12 +225,19 @@ class AsyncCommClient(object): # ping will throw in case of connection issues try: await self.ping() - except (OSError, asyncio.TimeoutError): + except (OSError, asyncio.TimeoutError) as e: logger.warning( - f"[AsyncCommClient {self.name()}] Ping failed: connection considered lost" + f"[AsyncCommClient {self.name()}] Ping failed: connection considered lost -- reconnecting" ) - # try reconnecting + # reset the connection + try: + await self.disconnect(e) + except Exception as e: + logger.info( + f"[AsyncCommClient {self.name()}] Disconnect failed -- leaking resource" + ) + await self.reconnect() # don't spin, sleep for a while @@ -263,9 +278,10 @@ class AsyncCommClient(object): self.running = False - # cancel & reap watcher - self.watch_connection_task.cancel() try: + # cancel & reap watcher + self.watch_connection_task.cancel() + await self.watch_connection_task except asyncio.CancelledError as e: pass @@ -276,7 +292,7 @@ class AsyncCommClient(object): # the task stopped either way, so no need to bother our caller with this - await self.disconnect() + await self.disconnect(None) def sync_stop(self): """Synchronous version of stop().""" diff --git a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py index 3bf67bc20d4314a2218f33867af40bae32b92a81..4e9d038e1f9491d3a35ff32863b71ac808663786 100644 --- a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py @@ -5,7 +5,7 @@ import asyncio import logging import socket import textwrap -from typing import Dict, List +from typing import Dict, List, Optional import asyncua from datetime import datetime @@ -93,20 +93,31 @@ class OPCUAConnection(AsyncCommClient): self.timeout = int(timeout) self.namespace = namespace - self.client = None + self._clear_client() + + self.io_lock = asyncio.Lock() self.device = device # prefix path to all nodes with this. this allows the user to switch trees more easily. self.node_path_prefix = [] - # caches the protocol attributes based on their paths - self._prot_attr_cache = {} - self.connection_status = connection_status self.connection_status.connected = False super().__init__(fault_func, event_loop) + def _clear_client(self): + """(Re)initialise members relating to the OPC/UA connection.""" + + self.client = None + self.obj = None + + # caches the protocol attributes based on their paths + self._prot_attr_cache = {} + + # cache of looked up child node lists for each comma-separated parent path + self._node_cache = {} + def _servername(self): return self.client.server_url.geturl() @@ -120,9 +131,9 @@ class OPCUAConnection(AsyncCommClient): logger.debug(f"Connecting to server {self._servername()}") try: - await self.client.connect() + async with self.io_lock: + await self.client.connect() self.connection_status.connected = True - except (socket.error, IOError, OSError) as e: logger.warning( f"Could not connect to OPC-UA server {self._servername()} ", @@ -148,78 +159,41 @@ class OPCUAConnection(AsyncCommClient): self.obj = self.client.get_objects_node() - # cache of looked up child node lists for each comma-separated parent path - self._node_cache = {} - # updates the protocol attributes after a reconnect await self.update_protocol_attributes() - async def disconnect(self): + async def disconnect(self, reason: Optional[Exception]): """ disconnect from the client """ - if not self.connection_status.connected: - return - logger.info(f"Disconnecting to OPC-UA server {self._servername()}") - await self.client.disconnect() self.connection_status.connected = False + if reason: + self.connection_status.last_disconnect_exception = reason + self.connection_status.last_disconnect_time = datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ) + self.connection_status.connection_lost() + + try: + async with self.io_lock: + await self.client.disconnect() + finally: + self._clear_client() + async def ping(self): """ ping the client to make sure the connection with the client is still functional. """ try: - # do a cheap call. NOTE: send_hello is not allowed after establishing a connection, - # so cannot be used here. see https://reference.opcfoundation.org/v104/Core/docs/Part6/7.1.3/ - _ = await self.client.get_namespace_array() + async with self.io_lock: + # do a cheap call. NOTE: send_hello is not allowed after establishing a connection, + # so cannot be used here. see https://reference.opcfoundation.org/v104/Core/docs/Part6/7.1.3/ + _ = await self.client.get_namespace_array() except Exception as e: - if self.connection_status.connected: - await self.handle_connection_exception(e) - raise IOError(f"Lost connection to server {self._servername()}: {e}") - - async def handle_connection_exception(self, _e: Exception): - """ - Called whenever an exception occurs while comunicating with the OPCua server - Determines whether the exception is connection related, and if so, - disconnects from the server which will cause the client to go in to reconnect - mode automatically. - """ - - # if the exception is not connection related, just raise it. - if isinstance( - _e, - ( - asyncua.ua.uaerrors.BadNoMatch, - asyncua.ua.uaerrors.BadTypeMismatch, - ValueError, - TypeError, - RuntimeError, - SyntaxError, - NameError, - LookupError, - AttributeError, - AssertionError, - ArithmeticError, - ImportError, - MemoryError, - ), - ): - raise _e - - # Do not handle the exception if we are not connected - if not self.connection_status.connected: - return - - self.connection_status.last_disconnect_exception = _e - self.connection_status.last_disconnect_time = datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ) - self.connection_status.connection_lost() - - # Disconnect to trigger `AsyncCommClient.watch_connection` to go in to reconnect mode - await self.disconnect() + raise IOError(f"Lost connection to server {self._servername()}: {e}") from e def get_full_node_path(self, path: List[str]): """ @@ -249,9 +223,12 @@ class OPCUAConnection(AsyncCommClient): return self._node_cache[cache_key] # cache it and all of its siblings to save us the round trips for them later on. - parent_path = path[:-1] - parent_node = await self.obj.get_child(parent_path) if parent_path else self.obj - child_nodes = await parent_node.get_children_descriptions() + async with self.io_lock: + parent_path = path[:-1] + parent_node = ( + await self.obj.get_child(parent_path) if parent_path else self.obj + ) + child_nodes = await parent_node.get_children_descriptions() for child_node in child_nodes: # add node to the cache @@ -267,7 +244,8 @@ class OPCUAConnection(AsyncCommClient): return self._node_cache[cache_key] # we couldnt find the requested child, ask server directly to get the appropriate error - return await self.obj.get_child(path) + async with self.io_lock: + return await self.obj.get_child(path) async def update_protocol_attributes(self): """ @@ -302,6 +280,23 @@ class OPCUAConnection(AsyncCommClient): annotation = self._fix_annotation(annotation) path = self.get_full_node_path(annotation["path"]) + prot_attr = await self._protocol_attribute( + path, + attribute.dim_x, + attribute.dim_y, + attribute.datatype, + log_writes=annotation.get("log_writes", True), + ) + + # cache the protocol attributes in case we need to update them after a reconnect + self._prot_attr_cache[path[0]] = prot_attr + + return prot_attr + + async def _protocol_attribute( + self, path, dim_x, dim_y, datatype, log_writes: bool = True + ): + try: node = await self.get_node(path) except Exception as e: @@ -313,20 +308,19 @@ class OPCUAConnection(AsyncCommClient): ) from e # get all the necessary data to set up the read/write functions from the AttributeWrapper - dim_x = attribute.dim_x - dim_y = attribute.dim_y ua_type = numpy_to_OPCua_dict[ - attribute.datatype + datatype ] # convert the numpy type to a corresponding UA type # configure and return the read/write functions prot_attr = ProtocolAttribute( node, + self.io_lock, dim_x, dim_y, ua_type, name=",".join(path), - log_writes=annotation.get("log_writes", True), + log_writes=log_writes, ) try: @@ -345,9 +339,6 @@ class OPCUAConnection(AsyncCommClient): "like memory errors, interrupts, system exit" pass - # cache the protocol attributes in case we need to update them after a reconnect - self._prot_attr_cache[path[0]] = prot_attr - return prot_attr async def setup_attribute(self, annotation, attribute): @@ -364,9 +355,7 @@ class OPCUAConnection(AsyncCommClient): f"Failed to read attribute {prot_attr.name}: {exception_to_str(e)}" ) - asyncio.run_coroutine_threadsafe( - self.handle_connection_exception(e), self.event_loop - ) + raise def write_function(value): try: @@ -378,9 +367,7 @@ class OPCUAConnection(AsyncCommClient): f"Failed to write attribute {prot_attr.name}: {exception_to_str(e)}" ) - asyncio.run_coroutine_threadsafe( - self.handle_connection_exception(e), self.event_loop - ) + raise # return the read/write functions return read_function, write_function @@ -396,9 +383,10 @@ class OPCUAConnection(AsyncCommClient): node = await self.get_node(method_path[:-1]) result = await node.call_method(method_path[-1], *args) except Exception as e: - exc = RuntimeError(f"Calling method {method_path} failed") - exc.__cause__ = e - await self.handle_connection_exception(exc) + logger.error(f"Failed to call method {method_path}: {exception_to_str(e)}") + + raise + return result def call_method(self, method_path, *args): @@ -413,9 +401,17 @@ class ProtocolAttribute: """ def __init__( - self, node, dim_x, dim_y, ua_type, name: str = "", log_writes: bool = True + self, + node, + io_lock, + dim_x, + dim_y, + ua_type, + name: str = "", + log_writes: bool = True, ): self.node = node + self.io_lock = io_lock self.dim_y = dim_y self.dim_x = dim_x self.ua_type = ua_type @@ -431,7 +427,8 @@ class ProtocolAttribute: Read_R function """ - value = await self.node.get_value() + async with self.io_lock: + value = await self.node.get_value() try: # Pytango strings are Latin-1, and will crash on receiving Unicode strings @@ -485,43 +482,46 @@ class ProtocolAttribute: f"OPC-UA write: {self.name} := {textwrap.shorten(str(value), 150)}" ) - try: - await self.node.set_data_value( - asyncua.ua.uatypes.Variant(Value=value, VariantType=self.ua_type) - ) - except (TypeError, asyncua.ua.uaerrors.BadTypeMismatch) as e: - # A type conversion went wrong or there is a type mismatch. - # - # This is either the conversion us -> opcua in our client, or client -> server. - # Report all types involved to allow assessment of the location of the error. - if type(value) == list: - our_type = "list({dtype}) x ({dimensions})".format( - dtype=(type(value[0]).__name__ if value else ""), - dimensions=len(value), + async with self.io_lock: + try: + await self.node.set_data_value( + asyncua.ua.uatypes.Variant(Value=value, VariantType=self.ua_type) ) - else: - our_type = "{dtype}".format(dtype=type(value)) - - is_scalar = (self.dim_x + self.dim_y) == 1 - - if is_scalar: - expected_server_type = "{dtype} (scalar)".format(dtype=self.ua_type) - else: - expected_server_type = "{dtype} x ({dim_x}, {dim_y})".format( - dtype=self.ua_type, dim_x=self.dim_x, dim_y=self.dim_y + except (TypeError, asyncua.ua.uaerrors.BadTypeMismatch) as e: + # A type conversion went wrong or there is a type mismatch. + # + # This is either the conversion us -> opcua in our client, or client -> server. + # Report all types involved to allow assessment of the location of the error. + if type(value) == list: + our_type = "list({dtype}) x ({dimensions})".format( + dtype=(type(value[0]).__name__ if value else ""), + dimensions=len(value), + ) + else: + our_type = "{dtype}".format(dtype=type(value)) + + is_scalar = (self.dim_x + self.dim_y) == 1 + + if is_scalar: + expected_server_type = "{dtype} (scalar)".format(dtype=self.ua_type) + else: + expected_server_type = "{dtype} x ({dim_x}, {dim_y})".format( + dtype=self.ua_type, dim_x=self.dim_x, dim_y=self.dim_y + ) + + actual_server_type = "{dtype} x {dimensions}".format( + dtype=await self.node.read_data_type_as_variant_type(), + dimensions=(await self.node.read_array_dimensions()) + or "(dimensions unknown)", ) - actual_server_type = "{dtype} x {dimensions}".format( - dtype=await self.node.read_data_type_as_variant_type(), - dimensions=(await self.node.read_array_dimensions()) - or "(dimensions unknown)", - ) - - attribute_name = (await self.node.read_display_name()).to_string() + attribute_name = (await self.node.read_display_name()).to_string() - raise TypeError( - f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}" - ) from e + raise TypeError( + f"Cannot write value to OPC-UA attribute '{attribute_name}': " + f"tried to convert data type {our_type} to expected server type " + f"{expected_server_type}, server reports type {actual_server_type}" + ) from e async def OPCUA_connection_tester(server, port): diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/client.py b/tangostationcontrol/tangostationcontrol/clients/statistics/client.py index c9af9e802dca2c6c66a3880ec61249bba664d2ad..40d950e9a3921c8923f81cd406b2fd963df4667e 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics/client.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics/client.py @@ -80,32 +80,39 @@ class StatisticsClient(AsyncCommClient): if not self.tcp.is_alive(): raise Exception("TCPReplicator thread died unexpectedly") - async def disconnect(self): + async def disconnect(self, reason: Optional[Exception]): # explicit disconnect, instead of waiting for the GC to kick in after "del" below if self.statistics: try: self.statistics.disconnect() + del self.statistics except Exception: logger.exception("Could not disconnect statistics processing class") - del self.statistics + self.statistics = None if self.udp: try: self.udp.disconnect() + del self.udp except Exception: # nothing we can do, but we should continue cleaning up logger.exception("Could not disconnect UDP receiver class") - del self.udp + self.udp = None if self.tcp: try: self.tcp.disconnect() + del self.tcp except Exception: logger.exception("Could not disconnect TCPReplicator class") - # logger.log_exception("Could not disconnect TCPReplicator class") - del self.tcp + self.tcp = None - del self.collector_queue + if self.collector_queue: + try: + del self.collector_queue + except Exception: + logger.exception("Could not clean up collector queue") + self.collector_queue = None async def setup_attribute(self, annotation, attribute): """ diff --git a/tangostationcontrol/tangostationcontrol/common/lofar_logging.py b/tangostationcontrol/tangostationcontrol/common/lofar_logging.py index 4645cd372c557fbbd1fa4695701424f6089095e0..dbbd263896d654cfbd1fb1f584cfc47ba338eabe 100644 --- a/tangostationcontrol/tangostationcontrol/common/lofar_logging.py +++ b/tangostationcontrol/tangostationcontrol/common/lofar_logging.py @@ -25,7 +25,7 @@ def exception_to_str(ex: Exception) -> str: ) else: return "{klass}: {args}".format( - klass=ex.__class__.__name__, args=": ".join(ex.args) + klass=ex.__class__.__name__, args=": ".join([str(arg) for arg in ex.args]) ) diff --git a/tangostationcontrol/test/clients/test_opcua_client.py b/tangostationcontrol/test/clients/test_opcua_client.py index e780b7b32670ed594eb90e2d48204073f6af0d58..9a9ab183789448e6f90ea4aa64490abaeb9a53c5 100644 --- a/tangostationcontrol/test/clients/test_opcua_client.py +++ b/tangostationcontrol/test/clients/test_opcua_client.py @@ -73,6 +73,12 @@ class TestOPCuaAgainstServer(unittest.IsolatedAsyncioTestCase): ] ) namespace_idx = await server.register_namespace("namespace") + server_interfaces = await server.nodes.objects.add_folder( + namespace_idx, "ServerInterfaces" + ) + obj = await server_interfaces.add_object(namespace_idx, "TestDevice") + + await obj.add_variable(namespace_idx, "float_R", 20.0) return server @@ -92,7 +98,6 @@ class TestOPCuaAgainstServer(unittest.IsolatedAsyncioTestCase): process = psutil.Process() connections_before = process.connections() - try: yield finally: @@ -115,7 +120,13 @@ class TestOPCuaAgainstServer(unittest.IsolatedAsyncioTestCase): client = self.make_client() await client.connect() - await client.disconnect() + + attr = await client._protocol_attribute( + ["2:ServerInterfaces", "2:TestDevice", "2:float_R"], 1, 0, numpy.float32 + ) + self.assertEqual(20.0, await attr.read_function()) + + await client.disconnect(None) @patch.object(OPCUAConnectionStatus, "connection_lost") async def test_handle_connecton_exception(self, m_connection_lost): @@ -125,7 +136,27 @@ class TestOPCuaAgainstServer(unittest.IsolatedAsyncioTestCase): client = self.make_client() await client.connect() - await client.handle_connection_exception(Exception()) + await client.disconnect(Exception()) + + async def test_reconnect(self): + """Try reconnecting.""" + + async with self.server, self.assert_no_connection_leak(): + client = self.make_client() + + await client.connect() + client.running = ( + True # simulate client.start() without starting the watcher thread + ) + await client.disconnect(None) + await client.reconnect() + + attr = await client._protocol_attribute( + ["2:ServerInterfaces", "2:TestDevice", "2:float_R"], 1, 0, numpy.float32 + ) + self.assertEqual(20.0, await attr.read_function()) + + await client.disconnect(None) class TestOPCua(unittest.IsolatedAsyncioTestCase): @@ -264,7 +295,9 @@ class TestOPCua(unittest.IsolatedAsyncioTestCase): dims = (j[1], j[0]) ua_type = opcua_client.numpy_to_OPCua_dict[i.numpy_type] - test = opcua_client.ProtocolAttribute(node, dims[0], dims[1], ua_type) + test = opcua_client.ProtocolAttribute( + node, asyncio.Lock(), dims[0], dims[1], ua_type + ) print(test.dim_y, test.dim_x, test.ua_type) """ @@ -325,11 +358,16 @@ class TestOPCua(unittest.IsolatedAsyncioTestCase): if len(j) == 1: test = opcua_client.ProtocolAttribute( - m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type] + m_node, + asyncio.Lock(), + j[0], + 0, + opcua_client.numpy_to_OPCua_dict[i.numpy_type], ) else: test = opcua_client.ProtocolAttribute( m_node, + asyncio.Lock(), j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type], @@ -360,7 +398,7 @@ class TestOPCua(unittest.IsolatedAsyncioTestCase): # create the ProtocolAttribute to test test = opcua_client.ProtocolAttribute( - m_node, 1, 0, opcua_client.numpy_to_OPCua_dict[str] + m_node, asyncio.Lock(), 1, 0, opcua_client.numpy_to_OPCua_dict[str] ) # check if unicode is replaced by ? @@ -465,11 +503,16 @@ class TestOPCua(unittest.IsolatedAsyncioTestCase): # create the protocolattribute if len(j) == 1: test = opcua_client.ProtocolAttribute( - m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type] + m_node, + asyncio.Lock(), + j[0], + 0, + opcua_client.numpy_to_OPCua_dict[i.numpy_type], ) else: test = opcua_client.ProtocolAttribute( m_node, + asyncio.Lock(), j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type],