diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json index 705f701556224fa4936e35916993aa2d4d05107e..7a736405ec10708d46268183dcce670f23f2c4d7 100644 --- a/CDB/LOFAR_ConfigDb.json +++ b/CDB/LOFAR_ConfigDb.json @@ -741,6 +741,28 @@ } } }, + "XST": { + "LTS": { + "XST": { + "LTS/XST/1": { + "properties": { + "Statistics_Client_Port": [ + "5002" + ], + "OPC_Server_Name": [ + "dop36.astron.nl" + ], + "OPC_Server_Port": [ + "4840" + ], + "OPC_Time_Out": [ + "5.0" + ] + } + } + } + } + }, "StatsCrosslet": { "CS997": { "StatsCrosslet": { diff --git a/devices/devices/sdp/sst.py b/devices/devices/sdp/sst.py index 1a62a4edcf28c84f7be865d38f7d5312417b497e..49be882d62703f0ba8293561f2d281681faf2fef 100644 --- a/devices/devices/sdp/sst.py +++ b/devices/devices/sdp/sst.py @@ -65,15 +65,17 @@ class SST(Statistics): FPGA_sst_offload_selector_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,)) # number of packets with valid payloads - nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) + nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64) # number of packets with invalid payloads - nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) + nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64) # latest SSTs sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(SSTCollector.MAX_SUBBANDS, SSTCollector.MAX_INPUTS), datatype=numpy.uint64) # reported timestamp for each row in the latest SSTs sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) # integration interval for each row in the latest SSTs integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.float32) + # whether the subband data was calibrated by the SDP (that is, were subband weights applied) + subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.bool_) # -------- # Overloaded functions diff --git a/devices/devices/sdp/statistics_collector.py b/devices/devices/sdp/statistics_collector.py index f3aac3c1982b03b169eaddedce52b50c939ddc45..6c4215f529e9d7ebef82f9a41d84434a361a1a5f 100644 --- a/devices/devices/sdp/statistics_collector.py +++ b/devices/devices/sdp/statistics_collector.py @@ -10,15 +10,12 @@ logger = logging.getLogger() class StatisticsCollector(Thread): """ Base class to process statistics packets from a queue, asynchronously. """ - # Maximum number of antenna inputs we support (used to determine array sizes) - MAX_INPUTS = 192 - - # Maximum number of subbands we support (used to determine array sizes) - MAX_SUBBANDS = 512 - # Maximum time to wait for the Thread to get unstuck, if we want to stop DISCONNECT_TIMEOUT = 10.0 + # Maximum number of FPGAs we receive data from (used for diagnostics) + MAX_FPGAS = 16 + def __init__(self, queue: Queue): self.queue = queue self.last_packet = None @@ -100,15 +97,16 @@ class SSTCollector(StatisticsCollector): defaults.update({ # Number of packets received so far that we could parse correctly and do not have a payload error - "nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), + "nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), # Packets that reported a payload error - "nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64), + "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), # Last value array we've constructed out of the packets "sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64), "sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), "integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32), + "subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.bool_), }) return defaults @@ -125,13 +123,105 @@ class SSTCollector(StatisticsCollector): if fields.payload_error: # cannot trust the data if a payload error is reported - self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1) + self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1) # don't raise, as packet is valid return # process the packet - self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1) + self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["integration_intervals"][input_index] = fields.integration_interval() + self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flagt + +def nr_baselines(nr_inputs: int) -> int: + return nr_inputs * (nr_inputs + 1) // 2 + +def baseline_index(major: int, minor: int) -> int: + if major < minor: + raise ValueError(f"major < minor: {major} < {minor}. Since we do not store the conjugates this will lead to processing errors.") + + return major * (major + 1) // 2 + minor + + +class XSTCollector(StatisticsCollector): + """ Class to process XST statistics packets. """ + + # Maximum number of antenna inputs we support (used to determine array sizes) + MAX_INPUTS = 192 + + # Maximum number of baselines we can receive + MAX_BASELINES = nr_baselines(MAX_INPUTS) + + # Expected block size is BLOCK_LENGTH x BLOCK_LENGTH + BLOCK_LENGTH = 12 + + # Expected number of blocks: enough to cover all baselines without the conjugates (that is, the top-left triangle of the matrix). + MAX_BLOCKS = nr_baselines(MAX_INPUTS // BLOCK_LENGTH) + + # Maximum number of subbands we support (used to determine array sizes) + MAX_SUBBANDS = 512 + + # Complex values are (real, imag). A bit silly, but we don't want magical constants. + VALUES_PER_COMPLEX = 2 + + def _default_parameters(self): + defaults = super()._default_parameters() + + defaults.update({ + # Number of packets received so far that we could parse correctly and do not have a payload error + "nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), + + # Packets that reported a payload error + "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), + + # Last value array we've constructed out of the packets + "xst_values": numpy.zeros((self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.float64), + "xst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64), + "xst_subbands": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.uint16), + "integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32), + }) + + return defaults + + def process_packet(self, packet): + fields = XSTPacket(packet) + + if fields.payload_error: + # cannot trust the data if a payload error is reported + self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1) + + # don't raise, as packet is valid + return + + # the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH + if fields.nof_signal_inputs != self.BLOCK_LENGTH: + raise ValueError("Packet describes a block of {0} x {0} baselines, but we can only parse blocks of {1} x {1} baselines".format(fields.nof_signal_inputs, self.BLOCK_LENGTH)) + + # check whether set of baselines in this packet are not out of bounds + for antenna in (0,1): + if fields.first_baseline[antenna] + nof_signal_inputs >= self.MAX_INPUTS: + # packet describes an input that is out of bounds for us + raise ValueError("Packet describes {0} x {0} baselines starting at {1}, but we are limited to describing MAX_INPUTS={2}".format(fields.nof_signal_inputs, fields.first_baseline, self.MAX_INPUTS)) + + # the blocks of baselines need to be tightly packed, and thus be provided at exact intervals + if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0: + raise ValueError("Packet describes baselines starting at %s, but we require a multiple of BLOCK_LENGTH=%d" % (fields.first_baseline, self.MAX_INPUTS)) + + # the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH + # starting at baseline first_baseline. + # + # we honour this format, as we want to keep the metadata together with these blocks. we do need to put the blocks in a linear + # and tight order, however, so we calculate a block index. + block_index = baseline_index(fields.first_baseline[0] // self.BLOCK_LENGTH, fields.first_baseline[1] // self.BLOCK_LENGTH) + + # process the packet + self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) + + block_index = baseline_index(fields.first_baseline[0], fields.first_baseline[1]) + + self.parameters["xst_values"][block_index][:fields.nof_statistics_per_packet] = fields.payload + self.parameters["xst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp()) + self.parameters["xst_subbands"][block_index] = numpy.uint16(fields.subband_index) + self.parameters["integration_intervals"][block_index] = fields.integration_interval() diff --git a/devices/devices/sdp/statistics_packet.py b/devices/devices/sdp/statistics_packet.py index 6843c99e62c79b2c9afa119aaf0b3b51709269f7..7b52d9bc79b872f8ec3a785f09476d548918f1cf 100644 --- a/devices/devices/sdp/statistics_packet.py +++ b/devices/devices/sdp/statistics_packet.py @@ -117,9 +117,9 @@ class StatisticsPacket(object): self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14) self.t_adc = get_bit_value(self.source_info, 12) self.fsub_type = get_bit_value(self.source_info, 11) - self.payload_error = get_bit_value(self.source_info, 10) - self.beam_repositioning_flag = get_bit_value(self.source_info, 9) - self.subband_calibrated_flag = get_bit_value(self.source_info, 8) + self.payload_error = (get_bit_value(self.source_info, 10) != 0) + self.beam_repositioning_flag = (get_bit_value(self.source_info, 9) != 0) + self.subband_calibrated_flag = (get_bit_value(self.source_info, 8) != 0) # self.source_info 5-7 are reserved self.gn_index = get_bit_value(self.source_info, 0, 4) @@ -263,9 +263,10 @@ class XSTPacket(StatisticsPacket): The following fields are exposed as properties & functions. - subband_index: subband number for which this packet contains statistics. - baseline: antenna pair for which this packet contains statistics. + first_baseline: first antenna pair for which this packet contains statistics. + + payload[nof_signal_inputs][nof_signal_inputs] the baselines, starting from first_baseline """ def __init__(self, packet): @@ -281,13 +282,13 @@ class XSTPacket(StatisticsPacket): super().unpack_data_id() self.subband_index = get_bit_value(self.data_id, 16, 24) - self.baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7)) + self.first_baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7)) def header(self): header = super().header() header["data_id"]["subband_index"] = self.subband_index - header["data_id"]["baseline"] = self.baseline + header["data_id"]["first_baseline"] = self.first_baseline return header diff --git a/devices/devices/sdp/xst.py b/devices/devices/sdp/xst.py new file mode 100644 index 0000000000000000000000000000000000000000..ed28bb1a8db84ceb279760c7249e3f7037a2b7b4 --- /dev/null +++ b/devices/devices/sdp/xst.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the XST project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" XST Device Server for LOFAR2.0 + +""" + +# TODO(Corne): Remove sys.path.append hack once packaging is in place! +import os, sys +currentdir = os.path.dirname(os.path.realpath(__file__)) +parentdir = os.path.dirname(currentdir) +parentdir = os.path.dirname(parentdir) +sys.path.append(parentdir) + +# PyTango imports +from tango.server import run +from tango.server import device_property, attribute +from tango import AttrWriteType +# Additional import + +from clients.attribute_wrapper import attribute_wrapper +from clients.opcua_client import OPCUAConnection +from clients.statistics_client import StatisticsClient + +from devices.hardware_device import hardware_device + +from common.lofar_git import get_version +from common.lofar_logging import device_logging_to_python, log_exceptions + +from devices.sdp.statistics import Statistics +from devices.sdp.statistics_collector import XSTCollector + +import numpy + +__all__ = ["XST", "main"] + +class XST(Statistics): + + STATISTICS_COLLECTOR_CLASS = XSTCollector + + # ----------------- + # Device Properties + # ----------------- + + # ---------- + # Attributes + # ---------- + + # FPGA control points for XSTs + FPGA_xst_integration_interval_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_integration_interval_RW"], datatype=numpy.double, dims=(8,16), access=AttrWriteType.READ_WRITE) + FPGA_xst_integration_interval_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_integration_interval_R"], datatype=numpy.double, dims=(8,16)) + FPGA_xst_offload_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_enable_R"], datatype=numpy.bool_, dims=(16,)) + FPGA_xst_offload_hdr_eth_destination_mac_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_eth_destination_mac_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_hdr_eth_destination_mac_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_eth_destination_mac_R"], datatype=numpy.str_, dims=(16,)) + FPGA_xst_offload_hdr_ip_destination_address_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_ip_destination_address_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,)) + FPGA_xst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,)) + FPGA_xst_offload_subband_select_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_subband_select_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_xst_offload_subband_select_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_subband_select_R"], datatype=numpy.uint32, dims=(16,)) + + # number of packets with valid payloads + nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64) + # number of packets with invalid payloads + nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64) + # latest XSTs + xst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_values"}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.float64) + # reported timestamp for each row in the latest XSTs + xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint64) + # which subband the XSTs describe + xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint16) + # integration interval for each row in the latest XSTs + integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.float32) + + # -------- + # Overloaded functions + # -------- + + # -------- + # Commands + # -------- + +# ---------- +# Run server +# ---------- +def main(args=None, **kwargs): + """Main function of the XST Device module.""" + + from common.lofar_logging import configure_logger + configure_logger() + + return run((XST,), args=args, **kwargs) + + +if __name__ == '__main__': + main() diff --git a/docker-compose/device-xst.yml b/docker-compose/device-xst.yml new file mode 100644 index 0000000000000000000000000000000000000000..9dcd8cea93f94d2cf25953ae933d9ba159696ade --- /dev/null +++ b/docker-compose/device-xst.yml @@ -0,0 +1,44 @@ +# +# Docker compose file that launches an interactive iTango session. +# +# Connect to the interactive session with 'docker attach itango'. +# Disconnect with the Docker deattach sequence: <CTRL>+<P> <CTRL>+<Q> +# +# Defines: +# - itango: iTango interactive session +# +# Requires: +# - lofar-device-base.yml +# +version: '2' + +services: + device-sst: + image: device-xst + # build explicitly, as docker-compose does not understand a local image + # being shared among services. + build: + context: lofar-device-base + args: + SOURCE_IMAGE: ${DOCKER_REGISTRY_HOST}/${DOCKER_REGISTRY_USER}-tango-itango:${TANGO_ITANGO_VERSION} + container_name: ${CONTAINER_NAME_PREFIX}device-xst + networks: + - control + - data + ports: + - "5002:5002/udp" # port to receive XST UDP packets on + - "5704:5704" # unique port for this DS + volumes: + - ${TANGO_LOFAR_CONTAINER_MOUNT} + environment: + - TANGO_HOST=${TANGO_HOST} + entrypoint: + - /usr/local/bin/wait-for-it.sh + - ${TANGO_HOST} + - --timeout=30 + - --strict + - -- + # configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA + # can't know about our Docker port forwarding + - python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/devices/sdp/xst.py LTS -v -ORBendPoint giop:tcp:0:5704 -ORBendPointPublish giop:tcp:${HOSTNAME}:5704 + restart: on-failure diff --git a/docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py b/docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py index bc56f8b05de5e90804562bcf77378ae8798100a2..3960c59843f3ad3bec063bd39489e2f0064c1bd6 100644 --- a/docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py +++ b/docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py @@ -2,6 +2,7 @@ pcc = DeviceProxy("LTS/PCC/1") sdp = DeviceProxy("LTS/SDP/1") sst = DeviceProxy("LTS/SST/1") +xst = DeviceProxy("LTS/XST/1") # Put them in a list in case one wants to iterate -devices = [pcc, sdp, sst] +devices = [pcc, sdp, sst, xst]