from threading import Thread import socket from util.comms_client import CommClient from collections import deque import numpy import logging import socket from datetime import datetime from multiprocessing import Value, Array import time __all__ = ["sst_client"] # <class 'numpy.bool_'> class sst_client(CommClient): """ Connects to OPC-UA in the foreground or background, and sends HELLO messages to keep a check on the connection. On connection failure, reconnects once. """ def start(self): super().start() def __init__(self, host, port, fault_func, streams, try_interval=2): """ Create the sst client and connect() to it and get the object node """ self.host = host self.port = port self.timeout = 0.1 self.buffersize = 9000 super().__init__(fault_func, streams, try_interval) # Explicitly connect if not self.connect(): # hardware or infra is down -- needs fixing first fault_func() return def connect(self): """ Function used to connect to the client. """ self.deque = deque(maxlen=1024) self.udp = UDP_Receiver(self.host, self.port, self.deque, self.buffersize, self.timeout) self.sst = SST(self.deque) return super().connect() def disconnect(self): del self.udp del self.sst del self.deque return super().disconnect() def _setup_annotation(self, annotation): """ This class's Implementation of the get_mapping function. returns the read and write functions """ parameter = annotation.get('parameter', None) if parameter is None: raise Exception("No SST parameter was given in the annotation: %s", annotation) return parameter def setup_value_conversion(self, attribute): """ gives the client access to the attribute_wrapper object in order to access all data it could potentially need. the OPC ua read/write functions require the dimensionality and the type to be known """ return def setup_attribute(self, annotation, attribute): """ MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions """ # process the annotation SST_param = self._setup_annotation(annotation) # get all the necessary data to set up the read/write functions from the attribute_wrapper self.setup_value_conversion(attribute) def read_function(): return self.sst.parameters[SST_param] def write_function(value): """ Not used here """ pass return read_function, write_function class UDP_Receiver(Thread): """ This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code """ def __init__(self, host, port, deque, buffersize=9000, timeout=0.1): self.deque = deque self.host = host self.port = port self.buffersize = buffersize self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.settimeout(timeout) self.stream_on = True super().__init__() self.start() def run(self): # all variables are manually defined and are updated each time while self.stream_on: try: packet = Array('B', self.buffersize) self.sock.recvmsg_into(packet[0:self.buffersize]) self.deque.append(packet) except socket.timeout: pass def __del__(self): self.stream_on = False self.join() class SST(Thread): def __init__(self, deque): self.deque = deque self.last_packet = None self.parameters = { "packet_count": 0, "timestamp": 0 } super().__init__() self.start() def run(self): while True: packet = self.deque.pop() if packet is None: break self.decode(packet) def __del__(self): self.deque.appendleft(None) self.join() def decode(self, packet): self.parameters["packet_count"] += 1 self.parameters["timestamp"] = time.time()