diff --git a/devices/clients/attribute_wrapper.py b/devices/clients/attribute_wrapper.py index 4cb389824750cb9d01fc836e8d65caf3656d59a4..20c2ac01feaac5569494e5ce87b0ca5ef3f35121 100644 --- a/devices/clients/attribute_wrapper.py +++ b/devices/clients/attribute_wrapper.py @@ -146,13 +146,13 @@ class attribute_wrapper(attribute): return value - def set_comm_client(self, client): + async def set_comm_client(self, client): """ takes a communications client as input arguments This client should be of a class containing a "get_mapping" function and return a read and write function that the wrapper will use to get/set data. """ try: - self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self) + self.read_function, self.write_function = await client.setup_attribute(self.comms_annotation, self) except Exception as e: logger.error("Exception while setting {} attribute with annotation: '{}' {}".format(client.__class__.__name__, self.comms_annotation, e)) diff --git a/devices/clients/opcua_client.py b/devices/clients/opcua_client.py index 7d915cbd00aee72da2a13b7bbb7365306457cf4d..634fc62ad41e136d92e2a84ce8ee8d518b3b9cfe 100644 --- a/devices/clients/opcua_client.py +++ b/devices/clients/opcua_client.py @@ -1,54 +1,65 @@ from threading import Thread import socket import numpy -import opcua -from opcua import Client +import asyncua +import asyncio +from asyncua import Client from clients.comms_client import CommClient -__all__ = ["OPCUAConnection"] +import logging +logger = logging.getLogger() + +__all__ = ["OPCUAConnection", "event_loop"] numpy_to_OPCua_dict = { - numpy.bool_: opcua.ua.VariantType.Boolean, - numpy.int8: opcua.ua.VariantType.SByte, - numpy.uint8: opcua.ua.VariantType.Byte, - numpy.int16: opcua.ua.VariantType.Int16, - numpy.uint16: opcua.ua.VariantType.UInt16, - numpy.int32: opcua.ua.VariantType.Int32, - numpy.uint32: opcua.ua.VariantType.UInt32, - numpy.int64: opcua.ua.VariantType.Int64, - numpy.uint64: opcua.ua.VariantType.UInt64, - numpy.float32: opcua.ua.VariantType.Float, - numpy.double: opcua.ua.VariantType.Double, - numpy.float64: opcua.ua.VariantType.Double, - numpy.str: opcua.ua.VariantType.String + numpy.bool_: asyncua.ua.VariantType.Boolean, + numpy.int8: asyncua.ua.VariantType.SByte, + numpy.uint8: asyncua.ua.VariantType.Byte, + numpy.int16: asyncua.ua.VariantType.Int16, + numpy.uint16: asyncua.ua.VariantType.UInt16, + numpy.int32: asyncua.ua.VariantType.Int32, + numpy.uint32: asyncua.ua.VariantType.UInt32, + numpy.int64: asyncua.ua.VariantType.Int64, + numpy.uint64: asyncua.ua.VariantType.UInt64, + numpy.float32: asyncua.ua.VariantType.Float, + numpy.double: asyncua.ua.VariantType.Double, + numpy.float64: asyncua.ua.VariantType.Double, + numpy.str: asyncua.ua.VariantType.String } -# <class 'numpy.bool_'> +# Run a dedicated event loop for OPC-UA communications +# +# All co-routines need to be called through this event loop, +# for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop). + +def run_loop(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + loop.run_forever() + +event_loop = asyncio.new_event_loop() +event_loop_thread = Thread(target=run_loop, args=(event_loop,), name="OPC-UA asyncio event loop", daemon=True) +event_loop_thread.start() -class OPCUAConnection(CommClient): +class OPCUAConnection(object): """ Connects to OPC-UA in the foreground or background, and sends HELLO messages to keep a check on the connection. On connection failure, reconnects once. """ - def start(self): - super().start() - def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2): """ Create the OPC ua client and connect() to it and get the object node """ - super().__init__(fault_func, streams, try_interval) - self.client = Client(address, timeout) - - # Explicitly connect - if not self.connect(): - # hardware or infra is down -- needs fixing first - fault_func() - return + self.client = Client(address, int(timeout)) + self.streams = streams + self.fault_func = fault_func + self.namespace = namespace + async def start(self): + # connect + await self.connect() # determine namespace used if type(namespace) is str: @@ -59,64 +70,99 @@ class OPCUAConnection(CommClient): raise TypeError(f"namespace must be of type str or int, but is of type {type(namespace).__name__}") self.obj = self.client.get_objects_node() - self.check_nodes() def _servername(self): return self.client.server_url.geturl() - def connect(self): + async def connect(self): """ Try to connect to the client """ + if self.connected: + logger.debug(f"Already connected to server {self._servername()}") + return + + logger.debug(f"Connecting to server {self._servername()}") + try: - self.streams.debug_stream("Connecting to server %s", self._servername()) - self.client.connect() + await self.client.connect() self.connected = True - self.streams.debug_stream("Connected to %s. Initialising.", self._servername()) - return True except socket.error as e: - self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e) - raise Exception("Could not connect to server %s", self._servername()) from e + raise IOError(f"Could not connect to OPC-UA server {self._servername()}") from e - def check_nodes(self): - """ - function purely for debugging/development only. Simply lists all top level nodes and the nodes below that - """ - - for i in self.obj.get_children(): - print(i.get_browse_name()) - for j in i.get_children(): - try: - print(j.get_browse_name(), j.get_data_type_as_variant_type(), j.get_value()) - except: - print(j.get_browse_name()) - finally: - pass + logger.debug(f"Connected to OPC-UA server {self._servername()}") + # watch connection + self.watch_connection_task = asyncio.create_task(self.watch_connection()) - def disconnect(self): + async def disconnect(self): """ disconnect from the client """ - self.connected = False # always force a reconnect, regardless of a successful disconnect + if not self.connected: + logger.debug(f"Already disconnected from server {self._servername()}") + return + + self.connected = False + + # cancel & reap watcher + self.watch_connection_task.cancel() + try: + await self.watch_connection_task + except Exception as e: + logger.exception(f"Watcher thread for {self._servername()} raised exception") + # disconnect client explictly (will throw if the other side already disconnected) try: - self.client.disconnect() + await self.client.disconnect() except Exception as e: - self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e) + logger.exception(f"Could not disconnect from OPC-UA server {self._servername()}") from e + + async def watch_connection(self): + """ Notice when the connection goes down. """ + + try: + logger.info(f"Start watching OPC-UA connection to {self._servername()}") + + while self.connected: + # ping will throw in case of connection issues + try: + await self.ping() + except Exception as e: + logger.exception(f"OPC-UA connection to {self._servername()} lost") + + # connection error, go to fault + self.fault_func() + + # disconnect will cancel us + await self.disconnect() + + # always have a backup plan + return - def ping(self): + # don't spin, sleep for a while + await asyncio.sleep(1) + except asyncio.CancelledError as e: + pass + except Exception as e: + # log immediately, or the exception will only be printed once this task is awaited + logger.exception(f"Exception raised while watching OPC-UA connection to {self._servername()}") + + raise + finally: + logger.info(f"Stop watching OPC-UA connection to {self._servername()}") + + async def ping(self): """ ping the client to make sure the connection with the client is still functional. """ try: - #self.client.send_hello() # <-- this crashes when communicating with open62541 v1.2.2+ - pass + await self.client.send_hello() except Exception as e: - raise Exception("Lost connection to server %s: %s", self._servername(), e) + raise IOError("Lost connection to server %s: %s", self._servername(), e) - def _setup_annotation(self, annotation): + async def _setup_annotation(self, annotation): """ This class's Implementation of the get_mapping function. returns the read and write functions """ @@ -136,7 +182,7 @@ class OPCUAConnection(CommClient): path = [name if ':' in name else f'{self.name_space_index}:{name}' for name in path] try: - node = self.obj.get_child(path) + node = await self.obj.get_child(path) except Exception as e: self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e) raise Exception("Could not get node: %s on server %s", path, self._servername()) from e @@ -155,13 +201,13 @@ class OPCUAConnection(CommClient): return dim_x, dim_y, ua_type - def setup_attribute(self, annotation, attribute): + async def setup_attribute(self, annotation, attribute): """ MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions """ # process the annotation - node = self._setup_annotation(annotation) + node = await self._setup_annotation(annotation) # get all the necessary data to set up the read/write functions from the attribute_wrapper dim_x, dim_y, ua_type = self.setup_value_conversion(attribute) @@ -179,6 +225,11 @@ class OPCUAConnection(CommClient): return prot_attr.read_function, prot_attr.write_function + async def call_method(self, method_path, *args): + node = await self.obj.get_child(method_path[:-1]) + return await node.call_method(method_path[-1], *args) + + class ProtocolAttribute: """ This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code @@ -190,41 +241,37 @@ class ProtocolAttribute: self.dim_x = dim_x self.ua_type = ua_type + async def _read_value(self): + return await self.node.get_value() + def read_function(self): """ Read_R function """ - value = self.node.get_value() + future = asyncio.run_coroutine_threadsafe(self._read_value(), event_loop) + value = future.result() - if self.dim_y + self.dim_x == 1: - # scalar - return value - elif self.dim_y != 0: - # 2D array - value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y)) - else: - # 1D array - value = numpy.array(value) - - return value + try: + if self.dim_y + self.dim_x == 1: + # scalar + return value + elif self.dim_y != 0: + # 2D array + value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y)) + else: + # 1D array + value = numpy.array(value) + return value + except Exception as e: + # Log "value" that gave us this issue + raise ValueError(f"Failed to parse atribute value retrieved from OPC-UA: {value}") from e - def write_function(self, value): - """ - write_RW function - """ - - if self.dim_y != 0: - # flatten array, convert to python array - value = numpy.concatenate(value).tolist() - elif self.dim_x != 1: - # make sure it is a python array - value = value.tolist() if type(value) == numpy.ndarray else value - + async def _write_value(self, value): try: - self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type)) - except (TypeError, opcua.ua.uaerrors.BadTypeMismatch) as e: + await self.node.set_data_value(asyncua.ua.uatypes.Variant(Value=value, VariantType=self.ua_type)) + except (TypeError, asyncua.ua.uaerrors.BadTypeMismatch) as e: # A type conversion went wrong or there is a type mismatch. # # This is either the conversion us -> opcua in our client, or client -> server. @@ -248,10 +295,26 @@ class ProtocolAttribute: dim_x=self.dim_x, dim_y=self.dim_y) - actual_server_type = "{dtype} {dimensions}".format( - dtype=self.node.get_data_type_as_variant_type(), - dimensions=(self.node.get_array_dimensions() or "???")) + actual_server_type = "{dtype} x {dimensions}".format( + dtype=await self.node.read_data_type_as_variant_type(), + dimensions=(await self.node.read_array_dimensions()) or "(dimensions unknown)") - attribute_name = self.node.get_display_name().to_string() + attribute_name = (await self.node.read_display_name()).to_string() raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e + + + def write_function(self, value): + """ + write_RW function + """ + + if self.dim_y != 0: + # flatten array, convert to python array + value = numpy.concatenate(value).tolist() + elif self.dim_x != 1: + # make sure it is a python array + value = value.tolist() if type(value) == numpy.ndarray else value + + future = asyncio.run_coroutine_threadsafe(self._write_value(value), event_loop) + _ = future.result() diff --git a/devices/common/lofar_logging.py b/devices/common/lofar_logging.py index c605d8cf927f890083dafc3ec85a16c1dab70d9d..c59979636f718d233f293f1b87139c0115f9ab3d 100644 --- a/devices/common/lofar_logging.py +++ b/devices/common/lofar_logging.py @@ -30,7 +30,13 @@ class TangoLoggingHandler(logging.Handler): stream = self.level_to_device_stream[record.levelno] # send the log message to Tango - stream(record.tango_device, record.msg, *record.args) + try: + record_msg = record.msg % record.args + stream(record.tango_device, record.msg, *record.args) + except TypeError: + # Tango's logger barfs on mal-formed log lines, f.e. if msg % args is not possible + record_msg = f"{record.msg} {record.args}".replace("%","%%") + stream(record.tango_device, record_msg) self.flush() @@ -112,7 +118,7 @@ def configure_logger(logger: logging.Logger=None, log_extra=None): logger.setLevel(logging.DEBUG) # remove spam from the OPC-UA client connection - logging.getLogger("opcua").setLevel(logging.WARN) + logging.getLogger("asyncua").setLevel(logging.WARN) # Log to ELK stack try: diff --git a/devices/devices/recv.py b/devices/devices/recv.py index 6f1de6aedc9e6db463c2edcd7a1a8bdf3daf7c2e..c656c7862805a204a536fcbc051efb0067ed564a 100644 --- a/devices/devices/recv.py +++ b/devices/devices/recv.py @@ -23,11 +23,12 @@ from tango.server import run, command from tango.server import device_property, attribute from tango import AttrWriteType import numpy +import asyncio # Additional import from device_decorators import * -from clients.opcua_client import OPCUAConnection +from clients.opcua_client import OPCUAConnection, event_loop as opcua_event_loop from clients.attribute_wrapper import attribute_wrapper from devices.hardware_device import hardware_device from common.lofar_logging import device_logging_to_python, log_exceptions @@ -131,10 +132,11 @@ class RECV(hardware_device): """ user code here. is called when the state is set to OFF """ # Stop keep-alive try: - self.opcua_connection.stop() + self.OPCua_client.stop() except Exception as e: self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) + @log_exceptions() def configure_for_initialise(self): """ user code here. is called when the state is set to INIT """ @@ -146,22 +148,24 @@ class RECV(hardware_device): self.function_mapping["CLK_on"] = {} self.function_mapping["CLK_off"] = {} + future = asyncio.run_coroutine_threadsafe(self._initialise_opcua(), opcua_event_loop) + _ = future.result() + + async def _initialise_opcua(self): # set up the OPC ua client self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) + await self.OPCua_client.start() + # map an access helper class for i in self.attr_list(): try: - i.set_comm_client(self.OPCua_client) + await i.set_comm_client(self.OPCua_client) except Exception as e: # use the pass function instead of setting read/write fails i.set_pass_func() self.warn_stream("error while setting the RECV attribute {} read/write function. {}".format(i, e)) - self.OPCua_client.start() - - - # -------- # Commands # -------- diff --git a/docker-compose/lofar-device-base/lofar-requirements.txt b/docker-compose/lofar-device-base/lofar-requirements.txt index 2214412a4365f4b804d6b20b0576c390482b1481..31b22c71689b481357cef56bf4940c1575a3b01d 100644 --- a/docker-compose/lofar-device-base/lofar-requirements.txt +++ b/docker-compose/lofar-device-base/lofar-requirements.txt @@ -1,4 +1,4 @@ -opcua >= 0.98.9 +asyncua astropy python-logstash-async gitpython