diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics_client.py b/tangostationcontrol/tangostationcontrol/clients/statistics_client.py index 16d46bc71bbeba33c66aa8aa590f301cd0de0fa8..e602b410ab202906f2953ec1a722b74a2ff64734 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics_client.py @@ -97,7 +97,14 @@ class StatisticsClient(AsyncCommClient): # redirect to right object. this works as long as the parameter names are unique among them. if annotation["type"] == "statistics": def read_function(): - return self.collector.parameters[parameter] + if annotation.get("reshape", False): + # force array into the shape of the attribute + if attribute.max_dim_y > 1: + return self.collector.parameters[parameter].reshape(attribute.max_dim_y, attribute.max_dim_x) + else: + return self.collector.parameters[parameter].reshape(attribute.max_dim_x) + else: + return self.collector.parameters[parameter] elif annotation["type"] == "udp": def read_function(): return self.udp.parameters[parameter] diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py index 5c00c90b1dff70e5aa281ee84093454c44b4ef96..05a76d9b39d0245251c1ddeb2ee67eb9f74870a9 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py @@ -2,6 +2,7 @@ from queue import Queue from threading import Thread import logging import numpy +import datetime from .statistics_packet import SSTPacket, XSTPacket from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index @@ -99,7 +100,21 @@ class SSTCollector(StatisticsCollector): self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flag class XSTCollector(StatisticsCollector): - """ Class to process XST statistics packets. """ + """ Class to process XST statistics packets. + + XSTs are received for up to MAX_PARALLEL_SUBBANDS simultaneously, and only the values of the last + MAX_PARALLEL_SUBBANDS are kept. Raw data are collected for each subband in parameters["xst_blocks"], + and overwritten if newer (younger) data is received for the same subband. As such, the data represent + a rolling view on the XSTs. + + The xst_values() function is a user-friendly way to read the xst_blocks. + + The hardware can be configured to emit different and/or fewer subbands, causing some of the XSTs + to become stale. It is therefor advised to inspect parameters["xst_timestamps"] as well. + """ + + # Maximum number of subbands for which we collect XSTs simultaneously + MAX_PARALLEL_SUBBANDS = 8 # Maximum number of antenna inputs we support (used to determine array sizes) MAX_INPUTS = 192 @@ -130,16 +145,33 @@ class XSTCollector(StatisticsCollector): "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), # Last value array we've constructed out of the packets - "xst_blocks": numpy.zeros((self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64), + "xst_blocks": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64), # Whether the values are actually conjugated and transposed - "xst_conjugated": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.bool_), - "xst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64), - "xst_subbands": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.uint16), - "integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32), + "xst_conjugated": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS,), dtype=numpy.bool_), + # When the youngest data for each subband was received + "xst_timestamps": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float64), + "xst_subbands": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.uint16), + "integration_intervals": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float32), }) return defaults + def select_subband_slot(self, subband): + """ Return which subband slot (0..MAX_PARALLEL_SUBBANDS) to use when confronted with a new subband. + Keep recording the same subband if we're already tracking it, but allocate or replace a slot if not. """ + + indices = numpy.where(self.parameters["xst_subbands"] == subband)[0] + + if len(indices) > 0: + # subband already being recorded, use same spot + return indices[0] + else: + # a new subband, kick out the oldest + oldest_timestamp = self.parameters["xst_timestamps"].min() + + # prefer the first one in case of multiple minima + return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0] + def parse_packet(self, packet): fields = XSTPacket(packet) @@ -172,6 +204,19 @@ class XSTCollector(StatisticsCollector): else: conjugated = False + # we keep track of multiple subbands. select slot for this one + subband_slot = self.select_subband_slot(fields.subband_index) + + assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, f"Selected slot {subband_slot}, but only have room for {self.MAX_PARALLEL_SUBBANDS}. Existing slots are {self.parameters['xst_subbands']}, processing subband {fields.subband_index}." + + # log if we're replacing a subband we were once recording + previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot] + if previous_subband_in_slot != fields.subband_index: + previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot]) + + if previous_subband_timestamp.timestamp() > 0: + logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.") + # the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH # starting at baseline first_baseline. # @@ -185,36 +230,40 @@ class XSTCollector(StatisticsCollector): # process the packet self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) - self.parameters["xst_blocks"][block_index][:fields.nof_statistics_per_packet] = fields.payload - self.parameters["xst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp()) - self.parameters["xst_conjugated"][block_index] = conjugated - self.parameters["xst_subbands"][block_index] = numpy.uint16(fields.subband_index) - self.parameters["integration_intervals"][block_index] = fields.integration_interval() + self.parameters["xst_blocks"][subband_slot, block_index, :fields.nof_statistics_per_packet] = fields.payload + self.parameters["xst_timestamps"][subband_slot] = numpy.float64(fields.timestamp().timestamp()) + self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated + self.parameters["xst_subbands"][subband_slot] = numpy.uint16(fields.subband_index) + self.parameters["integration_intervals"][subband_slot] = fields.integration_interval() - def xst_values(self): - """ xst_blocks, but as a matrix[MAX_INPUTS][MAX_INPUTS] of complex values. """ + def xst_values(self, subband_indices=range(MAX_PARALLEL_SUBBANDS)): + """ xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] of complex values. + + The subband indices must be in [0..MAX_PARALLEL_SUBBANDS). By default, all recorded XSTs are returned. + """ - matrix = numpy.zeros((self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64) + matrix = numpy.zeros((len(subband_indices), self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64) xst_blocks = self.parameters["xst_blocks"] xst_conjugated = self.parameters["xst_conjugated"] - for block_index in range(self.MAX_BLOCKS): - # convert real/imag int to complex float values. this works as real/imag come in pairs - block = xst_blocks[block_index].astype(numpy.float32).view(numpy.complex64) + for subband_index in subband_indices: + for block_index in range(self.MAX_BLOCKS): + # convert real/imag int to complex float values. this works as real/imag come in pairs + block = xst_blocks[subband_index][block_index].astype(numpy.float32).view(numpy.complex64) - if xst_conjugated[block_index]: - # block is conjugated and transposed. process. - block = block.conjugate().transpose() + if xst_conjugated[subband_index][block_index]: + # block is conjugated and transposed. process. + block = block.conjugate().transpose() - # reshape into [a][b] - block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH) + # reshape into [a][b] + block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH) - # compute destination in matrix - first_baseline = baseline_from_index(block_index) - first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH) + # compute destination in matrix + first_baseline = baseline_from_index(block_index) + first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH) - # copy block into matrix - matrix[first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block + # copy block into matrix + matrix[subband_index][first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block return matrix diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py index 89da8dddcf19dc9195db6bc6c88c61475c61c2f6..ec93b11f0d765aa06e8324babe027b043ae93541 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py @@ -116,33 +116,86 @@ class XST(Statistics): # number of packets with invalid payloads nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64) # latest XSTs - xst_blocks_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_blocks"}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.int64) + xst_blocks_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_blocks", "reshape": True}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.int64) # whether the values in the block are conjugated and transposed - xst_conjugated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_conjugated"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.bool_) - # reported timestamp for each row in the latest XSTs - xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint64) + xst_conjugated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_conjugated", "reshape": True}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.bool_) + # reported timestamp for each subband in the latest XSTs + xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.PARALLEL_SUBBANDS,), datatype=numpy.uint64) # which subband the XSTs describe - xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint16) - # integration interval for each row in the latest XSTs - integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.float32) + xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_PARALLEL_SUBBANDS,), datatype=numpy.uint16) + # integration interval for each subband in the latest XSTs + integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_PARALLEL_SUBBANDS,), datatype=numpy.float32) - # xst_R, but as a matrix of input x input - xst_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) - xst_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) - xst_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) - xst_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) + # xst_R, but as a matrix of subband x (input x input) + xst_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),)) + xst_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),)) + xst_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),)) + xst_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),)) def read_xst_real_R(self): - return numpy.real(self.statistics_client.collector.xst_values()) + return numpy.real(self.statistics_client.collector.xst_values()).reshape(XSTCollector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS) def read_xst_imag_R(self): - return numpy.imag(self.statistics_client.collector.xst_values()) + return numpy.imag(self.statistics_client.collector.xst_values()).reshape(XSTCollector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS) def read_xst_power_R(self): - return numpy.abs(self.statistics_client.collector.xst_values()) + return numpy.abs(self.statistics_client.collector.xst_values()).reshape(XSTCo llector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS) def read_xst_phase_R(self): - return numpy.angle(self.statistics_client.collector.xst_values()) + return numpy.angle(self.statistics_client.collector.xst_values()).reshape(XSTCllector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS) + + # xst_R, but as a matrix of input x input, for each specific subband index + xst_0_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(0)) + xst_0_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(0)) + xst_0_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(0)) + xst_0_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(0)) + + xst_1_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(1)) + xst_1_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(1)) + xst_1_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(1)) + xst_1_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(1)) + + xst_2_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(2)) + xst_2_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(2)) + xst_2_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(2)) + xst_2_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(2)) + + xst_3_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(3)) + xst_3_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(3)) + xst_3_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(3)) + xst_3_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(3)) + + xst_4_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(4)) + xst_4_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(4)) + xst_4_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(4)) + xst_4_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(4)) + + xst_5_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(5)) + xst_5_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(5)) + xst_5_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(5)) + xst_5_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(5)) + + xst_6_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(6)) + xst_6_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(6)) + xst_6_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(6)) + xst_6_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(6)) + + xst_7_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(7)) + xst_7_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(7)) + xst_7_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(7)) + xst_7_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(7)) + + def read_xst_N_real_R(self, subband_idx): + return numpy.real(self.statistics_client.collector.xst_values(subband_idx)[0]) + + def read_xst_N_imag_R(self, subband_idx): + return numpy.imag(self.statistics_client.collector.xst_values(subband_idx)[0]) + + def read_xst_N_power_R(self, subband_idx): + return numpy.abs(self.statistics_client.collector.xst_values(subband_idx)[0]) + + def read_xst_N_phase_R(self, subband_idx): + return numpy.angle(self.statistics_client.collector.xst_values(subband_idx)[0]) # ---------- # Summarising Attributes diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py index 30710a871e157909aa9e4d42169ac686fc23e889..d2bdbad1c02eae894ec100b63477a92ce1e84994 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py @@ -7,6 +7,7 @@ import h5py import numpy import logging +from abc import ABC, abstractmethod # import statistics classes with workaround import sys @@ -17,9 +18,10 @@ import tangostationcontrol.devices.sdp.statistics_collector as statistics_collec logger = logging.getLogger("statistics_writer") -__all__ = ["hdf5_writer"] +__all__ = ["hdf5_writer", "parallel_xst_hdf5_writer", "xst_hdf5_writer", "sst_hdf5_writer"] -class hdf5_writer: + +class hdf5_writer(ABC): SST_MODE = "SST" XST_MODE = "XST" @@ -39,18 +41,22 @@ class hdf5_writer: self.statistics_header = None # file handing - self.file_location = file_location + self.file_location = file_location or '.' self.decimation_factor = decimation_factor self.new_file_time_interval = timedelta(seconds=new_file_time_interval) self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) self.file = None # parameters that are configured depending on the mode the statistics writer is in (SST,XST,BST) - self.decoder = None - self.collector = None - self.store_function = None self.mode = statistics_mode.upper() - self.config_mode() + + @abstractmethod + def decoder(self): + pass + + @abstractmethod + def new_collector(self): + pass def next_packet(self, packet): """ @@ -123,7 +129,7 @@ class hdf5_writer: self.start_new_hdf5(timestamp) # create a new and empty current_matrix - self.current_matrix = self.collector() + self.current_matrix = self.new_collector() self.statistics_header = None def write_matrix(self): @@ -136,7 +142,7 @@ class hdf5_writer: current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.isoformat(timespec="milliseconds"))) # store the statistics values for the current group - self.store_function(current_group) + self.write_values_matrix(current_group) # might be optional, but they're easy to add. current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"]) @@ -145,6 +151,10 @@ class hdf5_writer: # get the statistics header header = self.statistics_header + if not header: + # edge case: no valid packet received at all + return + # can't store datetime objects, convert to string instead header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds") @@ -156,17 +166,13 @@ class hdf5_writer: else: current_group.attrs[k] = v - def write_sst_matrix(self, current_group): - # store the SST values - current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip") - - def write_xst_matrix(self, current_group): - # requires a function call to transform the xst_blocks in to the right structure - current_group.create_dataset(name="values", data=self.current_matrix.xst_values().astype(numpy.cfloat), compression="gzip") - - def write_bst_matrix(self, current_group): - raise NotImplementedError("BST values not implemented") + @abstractmethod + def write_values_matrix(self, current_group): + pass + def next_filename(self, timestamp, suffix=".h5"): + time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) + return f"{self.file_location}/{self.mode}_{time_str}{suffix}" def process_packet(self, packet): """ @@ -186,44 +192,17 @@ class hdf5_writer: except Exception as e: logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.") - current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) - logger.info(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5") + filename = self.next_filename(timestamp) + logger.info(f"creating new file: {filename}") try: - self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w') + self.file = h5py.File(filename, 'w') except Exception as e: logger.exception(f"Error while creating new file") raise e self.last_file_time = timestamp - def config_mode(self): - logger.debug(f"attempting to configure {self.mode} mode") - - """ - Configures the object for the correct statistics type to be used. - decoder: the class to decode a single packet - collector: the class to collect statistics packets - store_function: the function to write the mode specific data to file - """ - - if self.mode == self.SST_MODE: - self.decoder = SSTPacket - self.collector = statistics_collector.SSTCollector - self.store_function = self.write_sst_matrix - - elif self.mode == self.XST_MODE: - self.decoder = XSTPacket - self.collector = statistics_collector.XSTCollector - self.store_function = self.write_xst_matrix - - elif self.mode == self.BST_MODE: - self.store_function = self.write_bst_matrix - raise NotImplementedError("BST collector has not yet been implemented") - - else: - raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode)) - def close_writer(self): """ Function that can be used to stop the writer without data loss. @@ -240,3 +219,79 @@ class hdf5_writer: self.file.close() logger.debug(f"{filename} closed") logger.debug(f"Received a total of {self.statistics_counter} statistics while running. With {int(self.statistics_counter/self.decimation_factor)} written to disk ") + + +class sst_hdf5_writer(hdf5_writer): + def __init__(self, new_file_time_interval, file_location, decimation_factor): + super().__init__(new_file_time_interval, file_location, hdf5_writer.SST_MODE, decimation_factor) + + def decoder(self, packet): + return SSTPacket(packet) + + def new_collector(self): + return statistics_collector.SSTCollector() + + def write_values_matrix(self, current_group): + # store the SST values + current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip") + + +class xst_hdf5_writer(hdf5_writer): + def __init__(self, new_file_time_interval, file_location, decimation_factor, subband_index): + super().__init__(new_file_time_interval, file_location, hdf5_writer.XST_MODE, decimation_factor) + self.subband_index = subband_index + + def decoder(self, packet): + return XSTPacket(packet) + + def new_collector(self): + return statistics_collector.XSTCollector() + + def next_filename(self, timestamp): + time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) + return f"{self.file_location}/{self.mode}_SB{self.subband_index}_{time_str}.h5" + + def write_values_matrix(self, current_group): + # requires a function call to transform the xst_blocks in to the right structure + current_group.create_dataset(name="values", data=self.current_matrix.xst_values([self.subband_index])[0].astype(numpy.cfloat), compression="gzip") + + +class parallel_xst_hdf5_writer: + """ Writes multiple subbands in parallel. Each subband gets written to its own HDF5 file(s). """ + + def __init__(self, new_file_time_interval, file_location, decimation_factor): + # maintain a dedicated hdf5_writer per subband + self.writers = {} + + # function to create a new writer, to avoid having to store + # all the init parameters just for this purpose. + # + def new_writer(subband): + # Since we use a dedicated writer per subband, the data will end + # up at subband_index == 0 in each of them. + return xst_hdf5_writer( + new_file_time_interval, + file_location, + decimation_factor, + 0) + + self.new_writer = new_writer + + def next_packet(self, packet): + # decode to get subband of this packet + fields = XSTPacket(packet) + subband = fields.subband_index + + # make sure there is a writer for it + if subband not in self.writers: + self.writers[subband] = self.new_writer(subband) + + # demux packet to the correct writer + self.writers[subband].next_packet(packet) + + def close_writer(self): + for writer in self.writers.values(): + writer.close_writer() + + self.writers = {} + diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py index 1a1ecb671159e1b3ca143ecbf860000d6cdbe0c5..52747fff71d436d62bcbe40844aa0a3a45a2ab25 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py @@ -3,7 +3,7 @@ import time import sys from tangostationcontrol.statistics_writer.receiver import tcp_receiver, file_receiver -from tangostationcontrol.statistics_writer.hdf5_writer import hdf5_writer +from tangostationcontrol.statistics_writer.hdf5_writer import sst_hdf5_writer, parallel_xst_hdf5_writer import logging logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s') @@ -13,13 +13,13 @@ def main(): parser = argparse.ArgumentParser( description='Converts a stream of statistics packets into HDF5 files.') parser.add_argument( - '-a', '--host', type=str, required=True, help='the host to connect to') + '-a', '--host', type=str, required=False, help='the host to connect to') parser.add_argument( '-p', '--port', type=int, default=0, help='the port to connect to, or 0 to use default port for the ' 'selected mode (default: %(default)s)') parser.add_argument( - '-f', '--file', type=str, required=True, help='the file to read from') + '-f', '--file', type=str, required=False, help='the file to read from') parser.add_argument( '-m', '--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', help='sets the statistics type to be decoded options (default: ' @@ -57,6 +57,9 @@ def main(): debug = args.debug reconnect = args.reconnect + if not filename and not host: + raise ValueError("Supply either a filename (--file) or a hostname (--host)") + if decimation < 1: raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ") @@ -78,7 +81,16 @@ def main(): sys.exit(1) # create the writer - writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode, decimation_factor=decimation) + if mode == "XST": + writer = parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + elif mode == "SST": + writer = sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + elif mode == "BST": + logger.fatal(f"BST mode not supported") + sys.exit(1) + else: + logger.fatal(f"Invalid mode: {mode}") + sys.exit(1) # start looping try: diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py index 4b58141c06d9b09d68dba295b007b41081ca3618..7113ee837631789e99218f3356a602637ccac116 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py @@ -3,6 +3,45 @@ from tangostationcontrol.devices.sdp.statistics_packet import XSTPacket from tangostationcontrol.test import base +class TestSelectSubbandSlot(base.TestCase): + def test_first_entry(self): + collector = XSTCollector() + + # on start, any subband should map on the first entry + self.assertEqual(0, collector.select_subband_slot(102)) + + def test_subsequent_entries(self): + collector = XSTCollector() + + # assign some subbands + collector.parameters["xst_subbands"][0] = 102 + collector.parameters["xst_subbands"][2] = 103 + collector.parameters["xst_subbands"][3] = 104 + + # give them non-zero timestamps to make them newer than the other entries + collector.parameters["xst_timestamps"][0] = 1 + collector.parameters["xst_timestamps"][2] = 1 + collector.parameters["xst_timestamps"][3] = 1 + + # these should be reported back when looking them up again + self.assertEqual(0, collector.select_subband_slot(102)) + self.assertEqual(2, collector.select_subband_slot(103)) + self.assertEqual(3, collector.select_subband_slot(104)) + + # a place for another subband should be the lowest + self.assertEqual(1, collector.select_subband_slot(101)) + + def test_spilling(self): + collector = XSTCollector() + + # assign all subbands, in decreasing age + for n in range(XSTCollector.MAX_PARALLEL_SUBBANDS): + collector.parameters["xst_subbands"][n] = 100 + n + collector.parameters["xst_timestamps"][n] = 100 - n + + # check where a new subband replaces the oldest + self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200)) + class TestXSTCollector(base.TestCase): def test_valid_packet(self): collector = XSTCollector() @@ -17,6 +56,9 @@ class TestXSTCollector(base.TestCase): # baseline indeed should be (12,0) self.assertEqual((12,0), fields.first_baseline) + # subband should indeed be 102 + self.assertEqual(102, fields.subband_index) + # this should not throw collector.process_packet(packet) @@ -27,8 +69,10 @@ class TestXSTCollector(base.TestCase): self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) + self.assertListEqual([102,0,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) + # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values() + xst_values = collector.xst_values()[0] for baseline_a in range(collector.MAX_INPUTS): for baseline_b in range(collector.MAX_INPUTS): @@ -67,7 +111,7 @@ class TestXSTCollector(base.TestCase): self.assertEqual(0, collector.parameters["nof_invalid_packets"]) # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values() + xst_values = collector.xst_values()[0] for baseline_a in range(collector.MAX_INPUTS): for baseline_b in range(collector.MAX_INPUTS): @@ -84,6 +128,48 @@ class TestXSTCollector(base.TestCase): else: self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') + def test_multiple_subbands(self): + collector = XSTCollector() + + # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) + packet_subband_102 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' + packet_subband_103 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00g\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x02' + + # make sure the subband_indices are indeed what we claim they are + fields = XSTPacket(packet_subband_102) + self.assertEqual(102, fields.subband_index) + + fields = XSTPacket(packet_subband_103) + self.assertEqual(103, fields.subband_index) + + # process our packets + collector.process_packet(packet_subband_102) + collector.process_packet(packet_subband_103) + + # counters should now be updated + self.assertListEqual([102,103,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) + + # check whether the data ended up in the right block, and the rest is still zero + xst_values = collector.xst_values() + + for subband_idx in range(collector.MAX_PARALLEL_SUBBANDS): + for baseline_a in range(collector.MAX_INPUTS): + for baseline_b in range(collector.MAX_INPUTS): + if baseline_b > baseline_a: + # only scan top-left triangle + continue + + baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) + baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) + + if baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 0: + self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') + elif baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 1: + self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') + else: + self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') + + def test_invalid_packet(self): collector = XSTCollector() diff --git a/tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py b/tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..4b10230c8fca8c0773d085c6e42719b74dd7b2c3 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from tangostationcontrol.test import base +from tangostationcontrol.statistics_writer import statistics_writer +import sys +from os.path import dirname +from tempfile import TemporaryDirectory +from unittest import mock + +class TestStatisticsWriter(base.TestCase): + def test_sst(self): + with TemporaryDirectory() as tmpdir: + new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", dirname(__file__) + "/SDP_SST_statistics_packets.bin", "--output_dir", tmpdir] + with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + with self.assertRaises(SystemExit): + statistics_writer.main() + + def test_xst(self): + with TemporaryDirectory() as tmpdir: + new_sys_argv = [sys.argv[0], "--mode", "XST", "--file", dirname(__file__) + "/SDP_XST_statistics_packets.bin", "--output_dir", tmpdir] + with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + with self.assertRaises(SystemExit): + statistics_writer.main()