Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lofar2.0/tango
  • mckenna/tango
2 results
Show changes
Commits on Source (61)
......@@ -710,6 +710,19 @@
}
}
},
"SST": {
"LTS": {
"SST": {
"LTS/SST/1": {
"properties": {
"SST_Port": [
"5001"
]
}
}
}
}
},
"StatsCrosslet": {
"CS997": {
"StatsCrosslet": {
......
......@@ -89,7 +89,7 @@
}
}
},
"Statistics": {
"SST": {
"1": {
"SST": {
"LTS/SST/1": {
......
from struct import unpack, calcsize
from datetime import datetime, timezone
import numpy
__all__ = ["StatisticsPacket"]
def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int:
""" Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB.
For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal:
get_bit_value(b'01100', 2, 3) == 3
If 'last_bit' is not given, just the value of bit 'first_bit' is returned. """
# default last_bit to first_bit
if last_bit is None:
last_bit = first_bit
return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1)
class StatisticsPacket(object):
"""
Models a statistics UDP packet from SDP.
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
"""
def __init__(self, packet: bytes):
self.packet = packet
# Only parse valid packets
if self.marker not in 'SBX':
raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is '{}', not one of 'SBX'.".format(self.marker))
@property
def marker(self) -> str:
""" Return the type of statistic:
'S' = SST
'B' = BST
'X' = XST
"""
raw_marker = unpack("c",self.packet[0:1])[0]
try:
return raw_marker.decode('ascii')
except UnicodeDecodeError:
# non-ascii (>127) character, return as binary
#
# this is typically not visible to the user, as these packets are not SDP statistics packets,
# which the constructor will refuse to accept.
return raw_marker
@property
def version_id(self) -> int:
""" Return the version of this packet. """
return unpack("B",self.packet[1:2])[0]
@property
def observation_id(self) -> int:
""" Return the ID of the observation running when this packet was generated. """
return unpack("<I",self.packet[2:6])[0]
@property
def station_id(self) -> int:
""" Return the number of the station this packet was generated on. """
return unpack("<H",self.packet[6:8])[0]
@property
def source_info(self) -> int:
""" Return a dict with the source_info flags. The dict contains the following fields:
_raw: raw value of the source_info field in the packet, as an integer.
antenna_band_index: antenna type. 0 = low band, 1 = high band.
nyquist_zone_index: nyquist zone of filter:
0 = 0 -- 1/2 * t_adc Hz (low band),
1 = 1/2 * t_adc -- t_adc Hz (high band),
2 = t_adc -- 3/2 * t_adc Hz (high band).
t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz.
fsub_type: sampling method. 0 = critically sampled, 1 = oversampled.
payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered).
beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only).
subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not
reserved: reserved bits
gn_index: global index of FPGA that emitted this packet. """
bits = unpack("<H",self.packet[8:10])[0]
return {
"_raw": bits,
"antenna_band_index": get_bit_value(bits, 15),
"nyquist_zone_index": get_bit_value(bits, 13, 14),
"t_adc": get_bit_value(bits, 12),
"fsub_type": get_bit_value(bits, 11),
"payload_error": get_bit_value(bits, 10),
"beam_repositioning_flag": get_bit_value(bits, 9),
"subband_calibrated_flag": get_bit_value(bits, 8),
"reserved": get_bit_value(bits, 5, 7),
"gn_index": get_bit_value(bits, 0, 4),
}
@property
def reserved(self) -> bytes:
""" Reserved bytes. """
return self.packet[10:11]
@property
def integration_interval_raw(self) -> int:
""" Returns the integration interval, in blocks. """
# This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.
return unpack("<I", self.packet[11:14] + b'0')[0]
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """
# Translate to seconds using the block period
return self.integration_interval_raw * self.block_period()
@property
def data_id(self) -> int:
""" Returns the generic data identifier. """
return unpack("<I",self.packet[14:18])[0]
@property
def nof_signal_inputs(self) -> int:
""" Number of inputs that were used for constructing the payload. """
return unpack("<B",self.packet[18:19])[0]
@property
def nof_bytes_per_statistic(self) -> int:
""" Word size for the payload. """
return unpack("<B",self.packet[19:20])[0]
@property
def nof_statistics_per_packet(self) -> int:
""" Number of data points in the payload. """
return unpack("<H",self.packet[20:22])[0]
@property
def block_period_raw(self) -> int:
""" Return the block period, in nanoseconds. """
return unpack("<H",self.packet[22:24])[0]
def block_period(self) -> float:
""" Return the block period, in seconds. """
return self.block_period_raw / 1e9
@property
def block_serial_number(self) -> int:
""" Block index since epoch (1970). """
return unpack("<Q",self.packet[24:32])[0]
def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet. """
return datetime.fromtimestamp(self.block_serial_number * self.block_period(), timezone.utc)
def header(self) -> dict:
""" Return all the header fields as a dict. """
return {
"marker": self.marker,
"version_id": self.version_id,
"observation_id": self.observation_id,
"station_id": self.station_id,
"source_info": self.source_info,
"reserved": self.reserved,
"integration_interval_raw": self.integration_interval_raw,
"integration_interval": self.integration_interval(),
"data_id": self.data_id,
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,
"block_period_raw": self.block_period_raw,
"block_period": self.block_period(),
"block_serial_number": self.block_serial_number,
"timestamp": self.timestamp(),
}
@property
def payload_sst(self) -> numpy.array:
""" The payload of this packet, interpreted as SST data. """
if self.marker != 'S':
raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker))
# derive which and how many elements to read from the packet header
bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
format_str = "<{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic])
return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)]))
if __name__ == "__main__":
# parse one packet from stdin
import sys
import pprint
# read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when
# the packet is complete and can stop reading.
data = sys.stdin.buffer.read()
packet = StatisticsPacket(data)
# print header & payload
pprint.pprint(packet.header())
pprint.pprint(packet.payload_sst)
# -*- coding: utf-8 -*-
#
# This file is part of the Statistics project
# This file is part of the SST project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" Statistics Device Server for LOFAR2.0
""" SST Device Server for LOFAR2.0
"""
# PyTango imports
from tango.server import run
from tango.server import device_property
from tango.server import device_property, attribute
from tango import AttrWriteType
# Additional import
from clients.sst_client import sst_client
from clients.sst_client import sst_client, SST_collector
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
from util.lofar_git import get_version
from util.lofar_logging import device_logging_to_python, log_exceptions
......@@ -43,13 +44,37 @@ class SST(hardware_device):
# ----------
# Attributes
# ----------
# --------
# SST client annotation consists of a dict that contains the parameter name that needs to be read.
# Example: comms_annotation={"parameter": "this_value_R"}
packet_count_R = attribute_wrapper(comms_annotation={"parameter": "packet_count_R"}, datatype=numpy.int64)
last_packet_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "last_packet_timestamp_R"}, datatype=numpy.int64)
queue_percentage_used_R = attribute_wrapper(comms_annotation={"parameter": "queue_percentage_used_R"}, datatype=numpy.double)
version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version())
# number of UDP packets that were received
nof_packets_received_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64)
# number of UDP packets that were dropped because we couldn't keep up with processing
nof_packets_dropped_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64)
# last packet we processed
last_packet_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8)
# when last packet was received
last_packet_timestamp_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64)
# number of UDP packets that were processed
nof_packets_processed_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64)
# queue fill percentage, as reported by the consumer
queue_fill_percentage_R = attribute_wrapper(comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64)
# number of invalid (non-SST) packets received
nof_invalid_packets_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64)
# last packet that could not be parsed
last_invalid_packet_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8)
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# latest SSTs
sst_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64)
# reported timestamp for each row in the latest SSTs
sst_timestamp_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# integration interval for each row in the latest SSTs
integration_interval_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32)
# --------
# overloaded functions
......@@ -89,13 +114,13 @@ class SST(hardware_device):
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the Statistics Device module."""
"""Main function of the SST Device module."""
from util.lofar_logging import configure_logger
import logging
configure_logger(logging.getLogger())
return run((Statistics,), args=args, **kwargs)
return run((SST,), args=args, **kwargs)
if __name__ == '__main__':
......
import struct
from datetime import datetime, timezone
from typing import Tuple
import numpy
__all__ = ["StatisticsPacket", "SSTPacket", "XSTPacket", "BSTPacket"]
def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int:
""" Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB.
For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal:
get_bit_value(b'01100', 2, 3) == 3
If 'last_bit' is not given, just the value of bit 'first_bit' is returned. """
# default last_bit to first_bit
if last_bit is None:
last_bit = first_bit
return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1)
class StatisticsPacket(object):
"""
Models a statistics UDP packet from SDP.
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
The following fields are exposed as properties & functions. The _raw fields come directly
from the packet, and have more user-friendly alternatives for intepretation:
marker_raw packet marker as byte.
marker() packet marker as character. 'S' = SST, 'X' = XST, 'B' = BST
version_id packet format version.
observation_id observation identifier.
station_id station identifier.
source_info: bit field with input information, encoding several other properties.
antenna_band_index: antenna type. 0 = low band, 1 = high band.
nyquist_zone_index: nyquist zone of filter:
0 = 0 -- 1/2 * t_adc Hz (low band),
1 = 1/2 * t_adc -- t_adc Hz (high band),
2 = t_adc -- 3/2 * t_adc Hz (high band).
t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz.
fsub_type: sampling method. 0 = critically sampled, 1 = oversampled.
payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered).
beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only).
subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not.
gn_index: global index of FPGA that emitted this packet.
data_id: bit field with payload information, encoding several other properties.
nof_signal_inputs: number of inputs that contributed to data in this packet.
nof_bytes_per_statistics: word size of each statistic.
nof_statistics_per_packet: number of statistic data points in the payload.
integration_interval_raw: integration interval, in block periods.
integration_interval(): integration interval, in seconds.
block_period_raw: block period, in nanoseconds.
block_period(): block period, in seconds.
block_serial_number: timestamp of the data, in block periods since 1970.
timestamp(): timestamp of the data, as a datetime object.
"""
def __init__(self, packet: bytes):
self.packet = packet
self.unpack()
# Only parse valid statistics packets from SDP, reject everything else
if self.marker_raw not in b'SBX':
raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is {}, not one of 'SBX'.".format(self.marker))
def unpack(self):
""" Unpack the packet into properties of this object. """
# format string for the header, see unpack below
self.header_format = ">cBL HHB BHL BBH HQ"
self.header_size = struct.calcsize(self.header_format)
# unpack fields
try:
(self.marker_raw,
self.version_id,
self.observation_id,
self.station_id,
self.source_info,
# reserved byte
_,
# integration interval, in block periods. This field is 3 bytes, big endian -- combine later
integration_interval_hi,
integration_interval_lo,
self.data_id,
self.nof_signal_inputs,
self.nof_bytes_per_statistic,
self.nof_statistics_per_packet,
self.block_period_raw,
self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size])
self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo
except struct.error as e:
raise ValueError("Error parsing statistics packet") from e
# unpack the fields we just updated
self.unpack_source_info()
self.unpack_data_id()
def unpack_source_info(self):
""" Unpack the source_info field into properties of this object. """
self.antenna_band_index = get_bit_value(self.source_info, 15)
self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14)
self.t_adc = get_bit_value(self.source_info, 12)
self.fsub_type = get_bit_value(self.source_info, 11)
self.payload_error = get_bit_value(self.source_info, 10)
self.beam_repositioning_flag = get_bit_value(self.source_info, 9)
self.subband_calibrated_flag = get_bit_value(self.source_info, 8)
# self.source_info 5-7 are reserved
self.gn_index = get_bit_value(self.source_info, 0, 4)
def unpack_data_id(self):
""" Unpack the data_id field into properties of this object. """
# only useful in specialisations (XST/SST/BST)
pass
def expected_size(self) -> int:
""" The size this packet should be (header + payload), according to the header. """
return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic
@property
def marker(self) -> str:
""" Return the type of statistic:
'S' = SST
'B' = BST
'X' = XST
"""
try:
return self.marker_raw.decode('ascii')
except UnicodeDecodeError:
# non-ascii (>127) character, return as binary
#
# this is typically not visible to the user, as these packets are not SDP statistics packets,
# which the constructor will refuse to accept.
return self.marker_raw
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """
# Translate to seconds using the block period
return self.integration_interval_raw * self.block_period()
def block_period(self) -> float:
""" Return the block period, in seconds. """
return self.block_period_raw / 1e9
def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet.
Returns datetime.min if the block_serial_number in the packet is not set (0),
Returns datetime.max if the timestamp cannot be represented in python (likely because it is too large). """
try:
return datetime.fromtimestamp(self.block_serial_number * self.block_period(), timezone.utc)
except ValueError:
# Let's not barf anytime we want to print a header
return datetime.max
def header(self) -> dict:
""" Return all the header fields as a dict. """
header = {
"marker": self.marker,
"version_id": self.version_id,
"observation_id": self.observation_id,
"station_id": self.station_id,
"source_info": {
"_raw": self.source_info,
"antenna_band_index": self.antenna_band_index,
"nyquist_zone_index": self.nyquist_zone_index,
"t_adc": self.t_adc,
"fsub_type": self.fsub_type,
"payload_error": self.payload_error,
"beam_repositioning_flag": self.beam_repositioning_flag,
"subband_calibrated_flag": self.subband_calibrated_flag,
"gn_index": self.gn_index,
},
"data_id": {
"_raw": self.data_id,
},
"integration_interval_raw": self.integration_interval_raw,
"integration_interval": self.integration_interval(),
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,
"block_period_raw": self.block_period_raw,
"block_period": self.block_period(),
"block_serial_number": self.block_serial_number,
"timestamp": self.timestamp(),
}
return header
class SSTPacket(StatisticsPacket):
"""
Models an SST statistics UDP packet from SDP.
The following fields are exposed as properties & functions.
signal_input_index: input (antenna polarisation) index for which this packet contains statistics
payload[nof_statistics_per_packet]: SST statistics, an array of amplitudes per subband.
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse SST packets
if self.marker != 'S':
raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker))
def unpack_data_id(self):
super().unpack_data_id()
self.signal_input_index = get_bit_value(self.data_id, 0, 7)
def header(self):
header = super().header()
header["data_id"]["signal_input_index"] = self.signal_input_index
return header
@property
def payload(self) -> numpy.array:
""" The payload of this packet, interpreted as SST data. """
# derive which and how many elements to read from the packet header
bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
format_str = ">{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic])
return numpy.array(struct.unpack(format_str, self.packet[self.header_size:self.header_size + struct.calcsize(format_str)]))
class XSTPacket(StatisticsPacket):
"""
Models an XST statistics UDP packet from SDP.
The following fields are exposed as properties & functions.
subband_index: subband number for which this packet contains statistics.
baseline: antenna pair for which this packet contains statistics.
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse XST packets
if self.marker != 'X':
raise Exception("Payload of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format(self.marker))
def unpack_data_id(self):
super().unpack_data_id()
self.subband_index = get_bit_value(self.data_id, 16, 24)
self.baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7))
def header(self):
header = super().header()
header["data_id"]["subband_index"] = self.subband_index
header["data_id"]["baseline"] = self.baseline
return header
class BSTPacket(StatisticsPacket):
"""
Models an BST statistics UDP packet from SDP.
The following fields are exposed as properties & functions.
beamlet_index: the number of the beamlet for which this packet holds statistics.
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse BST packets
if self.marker != 'B':
raise Exception("Payload of BST requested of a non-BST packet. Actual packet marker is '{}', but must be 'B'.".format(self.marker))
def unpack_data_id(self):
super().unpack_data_id()
self.beamlet_index = get_bit_value(self.data_id, 0, 15)
def header(self):
header = super().header()
header["data_id"]["beamlet_index"] = self.beamlet_index
return header
if __name__ == "__main__":
# parse one packet from stdin
import sys
import pprint
# read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when
# the packet is complete and can stop reading.
data = sys.stdin.buffer.read()
packet = SSTPacket(data)
# print header & payload
pprint.pprint(packet.header())
pprint.pprint(packet.payload)
......@@ -2,6 +2,7 @@ import queue
from threading import Thread
import socket
from util.comms_client import CommClient
from clients.StatisticsPacket import SSTPacket
from queue import Queue
......@@ -11,9 +12,11 @@ import socket
from datetime import datetime
from multiprocessing import Value, Array
import time
from datetime import datetime
__all__ = ["sst_client", "SST_collector"]
__all__ = ["sst_client"]
logger = logging.getLogger()
class sst_client(CommClient):
"""
......@@ -24,14 +27,14 @@ class sst_client(CommClient):
def start(self):
super().start()
def __init__(self, host, port, fault_func, streams, try_interval=2, queuesize=1024, buffersize=9000):
def __init__(self, host, port, fault_func, streams, try_interval=2, queuesize=1024):
"""
Create the sst client and connect() to it and get the object node
"""
self.host = host
self.port = port
self.timeout = 0.1
self.buffersize = buffersize
self.poll_timeout = 0.1
self.disconnect_timeout = 10.0
self.queuesize = queuesize
super().__init__(fault_func, streams, try_interval)
......@@ -42,34 +45,42 @@ class sst_client(CommClient):
fault_func()
return
def queue_fill_percentage(self):
try:
return 100 * self.queue.qsize() / self.queue.maxsize if self.queue.maxsize else 0
except NotImplementedError:
# some platforms don't have qsize(), nothing we can do here
return 0
def connect(self):
"""
Function used to connect to the client.
"""
if not self.connected:
self.queue = Queue(maxsize=self.queuesize)
self.udp = UDP_Receiver(self.host, self.port, self.queue, self.streams, self.buffersize, self.timeout)
self.sst = SST(self.queue, self.streams)
return super().connect()
self.udp = UDP_Receiver(self.host, self.port, self.queue, self.poll_timeout, self.disconnect_timeout)
self.sst = SST_collector(self.queue, self.disconnect_timeout)
return super().connect()
def ping(self):
if not self.sst.isAlive():
raise Exception("SST thread died unexpectedly")
if not self.udp.isAlive():
raise Exception("UDP thread died unexpectedly")
def disconnect(self):
# explicit disconnect, instead of waiting for the GC to kick in after "del" below
self.sst.disconnect()
self.udp.disconnect()
del self.udp
del self.sst
del self.queue
return super().disconnect()
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
parameter = annotation.get('parameter', None)
if parameter is None:
raise Exception("No SST parameter was given in the annotation: %s", annotation)
return parameter
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
......@@ -82,15 +93,24 @@ class sst_client(CommClient):
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
SST_param = self._setup_annotation(annotation)
parameter = annotation["parameter"]
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
def read_function():
return [self.sst.parameters[SST_param]]
# redirect to right object. this works as long as the parameter names are unique among them.
if annotation["type"] == "sst":
def read_function():
return self.sst.parameters[parameter]
elif annotation["type"] == "udp":
def read_function():
return self.udp.parameters[parameter]
elif annotation["type"] == "queue":
if parameter == "fill_percentage":
def read_function():
return numpy.uint64(self.queue_fill_percentage())
else:
raise ValueError("Unknown queue parameter requested: %s" % parameter)
def write_function(value):
"""
......@@ -105,20 +125,38 @@ class UDP_Receiver(Thread):
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, host, port, queue, streams, buffersize=9000, timeout=0.1):
self.streams = streams
def __init__(self, host, port, queue, poll_timeout=0.1, disconnect_timeout=10.0):
self.queue = queue
self.host = host
self.port = port
self.buffersize = buffersize
self.disconnect_timeout = disconnect_timeout
self.parameters = {
# Number of packets we received
"nof_packets_received": numpy.uint64(0),
# Number of packets we had to drop due to a full queue
"nof_packets_dropped": numpy.uint64(0),
# Packets are at most 9000 bytes, the largest payload (well, MTU) of an Ethernet Jumbo frame
"last_packet": numpy.zeros((9000,), dtype=numpy.uint8),
# Timestamp of when the last packet was received
"last_packet_timestamp": numpy.uint64(0),
}
logger.debug("binding a socket on UDP port {}:{}".format(self.host, self.port))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Allow binding even if there are still lingering packets in the kernel for a
# previous listener that already died. If not, we get an "Address already in use".
# This is stock socket usage.
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# specify what host and port to listen on
self.sock.bind((self.host, self.port))
# Make sure we can stop receiving packets even if none arrive.
# Without this, the recvmsg() call blocks indefinitely if no packet arrives.
self.sock.settimeout(poll_timeout)
self.sock.settimeout(timeout)
self.stream_on = True
super().__init__()
......@@ -126,57 +164,147 @@ class UDP_Receiver(Thread):
def run(self):
# all variables are manually defined and are updated each time
self.streams.debug_stream("starting UDP thread with port {} and host {}".format(self.port, self.host))
logger.info("Starting UDP thread for {}:{}".format(self.host, self.port))
while self.stream_on:
try:
packet = [bytearray(self.buffersize)]
self.sock.recvmsg_into(packet[0:self.buffersize])
self.queue.put(packet)
packet, _, _, _ = self.sock.recvmsg(9000)
self.parameters["nof_packets_received"] += numpy.uint64(1)
self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["last_packet_timestamp"] = numpy.uint64(int(time.time()))
# Forward packet to processing thread
self.queue.put(packet)
except socket.timeout:
# timeout -- expected, allows us to check whether to stop
pass
except queue.Full:
pass
# overflow -- just discard
self.parameters["nof_packets_dropped"] += numpy.uint64(1)
def __del__(self):
logger.info("Stopping UDP thread for {}:{}".format(self.host, self.port))
def join(self, timeout=0):
self.stream_on = False
self.join()
logging.info("Sent shutdown to UDP thread for {}:{}".format(self.host, self.port))
self.join(timeout)
if self.isAlive():
# happens if timeout is hit
return
# shutdown the socket so that others can listen on this port
self.sock.shutdown(socket.SHUT_RDWR)
def disconnect(self):
if not self.isAlive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.disconnect_timeout)
class SST(Thread):
def __init__(self, queue, streams):
if self.isAlive():
logger.error("UDP thread not shutting down for {}:{}".format(self.host, self.port))
self.streams = streams
def __del__(self):
self.disconnect()
class SST_collector(Thread):
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
def __init__(self, queue, disconnect_timeout=10.0):
self.queue = queue
self.last_packet = None
self.disconnect_timeout = disconnect_timeout
self.parameters = {
"packet_count_R": numpy.int64(0),
"last_packet_timestamp_R": numpy.int64(0),
"queue_percentage_used_R": numpy.double(100 * self.queue.qsize() / self.queue.maxsize)
"nof_packets": numpy.uint64(0),
# Packet count for packets that could not be parsed as SSTs
"nof_invalid_packets": numpy.uint64(0),
# Full contents of the latest packet we deemed invalid.
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64),
"sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32),
}
super().__init__()
self.start()
def run(self):
self.streams.debug_stream("starting SST thread")
logging.info("Starting SST thread")
while True:
packet = self.queue.get()
self.last_packet = self.queue.get()
if packet is None:
self.queue.clear()
# This is the exception/slow path, but python doesn't allow us to optimise that
if self.last_packet is None:
# None is the magic marker to stop processing
break
self.process_packet(packet)
self.process_packet(self.last_packet)
def __del__(self):
logging.info("Stopping SST thread")
def join(self, timeout=0):
# insert magic marker
self.queue.put(None)
self.join()
logging.info("Sent shutdown to SST thread")
super().join(timeout)
def disconnect(self):
if not self.isAlive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.disconnect_timeout)
if self.isAlive:
logger.error("SST thread not shutting down")
def process_packet(self, packet):
self.parameters["packet_count_R"] += 1
self.parameters["last_packet_timestamp_R"] = numpy.int64(int(time.time()))
self.parameters["queue_percentage_used_R"] = numpy.double(100 * self.queue.qsize() / self.queue.maxsize)
\ No newline at end of file
self.parameters["nof_packets"] += numpy.uint64(1)
try:
fields = SSTPacket(packet)
# determine which input this packet contains data for
if fields.signal_input_index >= self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
raise ValueError("Packet describes input %d, but we are limited to describing MAX_INPUTS=%d" % (fields.signal_input_index, self.MAX_INPUTS))
input_index = fields.signal_input_index
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1)
return
# process the packet
self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1)
self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval()
except Exception as e:
# This is unexpected, so print a stack trace
logging.exception("Could not parse SST UDP packet")
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
self.parameters["last_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
......@@ -13,16 +13,18 @@
version: '2'
services:
device-statistics:
image: device-statistics
device-sst:
image: device-sst
# build explicitly, as docker-compose does not understand a local image
# being shared among services.
build:
context: lofar-device-base
args:
SOURCE_IMAGE: ${DOCKER_REGISTRY_HOST}/${DOCKER_REGISTRY_USER}/tango-itango:9.3.3.7
container_name: ${CONTAINER_NAME_PREFIX}device-statistics
container_name: ${CONTAINER_NAME_PREFIX}device-sst
network_mode: ${NETWORK_MODE}
ports:
- 5001:5001/udp
volumes:
- ${TANGO_LOFAR_CONTAINER_MOUNT}
environment:
......@@ -33,5 +35,5 @@ services:
- --timeout=30
- --strict
- --
- python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/Statistics.py LTS -v
- python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/SST.py LTS -v
restart: on-failure
# Create shortcuts for our devices
pcc = DeviceProxy("LTS/PCC/1")
sdp = DeviceProxy("LTS/SDP/1")
sst = DeviceProxy("LTS/SST/1")
# Put them in a list in case one wants to iterate
devices = [pcc, sdp]
devices = [pcc, sdp, sst]