Skip to content
Snippets Groups Projects
Commit 75ffb1c2 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-412-replace-opcua-by-asyncua' into 'master'

L2SS-412: Use asyncio for opcua and other clients

Closes L2SS-412

See merge request !142
parents e8d7b23d 61c39ec5
No related branches found
No related tags found
1 merge request!142L2SS-412: Use asyncio for opcua and other clients
Showing
with 411 additions and 269 deletions
......@@ -43,11 +43,9 @@ unit_test:
before_script:
- sudo apt-get update
- sudo apt-get install -y git
- pip3 install -r devices/test-requirements.txt
- pip3 install -r docker-compose/itango/lofar-requirements.txt
script:
- cd devices
- tox -e py37
- tox --recreate -e py37
integration_test:
stage: integration-tests
allow_failure: true
......
......@@ -5,5 +5,5 @@ numpy
opcua-client
pyqtgraph
PyQt5
opcua >= 0.98.13
asyncua
dataclasses
......@@ -154,8 +154,15 @@ class attribute_wrapper(attribute):
try:
self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self)
except Exception as e:
raise Exception("Exception while setting %s attribute with annotation: '%s'", client.__class__.__name__, self.comms_annotation) from e
logger.error("Exception while setting {} attribute with annotation: '{}' {}".format(client.__class__.__name__, self.comms_annotation, e))
async def async_set_comm_client(self, client):
"""
Asynchronous version of set_comm_client.
"""
try:
self.read_function, self.write_function = await client.setup_attribute(self.comms_annotation, self)
except Exception as e:
raise Exception("Exception while setting %s attribute with annotation: '%s'", client.__class__.__name__, self.comms_annotation) from e
def set_pass_func(self):
......
from threading import Thread
import time
import asyncio
from abc import ABC, abstractmethod
class CommClient(Thread):
import logging
logger = logging.getLogger()
class AbstractCommClient(ABC):
@abstractmethod
def start(self):
""" Start communication with the client. """
@abstractmethod
def stop(self):
""" Stop communication with the client. """
def ping(self):
""" Check whether the connection is still alive.
Clients that override this method must raise an Exception if the
connection died. """
pass
@abstractmethod
def setup_attribute(self, annotation, attribute):
"""
This function is responsible for providing the attribute_wrapper with a read/write function
How this is done is implementation specific.
The setup-attribute has access to the comms_annotation provided to the attribute wrapper to pass along to the comms client
as well as a reference to the attribute itself.
It should do this by first calling: _setup_annotation and setup_value_conversion to get all data necceacry to configure the read/write functions.
It should then return the read and write functions to the attribute.
MANDATORY:
annotation_outputs = _setup_annotation(annotation)
attribute_outputs = _setup_annotation(attribute)
(note: outputs are up to the user)
REQUIRED: provide read and write functions to return, there are no restrictions on how these should be provided,
except that the read function takes a single input value and the write function returns a single value
MANDATORY:
return read_function, write_function
Examples:
- File system: get_mapping returns functions that read/write a fixed
number of bytes at a fixed location in a file. (SEEK)
- OPC-UA: traverse the OPC-UA tree until the node is found.
Then return the read/write functions for that node which automatically
convert values between Python and OPC-UA.
"""
def _setup_annotation(self, annotation):
"""
This function is responsible for handling the annotation data provided by the attribute to configure the read/write function the client must provide.
This function should be called by setup_attribute
"""
pass
class CommClient(AbstractCommClient, Thread):
"""
The ProtocolHandler class is the generic interface class between the tango attribute_wrapper and the outside world
Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping()
primitives.
"""
def __init__(self, fault_func, streams, try_interval=2):
......@@ -69,7 +128,11 @@ class CommClient(Thread):
time.sleep(self.try_interval)
def ping(self):
return
""" Check whether the connection is still alive.
Clients that override this method must raise an Exception if the
connection died. """
pass
def stop(self):
"""
......@@ -85,47 +148,145 @@ class CommClient(Thread):
self.disconnect()
def setup_attribute(self, annotation, attribute):
"""
This function is responsible for providing the attribute_wrapper with a read/write function
How this is done is implementation specific.
The setup-attribute has access to the comms_annotation provided to the attribute wrapper to pass along to the comms client
as well as a reference to the attribute itself.
It should do this by first calling: _setup_annotation and setup_value_conversion to get all data necceacry to configure the read/write functions.
It should then return the read and write functions to the attribute.
class AsyncCommClient(object):
"""
Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping()
primitives.
MANDATORY:
annotation_outputs = _setup_annotation(annotation)
attribute_outputs = _setup_annotation(attribute)
(note: outputs are up to the user)
asyncio version of the CommClient. Also does not reconnect if the connection is lost.
"""
REQUIRED: provide read and write functions to return, there are no restrictions on how these should be provided,
except that the read function takes a single input value and the write function returns a single value
def __init__(self, fault_func=lambda: None, event_loop=None):
"""
Create an Asynchronous communication client.
MANDATORY:
return read_function, write_function
fault_func: Function to call to put the device to FAULT if an error is detected.
event_loop: Aysncio event loop to use. If None, a new event loop is created and
run in a separate thread. Only share event loops if any of the functions
executed doesn't stall, as asyncio used a cooperative multitasking model.
Examples:
- File system: get_mapping returns functions that read/write a fixed
number of bytes at a fixed location in a file. (SEEK)
- OPC-UA: traverse the OPC-UA tree until the node is found.
Then return the read/write functions for that node which automatically
convert values between Python and OPC-UA.
If the executed functions can stall (for a bit), use a dedicated loop to avoid
interfering with other users of the event loop.
All coroutines need to be executed in this loop, which wil also be stored
as the `event_loop` member of this object.
"""
raise NotImplementedError("the setup_attribute must be implemented and provide return a valid read/write function for the attribute")
self.fault_func = fault_func
self.running = False
def _setup_annotation(self, annotation):
if event_loop is None:
# Run a dedicated event loop for communications
#
# All co-routines need to be called through this event loop,
# for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop).
def run_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
self.event_loop = asyncio.new_event_loop()
self.event_loop_thread = Thread(target=run_loop, args=(self.event_loop,), name=f"AsyncCommClient {self.name()} event loop", daemon=True)
self.event_loop_thread.start()
else:
self.event_loop = event_loop
self.event_loop_thread = None
def __del__(self):
if self.event_loop_thread is not None:
# signal our event loop thread to stop
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
# reap our event loop thread once it is done processing tasks
self.event_loop_thread.join()
def name(self):
""" The name of this CommClient, for use in logs. """
return self.__class__.__name__
@abstractmethod
async def connect(self):
"""
This function is responsible for handling the annotation data provided by the attribute to configure the read/write function the client must provide.
This function should be called by setup_attribute
Function used to connect to the client, and any
post init.
"""
raise NotImplementedError("the _setup_annotation must be implemented, content and outputs are up to the user")
def setup_value_conversion(self, attribute):
@abstractmethod
async def disconnect(self):
"""
this function is responsible for setting up the value conversion between the client and the attribute.
This function should be called by setup_attribute
Function used to disconnect from the client.
"""
raise NotImplementedError("the setup_value_conversion must be implemented, content and outputs are up to the user")
async def watch_connection(self):
""" Notice when the connection goes down. """
try:
logger.info(f"[AsyncCommClient {self.name()}] Start watching")
while self.running:
# ping will throw in case of connection issues
try:
await self.ping()
except Exception as e:
logger.exception(f"[AsyncCommClient {self.name()}] Ping failed: connection considered lost")
# connection error, go to fault
self.fault_func()
# disconnect will cancel us
await self.disconnect()
# always have a backup plan
return
# don't spin, sleep for a while
await asyncio.sleep(2)
except asyncio.CancelledError as e:
pass
except Exception as e:
# log immediately, or the exception will only be printed once this task is awaited
logger.exception(f"[AsyncCommClient {self.name()}] Exception raised while watching")
raise
finally:
logger.info(f"[AsyncCommClient {self.name()}] Stop watching")
async def ping(self):
return
async def start(self):
if self.running:
# already running
return
await self.connect()
self.running = True
# watch connection
self.watch_connection_task = asyncio.create_task(self.watch_connection())
async def stop(self):
if not self.running:
# already stopped
return
self.running = False
# cancel & reap watcher
self.watch_connection_task.cancel()
try:
await self.watch_connection_task
except asyncio.CancelledError as e:
pass
except Exception as e:
logger.exception(f"[AsyncCommClient {self.name()}] Watcher thread raised exception")
# the task stopped eithr way, so no need to bother our caller with this
await self.disconnect()
def sync_stop(self):
""" Synchronous version of stop(). """
future = asyncio.run_coroutine_threadsafe(self.stop(), self.event_loop)
return future.result()
import logging
import docker
from .comms_client import CommClient
from .comms_client import AsyncCommClient
logger = logging.getLogger()
class DockerClient(CommClient):
class DockerClient(AsyncCommClient):
"""
Controls & queries running docker containers.
"""
def start(self):
super().start()
def __init__(self, base_url, fault_func, streams):
super().__init__(fault_func, streams)
def __init__(self, base_url, fault_func, event_loop=None):
super().__init__(fault_func, event_loop)
self.base_url = base_url
def connect(self):
"""
Function used to connect to the client.
"""
if not self.connected:
self.client = docker.DockerClient(self.base_url)
return super().connect()
async def connect(self):
self.client = docker.DockerClient(self.base_url)
def ping(self):
return True
async def ping(self):
# Raises if the server is unresponsive
self.client.ping()
def disconnect(self):
async def disconnect(self):
self.client = None
return super().disconnect()
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
return
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
async def setup_attribute(self, annotation, attribute):
container_name = annotation["container"]
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
def read_function():
try:
container = self.client.containers.get(container_name)
except docker.errors.NotFound:
return False
# expected values: running, restarting, paused, exited, created
return container.status == 'running'
def write_function(value):
......
from threading import Thread
import socket
import numpy
import opcua
from opcua import Client
import asyncua
import asyncio
from asyncua import Client
from clients.comms_client import CommClient
from clients.comms_client import AsyncCommClient
__all__ = ["OPCUAConnection"]
import logging
logger = logging.getLogger()
__all__ = ["OPCUAConnection", "event_loop"]
numpy_to_OPCua_dict = {
numpy.bool_: opcua.ua.VariantType.Boolean,
numpy.int8: opcua.ua.VariantType.SByte,
numpy.uint8: opcua.ua.VariantType.Byte,
numpy.int16: opcua.ua.VariantType.Int16,
numpy.uint16: opcua.ua.VariantType.UInt16,
numpy.int32: opcua.ua.VariantType.Int32,
numpy.uint32: opcua.ua.VariantType.UInt32,
numpy.int64: opcua.ua.VariantType.Int64,
numpy.uint64: opcua.ua.VariantType.UInt64,
numpy.float32: opcua.ua.VariantType.Float,
numpy.double: opcua.ua.VariantType.Double,
numpy.float64: opcua.ua.VariantType.Double,
numpy.str: opcua.ua.VariantType.String
numpy.bool_: asyncua.ua.VariantType.Boolean,
numpy.int8: asyncua.ua.VariantType.SByte,
numpy.uint8: asyncua.ua.VariantType.Byte,
numpy.int16: asyncua.ua.VariantType.Int16,
numpy.uint16: asyncua.ua.VariantType.UInt16,
numpy.int32: asyncua.ua.VariantType.Int32,
numpy.uint32: asyncua.ua.VariantType.UInt32,
numpy.int64: asyncua.ua.VariantType.Int64,
numpy.uint64: asyncua.ua.VariantType.UInt64,
numpy.float32: asyncua.ua.VariantType.Float,
numpy.double: asyncua.ua.VariantType.Double,
numpy.float64: asyncua.ua.VariantType.Double,
numpy.str: asyncua.ua.VariantType.String
}
# <class 'numpy.bool_'>
class OPCUAConnection(CommClient):
class OPCUAConnection(AsyncCommClient):
"""
Connects to OPC-UA in the foreground or background, and sends HELLO
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2):
def __init__(self, address, namespace, timeout, fault_func, event_loop=None):
"""
Create the OPC ua client and connect() to it and get the object node
"""
super().__init__(fault_func, streams, try_interval)
self.client = Client(address, timeout)
self.client = Client(address, int(timeout))
self.namespace = namespace
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
# determine namespace used
if type(namespace) is str:
self.name_space_index = self.client.get_namespace_index(namespace)
elif type(namespace) is int:
self.name_space_index = namespace
else:
raise TypeError(f"namespace must be of type str or int, but is of type {type(namespace).__name__}")
self.obj = self.client.get_objects_node()
self.check_nodes()
super().__init__(fault_func, event_loop)
def _servername(self):
return self.client.server_url.geturl()
def connect(self):
async def connect(self):
"""
Try to connect to the client
"""
logger.debug(f"Connecting to server {self._servername()}")
try:
self.streams.debug_stream("Connecting to server %s", self._servername())
self.client.connect()
await self.client.connect()
self.connected = True
self.streams.debug_stream("Connected to %s. Initialising.", self._servername())
return True
except socket.error as e:
self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e)
raise Exception("Could not connect to server %s", self._servername()) from e
except (socket.error, IOError, OSError) as e:
raise IOError(f"Could not connect to OPC-UA server {self._servername()}") from e
def check_nodes(self):
"""
function purely for debugging/development only. Simply lists all top level nodes and the nodes below that
"""
logger.debug(f"Connected to OPC-UA server {self._servername()}")
for i in self.obj.get_children():
print(i.get_browse_name())
for j in i.get_children():
try:
print(j.get_browse_name(), j.get_data_type_as_variant_type(), j.get_value())
except:
print(j.get_browse_name())
finally:
pass
# determine namespace used
if type(self.namespace) is str:
self.name_space_index = await self.client.get_namespace_index(self.namespace)
elif type(self.namespace) is int:
self.name_space_index = self.namespace
else:
raise TypeError(f"namespace must be of type str or int, but is of type {type(self.namespace).__name__}")
self.obj = self.client.get_objects_node()
def disconnect(self):
async def disconnect(self):
"""
disconnect from the client
"""
self.connected = False # always force a reconnect, regardless of a successful disconnect
try:
self.client.disconnect()
except Exception as e:
self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e)
await self.client.disconnect()
def ping(self):
async def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
try:
#self.client.send_hello() # <-- this crashes when communicating with open62541 v1.2.2+
pass
await self.client.send_hello()
except Exception as e:
raise Exception("Lost connection to server %s: %s", self._servername(), e)
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
raise IOError("Lost connection to server %s: %s", self._servername(), e)
async def _setup_annotation(self, annotation):
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('path') is None:
......@@ -136,36 +104,22 @@ class OPCUAConnection(CommClient):
path = [name if ':' in name else f'{self.name_space_index}:{name}' for name in path]
try:
node = self.obj.get_child(path)
node = await self.obj.get_child(path)
except Exception as e:
self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e)
logger.exception("Could not get node: %s on server %s", path, self._servername())
raise Exception("Could not get node: %s on server %s", path, self._servername()) from e
return node
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
async def setup_attribute(self, annotation, attribute):
# process the annotation
node = await self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x = attribute.dim_x
dim_y = attribute.dim_y
ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type
return dim_x, dim_y, ua_type
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
node = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, ua_type = self.setup_value_conversion(attribute)
# configure and return the read/write functions
prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)
......@@ -175,8 +129,21 @@ class OPCUAConnection(CommClient):
self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y))
except:
pass
# Tango will call these from a separate polling thread.
def read_function():
return asyncio.run_coroutine_threadsafe(prot_attr.read_function(), self.event_loop).result()
def write_function(value):
asyncio.run_coroutine_threadsafe(prot_attr.write_function(value), self.event_loop).result()
# return the read/write functions
return prot_attr.read_function, prot_attr.write_function
return read_function, write_function
async def call_method(self, method_path, *args):
node = await self.obj.get_child(method_path[:-1])
return await node.call_method(method_path[-1], *args)
def call_method(self, method_path, *args):
......@@ -194,27 +161,31 @@ class ProtocolAttribute:
self.dim_x = dim_x
self.ua_type = ua_type
def read_function(self):
async def read_function(self):
"""
Read_R function
"""
value = self.node.get_value()
if self.dim_y + self.dim_x == 1:
# scalar
return value
elif self.dim_y != 0:
# 2D array
value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y))
else:
# 1D array
value = numpy.array(value)
value = await self.node.get_value()
return value
try:
if self.dim_y + self.dim_x == 1:
# scalar
return value
elif self.dim_y != 0:
# 2D array
value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y))
else:
# 1D array
value = numpy.array(value)
return value
except Exception as e:
# Log "value" that gave us this issue
raise ValueError(f"Failed to parse atribute value retrieved from OPC-UA: {value}") from e
def write_function(self, value):
async def write_function(self, value):
"""
write_RW function
"""
......@@ -227,8 +198,8 @@ class ProtocolAttribute:
value = value.tolist() if type(value) == numpy.ndarray else value
try:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))
except (TypeError, opcua.ua.uaerrors.BadTypeMismatch) as e:
await self.node.set_data_value(asyncua.ua.uatypes.Variant(Value=value, VariantType=self.ua_type))
except (TypeError, asyncua.ua.uaerrors.BadTypeMismatch) as e:
# A type conversion went wrong or there is a type mismatch.
#
# This is either the conversion us -> opcua in our client, or client -> server.
......@@ -252,10 +223,10 @@ class ProtocolAttribute:
dim_x=self.dim_x,
dim_y=self.dim_y)
actual_server_type = "{dtype} {dimensions}".format(
dtype=self.node.get_data_type_as_variant_type(),
dimensions=(self.node.get_array_dimensions() or "???"))
actual_server_type = "{dtype} x {dimensions}".format(
dtype=await self.node.read_data_type_as_variant_type(),
dimensions=(await self.node.read_array_dimensions()) or "(dimensions unknown)")
attribute_name = self.node.get_display_name().to_string()
attribute_name = (await self.node.read_display_name()).to_string()
raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e
......@@ -2,7 +2,7 @@ from queue import Queue
import logging
import numpy
from .comms_client import CommClient
from .comms_client import AsyncCommClient
from .tcp_replicator import TCPReplicator
from .udp_receiver import UDPReceiver
......@@ -11,16 +11,13 @@ from devices.sdp.statistics_collector import StatisticsConsumer
logger = logging.getLogger()
class StatisticsClient(CommClient):
class StatisticsClient(AsyncCommClient):
"""
Collects statistics packets over UDP, forwards them to a StatisticsCollector,
and provides a CommClient interface to expose points to a Device Server.
"""
def start(self):
super().start()
def __init__(self, collector, udp_options, tcp_options, fault_func, streams, try_interval=2, queuesize=1024):
def __init__(self, collector, udp_options, tcp_options, fault_func, event_loop=None, queuesize=1024):
"""
Create the statistics client and connect() to it and get the object node.
......@@ -34,13 +31,7 @@ class StatisticsClient(CommClient):
self.queuesize = queuesize
self.collector = collector
super().__init__(fault_func, streams, try_interval)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
super().__init__(fault_func, event_loop)
@staticmethod
def _queue_fill_percentage(queue: Queue):
......@@ -50,22 +41,19 @@ class StatisticsClient(CommClient):
# some platforms don't have qsize(), nothing we can do here
return 0
def connect(self):
async def connect(self):
"""
Function used to connect to the client.
"""
if not self.connected:
self.collector_queue = Queue(maxsize=self.queuesize)
self.collector_queue = Queue(maxsize=self.queuesize)
self.tcp = TCPReplicator(self.tcp_options, self.queuesize)
self.statistics = StatisticsConsumer(self.collector_queue, self.collector)
self.tcp = TCPReplicator(self.tcp_options, self.queuesize)
self.statistics = StatisticsConsumer(self.collector_queue, self.collector)
self.udp = UDPReceiver([self.collector_queue, self.tcp],
self.udp_options)
self.udp = UDPReceiver([self.collector_queue, self.tcp],
self.udp_options)
return super().connect()
def ping(self):
async def ping(self):
if not self.statistics.is_alive():
raise Exception("Statistics processing thread died unexpectedly")
......@@ -75,7 +63,7 @@ class StatisticsClient(CommClient):
if not self.tcp.is_alive():
raise Exception("TCPReplicator thread died unexpectedly")
def disconnect(self):
async def disconnect(self):
# explicit disconnect, instead of waiting for the GC to kick in after "del" below
try:
self.statistics.disconnect()
......@@ -99,25 +87,13 @@ class StatisticsClient(CommClient):
del self.statistics
del self.collector_queue
return super().disconnect()
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
return
def setup_attribute(self, annotation, attribute):
async def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
parameter = annotation["parameter"]
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
# redirect to right object. this works as long as the parameter names are unique among them.
if annotation["type"] == "statistics":
def read_function():
......
......@@ -30,7 +30,13 @@ class TangoLoggingHandler(logging.Handler):
stream = self.level_to_device_stream[record.levelno]
# send the log message to Tango
stream(record.tango_device, record.msg, *record.args)
try:
record_msg = record.msg % record.args
stream(record.tango_device, record.msg, *record.args)
except TypeError:
# Tango's logger barfs on mal-formed log lines, f.e. if msg % args is not possible
record_msg = f"{record.msg} {record.args}".replace("%","%%")
stream(record.tango_device, record_msg)
self.flush()
......@@ -112,7 +118,7 @@ def configure_logger(logger: logging.Logger=None, log_extra=None, debug=False):
logger.setLevel(logging.DEBUG)
# remove spam from the OPC-UA client connection
logging.getLogger("opcua").setLevel(logging.WARN)
logging.getLogger("asyncua").setLevel(logging.WARN)
# don't spam errors for git, as we use it in our log handler, which would result in an infinite loop
logging.getLogger("git").setLevel(logging.ERROR)
......
......@@ -23,6 +23,7 @@ from tango.server import run, command
from tango.server import device_property, attribute
from tango import AttrWriteType
import numpy
import asyncio
# Additional import
from device_decorators import *
......@@ -96,7 +97,7 @@ class Docker(hardware_device):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.docker_client.stop()
self.docker_client.sync_stop()
except Exception as e:
self.warn_stream("Exception while stopping docker client in configure_for_off function: {}. Exception ignored".format(e))
......@@ -105,13 +106,18 @@ class Docker(hardware_device):
""" user code here. is called when the state is set to INIT """
# set up the Docker client
self.docker_client = DockerClient(self.Docker_Base_URL, self.Fault, self)
self.docker_client = DockerClient(self.Docker_Base_URL, self.Fault)
# schedule the docker initialisation, and wait for it to finish
future = asyncio.run_coroutine_threadsafe(self._connect_docker(), self.docker_client.event_loop)
_ = future.result()
async def _connect_docker(self):
# tie attributes to client
for i in self.attr_list():
i.set_comm_client(self.docker_client)
await i.async_set_comm_client(self.docker_client)
self.docker_client.start()
await self.docker_client.start()
# --------
# Commands
......
......@@ -26,15 +26,11 @@ from devices.device_decorators import only_in_states, fault_on_error
import time
import math
import logging
__all__ = ["hardware_device"]
import logging
logger = logging.getLogger()
#@log_exceptions()
class hardware_device(Device, metaclass=AbstractDeviceMetas):
"""
......@@ -60,6 +56,10 @@ class hardware_device(Device, metaclass=AbstractDeviceMetas):
The user triggers their transitions by the commands reflecting the target state (Initialise(), On(), Fault()).
"""
# ----------
# Attributes
# ----------
version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version())
# list of property names too be set first by set_defaults
......
......@@ -22,6 +22,7 @@ from tango import DebugIt
from tango.server import device_property, attribute
from tango import AttrWriteType
import numpy
import asyncio
# Additional import
from devices.device_decorators import *
......@@ -86,14 +87,22 @@ class opcua_device(hardware_device):
""" user code here. is called when the state is set to INIT """
# set up the OPC ua client
self.opcua_connection = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_namespace, self.OPC_Time_Out, self.Fault, self)
self.opcua_connection = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_namespace, self.OPC_Time_Out, self.Fault)
self.opcua_missing_attributes = []
# schedule the opc-ua initialisation, and wait for it to finish
future = asyncio.run_coroutine_threadsafe(self._connect_opcua(), self.opcua_connection.event_loop)
_ = future.result()
async def _connect_opcua(self):
# connect
await self.opcua_connection.start()
# map an access helper class
for i in self.attr_list():
try:
if not i.comms_id or i.comms_id == OPCUAConnection:
i.set_comm_client(self.opcua_connection)
await i.async_set_comm_client(self.opcua_connection)
except Exception as e:
# use the pass function instead of setting read/write fails
i.set_pass_func()
......@@ -101,14 +110,11 @@ class opcua_device(hardware_device):
self.warn_stream("error while setting the attribute {} read/write function. {}".format(i, e))
self.opcua_connection.start()
@log_exceptions()
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
try:
# disconnect
self.opcua_connection.stop()
self.opcua_connection.sync_stop()
except Exception as e:
self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e))
......@@ -25,8 +25,8 @@ from tango import AttrWriteType
# Additional import
from clients.attribute_wrapper import attribute_wrapper
from clients.opcua_client import OPCUAConnection
from clients.statistics_client import StatisticsClient
from clients.opcua_client import OPCUAConnection
from devices.sdp.statistics import Statistics
from devices.sdp.statistics_collector import SSTCollector
......
......@@ -24,9 +24,9 @@ from abc import ABCMeta, abstractmethod
from tango.server import device_property, attribute
from tango import AttrWriteType
# Additional import
import asyncio
from clients.statistics_client import StatisticsClient
from clients.opcua_client import OPCUAConnection
from clients.attribute_wrapper import attribute_wrapper
from devices.opcua_device import opcua_device
......@@ -100,9 +100,8 @@ class Statistics(opcua_device, metaclass=ABCMeta):
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.statistics_client.stop()
self.statistics_client.sync_stop()
except Exception as e:
logger.exception("Exception while stopping statistics_client in configure_for_off. Exception ignored")
......@@ -128,13 +127,24 @@ class Statistics(opcua_device, metaclass=ABCMeta):
}
self.statistics_collector = self.STATISTICS_COLLECTOR_CLASS()
self.statistics_client = StatisticsClient(self.statistics_collector, udp_options, tcp_options, self.Fault, self)
self.statistics_client.start()
self.statistics_client = StatisticsClient(self.statistics_collector, udp_options, tcp_options, self.Fault, self.opcua_connection.event_loop) # can share event loop
# tie attributes to client
# schedule the opc-ua initialisation, and wait for it to finish
future = asyncio.run_coroutine_threadsafe(self._connect_statistics(), self.statistics_client.event_loop)
_ = future.result()
async def _connect_statistics(self):
# map an access helper class
for i in self.attr_list():
if i.comms_id == StatisticsClient:
i.set_comm_client(self.statistics_client)
try:
if i.comms_id == StatisticsClient:
await i.async_set_comm_client(self.statistics_client)
except Exception as e:
# use the pass function instead of setting read/write fails
i.set_pass_func()
self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e))
await self.statistics_client.start()
# --------
# Commands
......
......@@ -25,12 +25,9 @@ from tango import AttrWriteType
# Additional import
from clients.attribute_wrapper import attribute_wrapper
from clients.opcua_client import OPCUAConnection
from clients.statistics_client import StatisticsClient
from clients.opcua_client import OPCUAConnection
from devices.hardware_device import hardware_device
from common.lofar_git import get_version
from common.lofar_logging import device_logging_to_python, log_exceptions
from devices.sdp.statistics import Statistics
......
......@@ -10,6 +10,7 @@
from common.lofar_logging import configure_logger
import unittest
import asynctest
import testscenarios
"""Setup logging for integration tests"""
......@@ -28,3 +29,9 @@ class IntegrationTestCase(BaseIntegrationTestCase):
def setUp(self):
super().setUp()
class IntegrationAsyncTestCase(testscenarios.WithScenarios, asynctest.TestCase):
"""Integration test case base class for all asyncio unit tests."""
def setUp(self):
super().setUp()
......@@ -7,26 +7,26 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from opcua import Client
from asyncua import Client
from integration_test import base
class TestSDPTRSim(base.IntegrationTestCase):
class TestSDPTRSim(base.IntegrationAsyncTestCase):
def setUp(self):
super(TestSDPTRSim, self).setUp()
def test_opcua_connection(self):
async def test_opcua_connection(self):
"""Check if we can connect to sdptr-sim"""
client = Client("opc.tcp://sdptr-sim:4840")
root_node = None
try:
client.connect()
await client.connect()
root_node = client.get_root_node()
finally:
client.disconnect()
await client.disconnect()
self.assertNotEqual(None, root_node)
......@@ -7,27 +7,27 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from opcua import Client
from asyncua import Client
from integration_test import base
class TestUNB2Sim(base.IntegrationTestCase):
class TestUNB2Sim(base.IntegrationAsyncTestCase):
def setUp(self):
super(TestUNB2Sim, self).setUp()
def test_opcua_connection(self):
async def test_opcua_connection(self):
"""Check if we can connect to unb2-sim"""
client = Client("opc.tcp://unb2-sim:4844")
root_node = None
client.connect()
await client.connect()
try:
root_node = client.get_root_node()
finally:
client.disconnect()
await client.disconnect()
self.assertNotEqual(None, root_node)
......@@ -57,3 +57,14 @@ class TestDeviceSDP(base.IntegrationTestCase):
d.on()
self.assertEqual(DevState.ON, d.state())
def test_device_sdp_read_attribute(self):
"""Test if we can read an attribute obtained over OPC-UA"""
d = DeviceProxy("LTS/SDP/1")
d.initialise()
d.on()
self.assertListEqual([True]*16, list(d.TR_fpga_communication_error_R))
......@@ -2,6 +2,7 @@
# order of appearance. Changing the order has an impact on the overall
# integration process, which may cause wedges in the gate later.
asynctest>=0.13.0 # Apache-2.0
bandit>=1.6.0 # Apache-2.0
coverage>=5.2.0 # Apache-2.0
doc8>=0.8.0 # Apache-2.0
......
......@@ -11,6 +11,7 @@ from common.lofar_logging import configure_logger
import unittest
import testscenarios
import asynctest
"""Setup logging for unit tests"""
configure_logger(debug=True)
......@@ -28,3 +29,10 @@ class TestCase(BaseTestCase):
def setUp(self):
super().setUp()
class AsyncTestCase(BaseTestCase):
"""Test case base class for all asyncio unit tests."""
def setUp(self):
super().setUp()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment