Skip to content
Snippets Groups Projects
sst_client.py 4.34 KiB
Newer Older
from threading import Thread
import socket
from util.comms_client import CommClient

from collections import deque

import numpy
import logging
import socket
from datetime import datetime
from multiprocessing import Value, Array
import time


__all__ = ["sst_client"]


# <class 'numpy.bool_'>

class sst_client(CommClient):
    """
      Connects to OPC-UA in the foreground or background, and sends HELLO
      messages to keep a check on the connection. On connection failure, reconnects once.
    """

    def start(self):
        super().start()

    def __init__(self, host, port, fault_func, streams, try_interval=2):
        """
        Create the sst client and connect() to it and get the object node
        """
        self.host = host
        self.port = port
        self.timeout = 0.1
        self.buffersize = 9000

        super().__init__(fault_func, streams, try_interval)

        # Explicitly connect
        if not self.connect():
            # hardware or infra is down -- needs fixing first
            fault_func()
            return

    def connect(self):
        """
        Function used to connect to the client.
        """
        self.deque = deque(maxlen=1024)
        self.udp = UDP_Receiver(self.host, self.port, self.deque, self.buffersize, self.timeout)
        self.sst = SST(self.deque)
        return super().connect()

    def disconnect(self):
        del self.udp
        del self.sst
        del self.deque
        return super().disconnect()

    def _setup_annotation(self, annotation):
        """
        This class's Implementation of the get_mapping function. returns the read and write functions
        """
        parameter = annotation.get('parameter', None)

        if parameter is None:
            raise Exception("No SST parameter was given in the annotation: %s", annotation)

        return parameter

    def setup_value_conversion(self, attribute):
        """
        gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
        the OPC ua read/write functions require the dimensionality and the type to be known
        """
        return

    def setup_attribute(self, annotation, attribute):
        """
        MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
        """

        # process the annotation
        SST_param = self._setup_annotation(annotation)

        # get all the necessary data to set up the read/write functions from the attribute_wrapper
        self.setup_value_conversion(attribute)


        def read_function():
            return self.sst.parameters[SST_param]

        def write_function(value):
            """
            Not used here
            """
            pass

        return read_function, write_function

class UDP_Receiver(Thread):
    """
    This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
    """

    def __init__(self, host, port, deque, buffersize=9000, timeout=0.1):

        self.deque = deque
        self.host = host
        self.port = port
        self.buffersize = buffersize

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.settimeout(timeout)
        self.stream_on = True

        super().__init__()

        self.start()

    def run(self):
        # all variables are manually defined and are updated each time
        while self.stream_on:
            try:

                packet = Array('B', self.buffersize)
                self.sock.recvmsg_into(packet[0:self.buffersize])
                self.deque.append(packet)

            except socket.timeout:
                pass

    def __del__(self):
        self.stream_on = False
        self.join()


class SST(Thread):
    def __init__(self, deque):

        self.deque = deque
        self.last_packet = None

        self.parameters = {
            "packet_count": 0,
            "timestamp": 0
        }

        super().__init__()
        self.start()

    def run(self):
        while True:
            packet = self.deque.pop()

            if packet is None:
                break

            self.decode(packet)

    def __del__(self):
        self.deque.appendleft(None)
        self.join()

    def decode(self, packet):
        self.parameters["packet_count"] += 1
        self.parameters["timestamp"] = time.time()