Skip to content
Snippets Groups Projects
Commit 0b1948b3 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-577-support-multiple-xst-streams' into 'master'

L2SS-577: Support multiple XST streams

Closes L2SS-577

See merge request !249
parents 0cc9f221 1ec527b3
No related branches found
No related tags found
1 merge request!249L2SS-577: Support multiple XST streams
...@@ -97,6 +97,13 @@ class StatisticsClient(AsyncCommClient): ...@@ -97,6 +97,13 @@ class StatisticsClient(AsyncCommClient):
# redirect to right object. this works as long as the parameter names are unique among them. # redirect to right object. this works as long as the parameter names are unique among them.
if annotation["type"] == "statistics": if annotation["type"] == "statistics":
def read_function(): def read_function():
if annotation.get("reshape", False):
# force array into the shape of the attribute
if attribute.max_dim_y > 1:
return self.collector.parameters[parameter].reshape(attribute.max_dim_y, attribute.max_dim_x)
else:
return self.collector.parameters[parameter].reshape(attribute.max_dim_x)
else:
return self.collector.parameters[parameter] return self.collector.parameters[parameter]
elif annotation["type"] == "udp": elif annotation["type"] == "udp":
def read_function(): def read_function():
......
...@@ -2,6 +2,7 @@ from queue import Queue ...@@ -2,6 +2,7 @@ from queue import Queue
from threading import Thread from threading import Thread
import logging import logging
import numpy import numpy
import datetime
from .statistics_packet import SSTPacket, XSTPacket from .statistics_packet import SSTPacket, XSTPacket
from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index
...@@ -99,7 +100,21 @@ class SSTCollector(StatisticsCollector): ...@@ -99,7 +100,21 @@ class SSTCollector(StatisticsCollector):
self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flag self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flag
class XSTCollector(StatisticsCollector): class XSTCollector(StatisticsCollector):
""" Class to process XST statistics packets. """ """ Class to process XST statistics packets.
XSTs are received for up to MAX_PARALLEL_SUBBANDS simultaneously, and only the values of the last
MAX_PARALLEL_SUBBANDS are kept. Raw data are collected for each subband in parameters["xst_blocks"],
and overwritten if newer (younger) data is received for the same subband. As such, the data represent
a rolling view on the XSTs.
The xst_values() function is a user-friendly way to read the xst_blocks.
The hardware can be configured to emit different and/or fewer subbands, causing some of the XSTs
to become stale. It is therefor advised to inspect parameters["xst_timestamps"] as well.
"""
# Maximum number of subbands for which we collect XSTs simultaneously
MAX_PARALLEL_SUBBANDS = 8
# Maximum number of antenna inputs we support (used to determine array sizes) # Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192 MAX_INPUTS = 192
...@@ -130,16 +145,33 @@ class XSTCollector(StatisticsCollector): ...@@ -130,16 +145,33 @@ class XSTCollector(StatisticsCollector):
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets # Last value array we've constructed out of the packets
"xst_blocks": numpy.zeros((self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64), "xst_blocks": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64),
# Whether the values are actually conjugated and transposed # Whether the values are actually conjugated and transposed
"xst_conjugated": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.bool_), "xst_conjugated": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS,), dtype=numpy.bool_),
"xst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64), # When the youngest data for each subband was received
"xst_subbands": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.uint16), "xst_timestamps": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32), "xst_subbands": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.uint16),
"integration_intervals": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float32),
}) })
return defaults return defaults
def select_subband_slot(self, subband):
""" Return which subband slot (0..MAX_PARALLEL_SUBBANDS) to use when confronted with a new subband.
Keep recording the same subband if we're already tracking it, but allocate or replace a slot if not. """
indices = numpy.where(self.parameters["xst_subbands"] == subband)[0]
if len(indices) > 0:
# subband already being recorded, use same spot
return indices[0]
else:
# a new subband, kick out the oldest
oldest_timestamp = self.parameters["xst_timestamps"].min()
# prefer the first one in case of multiple minima
return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0]
def parse_packet(self, packet): def parse_packet(self, packet):
fields = XSTPacket(packet) fields = XSTPacket(packet)
...@@ -172,6 +204,19 @@ class XSTCollector(StatisticsCollector): ...@@ -172,6 +204,19 @@ class XSTCollector(StatisticsCollector):
else: else:
conjugated = False conjugated = False
# we keep track of multiple subbands. select slot for this one
subband_slot = self.select_subband_slot(fields.subband_index)
assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, f"Selected slot {subband_slot}, but only have room for {self.MAX_PARALLEL_SUBBANDS}. Existing slots are {self.parameters['xst_subbands']}, processing subband {fields.subband_index}."
# log if we're replacing a subband we were once recording
previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot]
if previous_subband_in_slot != fields.subband_index:
previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot])
if previous_subband_timestamp.timestamp() > 0:
logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.")
# the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH # the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH
# starting at baseline first_baseline. # starting at baseline first_baseline.
# #
...@@ -185,24 +230,28 @@ class XSTCollector(StatisticsCollector): ...@@ -185,24 +230,28 @@ class XSTCollector(StatisticsCollector):
# process the packet # process the packet
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
self.parameters["xst_blocks"][block_index][:fields.nof_statistics_per_packet] = fields.payload self.parameters["xst_blocks"][subband_slot, block_index, :fields.nof_statistics_per_packet] = fields.payload
self.parameters["xst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["xst_timestamps"][subband_slot] = numpy.float64(fields.timestamp().timestamp())
self.parameters["xst_conjugated"][block_index] = conjugated self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated
self.parameters["xst_subbands"][block_index] = numpy.uint16(fields.subband_index) self.parameters["xst_subbands"][subband_slot] = numpy.uint16(fields.subband_index)
self.parameters["integration_intervals"][block_index] = fields.integration_interval() self.parameters["integration_intervals"][subband_slot] = fields.integration_interval()
def xst_values(self, subband_indices=range(MAX_PARALLEL_SUBBANDS)):
""" xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] of complex values.
def xst_values(self): The subband indices must be in [0..MAX_PARALLEL_SUBBANDS). By default, all recorded XSTs are returned.
""" xst_blocks, but as a matrix[MAX_INPUTS][MAX_INPUTS] of complex values. """ """
matrix = numpy.zeros((self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64) matrix = numpy.zeros((len(subband_indices), self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64)
xst_blocks = self.parameters["xst_blocks"] xst_blocks = self.parameters["xst_blocks"]
xst_conjugated = self.parameters["xst_conjugated"] xst_conjugated = self.parameters["xst_conjugated"]
for subband_index in subband_indices:
for block_index in range(self.MAX_BLOCKS): for block_index in range(self.MAX_BLOCKS):
# convert real/imag int to complex float values. this works as real/imag come in pairs # convert real/imag int to complex float values. this works as real/imag come in pairs
block = xst_blocks[block_index].astype(numpy.float32).view(numpy.complex64) block = xst_blocks[subband_index][block_index].astype(numpy.float32).view(numpy.complex64)
if xst_conjugated[block_index]: if xst_conjugated[subband_index][block_index]:
# block is conjugated and transposed. process. # block is conjugated and transposed. process.
block = block.conjugate().transpose() block = block.conjugate().transpose()
...@@ -214,7 +263,7 @@ class XSTCollector(StatisticsCollector): ...@@ -214,7 +263,7 @@ class XSTCollector(StatisticsCollector):
first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH) first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH)
# copy block into matrix # copy block into matrix
matrix[first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block matrix[subband_index][first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block
return matrix return matrix
......
...@@ -116,33 +116,86 @@ class XST(Statistics): ...@@ -116,33 +116,86 @@ class XST(Statistics):
# number of packets with invalid payloads # number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64) nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# latest XSTs # latest XSTs
xst_blocks_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_blocks"}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.int64) xst_blocks_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_blocks", "reshape": True}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.int64)
# whether the values in the block are conjugated and transposed # whether the values in the block are conjugated and transposed
xst_conjugated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_conjugated"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.bool_) xst_conjugated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_conjugated", "reshape": True}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.bool_)
# reported timestamp for each row in the latest XSTs # reported timestamp for each subband in the latest XSTs
xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint64) xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.PARALLEL_SUBBANDS,), datatype=numpy.uint64)
# which subband the XSTs describe # which subband the XSTs describe
xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint16) xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_PARALLEL_SUBBANDS,), datatype=numpy.uint16)
# integration interval for each row in the latest XSTs # integration interval for each subband in the latest XSTs
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.float32) integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_PARALLEL_SUBBANDS,), datatype=numpy.float32)
# xst_R, but as a matrix of input x input # xst_R, but as a matrix of subband x (input x input)
xst_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) xst_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),))
xst_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) xst_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),))
xst_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) xst_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),))
xst_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) xst_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),))
def read_xst_real_R(self): def read_xst_real_R(self):
return numpy.real(self.statistics_client.collector.xst_values()) return numpy.real(self.statistics_client.collector.xst_values()).reshape(XSTCollector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS)
def read_xst_imag_R(self): def read_xst_imag_R(self):
return numpy.imag(self.statistics_client.collector.xst_values()) return numpy.imag(self.statistics_client.collector.xst_values()).reshape(XSTCollector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS)
def read_xst_power_R(self): def read_xst_power_R(self):
return numpy.abs(self.statistics_client.collector.xst_values()) return numpy.abs(self.statistics_client.collector.xst_values()).reshape(XSTCo llector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS)
def read_xst_phase_R(self): def read_xst_phase_R(self):
return numpy.angle(self.statistics_client.collector.xst_values()) return numpy.angle(self.statistics_client.collector.xst_values()).reshape(XSTCllector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS)
# xst_R, but as a matrix of input x input, for each specific subband index
xst_0_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(0))
xst_0_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(0))
xst_0_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(0))
xst_0_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(0))
xst_1_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(1))
xst_1_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(1))
xst_1_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(1))
xst_1_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(1))
xst_2_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(2))
xst_2_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(2))
xst_2_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(2))
xst_2_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(2))
xst_3_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(3))
xst_3_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(3))
xst_3_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(3))
xst_3_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(3))
xst_4_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(4))
xst_4_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(4))
xst_4_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(4))
xst_4_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(4))
xst_5_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(5))
xst_5_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(5))
xst_5_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(5))
xst_5_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(5))
xst_6_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(6))
xst_6_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(6))
xst_6_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(6))
xst_6_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(6))
xst_7_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(7))
xst_7_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(7))
xst_7_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(7))
xst_7_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(7))
def read_xst_N_real_R(self, subband_idx):
return numpy.real(self.statistics_client.collector.xst_values(subband_idx)[0])
def read_xst_N_imag_R(self, subband_idx):
return numpy.imag(self.statistics_client.collector.xst_values(subband_idx)[0])
def read_xst_N_power_R(self, subband_idx):
return numpy.abs(self.statistics_client.collector.xst_values(subband_idx)[0])
def read_xst_N_phase_R(self, subband_idx):
return numpy.angle(self.statistics_client.collector.xst_values(subband_idx)[0])
# ---------- # ----------
# Summarising Attributes # Summarising Attributes
......
...@@ -7,6 +7,7 @@ import h5py ...@@ -7,6 +7,7 @@ import h5py
import numpy import numpy
import logging import logging
from abc import ABC, abstractmethod
# import statistics classes with workaround # import statistics classes with workaround
import sys import sys
...@@ -17,9 +18,10 @@ import tangostationcontrol.devices.sdp.statistics_collector as statistics_collec ...@@ -17,9 +18,10 @@ import tangostationcontrol.devices.sdp.statistics_collector as statistics_collec
logger = logging.getLogger("statistics_writer") logger = logging.getLogger("statistics_writer")
__all__ = ["hdf5_writer"] __all__ = ["hdf5_writer", "parallel_xst_hdf5_writer", "xst_hdf5_writer", "sst_hdf5_writer"]
class hdf5_writer:
class hdf5_writer(ABC):
SST_MODE = "SST" SST_MODE = "SST"
XST_MODE = "XST" XST_MODE = "XST"
...@@ -39,18 +41,22 @@ class hdf5_writer: ...@@ -39,18 +41,22 @@ class hdf5_writer:
self.statistics_header = None self.statistics_header = None
# file handing # file handing
self.file_location = file_location self.file_location = file_location or '.'
self.decimation_factor = decimation_factor self.decimation_factor = decimation_factor
self.new_file_time_interval = timedelta(seconds=new_file_time_interval) self.new_file_time_interval = timedelta(seconds=new_file_time_interval)
self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC)
self.file = None self.file = None
# parameters that are configured depending on the mode the statistics writer is in (SST,XST,BST) # parameters that are configured depending on the mode the statistics writer is in (SST,XST,BST)
self.decoder = None
self.collector = None
self.store_function = None
self.mode = statistics_mode.upper() self.mode = statistics_mode.upper()
self.config_mode()
@abstractmethod
def decoder(self):
pass
@abstractmethod
def new_collector(self):
pass
def next_packet(self, packet): def next_packet(self, packet):
""" """
...@@ -123,7 +129,7 @@ class hdf5_writer: ...@@ -123,7 +129,7 @@ class hdf5_writer:
self.start_new_hdf5(timestamp) self.start_new_hdf5(timestamp)
# create a new and empty current_matrix # create a new and empty current_matrix
self.current_matrix = self.collector() self.current_matrix = self.new_collector()
self.statistics_header = None self.statistics_header = None
def write_matrix(self): def write_matrix(self):
...@@ -136,7 +142,7 @@ class hdf5_writer: ...@@ -136,7 +142,7 @@ class hdf5_writer:
current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.isoformat(timespec="milliseconds"))) current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.isoformat(timespec="milliseconds")))
# store the statistics values for the current group # store the statistics values for the current group
self.store_function(current_group) self.write_values_matrix(current_group)
# might be optional, but they're easy to add. # might be optional, but they're easy to add.
current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"]) current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"])
...@@ -145,6 +151,10 @@ class hdf5_writer: ...@@ -145,6 +151,10 @@ class hdf5_writer:
# get the statistics header # get the statistics header
header = self.statistics_header header = self.statistics_header
if not header:
# edge case: no valid packet received at all
return
# can't store datetime objects, convert to string instead # can't store datetime objects, convert to string instead
header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds") header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds")
...@@ -156,17 +166,13 @@ class hdf5_writer: ...@@ -156,17 +166,13 @@ class hdf5_writer:
else: else:
current_group.attrs[k] = v current_group.attrs[k] = v
def write_sst_matrix(self, current_group): @abstractmethod
# store the SST values def write_values_matrix(self, current_group):
current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip") pass
def write_xst_matrix(self, current_group):
# requires a function call to transform the xst_blocks in to the right structure
current_group.create_dataset(name="values", data=self.current_matrix.xst_values().astype(numpy.cfloat), compression="gzip")
def write_bst_matrix(self, current_group):
raise NotImplementedError("BST values not implemented")
def next_filename(self, timestamp, suffix=".h5"):
time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
return f"{self.file_location}/{self.mode}_{time_str}{suffix}"
def process_packet(self, packet): def process_packet(self, packet):
""" """
...@@ -186,44 +192,17 @@ class hdf5_writer: ...@@ -186,44 +192,17 @@ class hdf5_writer:
except Exception as e: except Exception as e:
logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.") logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.")
current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) filename = self.next_filename(timestamp)
logger.info(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5") logger.info(f"creating new file: {filename}")
try: try:
self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w') self.file = h5py.File(filename, 'w')
except Exception as e: except Exception as e:
logger.exception(f"Error while creating new file") logger.exception(f"Error while creating new file")
raise e raise e
self.last_file_time = timestamp self.last_file_time = timestamp
def config_mode(self):
logger.debug(f"attempting to configure {self.mode} mode")
"""
Configures the object for the correct statistics type to be used.
decoder: the class to decode a single packet
collector: the class to collect statistics packets
store_function: the function to write the mode specific data to file
"""
if self.mode == self.SST_MODE:
self.decoder = SSTPacket
self.collector = statistics_collector.SSTCollector
self.store_function = self.write_sst_matrix
elif self.mode == self.XST_MODE:
self.decoder = XSTPacket
self.collector = statistics_collector.XSTCollector
self.store_function = self.write_xst_matrix
elif self.mode == self.BST_MODE:
self.store_function = self.write_bst_matrix
raise NotImplementedError("BST collector has not yet been implemented")
else:
raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode))
def close_writer(self): def close_writer(self):
""" """
Function that can be used to stop the writer without data loss. Function that can be used to stop the writer without data loss.
...@@ -240,3 +219,79 @@ class hdf5_writer: ...@@ -240,3 +219,79 @@ class hdf5_writer:
self.file.close() self.file.close()
logger.debug(f"{filename} closed") logger.debug(f"{filename} closed")
logger.debug(f"Received a total of {self.statistics_counter} statistics while running. With {int(self.statistics_counter/self.decimation_factor)} written to disk ") logger.debug(f"Received a total of {self.statistics_counter} statistics while running. With {int(self.statistics_counter/self.decimation_factor)} written to disk ")
class sst_hdf5_writer(hdf5_writer):
def __init__(self, new_file_time_interval, file_location, decimation_factor):
super().__init__(new_file_time_interval, file_location, hdf5_writer.SST_MODE, decimation_factor)
def decoder(self, packet):
return SSTPacket(packet)
def new_collector(self):
return statistics_collector.SSTCollector()
def write_values_matrix(self, current_group):
# store the SST values
current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip")
class xst_hdf5_writer(hdf5_writer):
def __init__(self, new_file_time_interval, file_location, decimation_factor, subband_index):
super().__init__(new_file_time_interval, file_location, hdf5_writer.XST_MODE, decimation_factor)
self.subband_index = subband_index
def decoder(self, packet):
return XSTPacket(packet)
def new_collector(self):
return statistics_collector.XSTCollector()
def next_filename(self, timestamp):
time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
return f"{self.file_location}/{self.mode}_SB{self.subband_index}_{time_str}.h5"
def write_values_matrix(self, current_group):
# requires a function call to transform the xst_blocks in to the right structure
current_group.create_dataset(name="values", data=self.current_matrix.xst_values([self.subband_index])[0].astype(numpy.cfloat), compression="gzip")
class parallel_xst_hdf5_writer:
""" Writes multiple subbands in parallel. Each subband gets written to its own HDF5 file(s). """
def __init__(self, new_file_time_interval, file_location, decimation_factor):
# maintain a dedicated hdf5_writer per subband
self.writers = {}
# function to create a new writer, to avoid having to store
# all the init parameters just for this purpose.
#
def new_writer(subband):
# Since we use a dedicated writer per subband, the data will end
# up at subband_index == 0 in each of them.
return xst_hdf5_writer(
new_file_time_interval,
file_location,
decimation_factor,
0)
self.new_writer = new_writer
def next_packet(self, packet):
# decode to get subband of this packet
fields = XSTPacket(packet)
subband = fields.subband_index
# make sure there is a writer for it
if subband not in self.writers:
self.writers[subband] = self.new_writer(subband)
# demux packet to the correct writer
self.writers[subband].next_packet(packet)
def close_writer(self):
for writer in self.writers.values():
writer.close_writer()
self.writers = {}
...@@ -3,7 +3,7 @@ import time ...@@ -3,7 +3,7 @@ import time
import sys import sys
from tangostationcontrol.statistics_writer.receiver import tcp_receiver, file_receiver from tangostationcontrol.statistics_writer.receiver import tcp_receiver, file_receiver
from tangostationcontrol.statistics_writer.hdf5_writer import hdf5_writer from tangostationcontrol.statistics_writer.hdf5_writer import sst_hdf5_writer, parallel_xst_hdf5_writer
import logging import logging
logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s') logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s')
...@@ -13,13 +13,13 @@ def main(): ...@@ -13,13 +13,13 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Converts a stream of statistics packets into HDF5 files.') description='Converts a stream of statistics packets into HDF5 files.')
parser.add_argument( parser.add_argument(
'-a', '--host', type=str, required=True, help='the host to connect to') '-a', '--host', type=str, required=False, help='the host to connect to')
parser.add_argument( parser.add_argument(
'-p', '--port', type=int, default=0, '-p', '--port', type=int, default=0,
help='the port to connect to, or 0 to use default port for the ' help='the port to connect to, or 0 to use default port for the '
'selected mode (default: %(default)s)') 'selected mode (default: %(default)s)')
parser.add_argument( parser.add_argument(
'-f', '--file', type=str, required=True, help='the file to read from') '-f', '--file', type=str, required=False, help='the file to read from')
parser.add_argument( parser.add_argument(
'-m', '--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', '-m', '--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST',
help='sets the statistics type to be decoded options (default: ' help='sets the statistics type to be decoded options (default: '
...@@ -57,6 +57,9 @@ def main(): ...@@ -57,6 +57,9 @@ def main():
debug = args.debug debug = args.debug
reconnect = args.reconnect reconnect = args.reconnect
if not filename and not host:
raise ValueError("Supply either a filename (--file) or a hostname (--host)")
if decimation < 1: if decimation < 1:
raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ") raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ")
...@@ -78,7 +81,16 @@ def main(): ...@@ -78,7 +81,16 @@ def main():
sys.exit(1) sys.exit(1)
# create the writer # create the writer
writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode, decimation_factor=decimation) if mode == "XST":
writer = parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation)
elif mode == "SST":
writer = sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation)
elif mode == "BST":
logger.fatal(f"BST mode not supported")
sys.exit(1)
else:
logger.fatal(f"Invalid mode: {mode}")
sys.exit(1)
# start looping # start looping
try: try:
......
...@@ -3,6 +3,45 @@ from tangostationcontrol.devices.sdp.statistics_packet import XSTPacket ...@@ -3,6 +3,45 @@ from tangostationcontrol.devices.sdp.statistics_packet import XSTPacket
from tangostationcontrol.test import base from tangostationcontrol.test import base
class TestSelectSubbandSlot(base.TestCase):
def test_first_entry(self):
collector = XSTCollector()
# on start, any subband should map on the first entry
self.assertEqual(0, collector.select_subband_slot(102))
def test_subsequent_entries(self):
collector = XSTCollector()
# assign some subbands
collector.parameters["xst_subbands"][0] = 102
collector.parameters["xst_subbands"][2] = 103
collector.parameters["xst_subbands"][3] = 104
# give them non-zero timestamps to make them newer than the other entries
collector.parameters["xst_timestamps"][0] = 1
collector.parameters["xst_timestamps"][2] = 1
collector.parameters["xst_timestamps"][3] = 1
# these should be reported back when looking them up again
self.assertEqual(0, collector.select_subband_slot(102))
self.assertEqual(2, collector.select_subband_slot(103))
self.assertEqual(3, collector.select_subband_slot(104))
# a place for another subband should be the lowest
self.assertEqual(1, collector.select_subband_slot(101))
def test_spilling(self):
collector = XSTCollector()
# assign all subbands, in decreasing age
for n in range(XSTCollector.MAX_PARALLEL_SUBBANDS):
collector.parameters["xst_subbands"][n] = 100 + n
collector.parameters["xst_timestamps"][n] = 100 - n
# check where a new subband replaces the oldest
self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200))
class TestXSTCollector(base.TestCase): class TestXSTCollector(base.TestCase):
def test_valid_packet(self): def test_valid_packet(self):
collector = XSTCollector() collector = XSTCollector()
...@@ -17,6 +56,9 @@ class TestXSTCollector(base.TestCase): ...@@ -17,6 +56,9 @@ class TestXSTCollector(base.TestCase):
# baseline indeed should be (12,0) # baseline indeed should be (12,0)
self.assertEqual((12,0), fields.first_baseline) self.assertEqual((12,0), fields.first_baseline)
# subband should indeed be 102
self.assertEqual(102, fields.subband_index)
# this should not throw # this should not throw
collector.process_packet(packet) collector.process_packet(packet)
...@@ -27,8 +69,10 @@ class TestXSTCollector(base.TestCase): ...@@ -27,8 +69,10 @@ class TestXSTCollector(base.TestCase):
self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index])
self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index])
self.assertListEqual([102,0,0,0,0,0,0,0], list(collector.parameters["xst_subbands"]))
# check whether the data ended up in the right block, and the rest is still zero # check whether the data ended up in the right block, and the rest is still zero
xst_values = collector.xst_values() xst_values = collector.xst_values()[0]
for baseline_a in range(collector.MAX_INPUTS): for baseline_a in range(collector.MAX_INPUTS):
for baseline_b in range(collector.MAX_INPUTS): for baseline_b in range(collector.MAX_INPUTS):
...@@ -67,7 +111,7 @@ class TestXSTCollector(base.TestCase): ...@@ -67,7 +111,7 @@ class TestXSTCollector(base.TestCase):
self.assertEqual(0, collector.parameters["nof_invalid_packets"]) self.assertEqual(0, collector.parameters["nof_invalid_packets"])
# check whether the data ended up in the right block, and the rest is still zero # check whether the data ended up in the right block, and the rest is still zero
xst_values = collector.xst_values() xst_values = collector.xst_values()[0]
for baseline_a in range(collector.MAX_INPUTS): for baseline_a in range(collector.MAX_INPUTS):
for baseline_b in range(collector.MAX_INPUTS): for baseline_b in range(collector.MAX_INPUTS):
...@@ -84,6 +128,48 @@ class TestXSTCollector(base.TestCase): ...@@ -84,6 +128,48 @@ class TestXSTCollector(base.TestCase):
else: else:
self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.')
def test_multiple_subbands(self):
collector = XSTCollector()
# a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0)
packet_subband_102 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01'
packet_subband_103 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00g\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x02'
# make sure the subband_indices are indeed what we claim they are
fields = XSTPacket(packet_subband_102)
self.assertEqual(102, fields.subband_index)
fields = XSTPacket(packet_subband_103)
self.assertEqual(103, fields.subband_index)
# process our packets
collector.process_packet(packet_subband_102)
collector.process_packet(packet_subband_103)
# counters should now be updated
self.assertListEqual([102,103,0,0,0,0,0,0], list(collector.parameters["xst_subbands"]))
# check whether the data ended up in the right block, and the rest is still zero
xst_values = collector.xst_values()
for subband_idx in range(collector.MAX_PARALLEL_SUBBANDS):
for baseline_a in range(collector.MAX_INPUTS):
for baseline_b in range(collector.MAX_INPUTS):
if baseline_b > baseline_a:
# only scan top-left triangle
continue
baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs)
baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs)
if baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 0:
self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.')
elif baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 1:
self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.')
else:
self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.')
def test_invalid_packet(self): def test_invalid_packet(self):
collector = XSTCollector() collector = XSTCollector()
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tangostationcontrol.test import base
from tangostationcontrol.statistics_writer import statistics_writer
import sys
from os.path import dirname
from tempfile import TemporaryDirectory
from unittest import mock
class TestStatisticsWriter(base.TestCase):
def test_sst(self):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", dirname(__file__) + "/SDP_SST_statistics_packets.bin", "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
def test_xst(self):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "XST", "--file", dirname(__file__) + "/SDP_XST_statistics_packets.bin", "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment