Skip to content
Snippets Groups Projects
Commit 43fe3d7d authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-412: Ported RECV device to asyncua. Other devices still broken.

parent d0f2f8de
Branches
Tags
1 merge request!142L2SS-412: Use asyncio for opcua and other clients
...@@ -146,13 +146,13 @@ class attribute_wrapper(attribute): ...@@ -146,13 +146,13 @@ class attribute_wrapper(attribute):
return value return value
def set_comm_client(self, client): async def set_comm_client(self, client):
""" """
takes a communications client as input arguments This client should be of a class containing a "get_mapping" function takes a communications client as input arguments This client should be of a class containing a "get_mapping" function
and return a read and write function that the wrapper will use to get/set data. and return a read and write function that the wrapper will use to get/set data.
""" """
try: try:
self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self) self.read_function, self.write_function = await client.setup_attribute(self.comms_annotation, self)
except Exception as e: except Exception as e:
logger.error("Exception while setting {} attribute with annotation: '{}' {}".format(client.__class__.__name__, self.comms_annotation, e)) logger.error("Exception while setting {} attribute with annotation: '{}' {}".format(client.__class__.__name__, self.comms_annotation, e))
......
from threading import Thread from threading import Thread
import socket import socket
import numpy import numpy
import opcua import asyncua
from opcua import Client import asyncio
from asyncua import Client
from clients.comms_client import CommClient from clients.comms_client import CommClient
__all__ = ["OPCUAConnection"] import logging
logger = logging.getLogger()
__all__ = ["OPCUAConnection", "event_loop"]
numpy_to_OPCua_dict = { numpy_to_OPCua_dict = {
numpy.bool_: opcua.ua.VariantType.Boolean, numpy.bool_: asyncua.ua.VariantType.Boolean,
numpy.int8: opcua.ua.VariantType.SByte, numpy.int8: asyncua.ua.VariantType.SByte,
numpy.uint8: opcua.ua.VariantType.Byte, numpy.uint8: asyncua.ua.VariantType.Byte,
numpy.int16: opcua.ua.VariantType.Int16, numpy.int16: asyncua.ua.VariantType.Int16,
numpy.uint16: opcua.ua.VariantType.UInt16, numpy.uint16: asyncua.ua.VariantType.UInt16,
numpy.int32: opcua.ua.VariantType.Int32, numpy.int32: asyncua.ua.VariantType.Int32,
numpy.uint32: opcua.ua.VariantType.UInt32, numpy.uint32: asyncua.ua.VariantType.UInt32,
numpy.int64: opcua.ua.VariantType.Int64, numpy.int64: asyncua.ua.VariantType.Int64,
numpy.uint64: opcua.ua.VariantType.UInt64, numpy.uint64: asyncua.ua.VariantType.UInt64,
numpy.float32: opcua.ua.VariantType.Float, numpy.float32: asyncua.ua.VariantType.Float,
numpy.double: opcua.ua.VariantType.Double, numpy.double: asyncua.ua.VariantType.Double,
numpy.float64: opcua.ua.VariantType.Double, numpy.float64: asyncua.ua.VariantType.Double,
numpy.str: opcua.ua.VariantType.String numpy.str: asyncua.ua.VariantType.String
} }
# <class 'numpy.bool_'> # Run a dedicated event loop for OPC-UA communications
#
# All co-routines need to be called through this event loop,
# for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop).
def run_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
event_loop = asyncio.new_event_loop()
event_loop_thread = Thread(target=run_loop, args=(event_loop,), name="OPC-UA asyncio event loop", daemon=True)
event_loop_thread.start()
class OPCUAConnection(CommClient): class OPCUAConnection(object):
""" """
Connects to OPC-UA in the foreground or background, and sends HELLO Connects to OPC-UA in the foreground or background, and sends HELLO
messages to keep a check on the connection. On connection failure, reconnects once. messages to keep a check on the connection. On connection failure, reconnects once.
""" """
def start(self):
super().start()
def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2): def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2):
""" """
Create the OPC ua client and connect() to it and get the object node Create the OPC ua client and connect() to it and get the object node
""" """
super().__init__(fault_func, streams, try_interval)
self.client = Client(address, timeout) self.client = Client(address, int(timeout))
self.streams = streams
# Explicitly connect self.fault_func = fault_func
if not self.connect(): self.namespace = namespace
# hardware or infra is down -- needs fixing first
fault_func()
return
async def start(self):
# connect
await self.connect()
# determine namespace used # determine namespace used
if type(namespace) is str: if type(namespace) is str:
...@@ -59,64 +70,99 @@ class OPCUAConnection(CommClient): ...@@ -59,64 +70,99 @@ class OPCUAConnection(CommClient):
raise TypeError(f"namespace must be of type str or int, but is of type {type(namespace).__name__}") raise TypeError(f"namespace must be of type str or int, but is of type {type(namespace).__name__}")
self.obj = self.client.get_objects_node() self.obj = self.client.get_objects_node()
self.check_nodes()
def _servername(self): def _servername(self):
return self.client.server_url.geturl() return self.client.server_url.geturl()
def connect(self): async def connect(self):
""" """
Try to connect to the client Try to connect to the client
""" """
if self.connected:
logger.debug(f"Already connected to server {self._servername()}")
return
logger.debug(f"Connecting to server {self._servername()}")
try: try:
self.streams.debug_stream("Connecting to server %s", self._servername()) await self.client.connect()
self.client.connect()
self.connected = True self.connected = True
self.streams.debug_stream("Connected to %s. Initialising.", self._servername())
return True
except socket.error as e: except socket.error as e:
self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e) raise IOError(f"Could not connect to OPC-UA server {self._servername()}") from e
raise Exception("Could not connect to server %s", self._servername()) from e
def check_nodes(self): logger.debug(f"Connected to OPC-UA server {self._servername()}")
"""
function purely for debugging/development only. Simply lists all top level nodes and the nodes below that
"""
for i in self.obj.get_children():
print(i.get_browse_name())
for j in i.get_children():
try:
print(j.get_browse_name(), j.get_data_type_as_variant_type(), j.get_value())
except:
print(j.get_browse_name())
finally:
pass
# watch connection
self.watch_connection_task = asyncio.create_task(self.watch_connection())
def disconnect(self): async def disconnect(self):
""" """
disconnect from the client disconnect from the client
""" """
self.connected = False # always force a reconnect, regardless of a successful disconnect if not self.connected:
logger.debug(f"Already disconnected from server {self._servername()}")
return
self.connected = False
# cancel & reap watcher
self.watch_connection_task.cancel()
try:
await self.watch_connection_task
except Exception as e:
logger.exception(f"Watcher thread for {self._servername()} raised exception")
# disconnect client explictly (will throw if the other side already disconnected)
try: try:
self.client.disconnect() await self.client.disconnect()
except Exception as e: except Exception as e:
self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e) logger.exception(f"Could not disconnect from OPC-UA server {self._servername()}") from e
async def watch_connection(self):
""" Notice when the connection goes down. """
try:
logger.info(f"Start watching OPC-UA connection to {self._servername()}")
while self.connected:
# ping will throw in case of connection issues
try:
await self.ping()
except Exception as e:
logger.exception(f"OPC-UA connection to {self._servername()} lost")
# connection error, go to fault
self.fault_func()
# disconnect will cancel us
await self.disconnect()
# always have a backup plan
return
def ping(self): # don't spin, sleep for a while
await asyncio.sleep(1)
except asyncio.CancelledError as e:
pass
except Exception as e:
# log immediately, or the exception will only be printed once this task is awaited
logger.exception(f"Exception raised while watching OPC-UA connection to {self._servername()}")
raise
finally:
logger.info(f"Stop watching OPC-UA connection to {self._servername()}")
async def ping(self):
""" """
ping the client to make sure the connection with the client is still functional. ping the client to make sure the connection with the client is still functional.
""" """
try: try:
#self.client.send_hello() # <-- this crashes when communicating with open62541 v1.2.2+ await self.client.send_hello()
pass
except Exception as e: except Exception as e:
raise Exception("Lost connection to server %s: %s", self._servername(), e) raise IOError("Lost connection to server %s: %s", self._servername(), e)
def _setup_annotation(self, annotation): async def _setup_annotation(self, annotation):
""" """
This class's Implementation of the get_mapping function. returns the read and write functions This class's Implementation of the get_mapping function. returns the read and write functions
""" """
...@@ -136,7 +182,7 @@ class OPCUAConnection(CommClient): ...@@ -136,7 +182,7 @@ class OPCUAConnection(CommClient):
path = [name if ':' in name else f'{self.name_space_index}:{name}' for name in path] path = [name if ':' in name else f'{self.name_space_index}:{name}' for name in path]
try: try:
node = self.obj.get_child(path) node = await self.obj.get_child(path)
except Exception as e: except Exception as e:
self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e) self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e)
raise Exception("Could not get node: %s on server %s", path, self._servername()) from e raise Exception("Could not get node: %s on server %s", path, self._servername()) from e
...@@ -155,13 +201,13 @@ class OPCUAConnection(CommClient): ...@@ -155,13 +201,13 @@ class OPCUAConnection(CommClient):
return dim_x, dim_y, ua_type return dim_x, dim_y, ua_type
def setup_attribute(self, annotation, attribute): async def setup_attribute(self, annotation, attribute):
""" """
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
""" """
# process the annotation # process the annotation
node = self._setup_annotation(annotation) node = await self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper # get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, ua_type = self.setup_value_conversion(attribute) dim_x, dim_y, ua_type = self.setup_value_conversion(attribute)
...@@ -179,6 +225,11 @@ class OPCUAConnection(CommClient): ...@@ -179,6 +225,11 @@ class OPCUAConnection(CommClient):
return prot_attr.read_function, prot_attr.write_function return prot_attr.read_function, prot_attr.write_function
async def call_method(self, method_path, *args):
node = await self.obj.get_child(method_path[:-1])
return await node.call_method(method_path[-1], *args)
class ProtocolAttribute: class ProtocolAttribute:
""" """
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
...@@ -190,41 +241,37 @@ class ProtocolAttribute: ...@@ -190,41 +241,37 @@ class ProtocolAttribute:
self.dim_x = dim_x self.dim_x = dim_x
self.ua_type = ua_type self.ua_type = ua_type
async def _read_value(self):
return await self.node.get_value()
def read_function(self): def read_function(self):
""" """
Read_R function Read_R function
""" """
value = self.node.get_value() future = asyncio.run_coroutine_threadsafe(self._read_value(), event_loop)
value = future.result()
if self.dim_y + self.dim_x == 1: try:
# scalar if self.dim_y + self.dim_x == 1:
return value # scalar
elif self.dim_y != 0: return value
# 2D array elif self.dim_y != 0:
value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y)) # 2D array
else: value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y))
# 1D array else:
value = numpy.array(value) # 1D array
value = numpy.array(value)
return value
return value
except Exception as e:
# Log "value" that gave us this issue
raise ValueError(f"Failed to parse atribute value retrieved from OPC-UA: {value}") from e
def write_function(self, value): async def _write_value(self, value):
"""
write_RW function
"""
if self.dim_y != 0:
# flatten array, convert to python array
value = numpy.concatenate(value).tolist()
elif self.dim_x != 1:
# make sure it is a python array
value = value.tolist() if type(value) == numpy.ndarray else value
try: try:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type)) await self.node.set_data_value(asyncua.ua.uatypes.Variant(Value=value, VariantType=self.ua_type))
except (TypeError, opcua.ua.uaerrors.BadTypeMismatch) as e: except (TypeError, asyncua.ua.uaerrors.BadTypeMismatch) as e:
# A type conversion went wrong or there is a type mismatch. # A type conversion went wrong or there is a type mismatch.
# #
# This is either the conversion us -> opcua in our client, or client -> server. # This is either the conversion us -> opcua in our client, or client -> server.
...@@ -248,10 +295,26 @@ class ProtocolAttribute: ...@@ -248,10 +295,26 @@ class ProtocolAttribute:
dim_x=self.dim_x, dim_x=self.dim_x,
dim_y=self.dim_y) dim_y=self.dim_y)
actual_server_type = "{dtype} {dimensions}".format( actual_server_type = "{dtype} x {dimensions}".format(
dtype=self.node.get_data_type_as_variant_type(), dtype=await self.node.read_data_type_as_variant_type(),
dimensions=(self.node.get_array_dimensions() or "???")) dimensions=(await self.node.read_array_dimensions()) or "(dimensions unknown)")
attribute_name = self.node.get_display_name().to_string() attribute_name = (await self.node.read_display_name()).to_string()
raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e
def write_function(self, value):
"""
write_RW function
"""
if self.dim_y != 0:
# flatten array, convert to python array
value = numpy.concatenate(value).tolist()
elif self.dim_x != 1:
# make sure it is a python array
value = value.tolist() if type(value) == numpy.ndarray else value
future = asyncio.run_coroutine_threadsafe(self._write_value(value), event_loop)
_ = future.result()
...@@ -30,7 +30,13 @@ class TangoLoggingHandler(logging.Handler): ...@@ -30,7 +30,13 @@ class TangoLoggingHandler(logging.Handler):
stream = self.level_to_device_stream[record.levelno] stream = self.level_to_device_stream[record.levelno]
# send the log message to Tango # send the log message to Tango
stream(record.tango_device, record.msg, *record.args) try:
record_msg = record.msg % record.args
stream(record.tango_device, record.msg, *record.args)
except TypeError:
# Tango's logger barfs on mal-formed log lines, f.e. if msg % args is not possible
record_msg = f"{record.msg} {record.args}".replace("%","%%")
stream(record.tango_device, record_msg)
self.flush() self.flush()
...@@ -112,7 +118,7 @@ def configure_logger(logger: logging.Logger=None, log_extra=None): ...@@ -112,7 +118,7 @@ def configure_logger(logger: logging.Logger=None, log_extra=None):
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
# remove spam from the OPC-UA client connection # remove spam from the OPC-UA client connection
logging.getLogger("opcua").setLevel(logging.WARN) logging.getLogger("asyncua").setLevel(logging.WARN)
# Log to ELK stack # Log to ELK stack
try: try:
......
...@@ -23,11 +23,12 @@ from tango.server import run, command ...@@ -23,11 +23,12 @@ from tango.server import run, command
from tango.server import device_property, attribute from tango.server import device_property, attribute
from tango import AttrWriteType from tango import AttrWriteType
import numpy import numpy
import asyncio
# Additional import # Additional import
from device_decorators import * from device_decorators import *
from clients.opcua_client import OPCUAConnection from clients.opcua_client import OPCUAConnection, event_loop as opcua_event_loop
from clients.attribute_wrapper import attribute_wrapper from clients.attribute_wrapper import attribute_wrapper
from devices.hardware_device import hardware_device from devices.hardware_device import hardware_device
from common.lofar_logging import device_logging_to_python, log_exceptions from common.lofar_logging import device_logging_to_python, log_exceptions
...@@ -131,10 +132,11 @@ class RECV(hardware_device): ...@@ -131,10 +132,11 @@ class RECV(hardware_device):
""" user code here. is called when the state is set to OFF """ """ user code here. is called when the state is set to OFF """
# Stop keep-alive # Stop keep-alive
try: try:
self.opcua_connection.stop() self.OPCua_client.stop()
except Exception as e: except Exception as e:
self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions() @log_exceptions()
def configure_for_initialise(self): def configure_for_initialise(self):
""" user code here. is called when the state is set to INIT """ """ user code here. is called when the state is set to INIT """
...@@ -146,22 +148,24 @@ class RECV(hardware_device): ...@@ -146,22 +148,24 @@ class RECV(hardware_device):
self.function_mapping["CLK_on"] = {} self.function_mapping["CLK_on"] = {}
self.function_mapping["CLK_off"] = {} self.function_mapping["CLK_off"] = {}
future = asyncio.run_coroutine_threadsafe(self._initialise_opcua(), opcua_event_loop)
_ = future.result()
async def _initialise_opcua(self):
# set up the OPC ua client # set up the OPC ua client
self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
await self.OPCua_client.start()
# map an access helper class # map an access helper class
for i in self.attr_list(): for i in self.attr_list():
try: try:
i.set_comm_client(self.OPCua_client) await i.set_comm_client(self.OPCua_client)
except Exception as e: except Exception as e:
# use the pass function instead of setting read/write fails # use the pass function instead of setting read/write fails
i.set_pass_func() i.set_pass_func()
self.warn_stream("error while setting the RECV attribute {} read/write function. {}".format(i, e)) self.warn_stream("error while setting the RECV attribute {} read/write function. {}".format(i, e))
self.OPCua_client.start()
# -------- # --------
# Commands # Commands
# -------- # --------
......
opcua >= 0.98.9 asyncua
astropy astropy
python-logstash-async python-logstash-async
gitpython gitpython
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment