-
Taya Snijder authoredTaya Snijder authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
comms_client.py 8.86 KiB
from threading import Thread
import time
import asyncio
from abc import ABC, abstractmethod
import logging
logger = logging.getLogger()
class AbstractCommClient(ABC):
@abstractmethod
def start(self):
""" Start communication with the client. """
@abstractmethod
def stop(self):
""" Stop communication with the client. """
def ping(self):
""" Check whether the connection is still alive.
Clients that override this method must raise an Exception if the
connection died. """
pass
@abstractmethod
def setup_attribute(self, annotation, attribute):
"""
This function returns a (read_function, write_function) tuple for the provided attribute with the provided annotation.
The setup-attribute has access to the comms_annotation provided to the attribute wrapper to pass along to the comms client
as well as a reference to the attribute itself.
The read_function must return a single value, representing the current value of the attribute.
The write_function must take a single value, write it, and return None.
Examples:
- File system: get_mapping returns functions that read/write a fixed
number of bytes at a fixed location in a file. (SEEK)
- OPC-UA: traverse the OPC-UA tree until the node is found.
Then return the read/write functions for that node which automatically
convert values between Python and OPC-UA.
"""
class CommClient(AbstractCommClient, Thread):
"""
Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping()
primitives.
"""
def __init__(self, fault_func, try_interval=2):
"""
"""
self.fault_func = fault_func
self.try_interval = try_interval
self.stopping = False
self.connected = False
super().__init__(daemon=True)
def connect(self):
"""
Function used to connect to the client.
Throws an Exception if the connection cannot be established.
"""
self.connected = True
def disconnect(self):
"""
Function used to connect to the client.
"""
self.connected = False
def run(self):
self.stopping = False
while not self.stopping:
if not self.connected:
# we (re)try only once, to catch exotic network issues. if the infra or hardware is down,
# our device cannot help, and must be reinitialised after the infra or hardware is fixed.
try:
self.connect()
except Exception as e:
logger.exception("Fault condition in communication detected.")
self.fault_func()
return
# keep checking if the connection is still alive
try:
while not self.stopping:
self.ping()
time.sleep(self.try_interval)
except Exception as e:
logger.exception("Fault condition in communication detected.")
# technically, we may not have dropped the connection, but encounter a different error. so explicitly disconnect.
self.disconnect()
# signal that we're disconnected
self.fault_func()
# don't enter a spam-connect loop if faults immediately occur
time.sleep(self.try_interval)
def ping(self):
""" Check whether the connection is still alive.
Clients that override this method must raise an Exception if the
connection died. """
pass
def stop(self):
"""
Stop connecting & disconnect. Can take a few seconds for the timeouts to hit.
"""
if not self.ident:
# have not yet been started, so nothing to do
return
self.stopping = True
self.join()
self.disconnect()
class AsyncCommClient(object):
"""
Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping()
primitives.
asyncio version of the CommClient. Also does not reconnect if the connection is lost.
"""
def __init__(self, fault_func=lambda: None, event_loop=None):
"""
Create an Asynchronous communication client.
fault_func: Function to call to put the device to FAULT if an error is detected.
event_loop: Aysncio event loop to use. If None, a new event loop is created and
run in a separate thread. Only share event loops if any of the functions
executed doesn't stall, as asyncio used a cooperative multitasking model.
If the executed functions can stall (for a bit), use a dedicated loop to avoid
interfering with other users of the event loop.
All coroutines need to be executed in this loop, which wil also be stored
as the `event_loop` member of this object.
"""
self.fault_func = fault_func
self.running = False
if event_loop is None:
# Run a dedicated event loop for communications
#
# All co-routines need to be called through this event loop,
# for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop).
def run_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
self.event_loop = asyncio.new_event_loop()
self.event_loop_thread = Thread(target=run_loop, args=(self.event_loop,), name=f"AsyncCommClient {self.name()} event loop", daemon=True)
self.event_loop_thread.start()
else:
self.event_loop = event_loop
self.event_loop_thread = None
def __del__(self):
if self.event_loop_thread is not None:
# signal our event loop thread to stop
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
# reap our event loop thread once it is done processing tasks
self.event_loop_thread.join()
def name(self):
""" The name of this CommClient, for use in logs. """
return self.__class__.__name__
@abstractmethod
async def connect(self):
"""
Function used to connect to the client, and any
post init.
"""
@abstractmethod
async def disconnect(self):
"""
Function used to disconnect from the client.
"""
async def watch_connection(self):
""" Notice when the connection goes down. """
try:
logger.info(f"[AsyncCommClient {self.name()}] Start watching")
while self.running:
# ping will throw in case of connection issues
try:
await self.ping()
except Exception as e:
logger.exception(f"[AsyncCommClient {self.name()}] Ping failed: connection considered lost")
# connection error, go to fault
self.fault_func()
# disconnect will cancel us
await self.disconnect()
# always have a backup plan
return
# don't spin, sleep for a while
await asyncio.sleep(2)
except asyncio.CancelledError as e:
pass
except Exception as e:
# log immediately, or the exception will only be printed once this task is awaited
logger.exception(f"[AsyncCommClient {self.name()}] Exception raised while watching")
raise
finally:
logger.info(f"[AsyncCommClient {self.name()}] Stop watching")
async def ping(self):
return
async def start(self):
if self.running:
# already running
return
await self.connect()
self.running = True
# watch connection
self.watch_connection_task = asyncio.create_task(self.watch_connection())
async def stop(self):
if not self.running:
# already stopped
return
self.running = False
# cancel & reap watcher
self.watch_connection_task.cancel()
try:
await self.watch_connection_task
except asyncio.CancelledError as e:
pass
except Exception as e:
logger.exception(f"[AsyncCommClient {self.name()}] Watcher thread raised exception")
# the task stopped eithr way, so no need to bother our caller with this
await self.disconnect()
def sync_stop(self):
""" Synchronous version of stop(). """
future = asyncio.run_coroutine_threadsafe(self.stop(), self.event_loop)
return future.result()