diff --git a/devices/clients/comms_client.py b/devices/clients/comms_client.py index 5e5f459f1fa40c5b18cb4585caa71f2c6d2092c7..40985e65328c6a2f1f520a71f3c6d39ef3937d6d 100644 --- a/devices/clients/comms_client.py +++ b/devices/clients/comms_client.py @@ -150,13 +150,42 @@ class AsyncCommClient(object): asyncio version of the CommClient. Also does not reconnect if the connection is lost. """ - def __init__(self, fault_func): + def __init__(self, fault_func, event_loop=None): """ """ self.fault_func = fault_func self.running = False + if event_loop is None: + # Run a dedicated event loop for communications + # + # All co-routines need to be called through this event loop, + # for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop). + + def run_loop(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + loop.run_forever() + + self.event_loop = asyncio.new_event_loop() + self.event_loop_thread = Thread(target=run_loop, args=(self.event_loop,), name=f"AsyncCommClient {self.name()} event loop") + self.event_loop_thread.start() + else: + self.event_loop = event_loop + self.event_loop_thread = None + + def __del__(self): + if self.event_loop_thread is not None: + # signal our event loop thread to stop + self.event_loop.stop() + + # reap our event loop thread once it is done processing tasks + self.event_loop_thread.join() + + def name(self): + """ The name of this CommClient, for use in logs. """ + return self.__class__.__name__ + @abstractmethod async def connect(self): """ @@ -174,14 +203,14 @@ class AsyncCommClient(object): """ Notice when the connection goes down. """ try: - logger.info(f"Start watching OPC-UA connection to {self._servername()}") + logger.info(f"[AsyncCommClient {self.name()}] Start watching") while self.running: # ping will throw in case of connection issues try: await self.ping() except Exception as e: - logger.exception("Ping failed: connection considered lost") + logger.exception(f"[AsyncCommClient {self.name()}] Ping failed: connection considered lost") # connection error, go to fault self.fault_func() @@ -198,11 +227,11 @@ class AsyncCommClient(object): pass except Exception as e: # log immediately, or the exception will only be printed once this task is awaited - logger.exception(f"Exception raised while watching OPC-UA connection to {self._servername()}") + logger.exception(f"[AsyncCommClient {self.name()}] Exception raised while watching") raise finally: - logger.info(f"Stop watching OPC-UA connection to {self._servername()}") + logger.info(f"[AsyncCommClient {self.name()}] Stop watching") async def ping(self): return @@ -230,7 +259,7 @@ class AsyncCommClient(object): try: await self.watch_connection_task except Exception as e: - logger.exception(f"Watcher thread raised exception") + logger.exception(f"[AsyncCommClient {self.name()}] Watcher thread raised exception") # the task stopped eithr way, so no need to bother our caller with this diff --git a/devices/clients/docker_client.py b/devices/clients/docker_client.py index c5b0e8b81f69e7f83ae381468b6bcd738f9ec296..66a782f9c44833f1ee52d1822991936e5bcf1f47 100644 --- a/devices/clients/docker_client.py +++ b/devices/clients/docker_client.py @@ -1,63 +1,40 @@ import logging import docker -from .comms_client import CommClient +from .comms_client import AsyncCommClient logger = logging.getLogger() -class DockerClient(CommClient): +class DockerClient(AsyncCommClient): """ Controls & queries running docker containers. """ - def start(self): - super().start() - - def __init__(self, base_url, fault_func, streams): - super().__init__(fault_func, streams) + def __init__(self, base_url, fault_func, event_loop=None): + super().__init__(fault_func, event_loop) self.base_url = base_url - def connect(self): - """ - Function used to connect to the client. - """ - if not self.connected: - self.client = docker.DockerClient(self.base_url) - - return super().connect() + async def connect(self): + self.client = docker.DockerClient(self.base_url) - def ping(self): - return True + async def ping(self): + # Raises if the server is unresponsive + self.client.ping() - def disconnect(self): + async def disconnect(self): self.client = None - return super().disconnect() - - def setup_value_conversion(self, attribute): - """ - gives the client access to the attribute_wrapper object in order to access all data it could potentially need. - the OPC ua read/write functions require the dimensionality and the type to be known - """ - return - - def setup_attribute(self, annotation, attribute): - """ - MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions - """ - + async def setup_attribute(self, annotation, attribute): container_name = annotation["container"] - # get all the necessary data to set up the read/write functions from the attribute_wrapper - self.setup_value_conversion(attribute) - def read_function(): try: container = self.client.containers.get(container_name) except docker.errors.NotFound: return False + # expected values: running, restarting, paused, exited, created return container.status == 'running' def write_function(value): diff --git a/devices/clients/opcua_client.py b/devices/clients/opcua_client.py index b3c45bc1aa24d632eee25142afd2f0200fe75ff8..13fd0dacc0bdea0bbd0014c2ea22fd7b0cd1220a 100644 --- a/devices/clients/opcua_client.py +++ b/devices/clients/opcua_client.py @@ -28,19 +28,6 @@ numpy_to_OPCua_dict = { numpy.str: asyncua.ua.VariantType.String } -# Run a dedicated event loop for OPC-UA communications -# -# All co-routines need to be called through this event loop, -# for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop). - -def run_loop(loop: asyncio.AbstractEventLoop) -> None: - asyncio.set_event_loop(loop) - loop.run_forever() - -event_loop = asyncio.new_event_loop() -event_loop_thread = Thread(target=run_loop, args=(event_loop,), name="OPC-UA asyncio event loop", daemon=True) -event_loop_thread.start() - class OPCUAConnection(AsyncCommClient): """ Connects to OPC-UA in the foreground or background, and sends HELLO @@ -55,7 +42,7 @@ class OPCUAConnection(AsyncCommClient): self.client = Client(address, int(timeout)) self.namespace = namespace - super().__init__(fault_func) + super().__init__(fault_func, opcua_event_loop) async def start(self): # connect @@ -151,7 +138,7 @@ class OPCUAConnection(AsyncCommClient): ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type # configure and return the read/write functions - prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type) + prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type, self.event_loop) try: # NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path @@ -173,12 +160,14 @@ class ProtocolAttribute: This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code """ - def __init__(self, node, dim_x, dim_y, ua_type): + def __init__(self, node, dim_x, dim_y, ua_type, event_loop): self.node = node self.dim_y = dim_y self.dim_x = dim_x self.ua_type = ua_type + self.event_loop = event_loop + async def _read_value(self): return await self.node.get_value() @@ -186,7 +175,7 @@ class ProtocolAttribute: """ Read_R function """ - future = asyncio.run_coroutine_threadsafe(self._read_value(), event_loop) + future = asyncio.run_coroutine_threadsafe(self._read_value(), self.event_loop) value = future.result() try: @@ -254,5 +243,5 @@ class ProtocolAttribute: # make sure it is a python array value = value.tolist() if type(value) == numpy.ndarray else value - future = asyncio.run_coroutine_threadsafe(self._write_value(value), event_loop) + future = asyncio.run_coroutine_threadsafe(self._write_value(value), self.event_loop) _ = future.result() diff --git a/devices/devices/docker_device.py b/devices/devices/docker_device.py index 01d36236d8d04678035748a34cae213e6b7b18ae..72f544037e6ce7f704535731a8ce78fa214994d4 100644 --- a/devices/devices/docker_device.py +++ b/devices/devices/docker_device.py @@ -23,6 +23,7 @@ from tango.server import run, command from tango.server import device_property, attribute from tango import AttrWriteType import numpy +import asyncio # Additional import from device_decorators import * @@ -131,18 +132,23 @@ class Docker(hardware_device): """ user code here. is called when the state is set to INIT """ # set up the Docker client - self.docker_client = DockerClient(self.Docker_Base_URL, self.Fault, self) + self.docker_client = DockerClient(self.Docker_Base_URL, self.Fault) + # schedule the docker initialisation, and wait for it to finish + future = asyncio.run_coroutine_threadsafe(self._connect_docker(), self.docker_client.event_loop) + _ = future.result() + + async def _connect_docker(self): # map an access helper class for i in self.attr_list(): try: - i.set_comm_client(self.docker_client) + await i.async_set_comm_client(self.docker_client) except Exception as e: # use the pass function instead of setting read/write fails i.set_pass_func() self.warn_stream("error while setting the attribute {} read/write function. {}".format(i, e)) - self.docker_client.start() + await self.docker_client.start() # -------- # Commands