diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json index 197686104afa47aeb018b0c5caade4d286a4fe9b..f8bfe42070e2a430bd160c9459c256f11df4b4e0 100644 --- a/CDB/LOFAR_ConfigDb.json +++ b/CDB/LOFAR_ConfigDb.json @@ -755,22 +755,22 @@ "5.0" ], "FPGA_sst_offload_hdr_eth_destination_mac_RW_default": [ - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de", - "6c:2b:59:97:cb:de" + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd" ], "FPGA_sst_offload_hdr_ip_destination_address_RW_default": [ "10.99.250.250", @@ -813,6 +813,82 @@ } } }, + "XST": { + "LTS": { + "XST": { + "LTS/XST/1": { + "properties": { + "Statistics_Client_Port": [ + "5002" + ], + "OPC_Server_Name": [ + "dop36.astron.nl" + ], + "OPC_Server_Port": [ + "4840" + ], + "OPC_Time_Out": [ + "5.0" + ], + "FPGA_xst_offload_hdr_eth_destination_mac_RW_default": [ + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd", + "6c:2b:59:97:cb:dd" + ], + "FPGA_xst_offload_hdr_ip_destination_address_RW_default": [ + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250", + "10.99.250.250" + ], + "FPGA_xst_offload_hdr_udp_destination_port_RW_default": [ + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002", + "5002" + ] + } + } + } + } + }, "StatsCrosslet": { "CS997": { "StatsCrosslet": { diff --git a/devices/common/baselines.py b/devices/common/baselines.py new file mode 100644 index 0000000000000000000000000000000000000000..b9b0ca8038c0d881d602df37f99203d733f283fc --- /dev/null +++ b/devices/common/baselines.py @@ -0,0 +1,59 @@ +""" + Baseline calculation functions. +""" + +import math + +def nr_baselines(nr_inputs: int) -> int: + """ Return the number of baselines (unique pairs) that exist between a given number of inputs. """ + return nr_inputs * (nr_inputs + 1) // 2 + +""" + + Baselines are ordered like: + 0-0, 1-0, 1-1, 2-0, 2-1, 2-2, ... + + if + b = baseline + x = stat1 (major) + y = stat2 (minor) + x >= y + then + b_xy = x * (x + 1) / 2 + y + let + u := b_x0 + then + u = x * (x + 1) / 2 + 8u = 4x^2 + 4x + 8u + 1 = 4x^2 + 4x + 1 = (2x + 1)^2 + sqrt(8u + 1) = 2x + 1 + x = (sqrt(8u + 1) - 1) / 2 + + Let us define + x'(b) = (sqrt(8b + 1) - 1) / 2 + which increases monotonically and is a continuation of y(b). + + Because y simply increases by 1 when b increases enough, we + can just take the floor function to obtain the discrete y(b): + x(b) = floor(x'(b)) + = floor(sqrt(8b + 1) - 1) / 2) + +""" + +def baseline_index(major: int, minor: int) -> int: + """ Provide a total ordering of baselines: give the unique array index for the baseline (major,minor), + with major >= minor. """ + + if major < minor: + raise ValueError(f"major < minor: {major} < {minor}. Since we do not store the conjugates this will lead to processing errors.") + + return major * (major + 1) // 2 + minor + +def baseline_from_index(index: int) -> tuple: + """ Return the (major,minor) input pair given a baseline index. """ + + major = int((math.sqrt(float(8 * index + 1)) - 0.99999) / 2) + minor = index - baseline_index(major,0) + + return (major,minor) + diff --git a/devices/devices/sdp/sst.py b/devices/devices/sdp/sst.py index 0cabbbf8c74c06bbb475e80ac2af68a5f67da70f..c10141aed93bcdbd2161b18e2eb08cf65236779f 100644 --- a/devices/devices/sdp/sst.py +++ b/devices/devices/sdp/sst.py @@ -95,15 +95,17 @@ class SST(Statistics): FPGA_sst_offload_selector_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,)) # number of packets with valid payloads - nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) + nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64) # number of packets with invalid payloads - nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) + nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64) # latest SSTs sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(SSTCollector.MAX_SUBBANDS, SSTCollector.MAX_INPUTS), datatype=numpy.uint64) # reported timestamp for each row in the latest SSTs sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) # integration interval for each row in the latest SSTs integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.float32) + # whether the subband data was calibrated by the SDP (that is, were subband weights applied) + subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.bool_) # -------- # Overloaded functions diff --git a/devices/devices/sdp/statistics_collector.py b/devices/devices/sdp/statistics_collector.py index f3aac3c1982b03b169eaddedce52b50c939ddc45..4108ef1184537348f281d10f6db1588817d0884d 100644 --- a/devices/devices/sdp/statistics_collector.py +++ b/devices/devices/sdp/statistics_collector.py @@ -3,22 +3,20 @@ from threading import Thread import logging import numpy -from .statistics_packet import SSTPacket +from .statistics_packet import SSTPacket, XSTPacket +from common.baselines import nr_baselines, baseline_index, baseline_from_index logger = logging.getLogger() class StatisticsCollector(Thread): """ Base class to process statistics packets from a queue, asynchronously. """ - # Maximum number of antenna inputs we support (used to determine array sizes) - MAX_INPUTS = 192 - - # Maximum number of subbands we support (used to determine array sizes) - MAX_SUBBANDS = 512 - # Maximum time to wait for the Thread to get unstuck, if we want to stop DISCONNECT_TIMEOUT = 10.0 + # Maximum number of FPGAs we receive data from (used for diagnostics) + MAX_FPGAS = 16 + def __init__(self, queue: Queue): self.queue = queue self.last_packet = None @@ -100,15 +98,16 @@ class SSTCollector(StatisticsCollector): defaults.update({ # Number of packets received so far that we could parse correctly and do not have a payload error - "nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), + "nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), # Packets that reported a payload error - "nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), + "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), # Last value array we've constructed out of the packets "sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64), "sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), "integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32), + "subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.bool_), }) return defaults @@ -125,13 +124,118 @@ class SSTCollector(StatisticsCollector): if fields.payload_error: # cannot trust the data if a payload error is reported - self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1) + self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1) # don't raise, as packet is valid return # process the packet - self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1) + self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["integration_intervals"][input_index] = fields.integration_interval() + self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flagt + +class XSTCollector(StatisticsCollector): + """ Class to process XST statistics packets. """ + + # Maximum number of antenna inputs we support (used to determine array sizes) + MAX_INPUTS = 192 + + # Maximum number of baselines we can receive + MAX_BASELINES = nr_baselines(MAX_INPUTS) + + # Expected block size is BLOCK_LENGTH x BLOCK_LENGTH + BLOCK_LENGTH = 12 + + # Expected number of blocks: enough to cover all baselines without the conjugates (that is, the top-left triangle of the matrix). + MAX_BLOCKS = nr_baselines(MAX_INPUTS // BLOCK_LENGTH) + + # Maximum number of subbands we support (used to determine array sizes) + MAX_SUBBANDS = 512 + + # Complex values are (real, imag). A bit silly, but we don't want magical constants. + VALUES_PER_COMPLEX = 2 + + def _default_parameters(self): + defaults = super()._default_parameters() + + defaults.update({ + # Number of packets received so far that we could parse correctly and do not have a payload error + "nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), + + # Packets that reported a payload error + "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), + + # Last value array we've constructed out of the packets + "xst_blocks": numpy.zeros((self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64), + "xst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64), + "xst_subbands": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.uint16), + "integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32), + }) + + return defaults + + def process_packet(self, packet): + fields = XSTPacket(packet) + + if fields.payload_error: + # cannot trust the data if a payload error is reported + self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1) + + # don't raise, as packet is valid + return + + # the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH + if fields.nof_signal_inputs != self.BLOCK_LENGTH: + raise ValueError("Packet describes a block of {0} x {0} baselines, but we can only parse blocks of {1} x {1} baselines".format(fields.nof_signal_inputs, self.BLOCK_LENGTH)) + + # check whether set of baselines in this packet are not out of bounds + for antenna in (0,1): + if fields.first_baseline[antenna] + fields.nof_signal_inputs >= self.MAX_INPUTS: + # packet describes an input that is out of bounds for us + raise ValueError("Packet describes {0} x {0} baselines starting at {1}, but we are limited to describing MAX_INPUTS={2}".format(fields.nof_signal_inputs, fields.first_baseline, self.MAX_INPUTS)) + + # the blocks of baselines need to be tightly packed, and thus be provided at exact intervals + if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0: + raise ValueError("Packet describes baselines starting at %s, but we require a multiple of BLOCK_LENGTH=%d" % (fields.first_baseline, self.MAX_INPUTS)) + + # the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH + # starting at baseline first_baseline. + # + # we honour this format, as we want to keep the metadata together with these blocks. we do need to put the blocks in a linear + # and tight order, however, so we calculate a block index. + block_index = baseline_index(fields.first_baseline[0] // self.BLOCK_LENGTH, fields.first_baseline[1] // self.BLOCK_LENGTH) + + # process the packet + self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) + + block_index = baseline_index(fields.first_baseline[0], fields.first_baseline[1]) + + self.parameters["xst_blocks"][block_index][:fields.nof_statistics_per_packet] = fields.payload + self.parameters["xst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp()) + self.parameters["xst_subbands"][block_index] = numpy.uint16(fields.subband_index) + self.parameters["integration_intervals"][block_index] = fields.integration_interval() + + def xst_values(self): + """ xst_blocks, but as a matrix[MAX_INPUTS][MAX_INPUTS] of complex values. """ + + matrix = numpy.zeros((self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex) + xst_blocks = self.parameters["xst_blocks"] + + for block_index in range(self.MAX_BLOCKS): + block = xst_blocks[block_index] + + first_baseline = baseline_from_index(block_index) + first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH) + + for local_ant_a in range(self.BLOCK_LENGTH): + for local_ant_b in range(self.BLOCK_LENGTH): + offset = local_ant_a * self.BLOCK_LENGTH + local_ant_b + real_val, imag_val = block[offset:offset+2] + + ant_a = first_baseline[0] + local_ant_a + ant_b = first_baseline[1] + local_ant_b + + matrix[ant_a][ant_b] = real_val + imag_val * 1j + return matrix diff --git a/devices/devices/sdp/statistics_packet.py b/devices/devices/sdp/statistics_packet.py index 6843c99e62c79b2c9afa119aaf0b3b51709269f7..e93f18ce1c9af1c5b4f467ef88b54316bd848905 100644 --- a/devices/devices/sdp/statistics_packet.py +++ b/devices/devices/sdp/statistics_packet.py @@ -117,9 +117,9 @@ class StatisticsPacket(object): self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14) self.t_adc = get_bit_value(self.source_info, 12) self.fsub_type = get_bit_value(self.source_info, 11) - self.payload_error = get_bit_value(self.source_info, 10) - self.beam_repositioning_flag = get_bit_value(self.source_info, 9) - self.subband_calibrated_flag = get_bit_value(self.source_info, 8) + self.payload_error = (get_bit_value(self.source_info, 10) != 0) + self.beam_repositioning_flag = (get_bit_value(self.source_info, 9) != 0) + self.subband_calibrated_flag = (get_bit_value(self.source_info, 8) != 0) # self.source_info 5-7 are reserved self.gn_index = get_bit_value(self.source_info, 0, 4) @@ -210,6 +210,18 @@ class StatisticsPacket(object): return header + @property + def payload(self) -> numpy.array: + """ The payload of this packet, as a linear array. """ + + # derive which and how many elements to read from the packet header + bytecount_to_unsigned_struct_type = {1: 'B', 2: 'H', 4: 'I', 8: 'Q'} + format_str = ">{}{}".format(self.nof_statistics_per_packet, + bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic]) + + return numpy.array( + struct.unpack(format_str, self.packet[self.header_size:self.header_size + struct.calcsize(format_str)])) + class SSTPacket(StatisticsPacket): """ @@ -244,18 +256,6 @@ class SSTPacket(StatisticsPacket): return header - @property - def payload(self) -> numpy.array: - """ The payload of this packet, interpreted as SST data. """ - - # derive which and how many elements to read from the packet header - bytecount_to_unsigned_struct_type = {1: 'B', 2: 'H', 4: 'I', 8: 'Q'} - format_str = ">{}{}".format(self.nof_statistics_per_packet, - bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic]) - - return numpy.array( - struct.unpack(format_str, self.packet[self.header_size:self.header_size + struct.calcsize(format_str)])) - class XSTPacket(StatisticsPacket): """ @@ -263,9 +263,10 @@ class XSTPacket(StatisticsPacket): The following fields are exposed as properties & functions. - subband_index: subband number for which this packet contains statistics. - baseline: antenna pair for which this packet contains statistics. + first_baseline: first antenna pair for which this packet contains statistics. + + payload[nof_signal_inputs][nof_signal_inputs] the baselines, starting from first_baseline """ def __init__(self, packet): @@ -281,13 +282,13 @@ class XSTPacket(StatisticsPacket): super().unpack_data_id() self.subband_index = get_bit_value(self.data_id, 16, 24) - self.baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7)) + self.first_baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7)) def header(self): header = super().header() header["data_id"]["subband_index"] = self.subband_index - header["data_id"]["baseline"] = self.baseline + header["data_id"]["first_baseline"] = self.first_baseline return header diff --git a/devices/devices/sdp/xst.py b/devices/devices/sdp/xst.py new file mode 100644 index 0000000000000000000000000000000000000000..088498651881c703d05b8f83540e798519263321 --- /dev/null +++ b/devices/devices/sdp/xst.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the XST project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" XST Device Server for LOFAR2.0 + +""" + +# TODO(Corne): Remove sys.path.append hack once packaging is in place! +import os, sys +currentdir = os.path.dirname(os.path.realpath(__file__)) +parentdir = os.path.dirname(currentdir) +parentdir = os.path.dirname(parentdir) +sys.path.append(parentdir) + +# PyTango imports +from tango.server import run +from tango.server import device_property, attribute +from tango import AttrWriteType +# Additional import + +from clients.attribute_wrapper import attribute_wrapper +from clients.opcua_client import OPCUAConnection +from clients.statistics_client import StatisticsClient + +from devices.hardware_device import hardware_device + +from common.lofar_git import get_version +from common.lofar_logging import device_logging_to_python, log_exceptions + +from devices.sdp.statistics import Statistics +from devices.sdp.statistics_collector import XSTCollector + +import numpy + +__all__ = ["XST", "main"] + +class XST(Statistics): + + STATISTICS_COLLECTOR_CLASS = XSTCollector + + # ----------------- + # Device Properties + # ----------------- + + FPGA_xst_offload_hdr_eth_destination_mac_RW_default = device_property( + dtype='DevVarStringArray', + mandatory=True + ) + + FPGA_xst_offload_hdr_ip_destination_address_RW_default = device_property( + dtype='DevVarStringArray', + mandatory=True + ) + + FPGA_xst_offload_hdr_udp_destination_port_RW_default = device_property( + dtype='DevVarUShortArray', + mandatory=True + ) + + FPGA_xst_processing_enable_RW_default = device_property( + dtype='DevVarBooleanArray', + mandatory=False, + default_value=[True] * 16 + ) + + FPGA_xst_subband_select_RW_default = device_property( + dtype='DevVarULongArray', + mandatory=False, + default_value=[[0,102,0,0,0,0,0,0]] * 16 + ) + + # ---------- + # Attributes + # ---------- + + # FPGA control points for XSTs + FPGA_xst_integration_interval_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_integration_interval_RW"], datatype=numpy.double, dims=(8,16), access=AttrWriteType.READ_WRITE) + FPGA_xst_integration_interval_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_integration_interval_R"], datatype=numpy.double, dims=(8,16)) + FPGA_xst_offload_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_enable_R"], datatype=numpy.bool_, dims=(16,)) + FPGA_xst_offload_hdr_eth_destination_mac_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_eth_destination_mac_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_hdr_eth_destination_mac_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_eth_destination_mac_R"], datatype=numpy.str_, dims=(16,)) + FPGA_xst_offload_hdr_ip_destination_address_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_ip_destination_address_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,)) + FPGA_xst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,)) + FPGA_xst_processing_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_processing_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_processing_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_processing_enable_R"], datatype=numpy.bool_, dims=(16,)) + FPGA_xst_subband_select_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_subband_select_RW"], datatype=numpy.uint32, dims=(8,16), access=AttrWriteType.READ_WRITE) + FPGA_xst_subband_select_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_subband_select_R"], datatype=numpy.uint32, dims=(8,16)) + + # number of packets with valid payloads + nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64) + # number of packets with invalid payloads + nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64) + # latest XSTs + xst_blocks_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_blocks"}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.int64) + # reported timestamp for each row in the latest XSTs + xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint64) + # which subband the XSTs describe + xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint16) + # integration interval for each row in the latest XSTs + integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.float32) + + # xst_R, but as a matrix of input x input + xst_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) + xst_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) + xst_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) + xst_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) + + def read_xst_real_R(self): + return numpy.real(self.statistics_client.statistics.xst_values()) + + def read_xst_imag_R(self): + return numpy.imag(self.statistics_client.statistics.xst_values()) + + def read_xst_power_R(self): + return numpy.abs(self.statistics_client.statistics.xst_values()) + + def read_xst_phase_R(self): + return numpy.angle(self.statistics_client.statistics.xst_values()) + + # -------- + # Overloaded functions + # -------- + + # -------- + # Commands + # -------- + +# ---------- +# Run server +# ---------- +def main(args=None, **kwargs): + """Main function of the XST Device module.""" + + from common.lofar_logging import configure_logger + configure_logger() + + return run((XST,), args=args, **kwargs) + + +if __name__ == '__main__': + main() diff --git a/docker-compose/device-xst.yml b/docker-compose/device-xst.yml new file mode 100644 index 0000000000000000000000000000000000000000..925709236d9bba8e89734754e4388b8157c9c5f4 --- /dev/null +++ b/docker-compose/device-xst.yml @@ -0,0 +1,44 @@ +# +# Docker compose file that launches an interactive iTango session. +# +# Connect to the interactive session with 'docker attach itango'. +# Disconnect with the Docker deattach sequence: <CTRL>+<P> <CTRL>+<Q> +# +# Defines: +# - itango: iTango interactive session +# +# Requires: +# - lofar-device-base.yml +# +version: '2' + +services: + device-xst: + image: device-xst + # build explicitly, as docker-compose does not understand a local image + # being shared among services. + build: + context: lofar-device-base + args: + SOURCE_IMAGE: ${DOCKER_REGISTRY_HOST}/${DOCKER_REGISTRY_USER}-tango-itango:${TANGO_ITANGO_VERSION} + container_name: ${CONTAINER_NAME_PREFIX}device-xst + networks: + - control + - data + ports: + - "5002:5002/udp" # port to receive XST UDP packets on + - "5704:5704" # unique port for this DS + volumes: + - ${TANGO_LOFAR_CONTAINER_MOUNT} + environment: + - TANGO_HOST=${TANGO_HOST} + entrypoint: + - /usr/local/bin/wait-for-it.sh + - ${TANGO_HOST} + - --timeout=30 + - --strict + - -- + # configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA + # can't know about our Docker port forwarding + - python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/devices/sdp/xst.py LTS -v -ORBendPoint giop:tcp:0:5704 -ORBendPointPublish giop:tcp:${HOSTNAME}:5704 + restart: on-failure diff --git a/docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py b/docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py index bc56f8b05de5e90804562bcf77378ae8798100a2..3960c59843f3ad3bec063bd39489e2f0064c1bd6 100644 --- a/docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py +++ b/docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py @@ -2,6 +2,7 @@ pcc = DeviceProxy("LTS/PCC/1") sdp = DeviceProxy("LTS/SDP/1") sst = DeviceProxy("LTS/SST/1") +xst = DeviceProxy("LTS/XST/1") # Put them in a list in case one wants to iterate -devices = [pcc, sdp, sst] +devices = [pcc, sdp, sst, xst]