Skip to content
Snippets Groups Projects
Commit 53194194 authored by Taya Snijder's avatar Taya Snijder
Browse files

added initial sst_client/udp receiver and statistics device

parent a198db3b
No related branches found
No related tags found
1 merge request!50Resolve L2SS-188 "2021 05 27 branched from master udp recv sst client"
from threading import Thread
import socket
from util.comms_client import CommClient
from collections import deque
import numpy
import logging
import socket
from datetime import datetime
from multiprocessing import Value, Array
import time
__all__ = ["sst_client"]
# <class 'numpy.bool_'>
class sst_client(CommClient):
"""
Connects to OPC-UA in the foreground or background, and sends HELLO
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, host, port, fault_func, streams, try_interval=2):
"""
Create the sst client and connect() to it and get the object node
"""
self.host = host
self.port = port
self.timeout = 0.1
self.buffersize = 9000
super().__init__(fault_func, streams, try_interval)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Function used to connect to the client.
"""
self.deque = deque(maxlen=1024)
self.udp = UDP_Receiver(self.host, self.port, self.deque, self.buffersize, self.timeout)
self.sst = SST(self.deque)
return super().connect()
def disconnect(self):
del self.udp
del self.sst
del self.deque
return super().disconnect()
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
parameter = annotation.get('parameter', None)
if parameter is None:
raise Exception("No SST parameter was given in the annotation: %s", annotation)
return parameter
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
return
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
SST_param = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
def read_function():
return self.sst.parameters[SST_param]
def write_function(value):
"""
Not used here
"""
pass
return read_function, write_function
class UDP_Receiver(Thread):
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, host, port, deque, buffersize=9000, timeout=0.1):
self.deque = deque
self.host = host
self.port = port
self.buffersize = buffersize
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(timeout)
self.stream_on = True
super().__init__()
self.start()
def run(self):
# all variables are manually defined and are updated each time
while self.stream_on:
try:
packet = Array('B', self.buffersize)
self.sock.recvmsg_into(packet[0:self.buffersize])
self.deque.append(packet)
except socket.timeout:
pass
def __del__(self):
self.stream_on = False
self.join()
class SST(Thread):
def __init__(self, deque):
self.deque = deque
self.last_packet = None
self.parameters = {
"packet_count": 0,
"timestamp": 0
}
super().__init__()
self.start()
def run(self):
while True:
packet = self.deque.pop()
if packet is None:
break
self.decode(packet)
def __del__(self):
self.deque.appendleft(None)
self.join()
def decode(self, packet):
self.parameters["packet_count"] += 1
self.parameters["timestamp"] = time.time()
# -*- coding: utf-8 -*-
#
# This file is part of the SDP project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" SDP Device Server for LOFAR2.0
"""
# PyTango imports
from tango.server import run
from tango.server import device_property
from tango import AttrWriteType
# Additional import
from clients.sst_client import sst_client
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
from util.lofar_logging import device_logging_to_python, log_exceptions
import numpy
__all__ = ["SST", "main"]
@device_logging_to_python({"device": "SST"})
class SST(hardware_device):
# -----------------
# Device Properties
# -----------------
SST_Port = device_property(
dtype='DevULong',
mandatory=True
)
# ----------
# Attributes
# ----------
# SDP will switch from fpga_mask_RW to tr_fpga_mask_RW, offer both for now as its a critical flag
packet_count_R = attribute_wrapper(comms_annotation={"parameter": "packet_count"}, datatype=numpy.int64)
timestamp_R = attribute_wrapper(comms_annotation={"parameter": "timestamp"}, datatype=numpy.int64)
# --------
# overloaded functions
# --------
@log_exceptions()
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.set_comm_client.stop()
except Exception as e:
self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the SDP."""
self.sst_client = sst_client("0.0.0.0", self.SST_Port, self.Fault, self.streams)
# map an access helper class
for i in self.attr_list():
try:
i.set_comm_client(self.sst_client)
except Exception as e:
# use the pass function instead of setting read/write fails
i.set_pass_func()
self.warn_stream("error while setting the SDP attribute {} read/write function. {}".format(i, e))
pass
self.sst_client.start()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the SDP module."""
return run((SST,), args=args, **kwargs)
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment