diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 350b264d8891a58f8b268c36b7d0315962ff382e..0371a7f7f806a9a6e9ac5c22d8c5562c6880a4eb 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -43,11 +43,9 @@ unit_test: before_script: - sudo apt-get update - sudo apt-get install -y git - - pip3 install -r devices/test-requirements.txt - - pip3 install -r docker-compose/itango/lofar-requirements.txt script: - cd devices - - tox -e py37 + - tox --recreate -e py37 integration_test: stage: integration-tests allow_failure: true diff --git a/bootstrap/etc/requirements.txt b/bootstrap/etc/requirements.txt index d7d6026bc20c52b6255dc0563d0780dc63c7f3aa..5502737a6308c9939be7a2fa4981707f965918ac 100644 --- a/bootstrap/etc/requirements.txt +++ b/bootstrap/etc/requirements.txt @@ -5,5 +5,5 @@ numpy opcua-client pyqtgraph PyQt5 -opcua >= 0.98.13 +asyncua dataclasses diff --git a/devices/clients/attribute_wrapper.py b/devices/clients/attribute_wrapper.py index 4cb389824750cb9d01fc836e8d65caf3656d59a4..e55a662142cb89f62775fb7ac2189c063593df37 100644 --- a/devices/clients/attribute_wrapper.py +++ b/devices/clients/attribute_wrapper.py @@ -154,8 +154,15 @@ class attribute_wrapper(attribute): try: self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self) except Exception as e: + raise Exception("Exception while setting %s attribute with annotation: '%s'", client.__class__.__name__, self.comms_annotation) from e - logger.error("Exception while setting {} attribute with annotation: '{}' {}".format(client.__class__.__name__, self.comms_annotation, e)) + async def async_set_comm_client(self, client): + """ + Asynchronous version of set_comm_client. + """ + try: + self.read_function, self.write_function = await client.setup_attribute(self.comms_annotation, self) + except Exception as e: raise Exception("Exception while setting %s attribute with annotation: '%s'", client.__class__.__name__, self.comms_annotation) from e def set_pass_func(self): diff --git a/devices/clients/comms_client.py b/devices/clients/comms_client.py index 011e1e62180e85f6bc17d72a6ee31eb5871ecb50..ab5279562305c351d346105fc4e914abffa72f57 100644 --- a/devices/clients/comms_client.py +++ b/devices/clients/comms_client.py @@ -1,9 +1,68 @@ from threading import Thread import time +import asyncio +from abc import ABC, abstractmethod -class CommClient(Thread): +import logging +logger = logging.getLogger() + +class AbstractCommClient(ABC): + @abstractmethod + def start(self): + """ Start communication with the client. """ + + @abstractmethod + def stop(self): + """ Stop communication with the client. """ + + def ping(self): + """ Check whether the connection is still alive. + + Clients that override this method must raise an Exception if the + connection died. """ + pass + + @abstractmethod + def setup_attribute(self, annotation, attribute): + """ + This function is responsible for providing the attribute_wrapper with a read/write function + How this is done is implementation specific. + The setup-attribute has access to the comms_annotation provided to the attribute wrapper to pass along to the comms client + as well as a reference to the attribute itself. + + It should do this by first calling: _setup_annotation and setup_value_conversion to get all data necceacry to configure the read/write functions. + It should then return the read and write functions to the attribute. + + MANDATORY: + annotation_outputs = _setup_annotation(annotation) + attribute_outputs = _setup_annotation(attribute) + (note: outputs are up to the user) + + REQUIRED: provide read and write functions to return, there are no restrictions on how these should be provided, + except that the read function takes a single input value and the write function returns a single value + + MANDATORY: + return read_function, write_function + + Examples: + - File system: get_mapping returns functions that read/write a fixed + number of bytes at a fixed location in a file. (SEEK) + - OPC-UA: traverse the OPC-UA tree until the node is found. + Then return the read/write functions for that node which automatically + convert values between Python and OPC-UA. + """ + + def _setup_annotation(self, annotation): + """ + This function is responsible for handling the annotation data provided by the attribute to configure the read/write function the client must provide. + This function should be called by setup_attribute + """ + pass + +class CommClient(AbstractCommClient, Thread): """ - The ProtocolHandler class is the generic interface class between the tango attribute_wrapper and the outside world + Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping() + primitives. """ def __init__(self, fault_func, streams, try_interval=2): @@ -69,7 +128,11 @@ class CommClient(Thread): time.sleep(self.try_interval) def ping(self): - return + """ Check whether the connection is still alive. + + Clients that override this method must raise an Exception if the + connection died. """ + pass def stop(self): """ @@ -85,47 +148,145 @@ class CommClient(Thread): self.disconnect() - def setup_attribute(self, annotation, attribute): - """ - This function is responsible for providing the attribute_wrapper with a read/write function - How this is done is implementation specific. - The setup-attribute has access to the comms_annotation provided to the attribute wrapper to pass along to the comms client - as well as a reference to the attribute itself. - It should do this by first calling: _setup_annotation and setup_value_conversion to get all data necceacry to configure the read/write functions. - It should then return the read and write functions to the attribute. +class AsyncCommClient(object): + """ + Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping() + primitives. - MANDATORY: - annotation_outputs = _setup_annotation(annotation) - attribute_outputs = _setup_annotation(attribute) - (note: outputs are up to the user) + asyncio version of the CommClient. Also does not reconnect if the connection is lost. + """ - REQUIRED: provide read and write functions to return, there are no restrictions on how these should be provided, - except that the read function takes a single input value and the write function returns a single value + def __init__(self, fault_func=lambda: None, event_loop=None): + """ + Create an Asynchronous communication client. - MANDATORY: - return read_function, write_function + fault_func: Function to call to put the device to FAULT if an error is detected. + event_loop: Aysncio event loop to use. If None, a new event loop is created and + run in a separate thread. Only share event loops if any of the functions + executed doesn't stall, as asyncio used a cooperative multitasking model. - Examples: - - File system: get_mapping returns functions that read/write a fixed - number of bytes at a fixed location in a file. (SEEK) - - OPC-UA: traverse the OPC-UA tree until the node is found. - Then return the read/write functions for that node which automatically - convert values between Python and OPC-UA. + If the executed functions can stall (for a bit), use a dedicated loop to avoid + interfering with other users of the event loop. + + All coroutines need to be executed in this loop, which wil also be stored + as the `event_loop` member of this object. """ - raise NotImplementedError("the setup_attribute must be implemented and provide return a valid read/write function for the attribute") + self.fault_func = fault_func + self.running = False - def _setup_annotation(self, annotation): + if event_loop is None: + # Run a dedicated event loop for communications + # + # All co-routines need to be called through this event loop, + # for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop). + + def run_loop(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + loop.run_forever() + + self.event_loop = asyncio.new_event_loop() + self.event_loop_thread = Thread(target=run_loop, args=(self.event_loop,), name=f"AsyncCommClient {self.name()} event loop", daemon=True) + self.event_loop_thread.start() + else: + self.event_loop = event_loop + self.event_loop_thread = None + + def __del__(self): + if self.event_loop_thread is not None: + # signal our event loop thread to stop + self.event_loop.call_soon_threadsafe(self.event_loop.stop) + + # reap our event loop thread once it is done processing tasks + self.event_loop_thread.join() + + def name(self): + """ The name of this CommClient, for use in logs. """ + return self.__class__.__name__ + + @abstractmethod + async def connect(self): """ - This function is responsible for handling the annotation data provided by the attribute to configure the read/write function the client must provide. - This function should be called by setup_attribute + Function used to connect to the client, and any + post init. """ - raise NotImplementedError("the _setup_annotation must be implemented, content and outputs are up to the user") - def setup_value_conversion(self, attribute): + @abstractmethod + async def disconnect(self): """ - this function is responsible for setting up the value conversion between the client and the attribute. - This function should be called by setup_attribute + Function used to disconnect from the client. """ - raise NotImplementedError("the setup_value_conversion must be implemented, content and outputs are up to the user") + async def watch_connection(self): + """ Notice when the connection goes down. """ + + try: + logger.info(f"[AsyncCommClient {self.name()}] Start watching") + + while self.running: + # ping will throw in case of connection issues + try: + await self.ping() + except Exception as e: + logger.exception(f"[AsyncCommClient {self.name()}] Ping failed: connection considered lost") + + # connection error, go to fault + self.fault_func() + + # disconnect will cancel us + await self.disconnect() + + # always have a backup plan + return + + # don't spin, sleep for a while + await asyncio.sleep(2) + except asyncio.CancelledError as e: + pass + except Exception as e: + # log immediately, or the exception will only be printed once this task is awaited + logger.exception(f"[AsyncCommClient {self.name()}] Exception raised while watching") + + raise + finally: + logger.info(f"[AsyncCommClient {self.name()}] Stop watching") + + async def ping(self): + return + + async def start(self): + if self.running: + # already running + return + + await self.connect() + self.running = True + + # watch connection + self.watch_connection_task = asyncio.create_task(self.watch_connection()) + + async def stop(self): + if not self.running: + # already stopped + return + + self.running = False + + # cancel & reap watcher + self.watch_connection_task.cancel() + try: + await self.watch_connection_task + except asyncio.CancelledError as e: + pass + except Exception as e: + logger.exception(f"[AsyncCommClient {self.name()}] Watcher thread raised exception") + + # the task stopped eithr way, so no need to bother our caller with this + + await self.disconnect() + + def sync_stop(self): + """ Synchronous version of stop(). """ + + future = asyncio.run_coroutine_threadsafe(self.stop(), self.event_loop) + return future.result() diff --git a/devices/clients/docker_client.py b/devices/clients/docker_client.py index c5b0e8b81f69e7f83ae381468b6bcd738f9ec296..66a782f9c44833f1ee52d1822991936e5bcf1f47 100644 --- a/devices/clients/docker_client.py +++ b/devices/clients/docker_client.py @@ -1,63 +1,40 @@ import logging import docker -from .comms_client import CommClient +from .comms_client import AsyncCommClient logger = logging.getLogger() -class DockerClient(CommClient): +class DockerClient(AsyncCommClient): """ Controls & queries running docker containers. """ - def start(self): - super().start() - - def __init__(self, base_url, fault_func, streams): - super().__init__(fault_func, streams) + def __init__(self, base_url, fault_func, event_loop=None): + super().__init__(fault_func, event_loop) self.base_url = base_url - def connect(self): - """ - Function used to connect to the client. - """ - if not self.connected: - self.client = docker.DockerClient(self.base_url) - - return super().connect() + async def connect(self): + self.client = docker.DockerClient(self.base_url) - def ping(self): - return True + async def ping(self): + # Raises if the server is unresponsive + self.client.ping() - def disconnect(self): + async def disconnect(self): self.client = None - return super().disconnect() - - def setup_value_conversion(self, attribute): - """ - gives the client access to the attribute_wrapper object in order to access all data it could potentially need. - the OPC ua read/write functions require the dimensionality and the type to be known - """ - return - - def setup_attribute(self, annotation, attribute): - """ - MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions - """ - + async def setup_attribute(self, annotation, attribute): container_name = annotation["container"] - # get all the necessary data to set up the read/write functions from the attribute_wrapper - self.setup_value_conversion(attribute) - def read_function(): try: container = self.client.containers.get(container_name) except docker.errors.NotFound: return False + # expected values: running, restarting, paused, exited, created return container.status == 'running' def write_function(value): diff --git a/devices/clients/opcua_client.py b/devices/clients/opcua_client.py index 11e4cac79dca4faf7acc04c97f4b4790490b4a3f..2bde56237f8f61dcdc3abc4bdb4d03f7741746fb 100644 --- a/devices/clients/opcua_client.py +++ b/devices/clients/opcua_client.py @@ -1,126 +1,94 @@ from threading import Thread import socket import numpy -import opcua -from opcua import Client +import asyncua +import asyncio +from asyncua import Client -from clients.comms_client import CommClient +from clients.comms_client import AsyncCommClient -__all__ = ["OPCUAConnection"] +import logging +logger = logging.getLogger() + +__all__ = ["OPCUAConnection", "event_loop"] numpy_to_OPCua_dict = { - numpy.bool_: opcua.ua.VariantType.Boolean, - numpy.int8: opcua.ua.VariantType.SByte, - numpy.uint8: opcua.ua.VariantType.Byte, - numpy.int16: opcua.ua.VariantType.Int16, - numpy.uint16: opcua.ua.VariantType.UInt16, - numpy.int32: opcua.ua.VariantType.Int32, - numpy.uint32: opcua.ua.VariantType.UInt32, - numpy.int64: opcua.ua.VariantType.Int64, - numpy.uint64: opcua.ua.VariantType.UInt64, - numpy.float32: opcua.ua.VariantType.Float, - numpy.double: opcua.ua.VariantType.Double, - numpy.float64: opcua.ua.VariantType.Double, - numpy.str: opcua.ua.VariantType.String + numpy.bool_: asyncua.ua.VariantType.Boolean, + numpy.int8: asyncua.ua.VariantType.SByte, + numpy.uint8: asyncua.ua.VariantType.Byte, + numpy.int16: asyncua.ua.VariantType.Int16, + numpy.uint16: asyncua.ua.VariantType.UInt16, + numpy.int32: asyncua.ua.VariantType.Int32, + numpy.uint32: asyncua.ua.VariantType.UInt32, + numpy.int64: asyncua.ua.VariantType.Int64, + numpy.uint64: asyncua.ua.VariantType.UInt64, + numpy.float32: asyncua.ua.VariantType.Float, + numpy.double: asyncua.ua.VariantType.Double, + numpy.float64: asyncua.ua.VariantType.Double, + numpy.str: asyncua.ua.VariantType.String } -# <class 'numpy.bool_'> - -class OPCUAConnection(CommClient): +class OPCUAConnection(AsyncCommClient): """ Connects to OPC-UA in the foreground or background, and sends HELLO messages to keep a check on the connection. On connection failure, reconnects once. """ - def start(self): - super().start() - - def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2): + def __init__(self, address, namespace, timeout, fault_func, event_loop=None): """ Create the OPC ua client and connect() to it and get the object node """ - super().__init__(fault_func, streams, try_interval) - self.client = Client(address, timeout) + self.client = Client(address, int(timeout)) + self.namespace = namespace - # Explicitly connect - if not self.connect(): - # hardware or infra is down -- needs fixing first - fault_func() - return - - - # determine namespace used - if type(namespace) is str: - self.name_space_index = self.client.get_namespace_index(namespace) - elif type(namespace) is int: - self.name_space_index = namespace - else: - raise TypeError(f"namespace must be of type str or int, but is of type {type(namespace).__name__}") - - self.obj = self.client.get_objects_node() - self.check_nodes() + super().__init__(fault_func, event_loop) def _servername(self): return self.client.server_url.geturl() - def connect(self): + async def connect(self): """ Try to connect to the client """ + logger.debug(f"Connecting to server {self._servername()}") + try: - self.streams.debug_stream("Connecting to server %s", self._servername()) - self.client.connect() + await self.client.connect() self.connected = True - self.streams.debug_stream("Connected to %s. Initialising.", self._servername()) - return True - except socket.error as e: - self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e) - raise Exception("Could not connect to server %s", self._servername()) from e + except (socket.error, IOError, OSError) as e: + raise IOError(f"Could not connect to OPC-UA server {self._servername()}") from e - def check_nodes(self): - """ - function purely for debugging/development only. Simply lists all top level nodes and the nodes below that - """ + logger.debug(f"Connected to OPC-UA server {self._servername()}") - for i in self.obj.get_children(): - print(i.get_browse_name()) - for j in i.get_children(): - try: - print(j.get_browse_name(), j.get_data_type_as_variant_type(), j.get_value()) - except: - print(j.get_browse_name()) - finally: - pass + # determine namespace used + if type(self.namespace) is str: + self.name_space_index = await self.client.get_namespace_index(self.namespace) + elif type(self.namespace) is int: + self.name_space_index = self.namespace + else: + raise TypeError(f"namespace must be of type str or int, but is of type {type(self.namespace).__name__}") + self.obj = self.client.get_objects_node() - def disconnect(self): + async def disconnect(self): """ disconnect from the client """ - self.connected = False # always force a reconnect, regardless of a successful disconnect - try: - self.client.disconnect() - except Exception as e: - self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e) + await self.client.disconnect() - def ping(self): + async def ping(self): """ ping the client to make sure the connection with the client is still functional. """ try: - #self.client.send_hello() # <-- this crashes when communicating with open62541 v1.2.2+ - pass + await self.client.send_hello() except Exception as e: - raise Exception("Lost connection to server %s: %s", self._servername(), e) - - def _setup_annotation(self, annotation): - """ - This class's Implementation of the get_mapping function. returns the read and write functions - """ + raise IOError("Lost connection to server %s: %s", self._servername(), e) + async def _setup_annotation(self, annotation): if isinstance(annotation, dict): # check if required path inarg is present if annotation.get('path') is None: @@ -136,36 +104,22 @@ class OPCUAConnection(CommClient): path = [name if ':' in name else f'{self.name_space_index}:{name}' for name in path] try: - node = self.obj.get_child(path) + node = await self.obj.get_child(path) except Exception as e: - self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e) + logger.exception("Could not get node: %s on server %s", path, self._servername()) raise Exception("Could not get node: %s on server %s", path, self._servername()) from e return node - def setup_value_conversion(self, attribute): - """ - gives the client access to the attribute_wrapper object in order to access all data it could potentially need. - the OPC ua read/write functions require the dimensionality and the type to be known - """ + async def setup_attribute(self, annotation, attribute): + # process the annotation + node = await self._setup_annotation(annotation) + # get all the necessary data to set up the read/write functions from the attribute_wrapper dim_x = attribute.dim_x dim_y = attribute.dim_y ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type - return dim_x, dim_y, ua_type - - def setup_attribute(self, annotation, attribute): - """ - MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions - """ - - # process the annotation - node = self._setup_annotation(annotation) - - # get all the necessary data to set up the read/write functions from the attribute_wrapper - dim_x, dim_y, ua_type = self.setup_value_conversion(attribute) - # configure and return the read/write functions prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type) @@ -175,8 +129,21 @@ class OPCUAConnection(CommClient): self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y)) except: pass + + # Tango will call these from a separate polling thread. + def read_function(): + return asyncio.run_coroutine_threadsafe(prot_attr.read_function(), self.event_loop).result() + + def write_function(value): + asyncio.run_coroutine_threadsafe(prot_attr.write_function(value), self.event_loop).result() + # return the read/write functions - return prot_attr.read_function, prot_attr.write_function + return read_function, write_function + + + async def call_method(self, method_path, *args): + node = await self.obj.get_child(method_path[:-1]) + return await node.call_method(method_path[-1], *args) def call_method(self, method_path, *args): @@ -194,27 +161,31 @@ class ProtocolAttribute: self.dim_x = dim_x self.ua_type = ua_type - def read_function(self): + async def read_function(self): """ Read_R function """ - value = self.node.get_value() - if self.dim_y + self.dim_x == 1: - # scalar - return value - elif self.dim_y != 0: - # 2D array - value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y)) - else: - # 1D array - value = numpy.array(value) + value = await self.node.get_value() - return value + try: + if self.dim_y + self.dim_x == 1: + # scalar + return value + elif self.dim_y != 0: + # 2D array + value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y)) + else: + # 1D array + value = numpy.array(value) + return value + except Exception as e: + # Log "value" that gave us this issue + raise ValueError(f"Failed to parse atribute value retrieved from OPC-UA: {value}") from e - def write_function(self, value): + async def write_function(self, value): """ write_RW function """ @@ -227,8 +198,8 @@ class ProtocolAttribute: value = value.tolist() if type(value) == numpy.ndarray else value try: - self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type)) - except (TypeError, opcua.ua.uaerrors.BadTypeMismatch) as e: + await self.node.set_data_value(asyncua.ua.uatypes.Variant(Value=value, VariantType=self.ua_type)) + except (TypeError, asyncua.ua.uaerrors.BadTypeMismatch) as e: # A type conversion went wrong or there is a type mismatch. # # This is either the conversion us -> opcua in our client, or client -> server. @@ -252,10 +223,10 @@ class ProtocolAttribute: dim_x=self.dim_x, dim_y=self.dim_y) - actual_server_type = "{dtype} {dimensions}".format( - dtype=self.node.get_data_type_as_variant_type(), - dimensions=(self.node.get_array_dimensions() or "???")) + actual_server_type = "{dtype} x {dimensions}".format( + dtype=await self.node.read_data_type_as_variant_type(), + dimensions=(await self.node.read_array_dimensions()) or "(dimensions unknown)") - attribute_name = self.node.get_display_name().to_string() + attribute_name = (await self.node.read_display_name()).to_string() raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e diff --git a/devices/clients/statistics_client.py b/devices/clients/statistics_client.py index eb37e9dc24b7cc80e557d9c5b2b060d73e652564..3fd470fbf0319e45242abbc3a79362584628f844 100644 --- a/devices/clients/statistics_client.py +++ b/devices/clients/statistics_client.py @@ -2,7 +2,7 @@ from queue import Queue import logging import numpy -from .comms_client import CommClient +from .comms_client import AsyncCommClient from .tcp_replicator import TCPReplicator from .udp_receiver import UDPReceiver @@ -11,16 +11,13 @@ from devices.sdp.statistics_collector import StatisticsConsumer logger = logging.getLogger() -class StatisticsClient(CommClient): +class StatisticsClient(AsyncCommClient): """ Collects statistics packets over UDP, forwards them to a StatisticsCollector, and provides a CommClient interface to expose points to a Device Server. """ - def start(self): - super().start() - - def __init__(self, collector, udp_options, tcp_options, fault_func, streams, try_interval=2, queuesize=1024): + def __init__(self, collector, udp_options, tcp_options, fault_func, event_loop=None, queuesize=1024): """ Create the statistics client and connect() to it and get the object node. @@ -34,13 +31,7 @@ class StatisticsClient(CommClient): self.queuesize = queuesize self.collector = collector - super().__init__(fault_func, streams, try_interval) - - # Explicitly connect - if not self.connect(): - # hardware or infra is down -- needs fixing first - fault_func() - return + super().__init__(fault_func, event_loop) @staticmethod def _queue_fill_percentage(queue: Queue): @@ -50,22 +41,19 @@ class StatisticsClient(CommClient): # some platforms don't have qsize(), nothing we can do here return 0 - def connect(self): + async def connect(self): """ Function used to connect to the client. """ - if not self.connected: - self.collector_queue = Queue(maxsize=self.queuesize) + self.collector_queue = Queue(maxsize=self.queuesize) - self.tcp = TCPReplicator(self.tcp_options, self.queuesize) - self.statistics = StatisticsConsumer(self.collector_queue, self.collector) + self.tcp = TCPReplicator(self.tcp_options, self.queuesize) + self.statistics = StatisticsConsumer(self.collector_queue, self.collector) - self.udp = UDPReceiver([self.collector_queue, self.tcp], - self.udp_options) + self.udp = UDPReceiver([self.collector_queue, self.tcp], + self.udp_options) - return super().connect() - - def ping(self): + async def ping(self): if not self.statistics.is_alive(): raise Exception("Statistics processing thread died unexpectedly") @@ -75,7 +63,7 @@ class StatisticsClient(CommClient): if not self.tcp.is_alive(): raise Exception("TCPReplicator thread died unexpectedly") - def disconnect(self): + async def disconnect(self): # explicit disconnect, instead of waiting for the GC to kick in after "del" below try: self.statistics.disconnect() @@ -99,25 +87,13 @@ class StatisticsClient(CommClient): del self.statistics del self.collector_queue - return super().disconnect() - - def setup_value_conversion(self, attribute): - """ - gives the client access to the attribute_wrapper object in order to access all data it could potentially need. - the OPC ua read/write functions require the dimensionality and the type to be known - """ - return - - def setup_attribute(self, annotation, attribute): + async def setup_attribute(self, annotation, attribute): """ MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions """ parameter = annotation["parameter"] - # get all the necessary data to set up the read/write functions from the attribute_wrapper - self.setup_value_conversion(attribute) - # redirect to right object. this works as long as the parameter names are unique among them. if annotation["type"] == "statistics": def read_function(): diff --git a/devices/common/lofar_logging.py b/devices/common/lofar_logging.py index 4114d5dd342c4562313a1dd609db3522f0b5fe63..07f7f64e95819d2278ca0feb29ee2a28900b3012 100644 --- a/devices/common/lofar_logging.py +++ b/devices/common/lofar_logging.py @@ -30,7 +30,13 @@ class TangoLoggingHandler(logging.Handler): stream = self.level_to_device_stream[record.levelno] # send the log message to Tango - stream(record.tango_device, record.msg, *record.args) + try: + record_msg = record.msg % record.args + stream(record.tango_device, record.msg, *record.args) + except TypeError: + # Tango's logger barfs on mal-formed log lines, f.e. if msg % args is not possible + record_msg = f"{record.msg} {record.args}".replace("%","%%") + stream(record.tango_device, record_msg) self.flush() @@ -112,7 +118,7 @@ def configure_logger(logger: logging.Logger=None, log_extra=None, debug=False): logger.setLevel(logging.DEBUG) # remove spam from the OPC-UA client connection - logging.getLogger("opcua").setLevel(logging.WARN) + logging.getLogger("asyncua").setLevel(logging.WARN) # don't spam errors for git, as we use it in our log handler, which would result in an infinite loop logging.getLogger("git").setLevel(logging.ERROR) diff --git a/devices/devices/docker_device.py b/devices/devices/docker_device.py index 2acf7aeecf04755b71337d42f6a64946ef7cd54a..93fdb26ba76604ecdefccc56281b951a13ddccbc 100644 --- a/devices/devices/docker_device.py +++ b/devices/devices/docker_device.py @@ -23,6 +23,7 @@ from tango.server import run, command from tango.server import device_property, attribute from tango import AttrWriteType import numpy +import asyncio # Additional import from device_decorators import * @@ -96,7 +97,7 @@ class Docker(hardware_device): """ user code here. is called when the state is set to OFF """ # Stop keep-alive try: - self.docker_client.stop() + self.docker_client.sync_stop() except Exception as e: self.warn_stream("Exception while stopping docker client in configure_for_off function: {}. Exception ignored".format(e)) @@ -105,13 +106,18 @@ class Docker(hardware_device): """ user code here. is called when the state is set to INIT """ # set up the Docker client - self.docker_client = DockerClient(self.Docker_Base_URL, self.Fault, self) + self.docker_client = DockerClient(self.Docker_Base_URL, self.Fault) + # schedule the docker initialisation, and wait for it to finish + future = asyncio.run_coroutine_threadsafe(self._connect_docker(), self.docker_client.event_loop) + _ = future.result() + + async def _connect_docker(self): # tie attributes to client for i in self.attr_list(): - i.set_comm_client(self.docker_client) + await i.async_set_comm_client(self.docker_client) - self.docker_client.start() + await self.docker_client.start() # -------- # Commands diff --git a/devices/devices/hardware_device.py b/devices/devices/hardware_device.py index 84d8e4c2b9c6156c994715416bebf38f979903b6..7c7e6663cff7a68f8b0340d59f076bb946ea9ec5 100644 --- a/devices/devices/hardware_device.py +++ b/devices/devices/hardware_device.py @@ -26,15 +26,11 @@ from devices.device_decorators import only_in_states, fault_on_error import time import math -import logging - __all__ = ["hardware_device"] import logging logger = logging.getLogger() - -#@log_exceptions() class hardware_device(Device, metaclass=AbstractDeviceMetas): """ @@ -60,6 +56,10 @@ class hardware_device(Device, metaclass=AbstractDeviceMetas): The user triggers their transitions by the commands reflecting the target state (Initialise(), On(), Fault()). """ + # ---------- + # Attributes + # ---------- + version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) # list of property names too be set first by set_defaults diff --git a/devices/devices/opcua_device.py b/devices/devices/opcua_device.py index 698df95705b0dce00cb869ed880a29f97d472cb1..078e21b249bcec2ca7de5661ea40bf963ac97394 100644 --- a/devices/devices/opcua_device.py +++ b/devices/devices/opcua_device.py @@ -22,6 +22,7 @@ from tango import DebugIt from tango.server import device_property, attribute from tango import AttrWriteType import numpy +import asyncio # Additional import from devices.device_decorators import * @@ -86,14 +87,22 @@ class opcua_device(hardware_device): """ user code here. is called when the state is set to INIT """ # set up the OPC ua client - self.opcua_connection = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_namespace, self.OPC_Time_Out, self.Fault, self) + self.opcua_connection = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_namespace, self.OPC_Time_Out, self.Fault) self.opcua_missing_attributes = [] + # schedule the opc-ua initialisation, and wait for it to finish + future = asyncio.run_coroutine_threadsafe(self._connect_opcua(), self.opcua_connection.event_loop) + _ = future.result() + + async def _connect_opcua(self): + # connect + await self.opcua_connection.start() + # map an access helper class for i in self.attr_list(): try: if not i.comms_id or i.comms_id == OPCUAConnection: - i.set_comm_client(self.opcua_connection) + await i.async_set_comm_client(self.opcua_connection) except Exception as e: # use the pass function instead of setting read/write fails i.set_pass_func() @@ -101,14 +110,11 @@ class opcua_device(hardware_device): self.warn_stream("error while setting the attribute {} read/write function. {}".format(i, e)) - self.opcua_connection.start() - @log_exceptions() def configure_for_off(self): """ user code here. is called when the state is set to OFF """ try: # disconnect - self.opcua_connection.stop() + self.opcua_connection.sync_stop() except Exception as e: self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) - diff --git a/devices/devices/sdp/sst.py b/devices/devices/sdp/sst.py index fe1b353b17737d56f5566da9cc7913e16ff828a6..277714ab0b7ada6882a5ec1086690b3c29fb2382 100644 --- a/devices/devices/sdp/sst.py +++ b/devices/devices/sdp/sst.py @@ -25,8 +25,8 @@ from tango import AttrWriteType # Additional import from clients.attribute_wrapper import attribute_wrapper -from clients.opcua_client import OPCUAConnection from clients.statistics_client import StatisticsClient +from clients.opcua_client import OPCUAConnection from devices.sdp.statistics import Statistics from devices.sdp.statistics_collector import SSTCollector diff --git a/devices/devices/sdp/statistics.py b/devices/devices/sdp/statistics.py index 63f1cb0a7b1d2763fc51fa79abfa6317684bfd38..af1cf0201fd4dc244b8495730660b7c84398a518 100644 --- a/devices/devices/sdp/statistics.py +++ b/devices/devices/sdp/statistics.py @@ -24,9 +24,9 @@ from abc import ABCMeta, abstractmethod from tango.server import device_property, attribute from tango import AttrWriteType # Additional import +import asyncio from clients.statistics_client import StatisticsClient -from clients.opcua_client import OPCUAConnection from clients.attribute_wrapper import attribute_wrapper from devices.opcua_device import opcua_device @@ -100,9 +100,8 @@ class Statistics(opcua_device, metaclass=ABCMeta): def configure_for_off(self): """ user code here. is called when the state is set to OFF """ - # Stop keep-alive try: - self.statistics_client.stop() + self.statistics_client.sync_stop() except Exception as e: logger.exception("Exception while stopping statistics_client in configure_for_off. Exception ignored") @@ -128,13 +127,24 @@ class Statistics(opcua_device, metaclass=ABCMeta): } self.statistics_collector = self.STATISTICS_COLLECTOR_CLASS() - self.statistics_client = StatisticsClient(self.statistics_collector, udp_options, tcp_options, self.Fault, self) - self.statistics_client.start() + self.statistics_client = StatisticsClient(self.statistics_collector, udp_options, tcp_options, self.Fault, self.opcua_connection.event_loop) # can share event loop - # tie attributes to client + # schedule the opc-ua initialisation, and wait for it to finish + future = asyncio.run_coroutine_threadsafe(self._connect_statistics(), self.statistics_client.event_loop) + _ = future.result() + + async def _connect_statistics(self): + # map an access helper class for i in self.attr_list(): - if i.comms_id == StatisticsClient: - i.set_comm_client(self.statistics_client) + try: + if i.comms_id == StatisticsClient: + await i.async_set_comm_client(self.statistics_client) + except Exception as e: + # use the pass function instead of setting read/write fails + i.set_pass_func() + self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e)) + + await self.statistics_client.start() # -------- # Commands diff --git a/devices/devices/sdp/xst.py b/devices/devices/sdp/xst.py index 7ecc937b9baa198b9aa3d8015204ff910d23f83b..c9883303b80425f0c142181994d43e477ec5431c 100644 --- a/devices/devices/sdp/xst.py +++ b/devices/devices/sdp/xst.py @@ -25,12 +25,9 @@ from tango import AttrWriteType # Additional import from clients.attribute_wrapper import attribute_wrapper -from clients.opcua_client import OPCUAConnection from clients.statistics_client import StatisticsClient +from clients.opcua_client import OPCUAConnection -from devices.hardware_device import hardware_device - -from common.lofar_git import get_version from common.lofar_logging import device_logging_to_python, log_exceptions from devices.sdp.statistics import Statistics diff --git a/devices/integration_test/base.py b/devices/integration_test/base.py index 085cbc540dba035969685c3a0fbfbef8c6c7e394..241f0ecd409fd16484d81e31f1e1f83dc1b9d81b 100644 --- a/devices/integration_test/base.py +++ b/devices/integration_test/base.py @@ -10,6 +10,7 @@ from common.lofar_logging import configure_logger import unittest +import asynctest import testscenarios """Setup logging for integration tests""" @@ -28,3 +29,9 @@ class IntegrationTestCase(BaseIntegrationTestCase): def setUp(self): super().setUp() + +class IntegrationAsyncTestCase(testscenarios.WithScenarios, asynctest.TestCase): + """Integration test case base class for all asyncio unit tests.""" + + def setUp(self): + super().setUp() diff --git a/devices/integration_test/client/test_sdptr_sim.py b/devices/integration_test/client/test_sdptr_sim.py index 3ba48e7d761c7ef366c8690e2d114c773de7311d..ab9288b727e515c19b07c99d1fe8a233d7032055 100644 --- a/devices/integration_test/client/test_sdptr_sim.py +++ b/devices/integration_test/client/test_sdptr_sim.py @@ -7,26 +7,26 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. -from opcua import Client +from asyncua import Client from integration_test import base -class TestSDPTRSim(base.IntegrationTestCase): +class TestSDPTRSim(base.IntegrationAsyncTestCase): def setUp(self): super(TestSDPTRSim, self).setUp() - def test_opcua_connection(self): + async def test_opcua_connection(self): """Check if we can connect to sdptr-sim""" client = Client("opc.tcp://sdptr-sim:4840") root_node = None try: - client.connect() + await client.connect() root_node = client.get_root_node() finally: - client.disconnect() + await client.disconnect() self.assertNotEqual(None, root_node) diff --git a/devices/integration_test/client/test_unb2_sim.py b/devices/integration_test/client/test_unb2_sim.py index 227e031e3651fdc1c0523e103b072762271b647a..1eb9972400f2121a6365b2fcb875ecbc2190cdff 100644 --- a/devices/integration_test/client/test_unb2_sim.py +++ b/devices/integration_test/client/test_unb2_sim.py @@ -7,27 +7,27 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. -from opcua import Client +from asyncua import Client from integration_test import base -class TestUNB2Sim(base.IntegrationTestCase): +class TestUNB2Sim(base.IntegrationAsyncTestCase): def setUp(self): super(TestUNB2Sim, self).setUp() - def test_opcua_connection(self): + async def test_opcua_connection(self): """Check if we can connect to unb2-sim""" client = Client("opc.tcp://unb2-sim:4844") root_node = None - client.connect() + await client.connect() try: root_node = client.get_root_node() finally: - client.disconnect() + await client.disconnect() self.assertNotEqual(None, root_node) diff --git a/devices/integration_test/devices/test_device_sdp.py b/devices/integration_test/devices/test_device_sdp.py index cfd656748054cb21e0e3bb2110ce60072d9fb28a..5f064128f858e0bd2c44768a4f13057e5dc20266 100644 --- a/devices/integration_test/devices/test_device_sdp.py +++ b/devices/integration_test/devices/test_device_sdp.py @@ -57,3 +57,14 @@ class TestDeviceSDP(base.IntegrationTestCase): d.on() self.assertEqual(DevState.ON, d.state()) + + def test_device_sdp_read_attribute(self): + """Test if we can read an attribute obtained over OPC-UA""" + + d = DeviceProxy("LTS/SDP/1") + + d.initialise() + + d.on() + + self.assertListEqual([True]*16, list(d.TR_fpga_communication_error_R)) diff --git a/devices/test-requirements.txt b/devices/test-requirements.txt index 20ed449cd8f17f9110ebe1b70774916abe8c00cb..1cd8ccb799fd1dc8b3b25db9051cb12d42d63bb3 100644 --- a/devices/test-requirements.txt +++ b/devices/test-requirements.txt @@ -2,6 +2,7 @@ # order of appearance. Changing the order has an impact on the overall # integration process, which may cause wedges in the gate later. +asynctest>=0.13.0 # Apache-2.0 bandit>=1.6.0 # Apache-2.0 coverage>=5.2.0 # Apache-2.0 doc8>=0.8.0 # Apache-2.0 diff --git a/devices/test/base.py b/devices/test/base.py index aecaaebc3b57909c49e0425d755f52f5028e0ded..66e64ea9a8669713f672db2088344d96a17f6e7c 100644 --- a/devices/test/base.py +++ b/devices/test/base.py @@ -11,6 +11,7 @@ from common.lofar_logging import configure_logger import unittest import testscenarios +import asynctest """Setup logging for unit tests""" configure_logger(debug=True) @@ -28,3 +29,10 @@ class TestCase(BaseTestCase): def setUp(self): super().setUp() + + +class AsyncTestCase(BaseTestCase): + """Test case base class for all asyncio unit tests.""" + + def setUp(self): + super().setUp() diff --git a/devices/test/clients/test_attr_wrapper.py b/devices/test/clients/test_attr_wrapper.py index 453e19c19d67b56eb339462cc1da7e0e8414451b..8711d989a67730667c10aed91de7c9929c500fcb 100644 --- a/devices/test/clients/test_attr_wrapper.py +++ b/devices/test/clients/test_attr_wrapper.py @@ -18,6 +18,8 @@ from devices.hardware_device import * from tango.test_context import DeviceTestContext from test import base +import asyncio + scalar_dims = (1,) spectrum_dims = (4,) image_dims = (3,2) @@ -31,7 +33,7 @@ def dev_init(device): device.set_state(DevState.INIT) device.test_client = test_client(device.Fault, device) for i in device.attr_list(): - i.set_comm_client(device.test_client) + asyncio.run(i.async_set_comm_client(device.test_client)) device.test_client.start() @@ -361,6 +363,9 @@ class TestAttributeTypes(base.TestCase): def read_RW_test(self, dev, dtype, test_type): '''Test device''' + expected = None + val = None + try: with DeviceTestContext(dev, process=True) as proxy: diff --git a/devices/test/clients/test_client.py b/devices/test/clients/test_client.py index 2c5a2df9c42431f28e6e8a8c3180b8902c4a4597..039974a1e34ae1a0c9779fd29c2c87f545bc38b7 100644 --- a/devices/test/clients/test_client.py +++ b/devices/test/clients/test_client.py @@ -91,7 +91,7 @@ class test_client(CommClient): self.streams.debug_stream("created and bound example_client read/write functions to attribute_wrapper object") return read_function, write_function - def setup_attribute(self, annotation=None, attribute=None): + async def setup_attribute(self, annotation=None, attribute=None): """ MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions diff --git a/devices/test/clients/test_opcua_client.py b/devices/test/clients/test_opcua_client.py index df9296c417857683955aa73ee3cbc0b7985ade76..6315abb20547cb35233b9b47f8f8c32414b1159e 100644 --- a/devices/test/clients/test_opcua_client.py +++ b/devices/test/clients/test_opcua_client.py @@ -2,13 +2,15 @@ import numpy from clients.opcua_client import OPCUAConnection from clients import opcua_client -import opcua +import asyncua import io +import asyncio from unittest import mock import unittest from test import base +import asynctest class attr_props: @@ -37,36 +39,39 @@ image_shape = (2, 3) dimension_tests = [scalar_shape, spectrum_shape, image_shape] -class TestOPCua(base.TestCase): - @mock.patch.object(OPCUAConnection, "check_nodes") - @mock.patch.object(OPCUAConnection, "connect") - @mock.patch.object(opcua_client, "Client") - def test_opcua_connection(self, m_opc_client, m_connect, m_check): +class TestOPCua(base.AsyncTestCase): + @asynctest.patch.object(OPCUAConnection, "ping") + @asynctest.patch.object(opcua_client, "Client") + async def test_opcua_connection(self, m_opc_client, m_ping): """ This tests verifies whether the correct connection steps happen. It checks whether we can init an OPCUAConnection object Whether we can set the namespace, and the OPCua client. """ - m_get_namespace = mock.Mock() - m_get_namespace.get_namespace_index.return_value = 42 - m_opc_client.return_value = m_get_namespace + m_opc_client_members = asynctest.asynctest.CoroutineMock() + m_opc_client_members.get_namespace_index = asynctest.asynctest.CoroutineMock(return_value=42) + m_opc_client_members.connect = asynctest.asynctest.CoroutineMock() + m_opc_client_members.disconnect = asynctest.asynctest.CoroutineMock() + m_opc_client_members.send_hello = asynctest.asynctest.CoroutineMock() + m_opc_client.return_value = m_opc_client_members - test_client = OPCUAConnection("opc.tcp://localhost:4874/freeopcua/server/", "http://lofar.eu", 5, mock.Mock(), mock.Mock()) + test_client = OPCUAConnection("opc.tcp://localhost:4874/freeopcua/server/", "http://lofar.eu", 5, mock.Mock(), self.loop) + try: + await test_client.start() - """Verify that construction of OPCUAConnection calls self.connect""" - m_connect.assert_called_once() # the connect function in the opcua client - m_check.assert_called_once() # debug function that prints out all nodes - m_opc_client.assert_called_once() # makes sure the actual freeOPCua client object is created only once + m_opc_client.assert_called_once() # makes sure the actual freeOPCua client object is created only once - m_get_namespace.get_namespace_index.assert_called_once_with("http://lofar.eu") - self.assertEqual(42, test_client.name_space_index) + # this also implies test_client.connect() is called + m_opc_client_members.get_namespace_index.assert_called_once_with("http://lofar.eu") + self.assertEqual(42, test_client.name_space_index) + finally: + await test_client.stop() - @mock.patch.object(OPCUAConnection, "check_nodes") - @mock.patch.object(OPCUAConnection, "connect") - @mock.patch.object(opcua_client, "Client") + @asynctest.patch.object(OPCUAConnection, "ping") + @asynctest.patch.object(opcua_client, "Client") @mock.patch.object(opcua_client, 'ProtocolAttribute') - def test_opcua_attr_setup(self, m_protocol_attr, m_opc_client, m_connect, m_check): + async def test_opcua_attr_setup(self, m_protocol_attr, m_opc_client, m_ping): """ This tests covers the correct creation of read/write functions. In normal circumstances called by he attribute wrapper. @@ -75,6 +80,16 @@ class TestOPCua(base.TestCase): Test succeeds if there are no errors. """ + m_opc_client_members = asynctest.asynctest.CoroutineMock() + m_opc_client_members.get_namespace_index = asynctest.asynctest.CoroutineMock(return_value=2) + m_opc_client_members.connect = asynctest.asynctest.CoroutineMock() + m_opc_client_members.disconnect = asynctest.asynctest.CoroutineMock() + m_opc_client_members.send_hello = asynctest.asynctest.CoroutineMock() + m_objects_node = asynctest.Mock() + m_objects_node.get_child = asynctest.asynctest.CoroutineMock() + m_opc_client_members.get_objects_node = asynctest.Mock(return_value=m_objects_node) + m_opc_client.return_value = m_opc_client_members + for i in attr_test_types: class mock_attr: def __init__(self, dtype, x, y): @@ -96,13 +111,15 @@ class TestOPCua(base.TestCase): # pretend like there is a running OPCua server with a node that has this name m_annotation = ["2:PCC", f"2:testNode_{str(i.numpy_type)}_{str(dim_x)}_{str(dim_y)}"] - test = OPCUAConnection("opc.tcp://localhost:4874/freeopcua/server/", "http://lofar.eu", 5, mock.Mock(), mock.Mock()) - test.setup_attribute(m_annotation, m_attribute) + test_client = OPCUAConnection("opc.tcp://localhost:4874/freeopcua/server/", "http://lofar.eu", 5, mock.Mock(), self.loop) + try: + await test_client.start() + await test_client.setup_attribute(m_annotation, m_attribute) + finally: + await test_client.stop() # success if there are no errors. - - def test_protocol_attr(self): """ This tests finding an OPCua node and returning a valid object with read/write functions. @@ -136,7 +153,7 @@ class TestOPCua(base.TestCase): self.assertTrue(hasattr(test, "write_function"), f"No write function found") self.assertTrue(hasattr(test, "read_function"), f"No read function found") - def test_read(self): + async def test_read(self): """ This tests the read functions. """ @@ -146,17 +163,17 @@ class TestOPCua(base.TestCase): def get_test_value(): return numpy.zeros(j, i.numpy_type) - def get_flat_value(): + async def get_flat_value(): return get_test_value().flatten() - m_node = mock.Mock() + m_node = asynctest.asynctest.CoroutineMock() if len(j) == 1: test = opcua_client.ProtocolAttribute(m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type]) else: test = opcua_client.ProtocolAttribute(m_node, j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type]) m_node.get_value = get_flat_value - val = test.read_function() + val = await test.read_function() comp = val == get_test_value() self.assertTrue(comp.all(), "Read value unequal to expected value: \n\t{} \n\t{}".format(val, get_test_value())) @@ -175,15 +192,15 @@ class TestOPCua(base.TestCase): default_value = 42.25 # apply our mapping - v = opcua.ua.uatypes.Variant(value=numpy_type(default_value), varianttype=opcua_type) + v = asyncua.ua.uatypes.Variant(Value=numpy_type(default_value), VariantType=opcua_type) try: # try to convert it to binary to force opcua to parse the value as the type - binary = opcua.ua.ua_binary.variant_to_binary(v) + binary = asyncua.ua.ua_binary.variant_to_binary(v) # reinterpret the resulting binary to obtain what opcua made of our value binary_stream = io.BytesIO(binary) - reparsed_v = opcua.ua.ua_binary.variant_from_binary(binary_stream) + reparsed_v = asyncua.ua.ua_binary.variant_from_binary(binary_stream) except Exception as e: raise Exception(f"Conversion {numpy_type} -> {opcua_type} failed.") from e @@ -192,11 +209,11 @@ class TestOPCua(base.TestCase): # does the OPC-UA type have the same datasize (and thus, precision?) if numpy_type not in [str, numpy.str]: - self.assertEqual(numpy_type().itemsize, getattr(opcua.ua.ua_binary.Primitives, opcua_type.name).size, msg=f"Conversion {numpy_type} -> {opcua_type} failed: precision mismatch") + self.assertEqual(numpy_type().itemsize, getattr(asyncua.ua.ua_binary.Primitives, opcua_type.name).size, msg=f"Conversion {numpy_type} -> {opcua_type} failed: precision mismatch") - def test_write(self): + async def test_write(self): """ Test the writing of values by instantiating a ProtocolAttribute attribute, and calling the write function. but the opcua function that writes to the server has been changed to the compare_values function. @@ -215,24 +232,22 @@ class TestOPCua(base.TestCase): # get opcua Varianttype array of the test value def get_mock_value(value): - return opcua.ua.uatypes.Variant(value=value, varianttype=opcua_client.numpy_to_OPCua_dict[i.numpy_type]) + return asyncua.ua.uatypes.Variant(Value=value, VariantType=opcua_client.numpy_to_OPCua_dict[i.numpy_type]) - m_node = mock.Mock() + m_node = asynctest.asynctest.CoroutineMock() # create the protocolattribute if len(j) == 1: - test = opcua_client.ProtocolAttribute(m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type]) + test = opcua_client.ProtocolAttribute(m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type], self.loop) else: - test = opcua_client.ProtocolAttribute(m_node, j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type]) - - test.node.get_data_value = mock.Mock() + test = opcua_client.ProtocolAttribute(m_node, j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type], self.loop) # comparison function that replaces `set_data_value` inside the attributes write function - def compare_values(val): - # test values + async def compare_values(val): + # test valuest val = val.tolist() if type(val) == numpy.ndarray else val if j != dimension_tests[0]: - comp = val._value == get_mock_value(get_test_value().flatten())._value + comp = val.Value == get_mock_value(get_test_value().flatten()).Value self.assertTrue(comp.all(), "Array attempting to write unequal to expected array: \n\t got: {} \n\texpected: {}".format(val,get_mock_value(get_test_value()))) else: @@ -243,4 +258,4 @@ class TestOPCua(base.TestCase): m_node.set_data_value = compare_values # call the write function with the test values - test.write_function(get_test_value()) + await test.write_function(get_test_value()) diff --git a/docker-compose/integration-test.yml b/docker-compose/integration-test.yml index 239dce0235dcd2b2a6a2a731f373e84c49ea671b..e0d1c6baf58948cdbee5a71ff2f859ab429dcd4b 100644 --- a/docker-compose/integration-test.yml +++ b/docker-compose/integration-test.yml @@ -26,4 +26,4 @@ services: - --timeout=30 - --strict - -- - - tox -e integration + - tox --recreate -e integration diff --git a/docker-compose/itango/lofar-requirements.txt b/docker-compose/itango/lofar-requirements.txt index 29942e272353180f3622f4ad6d36fb7c31307eb1..1349c50ca993b51bb866a7880e3e7fb185049de8 100644 --- a/docker-compose/itango/lofar-requirements.txt +++ b/docker-compose/itango/lofar-requirements.txt @@ -1,9 +1,8 @@ parso == 0.7.1 jedi == 0.17.2 -opcua >= 0.98.13 +asyncua astropy python-logstash-async gitpython PyMySQL[rsa] sqlalchemy -timeout-decorator diff --git a/docker-compose/lofar-device-base/lofar-requirements.txt b/docker-compose/lofar-device-base/lofar-requirements.txt index 2214412a4365f4b804d6b20b0576c390482b1481..31b22c71689b481357cef56bf4940c1575a3b01d 100644 --- a/docker-compose/lofar-device-base/lofar-requirements.txt +++ b/docker-compose/lofar-device-base/lofar-requirements.txt @@ -1,4 +1,4 @@ -opcua >= 0.98.9 +asyncua astropy python-logstash-async gitpython