diff --git a/devices/clients/sst_client.py b/devices/clients/sst_client.py new file mode 100644 index 0000000000000000000000000000000000000000..ef05253af4d93c2fa0eb470dfd9db6026ecc82b5 --- /dev/null +++ b/devices/clients/sst_client.py @@ -0,0 +1,168 @@ +from threading import Thread +import socket +from util.comms_client import CommClient + +from collections import deque + +import numpy +import logging +import socket +from datetime import datetime +from multiprocessing import Value, Array +import time + + +__all__ = ["sst_client"] + + +# <class 'numpy.bool_'> + +class sst_client(CommClient): + """ + Connects to OPC-UA in the foreground or background, and sends HELLO + messages to keep a check on the connection. On connection failure, reconnects once. + """ + + def start(self): + super().start() + + def __init__(self, host, port, fault_func, streams, try_interval=2): + """ + Create the sst client and connect() to it and get the object node + """ + self.host = host + self.port = port + self.timeout = 0.1 + self.buffersize = 9000 + + super().__init__(fault_func, streams, try_interval) + + # Explicitly connect + if not self.connect(): + # hardware or infra is down -- needs fixing first + fault_func() + return + + def connect(self): + """ + Function used to connect to the client. + """ + self.deque = deque(maxlen=1024) + self.udp = UDP_Receiver(self.host, self.port, self.deque, self.buffersize, self.timeout) + self.sst = SST(self.deque) + return super().connect() + + def disconnect(self): + del self.udp + del self.sst + del self.deque + return super().disconnect() + + def _setup_annotation(self, annotation): + """ + This class's Implementation of the get_mapping function. returns the read and write functions + """ + parameter = annotation.get('parameter', None) + + if parameter is None: + raise Exception("No SST parameter was given in the annotation: %s", annotation) + + return parameter + + def setup_value_conversion(self, attribute): + """ + gives the client access to the attribute_wrapper object in order to access all data it could potentially need. + the OPC ua read/write functions require the dimensionality and the type to be known + """ + return + + def setup_attribute(self, annotation, attribute): + """ + MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions + """ + + # process the annotation + SST_param = self._setup_annotation(annotation) + + # get all the necessary data to set up the read/write functions from the attribute_wrapper + self.setup_value_conversion(attribute) + + + def read_function(): + return self.sst.parameters[SST_param] + + def write_function(value): + """ + Not used here + """ + pass + + return read_function, write_function + +class UDP_Receiver(Thread): + """ + This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code + """ + + def __init__(self, host, port, deque, buffersize=9000, timeout=0.1): + + self.deque = deque + self.host = host + self.port = port + self.buffersize = buffersize + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.settimeout(timeout) + self.stream_on = True + + super().__init__() + + self.start() + + def run(self): + # all variables are manually defined and are updated each time + while self.stream_on: + try: + + packet = Array('B', self.buffersize) + self.sock.recvmsg_into(packet[0:self.buffersize]) + self.deque.append(packet) + + except socket.timeout: + pass + + def __del__(self): + self.stream_on = False + self.join() + + +class SST(Thread): + def __init__(self, deque): + + self.deque = deque + self.last_packet = None + + self.parameters = { + "packet_count": 0, + "timestamp": 0 + } + + super().__init__() + self.start() + + def run(self): + while True: + packet = self.deque.pop() + + if packet is None: + break + + self.decode(packet) + + def __del__(self): + self.deque.appendleft(None) + self.join() + + def decode(self, packet): + self.parameters["packet_count"] += 1 + self.parameters["timestamp"] = time.time() diff --git a/devices/statistics_device.py b/devices/statistics_device.py new file mode 100644 index 0000000000000000000000000000000000000000..1acdce228c51abbcfe998e0707b92911fb3d4af1 --- /dev/null +++ b/devices/statistics_device.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the SDP project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" SDP Device Server for LOFAR2.0 + +""" + +# PyTango imports +from tango.server import run +from tango.server import device_property +from tango import AttrWriteType +# Additional import + +from clients.sst_client import sst_client + +from util.attribute_wrapper import attribute_wrapper +from util.hardware_device import hardware_device + +from util.lofar_logging import device_logging_to_python, log_exceptions + +import numpy + +__all__ = ["SST", "main"] + +@device_logging_to_python({"device": "SST"}) +class SST(hardware_device): + + # ----------------- + # Device Properties + # ----------------- + + SST_Port = device_property( + dtype='DevULong', + mandatory=True + ) + + # ---------- + # Attributes + # ---------- + # SDP will switch from fpga_mask_RW to tr_fpga_mask_RW, offer both for now as its a critical flag + + packet_count_R = attribute_wrapper(comms_annotation={"parameter": "packet_count"}, datatype=numpy.int64) + timestamp_R = attribute_wrapper(comms_annotation={"parameter": "timestamp"}, datatype=numpy.int64) + + # -------- + # overloaded functions + # -------- + @log_exceptions() + def configure_for_off(self): + """ user code here. is called when the state is set to OFF """ + + # Stop keep-alive + try: + self.set_comm_client.stop() + except Exception as e: + self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) + + @log_exceptions() + def configure_for_initialise(self): + """ user code here. is called when the sate is set to INIT """ + """Initialises the attributes and properties of the SDP.""" + + self.sst_client = sst_client("0.0.0.0", self.SST_Port, self.Fault, self.streams) + + # map an access helper class + for i in self.attr_list(): + try: + i.set_comm_client(self.sst_client) + except Exception as e: + # use the pass function instead of setting read/write fails + i.set_pass_func() + self.warn_stream("error while setting the SDP attribute {} read/write function. {}".format(i, e)) + pass + + self.sst_client.start() + + # -------- + # Commands + # -------- + +# ---------- +# Run server +# ---------- +def main(args=None, **kwargs): + """Main function of the SDP module.""" + return run((SST,), args=args, **kwargs) + + +if __name__ == '__main__': + main()