Skip to content
Snippets Groups Projects
Commit b5b85e37 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-412: Moved docker client to AsyncCommClient, moved event loop into AsyncCommClient.

parent 09358d62
No related branches found
No related tags found
1 merge request!142L2SS-412: Use asyncio for opcua and other clients
...@@ -150,13 +150,42 @@ class AsyncCommClient(object): ...@@ -150,13 +150,42 @@ class AsyncCommClient(object):
asyncio version of the CommClient. Also does not reconnect if the connection is lost. asyncio version of the CommClient. Also does not reconnect if the connection is lost.
""" """
def __init__(self, fault_func): def __init__(self, fault_func, event_loop=None):
""" """
""" """
self.fault_func = fault_func self.fault_func = fault_func
self.running = False self.running = False
if event_loop is None:
# Run a dedicated event loop for communications
#
# All co-routines need to be called through this event loop,
# for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop).
def run_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
self.event_loop = asyncio.new_event_loop()
self.event_loop_thread = Thread(target=run_loop, args=(self.event_loop,), name=f"AsyncCommClient {self.name()} event loop")
self.event_loop_thread.start()
else:
self.event_loop = event_loop
self.event_loop_thread = None
def __del__(self):
if self.event_loop_thread is not None:
# signal our event loop thread to stop
self.event_loop.stop()
# reap our event loop thread once it is done processing tasks
self.event_loop_thread.join()
def name(self):
""" The name of this CommClient, for use in logs. """
return self.__class__.__name__
@abstractmethod @abstractmethod
async def connect(self): async def connect(self):
""" """
...@@ -174,14 +203,14 @@ class AsyncCommClient(object): ...@@ -174,14 +203,14 @@ class AsyncCommClient(object):
""" Notice when the connection goes down. """ """ Notice when the connection goes down. """
try: try:
logger.info(f"Start watching OPC-UA connection to {self._servername()}") logger.info(f"[AsyncCommClient {self.name()}] Start watching")
while self.running: while self.running:
# ping will throw in case of connection issues # ping will throw in case of connection issues
try: try:
await self.ping() await self.ping()
except Exception as e: except Exception as e:
logger.exception("Ping failed: connection considered lost") logger.exception(f"[AsyncCommClient {self.name()}] Ping failed: connection considered lost")
# connection error, go to fault # connection error, go to fault
self.fault_func() self.fault_func()
...@@ -198,11 +227,11 @@ class AsyncCommClient(object): ...@@ -198,11 +227,11 @@ class AsyncCommClient(object):
pass pass
except Exception as e: except Exception as e:
# log immediately, or the exception will only be printed once this task is awaited # log immediately, or the exception will only be printed once this task is awaited
logger.exception(f"Exception raised while watching OPC-UA connection to {self._servername()}") logger.exception(f"[AsyncCommClient {self.name()}] Exception raised while watching")
raise raise
finally: finally:
logger.info(f"Stop watching OPC-UA connection to {self._servername()}") logger.info(f"[AsyncCommClient {self.name()}] Stop watching")
async def ping(self): async def ping(self):
return return
...@@ -230,7 +259,7 @@ class AsyncCommClient(object): ...@@ -230,7 +259,7 @@ class AsyncCommClient(object):
try: try:
await self.watch_connection_task await self.watch_connection_task
except Exception as e: except Exception as e:
logger.exception(f"Watcher thread raised exception") logger.exception(f"[AsyncCommClient {self.name()}] Watcher thread raised exception")
# the task stopped eithr way, so no need to bother our caller with this # the task stopped eithr way, so no need to bother our caller with this
......
import logging import logging
import docker import docker
from .comms_client import CommClient from .comms_client import AsyncCommClient
logger = logging.getLogger() logger = logging.getLogger()
class DockerClient(CommClient): class DockerClient(AsyncCommClient):
""" """
Controls & queries running docker containers. Controls & queries running docker containers.
""" """
def start(self): def __init__(self, base_url, fault_func, event_loop=None):
super().start() super().__init__(fault_func, event_loop)
def __init__(self, base_url, fault_func, streams):
super().__init__(fault_func, streams)
self.base_url = base_url self.base_url = base_url
def connect(self): async def connect(self):
"""
Function used to connect to the client.
"""
if not self.connected:
self.client = docker.DockerClient(self.base_url) self.client = docker.DockerClient(self.base_url)
return super().connect() async def ping(self):
# Raises if the server is unresponsive
self.client.ping()
def ping(self): async def disconnect(self):
return True
def disconnect(self):
self.client = None self.client = None
return super().disconnect() async def setup_attribute(self, annotation, attribute):
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
return
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
container_name = annotation["container"] container_name = annotation["container"]
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
def read_function(): def read_function():
try: try:
container = self.client.containers.get(container_name) container = self.client.containers.get(container_name)
except docker.errors.NotFound: except docker.errors.NotFound:
return False return False
# expected values: running, restarting, paused, exited, created
return container.status == 'running' return container.status == 'running'
def write_function(value): def write_function(value):
......
...@@ -28,19 +28,6 @@ numpy_to_OPCua_dict = { ...@@ -28,19 +28,6 @@ numpy_to_OPCua_dict = {
numpy.str: asyncua.ua.VariantType.String numpy.str: asyncua.ua.VariantType.String
} }
# Run a dedicated event loop for OPC-UA communications
#
# All co-routines need to be called through this event loop,
# for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop).
def run_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
event_loop = asyncio.new_event_loop()
event_loop_thread = Thread(target=run_loop, args=(event_loop,), name="OPC-UA asyncio event loop", daemon=True)
event_loop_thread.start()
class OPCUAConnection(AsyncCommClient): class OPCUAConnection(AsyncCommClient):
""" """
Connects to OPC-UA in the foreground or background, and sends HELLO Connects to OPC-UA in the foreground or background, and sends HELLO
...@@ -55,7 +42,7 @@ class OPCUAConnection(AsyncCommClient): ...@@ -55,7 +42,7 @@ class OPCUAConnection(AsyncCommClient):
self.client = Client(address, int(timeout)) self.client = Client(address, int(timeout))
self.namespace = namespace self.namespace = namespace
super().__init__(fault_func) super().__init__(fault_func, opcua_event_loop)
async def start(self): async def start(self):
# connect # connect
...@@ -151,7 +138,7 @@ class OPCUAConnection(AsyncCommClient): ...@@ -151,7 +138,7 @@ class OPCUAConnection(AsyncCommClient):
ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type
# configure and return the read/write functions # configure and return the read/write functions
prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type) prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type, self.event_loop)
try: try:
# NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path # NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path
...@@ -173,12 +160,14 @@ class ProtocolAttribute: ...@@ -173,12 +160,14 @@ class ProtocolAttribute:
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
""" """
def __init__(self, node, dim_x, dim_y, ua_type): def __init__(self, node, dim_x, dim_y, ua_type, event_loop):
self.node = node self.node = node
self.dim_y = dim_y self.dim_y = dim_y
self.dim_x = dim_x self.dim_x = dim_x
self.ua_type = ua_type self.ua_type = ua_type
self.event_loop = event_loop
async def _read_value(self): async def _read_value(self):
return await self.node.get_value() return await self.node.get_value()
...@@ -186,7 +175,7 @@ class ProtocolAttribute: ...@@ -186,7 +175,7 @@ class ProtocolAttribute:
""" """
Read_R function Read_R function
""" """
future = asyncio.run_coroutine_threadsafe(self._read_value(), event_loop) future = asyncio.run_coroutine_threadsafe(self._read_value(), self.event_loop)
value = future.result() value = future.result()
try: try:
...@@ -254,5 +243,5 @@ class ProtocolAttribute: ...@@ -254,5 +243,5 @@ class ProtocolAttribute:
# make sure it is a python array # make sure it is a python array
value = value.tolist() if type(value) == numpy.ndarray else value value = value.tolist() if type(value) == numpy.ndarray else value
future = asyncio.run_coroutine_threadsafe(self._write_value(value), event_loop) future = asyncio.run_coroutine_threadsafe(self._write_value(value), self.event_loop)
_ = future.result() _ = future.result()
...@@ -23,6 +23,7 @@ from tango.server import run, command ...@@ -23,6 +23,7 @@ from tango.server import run, command
from tango.server import device_property, attribute from tango.server import device_property, attribute
from tango import AttrWriteType from tango import AttrWriteType
import numpy import numpy
import asyncio
# Additional import # Additional import
from device_decorators import * from device_decorators import *
...@@ -131,18 +132,23 @@ class Docker(hardware_device): ...@@ -131,18 +132,23 @@ class Docker(hardware_device):
""" user code here. is called when the state is set to INIT """ """ user code here. is called when the state is set to INIT """
# set up the Docker client # set up the Docker client
self.docker_client = DockerClient(self.Docker_Base_URL, self.Fault, self) self.docker_client = DockerClient(self.Docker_Base_URL, self.Fault)
# schedule the docker initialisation, and wait for it to finish
future = asyncio.run_coroutine_threadsafe(self._connect_docker(), self.docker_client.event_loop)
_ = future.result()
async def _connect_docker(self):
# map an access helper class # map an access helper class
for i in self.attr_list(): for i in self.attr_list():
try: try:
i.set_comm_client(self.docker_client) await i.async_set_comm_client(self.docker_client)
except Exception as e: except Exception as e:
# use the pass function instead of setting read/write fails # use the pass function instead of setting read/write fails
i.set_pass_func() i.set_pass_func()
self.warn_stream("error while setting the attribute {} read/write function. {}".format(i, e)) self.warn_stream("error while setting the attribute {} read/write function. {}".format(i, e))
self.docker_client.start() await self.docker_client.start()
# -------- # --------
# Commands # Commands
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment