Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
comms_client.py 8.86 KiB
from threading import Thread
import time
import asyncio
from abc import ABC, abstractmethod

import logging
logger = logging.getLogger()

class AbstractCommClient(ABC):
    @abstractmethod
    def start(self):
        """ Start communication with the client. """

    @abstractmethod
    def stop(self):
        """ Stop communication with the client. """

    def ping(self):
        """ Check whether the connection is still alive.
        
            Clients that override this method must raise an Exception if the
            connection died. """
        pass

    @abstractmethod
    def setup_attribute(self, annotation, attribute):
        """
        This function returns a (read_function, write_function) tuple for the provided attribute with the provided annotation.

        The setup-attribute has access to the comms_annotation provided to the attribute wrapper to pass along to the comms client
        as well as a reference to the attribute itself.

        The read_function must return a single value, representing the current value of the attribute.
        
        The write_function must take a single value, write it, and return None.

        Examples:
        - File system:  get_mapping returns functions that read/write a fixed
        number of bytes at a fixed location in a file. (SEEK)
        - OPC-UA:  traverse the OPC-UA tree until the node is found.
        Then return the read/write functions for that node which automatically
        convert values between Python and OPC-UA.
        """

class CommClient(AbstractCommClient, Thread):
    """
    Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping()
    primitives.
    """

    def __init__(self, fault_func, try_interval=2):
        """

        """
        self.fault_func = fault_func
        self.try_interval = try_interval
        self.stopping = False
        self.connected = False

        super().__init__(daemon=True)

    def connect(self):
        """
        Function used to connect to the client.

        Throws an Exception if the connection cannot be established.
        """
        self.connected = True

    def disconnect(self):
        """
        Function used to connect to the client.
        """
        self.connected = False

    def run(self):
        self.stopping = False
        while not self.stopping:
            if not self.connected:
                # we (re)try only once, to catch exotic network issues. if the infra or hardware is down,
                # our device cannot help, and must be reinitialised after the infra or hardware is fixed.
                try:
                    self.connect()
                except Exception as e:
                    logger.exception("Fault condition in communication detected.")
                    self.fault_func()
                    return

            # keep checking if the connection is still alive
            try:
                while not self.stopping:
                    self.ping()
                    time.sleep(self.try_interval)
            except Exception as e:
                logger.exception("Fault condition in communication detected.")

                # technically, we may not have dropped the connection, but encounter a different error. so explicitly disconnect.
                self.disconnect()

                # signal that we're disconnected
                self.fault_func()

                # don't enter a spam-connect loop if faults immediately occur
                time.sleep(self.try_interval)

    def ping(self):
        """ Check whether the connection is still alive.
        
            Clients that override this method must raise an Exception if the
            connection died. """
        pass

    def stop(self):
        """
          Stop connecting & disconnect. Can take a few seconds for the timeouts to hit.
        """

        if not self.ident:
            # have not yet been started, so nothing to do
            return

        self.stopping = True
        self.join()

        self.disconnect()

class AsyncCommClient(object):
    """
    Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping()
    primitives.

    asyncio version of the CommClient. Also does not reconnect if the connection is lost.
    """

    def __init__(self, fault_func=lambda: None, event_loop=None):
        """
          Create an Asynchronous communication client.

          fault_func: Function to call to put the device to FAULT if an error is detected.
          event_loop: Aysncio event loop to use. If None, a new event loop is created and
                      run in a separate thread. Only share event loops if any of the functions
                      executed doesn't stall, as asyncio used a cooperative multitasking model.

                      If the executed functions can stall (for a bit), use a dedicated loop to avoid
                      interfering with other users of the event loop.

                      All coroutines need to be executed in this loop, which wil also be stored
                      as the `event_loop` member of this object.
        """
        self.fault_func = fault_func
        self.running = False

        if event_loop is None:
            # Run a dedicated event loop for communications
            #
            # All co-routines need to be called through this event loop,
            # for example using asyncio.run_coroutine_threadsafe(coroutine, event_loop).

            def run_loop(loop: asyncio.AbstractEventLoop) -> None:
                asyncio.set_event_loop(loop)
                loop.run_forever()

            self.event_loop = asyncio.new_event_loop()
            self.event_loop_thread = Thread(target=run_loop, args=(self.event_loop,), name=f"AsyncCommClient {self.name()} event loop", daemon=True)
            self.event_loop_thread.start()
        else:
            self.event_loop = event_loop
            self.event_loop_thread = None

    def __del__(self):
        if self.event_loop_thread is not None:
            # signal our event loop thread to stop
            self.event_loop.call_soon_threadsafe(self.event_loop.stop)

            # reap our event loop thread once it is done processing tasks
            self.event_loop_thread.join()

    def name(self):
        """ The name of this CommClient, for use in logs. """
        return self.__class__.__name__

    @abstractmethod
    async def connect(self):
        """
        Function used to connect to the client, and any
        post init.
        """

    @abstractmethod
    async def disconnect(self):
        """
        Function used to disconnect from the client.
        """

    async def watch_connection(self):
        """ Notice when the connection goes down. """

        try:
            logger.info(f"[AsyncCommClient {self.name()}] Start watching")

            while self.running:
                # ping will throw in case of connection issues
                try:
                    await self.ping()
                except Exception as e:
                    logger.exception(f"[AsyncCommClient {self.name()}] Ping failed: connection considered lost")

                    # connection error, go to fault
                    self.fault_func()

                    # disconnect will cancel us
                    await self.disconnect()

                    # always have a backup plan
                    return

                # don't spin, sleep for a while
                await asyncio.sleep(2)
        except asyncio.CancelledError as e:
            pass
        except Exception as e:
            # log immediately, or the exception will only be printed once this task is awaited
            logger.exception(f"[AsyncCommClient {self.name()}] Exception raised while watching")

            raise
        finally:
            logger.info(f"[AsyncCommClient {self.name()}] Stop watching")

    async def ping(self):
        return

    async def start(self):
        if self.running:
            # already running
            return

        await self.connect()
        self.running = True

        # watch connection
        self.watch_connection_task = asyncio.create_task(self.watch_connection())

    async def stop(self):
        if not self.running:
            # already stopped
            return

        self.running = False

        # cancel & reap watcher
        self.watch_connection_task.cancel()
        try:
            await self.watch_connection_task
        except asyncio.CancelledError as e:
            pass
        except Exception as e:
            logger.exception(f"[AsyncCommClient {self.name()}] Watcher thread raised exception")

            # the task stopped eithr way, so no need to bother our caller with this

        await self.disconnect()

    def sync_stop(self):
        """ Synchronous version of stop(). """

        future = asyncio.run_coroutine_threadsafe(self.stop(), self.event_loop)
        return future.result()