Skip to content
Snippets Groups Projects
Commit 3c81de67 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-271-statistics-base-class' into 'master'

L2SS-271: Split statistics base classes off SSTs, cleaned those classes up a bit

Closes L2SS-271

See merge request !85
parents 362def1d 3f28d699
Branches
Tags
1 merge request!85L2SS-271: Split statistics base classes off SSTs, cleaned those classes up a bit
......@@ -722,7 +722,7 @@
"SST": {
"LTS/SST/1": {
"properties": {
"SST_Client_Port": [
"Statistics_Client_Port": [
"5001"
],
"OPC_Server_Name": [
......
......@@ -24,7 +24,7 @@
"SST": {
"LTS/SST/1": {
"properties": {
"SST_Client_Port": [
"Statistics_Client_Port": [
"5001"
],
"OPC_Server_Name": [
......
......@@ -94,7 +94,7 @@
"SST": {
"LTS/SST/1": {
"properties": {
"SST_Client_Port": [
"Statistics_Client_Port": [
"5001"
],
"OPC_Server_Name": [
......
from queue import Queue
from threading import Thread
import logging
import numpy
import queue
import socket
import time
from clients.comms_client import CommClient
from devices.sdp.statistics_packet import SSTPacket
logger = logging.getLogger()
class sst_client(CommClient):
"""
Connects to OPC-UA in the foreground or background, and sends HELLO
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, host, port, fault_func, streams, try_interval=2, queuesize=1024):
"""
Create the sst client and connect() to it and get the object node
"""
self.host = host
self.port = port
self.poll_timeout = 0.1
self.disconnect_timeout = 10.0
self.queuesize = queuesize
super().__init__(fault_func, streams, try_interval)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def queue_fill_percentage(self):
try:
return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0
except NotImplementedError:
# some platforms don't have qsize(), nothing we can do here
return 0
def connect(self):
"""
Function used to connect to the client.
"""
if not self.connected:
self.queue = Queue(maxsize=self.queuesize)
self.udp = UDP_Receiver(self.host, self.port, self.queue, self.poll_timeout, self.disconnect_timeout)
self.sst = SST_collector(self.queue, self.disconnect_timeout)
return super().connect()
def ping(self):
if not self.sst.isAlive():
raise Exception("SST thread died unexpectedly")
if not self.udp.isAlive():
raise Exception("UDP thread died unexpectedly")
def disconnect(self):
# explicit disconnect, instead of waiting for the GC to kick in after "del" below
self.sst.disconnect()
self.udp.disconnect()
del self.udp
del self.sst
del self.queue
return super().disconnect()
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
return
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
parameter = annotation["parameter"]
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
# redirect to right object. this works as long as the parameter names are unique among them.
if annotation["type"] == "sst":
def read_function():
return self.sst.parameters[parameter]
elif annotation["type"] == "udp":
def read_function():
return self.udp.parameters[parameter]
elif annotation["type"] == "queue":
if parameter == "fill_percentage":
def read_function():
return numpy.uint64(self.queue_fill_percentage())
else:
raise ValueError("Unknown queue parameter requested: %s" % parameter)
def write_function(value):
"""
Not used here
"""
pass
return read_function, write_function
class UDP_Receiver(Thread):
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, host, port, queue, poll_timeout=0.1, disconnect_timeout=10.0):
self.queue = queue
self.host = host
self.port = port
self.disconnect_timeout = disconnect_timeout
self.parameters = {
# Number of packets we received
"nof_packets_received": numpy.uint64(0),
# Number of packets we had to drop due to a full queue
"nof_packets_dropped": numpy.uint64(0),
# Packets are at most 9000 bytes, the largest payload (well, MTU) of an Ethernet Jumbo frame
"last_packet": numpy.zeros((9000,), dtype=numpy.uint8),
# Timestamp of when the last packet was received
"last_packet_timestamp": numpy.uint64(0),
}
logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Allow binding even if there are still lingering packets in the kernel for a
# previous listener that already died. If not, we get an "Address already in use".
# This is stock socket usage.
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# specify what host and port to listen on
self.sock.bind((self.host, self.port))
# Make sure we can stop receiving packets even if none arrive.
# Without this, the recvmsg() call blocks indefinitely if no packet arrives.
self.sock.settimeout(poll_timeout)
self.stream_on = True
super().__init__()
self.start()
def run(self):
# all variables are manually defined and are updated each time
logger.info("Starting UDP thread for {}:{}".format(self.host, self.port))
while self.stream_on:
try:
packet, _, _, _ = self.sock.recvmsg(9000)
self.parameters["nof_packets_received"] += numpy.uint64(1)
self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time()))
# Forward packet to processing thread
self.queue.put(packet)
except socket.timeout:
# timeout -- expected, allows us to check whether to stop
pass
except queue.Full:
# overflow -- just discard
self.parameters["nof_packets_dropped"] += numpy.uint64(1)
logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port))
def join(self, timeout=0):
self.stream_on = False
logging.info("Sent shutdown to UDP thread for {}:{}".format(self.host, self.port))
super().join(timeout)
if self.isAlive():
# happens if timeout is hit
return
# shutdown the socket so that others can listen on this port
self.sock.shutdown(socket.SHUT_RDWR)
def disconnect(self):
if not self.isAlive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.disconnect_timeout)
if self.isAlive():
logger.error("UDP thread not shutting down for {}:{}".format(self.host, self.port))
def __del__(self):
self.disconnect()
class SST_collector(Thread):
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
def __init__(self, queue, disconnect_timeout=10.0):
self.queue = queue
self.last_packet = None
self.disconnect_timeout = disconnect_timeout
self.parameters = {
"nof_packets": numpy.uint64(0),
# Packet count for packets that could not be parsed as SSTs
"nof_invalid_packets": numpy.uint64(0),
# Full contents of the latest packet we deemed invalid.
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64),
"sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32),
}
super().__init__()
self.start()
def run(self):
logging.info("Starting SST thread")
while True:
self.last_packet = self.queue.get()
# This is the exception/slow path, but python doesn't allow us to optimise that
if self.last_packet is None:
# None is the magic marker to stop processing
break
self.process_packet(self.last_packet)
logging.info("Stopping SST thread")
def join(self, timeout=0):
# insert magic marker
self.queue.put(None)
logging.info("Sent shutdown to SST thread")
super().join(timeout)
def disconnect(self):
if not self.isAlive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.disconnect_timeout)
if self.isAlive:
logger.error("SST thread not shutting down")
def process_packet(self, packet):
self.parameters["nof_packets"] += numpy.uint64(1)
try:
fields = SSTPacket(packet)
# determine which input this packet contains data for
if fields.signal_input_index >= self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
raise ValueError("Packet describes input %d, but we are limited to describing MAX_INPUTS=%d" % (fields.signal_input_index, self.MAX_INPUTS))
input_index = fields.signal_input_index
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1)
return
# process the packet
self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1)
self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval()
except Exception as e:
# This is unexpected, so print a stack trace
logging.exception("Could not parse SST UDP packet")
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
from queue import Queue
from threading import Thread
import logging
import numpy
import queue
from .comms_client import CommClient
from .udp_receiver import UDPReceiver
logger = logging.getLogger()
class StatisticsClient(CommClient):
"""
Collects statistics packets over UDP, forwards them to a StatisticsCollector,
and provides a CommClient interface to expose points to a Device Server.
"""
def start(self):
super().start()
def __init__(self, statistics_collector_class, host, port, fault_func, streams, try_interval=2, queuesize=1024):
"""
Create the statistics client and connect() to it and get the object node.
statistics_collector_class: a subclass of StatisticsCollector that specialises in processing the received packets.
host: hostname to listen on
port: port number to listen on
"""
self.host = host
self.port = port
self.poll_timeout = 0.1
self.queuesize = queuesize
self.statistics_collector_class = statistics_collector_class
super().__init__(fault_func, streams, try_interval)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def queue_fill_percentage(self):
try:
return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0
except NotImplementedError:
# some platforms don't have qsize(), nothing we can do here
return 0
def connect(self):
"""
Function used to connect to the client.
"""
if not self.connected:
self.queue = Queue(maxsize=self.queuesize)
self.udp = UDPReceiver(self.host, self.port, self.queue, self.poll_timeout)
self.statistics = self.statistics_collector_class(self.queue)
return super().connect()
def ping(self):
if not self.statistics.is_alive():
raise Exception("Statistics processing thread died unexpectedly")
if not self.udp.is_alive():
raise Exception("UDP thread died unexpectedly")
def disconnect(self):
# explicit disconnect, instead of waiting for the GC to kick in after "del" below
try:
self.statistics.disconnect()
except Exception:
# nothing we can do, but we should continue cleaning up
logger.log_exception("Could not disconnect statistics processing class")
try:
self.udp.disconnect()
except Exception:
# nothing we can do, but we should continue cleaning up
logger.log_exception("Could not disconnect UDP receiver class")
del self.udp
del self.statistics
del self.queue
return super().disconnect()
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
return
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
parameter = annotation["parameter"]
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
# redirect to right object. this works as long as the parameter names are unique among them.
if annotation["type"] == "statistics":
def read_function():
return self.statistics.parameters[parameter]
elif annotation["type"] == "udp":
def read_function():
return self.udp.parameters[parameter]
elif annotation["type"] == "queue":
if parameter == "fill_percentage":
def read_function():
return numpy.uint64(self.queue_fill_percentage())
else:
raise ValueError("Unknown queue parameter requested: %s" % parameter)
def write_function(value):
"""
Not used here
"""
pass
return read_function, write_function
from queue import Queue
from threading import Thread
import numpy
import logging
import socket
import time
logger = logging.getLogger()
class UDPReceiver(Thread):
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
# How long to wait for a stuck Thread
DISCONNECT_TIMEOUT = 10.0
def __init__(self, host, port, queue, poll_timeout=0.1):
self.queue = queue
self.host = host
self.port = port
self.parameters = {
# Number of packets we received
"nof_packets_received": numpy.uint64(0),
# Number of packets we had to drop due to a full queue
"nof_packets_dropped": numpy.uint64(0),
# Packets are at most 9000 bytes, the largest payload (well, MTU) of an Ethernet Jumbo frame
"last_packet": numpy.zeros((9000,), dtype=numpy.uint8),
# Timestamp of when the last packet was received
"last_packet_timestamp": numpy.uint64(0),
}
logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Allow binding even if there are still lingering packets in the kernel for a
# previous listener that already died. If not, we get an "Address already in use".
# This is stock socket usage.
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# specify what host and port to listen on
self.sock.bind((self.host, self.port))
# Make sure we can stop receiving packets even if none arrive.
# Without this, the recvmsg() call blocks indefinitely if no packet arrives.
self.sock.settimeout(poll_timeout)
self.stream_on = True
super().__init__()
self.start()
def run(self):
# all variables are manually defined and are updated each time
logger.info("Starting UDP thread for {}:{}".format(self.host, self.port))
while self.stream_on:
try:
packet, _, _, _ = self.sock.recvmsg(9000)
self.parameters["nof_packets_received"] += numpy.uint64(1)
self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time()))
# Forward packet to processing thread
self.queue.put(packet)
except socket.timeout:
# timeout -- expected, allows us to check whether to stop
pass
except queue.Full:
# overflow -- just discard
self.parameters["nof_packets_dropped"] += numpy.uint64(1)
logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port))
def join(self, timeout=0):
self.stream_on = False
logging.info("Sent shutdown to UDP thread for {}:{}".format(self.host, self.port))
super().join(timeout)
if self.is_alive():
# happens if timeout is hit
return
# shutdown the socket so that others can listen on this port
self.sock.shutdown(socket.SHUT_RDWR)
def disconnect(self):
if not self.is_alive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.DISCONNECT_TIMEOUT)
if self.is_alive():
# there is nothing we can do except wait (stall) longer, which could be indefinitely.
logger.error(f"UDP thread for {self.host}:{self.port} did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.")
def __del__(self):
self.disconnect()
......@@ -11,8 +11,10 @@
"""
from abc import ABCMeta, abstractmethod
# PyTango imports
from tango.server import Device, command
from tango.server import Device, command, DeviceMeta
from tango import DevState, DebugIt
# Additional import
......@@ -26,8 +28,12 @@ from devices.device_decorators import only_in_states, fault_on_error
import logging
logger = logging.getLogger()
class AbstractDeviceMetas(DeviceMeta, ABCMeta):
''' Collects meta classes to allow hardware_device to be both a Device and an ABC. '''
pass
#@log_exceptions()
class hardware_device(Device):
class hardware_device(Device, metaclass=AbstractDeviceMetas):
"""
**Properties:**
......@@ -155,13 +161,18 @@ class hardware_device(Device):
self.set_state(DevState.FAULT)
# functions that can be overloaded
# functions that can or must be overloaded
def configure_for_fault(self):
pass
@abstractmethod
def configure_for_off(self):
pass
def configure_for_on(self):
pass
@abstractmethod
def configure_for_initialise(self):
pass
......
......@@ -24,52 +24,34 @@ from tango.server import device_property, attribute
from tango import AttrWriteType
# Additional import
from clients.sst_client import sst_client, SST_collector
from clients.opcua_client import OPCUAConnection
from clients.attribute_wrapper import attribute_wrapper
from clients.opcua_client import OPCUAConnection
from clients.StatisticsClient import StatisticsClient
from devices.hardware_device import hardware_device
from common.lofar_git import get_version
from common.lofar_logging import device_logging_to_python, log_exceptions
from devices.sdp.statistics import Statistics
from devices.sdp.statistics_collector import SSTCollector
import numpy
__all__ = ["SST", "main"]
@device_logging_to_python()
class SST(hardware_device):
class SST(Statistics):
STATISTICS_COLLECTOR_CLASS = SSTCollector
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
mandatory=True
)
OPC_Server_Port = device_property(
dtype='DevULong',
mandatory=True
)
OPC_Time_Out = device_property(
dtype='DevDouble',
mandatory=True
)
SST_Client_Port = device_property(
dtype='DevUShort',
mandatory=True
)
# ----------
# Attributes
# ----------
version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version())
# FPGA control points for SSTs
FPGA_sst_offload_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_enable_R"], datatype=numpy.bool_, dims=(16,))
......@@ -82,76 +64,20 @@ class SST(hardware_device):
FPGA_sst_offload_selector_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_selector_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,))
# number of UDP packets that were received
nof_packets_received_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64)
# number of UDP packets that were dropped because we couldn't keep up with processing
nof_packets_dropped_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64)
# last packet we processed
last_packet_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8)
# when last packet was received
last_packet_timestamp_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64)
# number of UDP packets that were processed
nof_packets_processed_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64)
# queue fill percentage, as reported by the consumer
queue_fill_percentage_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64)
# number of invalid (non-SST) packets received
nof_invalid_packets_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64)
# last packet that could not be parsed
last_invalid_packet_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8)
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
# latest SSTs
sst_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64)
sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(SSTCollector.MAX_SUBBANDS, SSTCollector.MAX_INPUTS), datatype=numpy.uint64)
# reported timestamp for each row in the latest SSTs
sst_timestamp_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
# integration interval for each row in the latest SSTs
integration_interval_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32)
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.float32)
# --------
# overloaded functions
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.sst_client.stop()
except Exception as e:
self.warn_stream("Exception while stopping sst_client in configure_for_off function: {}. Exception ignored".format(e))
try:
self.opcua_connection.stop()
except Exception as e:
self.warn_stream("Exception while stopping OPC UA connection in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the statistics device."""
self.sst_client = sst_client("0.0.0.0", self.SST_Client_Port, self.Fault, self)
self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
# map an access helper class
for i in self.attr_list():
try:
if i.comms_id == sst_client:
i.set_comm_client(self.sst_client)
if i.comms_id == OPCUAConnection:
i.set_comm_client(self.OPCUA_client)
except Exception as e:
# use the pass function instead of setting read/write fails
i.set_pass_func()
self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e))
pass
self.sst_client.start()
self.OPCUA_client.start()
# Overloaded functions
# --------
# --------
# Commands
......
# -*- coding: utf-8 -*-
#
# This file is part of the SST project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" Base device for Statistics (SST/BST/XST)
"""
# TODO(Corne): Remove sys.path.append hack once packaging is in place!
import os, sys
currentdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.dirname(currentdir)
parentdir = os.path.dirname(parentdir)
sys.path.append(parentdir)
from abc import ABCMeta, abstractmethod
# PyTango imports
from tango.server import run
from tango.server import device_property, attribute
from tango import AttrWriteType
# Additional import
from clients.statistics_client import StatisticsClient
from clients.opcua_client import OPCUAConnection
from clients.attribute_wrapper import attribute_wrapper
from devices.hardware_device import hardware_device
from common.lofar_git import get_version
from common.lofar_logging import device_logging_to_python, log_exceptions
import numpy
__all__ = ["Statistics"]
class Statistics(hardware_device, metaclass=ABCMeta):
# In derived classes, set this to a subclass of StatisticsCollector
@property
@abstractmethod
def STATISTICS_COLLECTOR_CLASS(self):
pass
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
mandatory=True
)
OPC_Server_Port = device_property(
dtype='DevULong',
mandatory=True
)
OPC_Time_Out = device_property(
dtype='DevDouble',
mandatory=True
)
Statistics_Client_Port = device_property(
dtype='DevUShort',
mandatory=True
)
# ----------
# Attributes
# ----------
version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version())
# number of UDP packets that were received
nof_packets_received_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64)
# number of UDP packets that were dropped because we couldn't keep up with processing
nof_packets_dropped_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64)
# last packet we processed
last_packet_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8)
# when last packet was received
last_packet_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64)
# queue fill percentage, as reported by the consumer
queue_fill_percentage_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64)
# number of UDP packets that were processed
nof_packets_processed_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_packets"}, datatype=numpy.uint64)
# number of invalid (non-SST) packets received
nof_invalid_packets_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64)
# last packet that could not be parsed
last_invalid_packet_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8)
# --------
# Overloaded functions
# --------
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.statistics_client.stop()
except Exception as e:
self.warn_stream("Exception while stopping statistics_client in configure_for_off function: {}. Exception ignored".format(e))
try:
self.OPCUA_client.stop()
except Exception as e:
self.warn_stream("Exception while stopping OPC UA connection in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the statistics device."""
self.statistics_client = StasticsClient(self.STATISTICS_COLLECTOR_CLASS, "0.0.0.0", self.Statistics_Client_Port, self.Fault, self)
self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
# map an access helper class
for i in self.attr_list():
try:
if i.comms_id == StatisticsClient:
i.set_comm_client(self.statistics_client)
elif i.comms_id == OPCUAConnection:
i.set_comm_client(self.OPCUA_client)
else:
raise ValueError("Cannot set comm client for attribute {}: Unknown comms_id {}".format(i, i.comms_id))
except Exception as e:
# use the pass function instead of setting read/write fails
i.set_pass_func()
self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e))
pass
self.statistics_client.start()
self.OPCUA_client.start()
# --------
# Commands
# --------
from queue import Queue
from threading import Thread
import logging
import numpy
from .statistics_packet import SSTPacket
logger = logging.getLogger()
class StatisticsCollector(Thread):
""" Base class to process statistics packets from a queue, asynchronously. """
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
# Maximum time to wait for the Thread to get unstuck, if we want to stop
DISCONNECT_TIMEOUT = 10.0
def __init__(self, queue: Queue):
self.queue = queue
self.last_packet = None
self.parameters = {
"nof_packets": numpy.uint64(0),
# Packet count for packets that could not be parsed
"nof_invalid_packets": numpy.uint64(0),
# Full contents of the latest packet we deemed invalid.
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
}
super().__init__()
self.start()
def run(self):
logger.info("Starting statistics thread")
while True:
self.last_packet = self.queue.get()
# This is the exception/slow path, but python doesn't allow us to optimise that
if self.last_packet is None:
# None is the magic marker to stop processing
break
self.parameters["nof_packets"] += numpy.uint64(1)
try:
self.process_packet(self.last_packet)
except Exception as e:
logger.exception("Could not parse statistics UDP packet")
self.parameters["last_invalid_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
logger.info("Stopped statistics thread")
def join(self, timeout=0):
# insert magic marker
self.queue.put(None)
logger.info("Sent shutdown to statistics thread")
super().join(timeout)
def disconnect(self):
if not self.is_alive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.DISCONNECT_TIMEOUT)
if self.is_alive():
# there is nothing we can do except wait (stall) longer, which could be indefinitely.
logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.")
def process_packet(self, packet):
""" Update any information based on this packet. """
raise NotImplementedError
class SSTCollector(StatisticsCollector):
""" Class to process SST statistics packets. """
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
def __init__(self, queue):
super().__init__(queue)
self.parameters.extend({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64),
"sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32),
})
def process_packet(self, packet):
fields = SSTPacket(packet)
# determine which input this packet contains data for
if fields.signal_input_index >= self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
raise ValueError("Packet describes input %d, but we are limited to describing MAX_INPUTS=%d" % (fields.signal_input_index, self.MAX_INPUTS))
input_index = fields.signal_input_index
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1)
self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment