Skip to content
Snippets Groups Projects
Commit 17fc70e1 authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-868: Partial migration to lofar client

parent b763f757
No related branches found
No related tags found
1 merge request!394Resolve L2SS-868
......@@ -2,6 +2,7 @@
# order of appearance. Changing the order has an impact on the overall
# integration process, which may cause wedges in the gate later.
lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client
asyncua >= 0.9.90 # LGPLv3
PyMySQL[rsa] >= 1.0.2 # MIT
psycopg2-binary >= 2.9.2 # LGPL
......
......@@ -14,14 +14,14 @@
# PyTango imports
from tango.server import device_property, attribute
from tango import AttrWriteType
# Additional import
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.clients.opcua_client import OPCUAConnection
from tangostationcontrol.clients.statistics.client import StatisticsClient
from tangostationcontrol.devices.sdp.statistics import Statistics
from tangostationcontrol.statistics.collector import SSTCollector
from tangostationcontrol.statistics.collector import StationSSTCollector
import numpy
......@@ -30,7 +30,7 @@ __all__ = ["SST", "main"]
class SST(Statistics):
STATISTICS_COLLECTOR_CLASS = SSTCollector
STATISTICS_COLLECTOR_CLASS = StationSSTCollector
# -----------------
# Device Properties
......@@ -95,18 +95,18 @@ class SST(Statistics):
FPGA_sst_offload_nof_valid_R = attribute_wrapper(comms_annotation=["FPGA_sst_offload_nof_valid_R"], datatype=numpy.int32, dims=(16,))
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(StationSSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(StationSSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# latest SSTs
sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(SSTCollector.MAX_INPUTS, SSTCollector.MAX_SUBBANDS), datatype=numpy.uint64)
sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(StationSSTCollector.MAX_INPUTS, StationSSTCollector.MAX_SUBBANDS), datatype=numpy.uint64)
# reported timestamp
# for each row in the latest SSTs
sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
# integration interval for each row in the latest SSTs
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.float32)
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=numpy.float32)
# whether the subband data was calibrated by the SDP (that is, were subband weights applied)
subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(SSTCollector.MAX_INPUTS,), datatype=bool)
subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=bool)
# ----------
# Summarising Attributes
......
......@@ -9,7 +9,7 @@
from tangostationcontrol.integration_test.base import BaseIntegrationTestCase
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tangostationcontrol.statistics.collector import SSTCollector
from tangostationcontrol.statistics.collector import StationSSTCollector
from tangostationcontrol.statistics_writer import statistics_reader
from tangostationcontrol.statistics import writer
......@@ -45,7 +45,7 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
def test_insert_tango_SST_statistics(self):
self.setup_recv_proxy()
self.assertEqual(DevState.ON, self.recv_proxy.state())
collector = SSTCollector()
collector = StationSSTCollector()
# Test attribute values retrieval
collector.parse_device_attributes(self.recv_proxy)
......
import logging
import numpy
import datetime
from tangostationcontrol.statistics.packets import SSTPacket, XSTPacket, BSTPacket
from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index
from tango import DeviceProxy, DevFailed, DevState
logger = logging.getLogger()
class StatisticsCollector:
""" Base class to process statistics packets into parameters matrices. """
# Maximum number of FPGAs we receive data from (used for diagnostics)
MAX_FPGAS = 16
def __init__(self):
self.parameters = self._default_parameters()
def _default_parameters(self):
return {
"nof_packets": numpy.uint64(0),
# Packet count for packets that could not be parsed
"nof_invalid_packets": numpy.uint64(0),
# Full contents of the latest packet we deemed invalid.
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
}
def process_packet(self, packet, device=None):
self.parameters["nof_packets"] += numpy.uint64(1)
try:
self.parse_packet(packet, device)
except Exception as e:
self.parameters["last_invalid_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
raise ValueError("Could not parse statistics packet") from e
def parse_packet(self, packet, device):
""" Update any information based on this packet. """
raise NotImplementedError
def parse_device_attributes(self):
""" Update information based on device attributes """
raise NotImplementedError
class SSTCollector(StatisticsCollector):
""" Class to process SST statistics packets. """
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64),
"sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32),
"subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=bool),
# RECV attribute values to monitor the station configuration
"rcu_attenuator_dB": numpy.zeros((32,3), dtype=numpy.int64),
"rcu_band_select": numpy.zeros((32,3), dtype=numpy.int64),
"rcu_dth_on": numpy.full((32,3), False),
})
return defaults
def parse_packet(self, packet, device):
fields = SSTPacket(packet)
# determine which input this packet contains data for
if fields.signal_input_index >= self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
raise ValueError("Packet describes input %d, but we are limited to describing MAX_INPUTS=%d" % (fields.signal_input_index, self.MAX_INPUTS))
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
input_index = fields.signal_input_index
from lofar_station_client.statistics import SSTCollector
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1)
from tango import DevFailed
from tango import DeviceProxy
from tango import DevState
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval()
self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flag
class StationSSTCollector(SSTCollector):
def parse_packet(self, packet, obj):
super(StationSSTCollector, self).parse_packet(packet, obj)
# add tango values to packet
self.parse_device_attributes(device)
self.parse_device_attributes(obj)
def parse_device_attributes(self, device: DeviceProxy):
......@@ -129,229 +39,3 @@ class SSTCollector(StatisticsCollector):
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
class XSTCollector(StatisticsCollector):
""" Class to process XST statistics packets.
XSTs are received for up to MAX_PARALLEL_SUBBANDS simultaneously, and only the values of the last
MAX_PARALLEL_SUBBANDS are kept. Raw data are collected for each subband in parameters["xst_blocks"],
and overwritten if newer (younger) data is received for the same subband. As such, the data represent
a rolling view on the XSTs.
The xst_values() function is a user-friendly way to read the xst_blocks.
The hardware can be configured to emit different and/or fewer subbands, causing some of the XSTs
to become stale. It is therefor advised to inspect parameters["xst_timestamps"] as well.
"""
# Maximum number of subbands for which we collect XSTs simultaneously
MAX_PARALLEL_SUBBANDS = 8
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of baselines we can receive
MAX_BASELINES = nr_baselines(MAX_INPUTS)
# Expected block size is BLOCK_LENGTH x BLOCK_LENGTH
BLOCK_LENGTH = 12
# Expected number of blocks: enough to cover all baselines without the conjugates (that is, the top-left triangle of the matrix).
MAX_BLOCKS = nr_baselines(MAX_INPUTS // BLOCK_LENGTH)
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
# Complex values are (real, imag). A bit silly, but we don't want magical constants.
VALUES_PER_COMPLEX = 2
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"xst_blocks": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64),
# Whether the values are actually conjugated and transposed
"xst_conjugated": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS,), dtype=bool),
# When the youngest data for each subband was received
"xst_timestamps": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float64),
"xst_subbands": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.uint16),
"xst_integration_intervals": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float32),
})
return defaults
def select_subband_slot(self, subband):
""" Return which subband slot (0..MAX_PARALLEL_SUBBANDS) to use when confronted with a new subband.
Keep recording the same subband if we're already tracking it, but allocate or replace a slot if not. """
indices = numpy.where(self.parameters["xst_subbands"] == subband)[0]
if len(indices) > 0:
# subband already being recorded, use same spot
return indices[0]
else:
# a new subband, kick out the oldest
oldest_timestamp = self.parameters["xst_timestamps"].min()
# prefer the first one in case of multiple minima
return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0]
def parse_packet(self, packet, device=None):
fields = XSTPacket(packet)
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1)
# don't raise, as packet is valid
return
# the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH
if fields.nof_signal_inputs != self.BLOCK_LENGTH:
raise ValueError("Packet describes a block of {0} x {0} baselines, but we can only parse blocks of {1} x {1} baselines".format(fields.nof_signal_inputs, self.BLOCK_LENGTH))
# check whether set of baselines in this packet are not out of bounds
for antenna in (0,1):
if fields.first_baseline[antenna] + fields.nof_signal_inputs > self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
raise ValueError(f"Packet describes {fields.nof_signal_inputs} x {fields.nof_signal_inputs} baselines starting at {fields.first_baseline}, but we are limited to describing MAX_INPUTS={self.MAX_INPUTS}")
# the blocks of baselines need to be tightly packed, and thus be provided at exact intervals
if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0:
raise ValueError(f"Packet describes baselines starting at {fields.first_baseline}, but we require a multiple of BLOCK_LENGTH={self.MAX_INPUTS}")
# Make sure we always have a baseline (a,b) with a>=b. If not, we swap the indices and mark that the data must be conjugated and transposed when processed.
first_baseline = fields.first_baseline
if first_baseline[0] < first_baseline[1]:
conjugated = True
first_baseline = (first_baseline[1], first_baseline[0])
else:
conjugated = False
# we keep track of multiple subbands. select slot for this one
subband_slot = self.select_subband_slot(fields.subband_index)
assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, f"Selected slot {subband_slot}, but only have room for {self.MAX_PARALLEL_SUBBANDS}. Existing slots are {self.parameters['xst_subbands']}, processing subband {fields.subband_index}."
# log if we're replacing a subband we were once recording
self._log_replacing_subband(subband_slot, fields)
# the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH
# starting at baseline first_baseline.
#
# we honour this format, as we want to keep the metadata together with these blocks. we do need to put the blocks in a linear
# and tight order, however, so we calculate a block index.
block_index = baseline_index(first_baseline[0] // self.BLOCK_LENGTH, first_baseline[1] // self.BLOCK_LENGTH)
# We did enough checks on first_baseline for this to be a logic error in our code
assert 0 <= block_index < self.MAX_BLOCKS, f"Received block {block_index}, but have only room for {self.MAX_BLOCKS}. Block starts at baseline {first_baseline}."
# process the packet
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
self.parameters["xst_blocks"][subband_slot, block_index, :fields.nof_statistics_per_packet] = fields.payload
self.parameters["xst_timestamps"][subband_slot] = numpy.float64(fields.timestamp().timestamp())
self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated
self.parameters["xst_subbands"][subband_slot] = numpy.uint16(fields.subband_index)
self.parameters["xst_integration_intervals"][subband_slot] = fields.integration_interval()
def _log_replacing_subband(self, subband_slot, fields):
# log if we're replacing a subband we were once recording
previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot]
if previous_subband_in_slot != fields.subband_index:
if self.parameters["xst_timestamps"][subband_slot] > 0:
previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot])
logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.")
def xst_values(self, subband_indices = None):
""" xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] of complex values.
The subband indices must be in [0..MAX_PARALLEL_SUBBANDS). By default, all recorded XSTs are returned.
"""
if subband_indices is None:
subband_indices = range(self.MAX_PARALLEL_SUBBANDS)
matrix = numpy.zeros((len(subband_indices), self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64)
xst_blocks = self.parameters["xst_blocks"]
xst_conjugated = self.parameters["xst_conjugated"]
for matrix_idx, subband_index in enumerate(subband_indices):
for block_index in range(self.MAX_BLOCKS):
# convert real/imag int to complex float values. this works as real/imag come in pairs
block = xst_blocks[subband_index][block_index].astype(numpy.float32).view(numpy.complex64)
if xst_conjugated[subband_index][block_index]:
# block is conjugated and transposed. process.
block = block.conjugate().transpose()
# reshape into [a][b]
block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH)
# compute destination in matrix
first_baseline = baseline_from_index(block_index)
first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH)
# copy block into matrix
matrix[matrix_idx][first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block
return matrix
class BSTCollector(StatisticsCollector):
""" Class to process SST statistics packets. """
# beamlets = 488 * 2 for the x and y polorisations
MAX_BEAMLETS = 976
MAX_BLOCKS = 2
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"bst_values": numpy.zeros((self.MAX_BLOCKS, self.MAX_BEAMLETS), dtype=numpy.uint64),
"bst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32),
})
return defaults
def parse_packet(self, packet, device=None):
fields = BSTPacket(packet)
# To get the block_index we floor divide this beamlet_index by the max amount of beamlets per block
block_index = fields.beamlet_index // self.MAX_BEAMLETS
# determine which input this packet contains data for
if block_index >= self.MAX_BLOCKS:
# packet describes an input that is out of bounds for us
raise ValueError("Packet describes beamlet %d, but we are limited to describing MAX_BEAMLETS=%d" % (fields.beamlet_index, self.MAX_BEAMLETS))
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
self.parameters["bst_values"][block_index][:self.MAX_BEAMLETS] = fields.payload
self.parameters["bst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][block_index] = fields.integration_interval()
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import sys
import pprint
from lofar_station_client.statistics.packet import SDPPacket
from lofar_station_client.statistics.packet import PACKET_CLASS_FOR_MARKER
def read_packet(read_func) -> SDPPacket:
"""Read a packet using the given read function, with signature
```read_func(num_bytes: int) -> bytes```
and return it. The packet type is sensed from the data and
the correct subclass of SDPPacket is returned.
If read_func() returns None, this function will as well.
"""
# read just the marker
marker = read_func(1)
if not marker:
return None
# read the packet header based on type
packetClass = PACKET_CLASS_FOR_MARKER[marker]
# read the rest of the header
header = read_func(packetClass.HEADER_SIZE - len(marker))
if not header:
return None
header = marker + header
# parse the packet header size
packet = packetClass(header)
# read the payload
payload_size = packet.expected_size() - len(header)
payload = read_func(payload_size)
if not payload:
return None
# return full packet
return packetClass(header + payload)
def main(args=None, **kwargs):
# parse one packet from stdin
# packet counter
nr = 0
# byte offset in the stream
offset = 0
while True:
# read the packet from input
packet = read_packet(sys.stdin.buffer.read)
if not packet:
break
# print header
print(
f"# Packet {nr} of class {packet.__class__.__name__} starting at "
f"offset {offset} with length {packet.size()}"
)
pprint.pprint(packet.header())
# increment counters
nr += 1
offset += packet.size()
# this file is very useful to have stand alone to parse raw packet files, so make it
# work as such
if __name__ == "__main__":
main(sys.argv)
This diff is collapsed.
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import argparse
import time
import sys
......
......@@ -229,7 +229,7 @@ class sst_hdf5_writer(hdf5_writer):
return SSTPacket(packet)
def new_collector(self):
return statistics_collector.SSTCollector()
return statistics_collector.StationSSTCollector()
def write_values_matrix(self, current_group):
# store the SST values
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment