diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 46d29fe4d27e1cff7c99aa7904a0c40ced90e718..8e8499ec2d3b7d3e7a7c91b965a816996ab006e1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -518,6 +518,7 @@ unit_test: script: - cd tangostationcontrol - tox -e cover + coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/' artifacts: reports: coverage_report: diff --git a/tangostationcontrol/__init__.py b/docker-compose/tango-prometheus-exporter/code/__init__.py similarity index 100% rename from tangostationcontrol/__init__.py rename to docker-compose/tango-prometheus-exporter/code/__init__.py diff --git a/docker-compose/tango-prometheus-exporter/lofar2-policy.json b/docker-compose/tango-prometheus-exporter/lofar2-policy.json index 994d9dd1877b87ab7ccecbcfe325c97333dd7f92..e7da467993c3fed8f4fef44c107bbc8f0608d072 100644 --- a/docker-compose/tango-prometheus-exporter/lofar2-policy.json +++ b/docker-compose/tango-prometheus-exporter/lofar2-policy.json @@ -5,7 +5,7 @@ }, "devices": { - "STAT/AntennaField/1": { + "stat/antennafield/1": { "include": [ "HBAT_ANT_mask_RW" ], @@ -13,31 +13,31 @@ "HBAT_BF_delay_steps_*" ] }, - "STAT/APSCT/1": { + "stat/apsct/1": { }, - "STAT/APSPU/1": { + "stat/apspu/1": { }, - "STAT/Beamlet/1": { + "stat/beamlet/1": { "exclude": [ "FPGA_beamlet_subband_select_*", "FPGA_bf_weights_*" ] }, - "STAT/Boot/1": { + "stat/boot/1": { }, - "STAT/DigitalBeam/1": { + "stat/digitalbeam/1": { }, - "STAT/Docker/1": { + "stat/docker/1": { }, - "STAT/Observation/*":{ + "stat/observation/*":{ }, - "STAT/ObservationControl/1":{ + "stat/observationcontrol/1":{ }, - "STAT/PSOC/1": { + "stat/psoc/1": { }, - "STAT/PCON/1": { + "stat/pcon/1": { }, - "STAT/RECV/1": { + "stat/recv/1": { "include": [ "ANT_mask_RW", "RCU_mask_RW" @@ -51,7 +51,7 @@ "*_ITRF_offsets_R" ] }, - "STAT/SDP/1": { + "stat/sdp/1": { "include": [ "TR_fpga_mask_RW" ], @@ -65,7 +65,7 @@ "FPGA_wg_phase_*" ] }, - "STAT/SST/1": { + "stat/sst/1": { "exclude": [ "sst_R", "sst_timestamp_R", @@ -74,14 +74,14 @@ "subbands_calibrated_R" ] }, - "STAT/TileBeam/1": { + "stat/tilebeam/1": { }, - "STAT/UNB2/1": { + "stat/unb2/1": { "include": [ "UNB2_mask_RW" ] }, - "STAT/XST/1": { + "stat/xst/1": { "exclude": [ "FPGA_xst_ring_*", "FPGA_xst_rx_align_*", diff --git a/tangostationcontrol/requirements.txt b/tangostationcontrol/requirements.txt index aa00dc4a54c8c90861ef1ac5756a8166b2029504..21a4422bbb65100a0e3b9d3c10136b2f3345e9bb 100644 --- a/tangostationcontrol/requirements.txt +++ b/tangostationcontrol/requirements.txt @@ -2,6 +2,7 @@ # order of appearance. Changing the order has an impact on the overall # integration process, which may cause wedges in the gate later. +lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.3.0 asyncua >= 0.9.90 # LGPLv3 PyMySQL[rsa] >= 1.0.2 # MIT psycopg2-binary >= 2.9.2 # LGPL diff --git a/tangostationcontrol/setup.cfg b/tangostationcontrol/setup.cfg index 82d6efb049aa4dae8f7842518c20aea2f4ae1ba1..0fe48b128f13c2e09ccca534cf0b5fa609aee02b 100644 --- a/tangostationcontrol/setup.cfg +++ b/tangostationcontrol/setup.cfg @@ -51,7 +51,7 @@ console_scripts = l2ss-bst = tangostationcontrol.devices.sdp.bst:main l2ss-sst = tangostationcontrol.devices.sdp.sst:main l2ss-statistics-reader = tangostationcontrol.statistics_writer.statistics_reader:main - l2ss-statistics-writer = tangostationcontrol.statistics_writer.statistics_writer:main + l2ss-statistics-writer = tangostationcontrol.statistics.writer:main l2ss-unb2 = tangostationcontrol.devices.unb2:main l2ss-xst = tangostationcontrol.devices.sdp.xst:main l2ss-temperature-manager = tangostationcontrol.devices.temperature_manager:main diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/client.py b/tangostationcontrol/tangostationcontrol/clients/statistics/client.py index 869cb78dab4b013b7cb6634e517f4dad73f187c5..afbf93840499f0ddd8c85c8dbbda4fa626705402 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics/client.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics/client.py @@ -1,3 +1,12 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + from queue import Queue import logging import numpy diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/client_thread.py b/tangostationcontrol/tangostationcontrol/clients/statistics/client_thread.py index 3da8f76ac135fd4fb631f1de98518ff74f9ec2f9..04a288de676bfe02ec8e442951827d563f029438 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics/client_thread.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics/client_thread.py @@ -1,3 +1,12 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + from abc import ABC from abc import abstractmethod import logging diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py index 066839a293f61aa65b0da741800b8cdac2e31f34..8930cdb371cd4ac23b299d479e19dcd28dd08d38 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py @@ -1,9 +1,19 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + import logging from threading import Thread from queue import Queue +from lofar_station_client.statistics.collector import StatisticsCollector + from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread -from tangostationcontrol.devices.sdp.statistics_collector import StatisticsCollector logger = logging.getLogger() diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py index e39782b48b68d773f968a89b3b568849f8d183ef..1ec2a46f1442d11295b916be55167559e51f2af3 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py @@ -11,17 +11,19 @@ """ +import numpy + from tango.server import device_property, attribute from tango import AttrWriteType + +from lofar_station_client.statistics.collector import BSTCollector + # Own imports from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.clients.opcua_client import OPCUAConnection from tangostationcontrol.clients.statistics.client import StatisticsClient from tangostationcontrol.devices.sdp.statistics import Statistics -from tangostationcontrol.devices.sdp.statistics_collector import BSTCollector - -import numpy __all__ = ["BST", "main"] diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py b/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py deleted file mode 100644 index f36bff3199759422c16596adc9cdc3482295d8ac..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py +++ /dev/null @@ -1,570 +0,0 @@ -import struct -from datetime import datetime, timezone -import numpy - -__all__ = ["StatisticsPacket", "SDPPacket", "SSTPacket", "XSTPacket", "BSTPacket", "read_packet", "PACKET_CLASS_FOR_MARKER"] - -N_POL = 2 -N_COMPLEX = 2 - -def get_bit_value(value: bytes, first_bit: int, last_bit: int = None) -> int: - """ Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB. - - For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal: - get_bit_value(b'01100', 2, 3) == 3 - - If 'last_bit' is not given, just the value of bit 'first_bit' is returned. """ - - # default last_bit to first_bit - if last_bit is None: - last_bit = first_bit - - return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1) - - -class SDPPacket(object): - """ - Models a UDP packet from SDP. - - Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). - - The following fields are exposed as properties & functions. The _raw fields come directly - from the packet, and have more user-friendly alternatives for intepretation: - - marker_raw packet marker as byte. - marker() packet marker as character. 'S' = SST, 'X' = XST, 'B' = BST - - version_id packet format version. - observation_id observation identifier. - station_id station identifier. - - source_info: bit field with input information, encoding several other properties. - antenna_band_index: antenna type. 0 = low band, 1 = high band. - nyquist_zone_index: nyquist zone of filter: - 0 = 0 -- 1/2 * t_adc Hz (low band), - 1 = 1/2 * t_adc -- t_adc Hz (high band), - 2 = t_adc -- 3/2 * t_adc Hz (high band). - t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz. - fsub_type: sampling method. 0 = critically sampled, 1 = oversampled. - payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered). - beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only). - gn_index: global index of FPGA that emitted this packet. - - block_period_raw: block period, in nanoseconds. - block_period(): block period, in seconds. - block_serial_number: timestamp of the data, in block periods since 1970. - timestamp(): timestamp of the data, as a datetime object. - - """ - - def __init__(self, packet: bytes): - self.packet = packet - - self.unpack() - - # Only parse valid statistics packets from SDP, reject everything else - if self.marker_raw not in self.valid_markers(): - raise ValueError( - f"Invalid packet of type {self.__class__.__name__}: packet marker (first byte) is {self.marker}, not one of {self.valid_markers()}.") - - # format string for the header, see unpack below - HEADER_FORMAT = ">cBL HH xxxxx xxxxxxx HQ" - HEADER_SIZE = struct.calcsize(HEADER_FORMAT) - - @classmethod - def valid_markers(cls): - """ Valid values for the 'marker_raw' header field for this class. - - Each new packet class that introduces a new marker should be added - to the PACKET_CLASS_FOR_MARKER registry, which holds the mapping - marker -> packet class. - """ - - # return all markers registered in PACKET_CLASS_FOR_MARKER which are for this class or any of its specialisations - return [marker for marker, klass in PACKET_CLASS_FOR_MARKER.items() if issubclass(klass, cls)] - - def unpack(self): - """ Unpack the packet into properties of this object. """ - - # unpack fields - try: - (self.marker_raw, - self.version_id, - self.observation_id, - self.station_id, - self.source_info, - self.block_period_raw, - self.block_serial_number) = struct.unpack(self.HEADER_FORMAT, self.packet[:self.HEADER_SIZE]) - except struct.error as e: - raise ValueError("Error parsing statistics packet") from e - - # unpack the fields we just updated - self.unpack_source_info() - - def unpack_source_info(self): - """ Unpack the source_info field into properties of this object. """ - - self.antenna_band_index = get_bit_value(self.source_info, 15) - self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14) - self.t_adc = get_bit_value(self.source_info, 12) - self.fsub_type = get_bit_value(self.source_info, 11) - self.payload_error = (get_bit_value(self.source_info, 10) != 0) - self.beam_repositioning_flag = (get_bit_value(self.source_info, 9) != 0) - # self.source_info 5-8 are reserved - self.gn_index = get_bit_value(self.source_info, 0, 4) - - def expected_size(self) -> int: - """ The size this packet should be (header + payload), according to the header. """ - - # the generic header does not contain enough information to determine the payload size - raise NotImplementedError - - def size(self) -> int: - """ The actual size of this packet. """ - - return len(self.packet) - - @property - def marker(self) -> str: - """ Return the type of statistic: - - 'S' = SST - 'B' = BST - 'X' = XST - 'b' = beamlet - """ - - try: - return self.marker_raw.decode('ascii') - except UnicodeDecodeError: - # non-ascii (>127) character, return as binary - # - # this is typically not visible to the user, as these packets are not SDP statistics packets, - # which the constructor will refuse to accept. - return self.marker_raw - - def block_period(self) -> float: - """ Return the block period, in seconds. """ - - return self.block_period_raw / 1e9 - - def timestamp(self) -> datetime: - """ Returns the timestamp of the data in this packet. - - Returns datetime.min if the block_serial_number in the packet is not set (0), - Returns datetime.max if the timestamp cannot be represented in python (likely because it is too large). """ - - try: - return datetime.fromtimestamp(self.block_serial_number * self.block_period(), timezone.utc) - except ValueError: - # Let's not barf anytime we want to print a header - return datetime.max - - def header(self) -> dict: - """ Return all the header fields as a dict. """ - - header = { - "marker": self.marker, - "version_id": self.version_id, - "observation_id": self.observation_id, - "station_id": self.station_id, - "source_info": { - "_raw": self.source_info, - "antenna_band_index": self.antenna_band_index, - "nyquist_zone_index": self.nyquist_zone_index, - "t_adc": self.t_adc, - "fsub_type": self.fsub_type, - "payload_error": self.payload_error, - "beam_repositioning_flag": self.beam_repositioning_flag, - "gn_index": self.gn_index, - }, - "block_period_raw": self.block_period_raw, - "block_period": self.block_period(), - "block_serial_number": self.block_serial_number, - "timestamp": self.timestamp(), - } - - return header - - def payload(self, signed=False) -> numpy.array: - """ The payload of this packet, as a linear array. """ - - # derive which and how many elements to read from the packet header - bytecount_to_unsigned_struct_type = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} if signed else {1: 'B', 2: 'H', 4: 'I', 8: 'Q'} - format_str = ">{}{}".format(self.nof_statistics_per_packet, - bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic]) - - return numpy.array( - struct.unpack(format_str, self.packet[self.HEADER_SIZE:self.HEADER_SIZE + struct.calcsize(format_str)])) - - -class BeamletPacket(SDPPacket): - """ - Models a beamlet UDP packet from SDP. - - Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). - - The following additional fields are exposed as properties & functions. - - beamlet_width: bits/beamlet (4, 8, 16). - - beamlet_scale: scaling of beamlets prior to quantisation. - beamlet_index: index of the first beamlet in the payload. - nof_blocks_per_packet: number of blocks (timestamps) of data in the payload. - nof_beamlets_per_block: number of beamlets in the payload. - """ - # format string for the header, see unpack below - HEADER_FORMAT = ">cBL HH xxxxx HHBHHQ" - HEADER_SIZE = struct.calcsize(HEADER_FORMAT) - - def unpack(self): - """ Unpack the packet into properties of this object. """ - - # unpack fields - try: - (self.marker_raw, - self.version_id, - self.observation_id, - self.station_id, - self.source_info, - self.beamlet_scale, - self.beamlet_index, - self.nof_blocks_per_packet, - self.nof_beamlets_per_block, - self.block_period_raw, - self.block_serial_number) = struct.unpack(self.HEADER_FORMAT, self.packet[:self.HEADER_SIZE]) - except struct.error as e: - raise ValueError("Error parsing beamlet packet") from e - - # unpack the fields we just updated - self.unpack_source_info() - - def unpack_source_info(self): - """ Unpack the source_info field into properties of this object. """ - - super().unpack_source_info() - - self.beamlet_width = get_bit_value(self.source_info, 5, 8) or 16 # 0 -> 16 - - def expected_size(self) -> int: - """ The size this packet should be (header + payload), according to the header. """ - - return self.HEADER_SIZE + self.nof_statistics_per_packet * self.nof_bytes_per_statistic - - @property - def nof_statistics_per_packet(self) -> int: - return self.nof_blocks_per_packet * self.nof_beamlets_per_block * N_POL * N_COMPLEX * self.beamlet_width // 8 - - @property - def nof_bytes_per_statistic(self) -> int: - if self.beamlet_width == 8: - # cint8 data [-127, 127] - return 1 - elif self.beamlet_width == 16: - # cint16 data [-32767, 32767] - return 2 - elif self.beamlet_width == 4: - # cint4 data [-7, 7], packet in 1 byte - return 1 - - def header(self) -> dict: - """ Return all the header fields as a dict. """ - - header = super().header() - - header["source_info"]["beamlet_width"] = self.beamlet_width - - header.update({ - "beamlet_scale": self.beamlet_scale, - "beamlet_index": self.beamlet_index, - "nof_blocks_per_packet": self.nof_blocks_per_packet, - "nof_beamlets_per_block": self.nof_beamlets_per_block - }) - - return header - - def payload_raw(self): - if self.beamlet_width == 4: - return super().payload(signed=True).reshape(self.nof_blocks_per_packet, self.nof_beamlets_per_block, N_POL) - else: - return super().payload(signed=True).reshape(self.nof_blocks_per_packet, self.nof_beamlets_per_block, N_POL, N_COMPLEX) - - @property - def payload(self): - if self.beamlet_width == 4: - # real in low 4 bits, imag in high 4 bits (both signed!) - payload_raw = self.payload_raw() - - # shift extends sign, so we prefer it over bitwise and - return (payload_raw << 4 >> 4) + (payload_raw >> 4) * 1j - else: - return self.payload_raw().astype(float).view(numpy.complex64) - - -class StatisticsPacket(SDPPacket): - """ - Models a statistics UDP packet from SDP. - - Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). - - The following additional fields are exposed as properties & functions. - - subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not. - - data_id: bit field with payload information, encoding several other properties. - - nof_signal_inputs: number of inputs that contributed to data in this packet. - nof_bytes_per_statistics: word size of each statistic. - nof_statistics_per_packet: number of statistic data points in the payload. - - integration_interval_raw: integration interval, in block periods. - integration_interval(): integration interval, in seconds. - """ - - # statistics format string for the header, see unpack below - HEADER_FORMAT = ">cBL HHB BHL BBH HQ" - HEADER_SIZE = struct.calcsize(HEADER_FORMAT) - - def unpack(self): - """ Unpack the packet into properties of this object. """ - - # unpack fields - try: - (self.marker_raw, - self.version_id, - self.observation_id, - self.station_id, - self.source_info, - # reserved byte - _, - # integration interval, in block periods. This field is 3 bytes, big endian -- combine later - integration_interval_hi, - integration_interval_lo, - self.data_id, - self.nof_signal_inputs, - self.nof_bytes_per_statistic, - self.nof_statistics_per_packet, - self.block_period_raw, - self.block_serial_number) = struct.unpack(self.HEADER_FORMAT, self.packet[:self.HEADER_SIZE]) - - self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo - except struct.error as e: - raise ValueError("Error parsing statistics packet") from e - - # unpack the fields we just updated - self.unpack_source_info() - self.unpack_data_id() - - def expected_size(self) -> int: - """ The size this packet should be (header + payload), according to the header. """ - - return self.HEADER_SIZE + self.nof_statistics_per_packet * self.nof_bytes_per_statistic - - def unpack_source_info(self): - """ Unpack the source_info field into properties of this object. """ - - super().unpack_source_info() - self.subband_calibrated_flag = (get_bit_value(self.source_info, 8) != 0) - - def unpack_data_id(self): - """ Unpack the data_id field into properties of this object. """ - - # only useful in specialisations (XST/SST/BST) - pass - - def integration_interval(self) -> float: - """ Returns the integration interval, in seconds. """ - - # Translate to seconds using the block period - return self.integration_interval_raw * self.block_period() - - def header(self) -> dict: - """ Return all the header fields as a dict. """ - - header = super().header() - - header["source_info"]["subband_calibrated_flag"] = self.subband_calibrated_flag - - header.update({ - "data_id": { - "_raw": self.data_id, - }, - "integration_interval_raw": self.integration_interval_raw, - "integration_interval": self.integration_interval(), - "nof_signal_inputs": self.nof_signal_inputs, - "nof_bytes_per_statistic": self.nof_bytes_per_statistic, - "nof_statistics_per_packet": self.nof_statistics_per_packet - }) - - return header - - -class SSTPacket(StatisticsPacket): - """ - Models an SST statistics UDP packet from SDP. - - The following fields are exposed as properties & functions. - - - signal_input_index: input (antenna polarisation) index for which this packet contains statistics - - payload[nof_statistics_per_packet]: SST statistics, an array of amplitudes per subband. - """ - - def unpack_data_id(self): - super().unpack_data_id() - - self.signal_input_index = get_bit_value(self.data_id, 0, 7) - - def header(self): - header = super().header() - - header["data_id"]["signal_input_index"] = self.signal_input_index - - return header - - @property - def payload(self): - return super().payload(signed=False) - - -class XSTPacket(StatisticsPacket): - """ - Models an XST statistics UDP packet from SDP. - - The following fields are exposed as properties & functions. - - subband_index: subband number for which this packet contains statistics. - first_baseline: first antenna pair for which this packet contains statistics. - - payload[nof_signal_inputs][nof_signal_inputs] the baselines, starting from first_baseline - """ - - def unpack_data_id(self): - super().unpack_data_id() - - self.subband_index = get_bit_value(self.data_id, 16, 24) - self.first_baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7)) - - def header(self): - header = super().header() - - header["data_id"]["subband_index"] = self.subband_index - header["data_id"]["first_baseline"] = self.first_baseline - - return header - - @property - def payload(self): - return super().payload(signed=True) - - -class BSTPacket(StatisticsPacket): - """ - Models an BST statistics UDP packet from SDP. - - The following fields are exposed as properties & functions. - - beamlet_index: the number of the beamlet for which this packet holds statistics. - """ - - def unpack_data_id(self): - super().unpack_data_id() - - self.beamlet_index = get_bit_value(self.data_id, 0, 15) - - def header(self): - header = super().header() - - header["data_id"]["beamlet_index"] = self.beamlet_index - - return header - - @property - def payload(self): - return super().payload(signed=True) - - -# Which class to use for which marker. -# -# NB: Python does not allow us to register from inside the class, -# as we cannot reference the class during its construction. -PACKET_CLASS_FOR_MARKER = { - b'b': BeamletPacket, - b'S': SSTPacket, - b'B': BSTPacket, - b'X': XSTPacket, -} - - -def read_packet(read_func) -> SDPPacket: - """ Read a packet using the given read function, with signature - - read_func(num_bytes: int) -> bytes - - and return it. The packet type is sensed from the data and - the correct subclass of SDPPacket is returned. - - If read_func() returns None, this function will as well. - """ - - # read just the marker - marker = read_func(1) - if not marker: - return None - - # read the packet header based on type - packetClass = PACKET_CLASS_FOR_MARKER[marker] - - # read the rest of the header - header = read_func(packetClass.HEADER_SIZE - len(marker)) - if not header: - return None - - header = marker + header - - # parse the packet header size - packet = packetClass(header) - - # read the payload - payload_size = packet.expected_size() - len(header) - payload = read_func(payload_size) - - if not payload: - return None - - # return full packet - return packetClass(header + payload) - - -def main(args=None, **kwargs): - # parse one packet from stdin - import sys - import pprint - - # packet counter - nr = 0 - - # byte offset in the stream - offset = 0 - - while True: - # read the packet from input - packet = read_packet(sys.stdin.buffer.read) - - if not packet: - break - - # print header - print(f"# Packet {nr} of class {packet.__class__.__name__} starting at offset {offset} with length {packet.size()}") - pprint.pprint(packet.header()) - - # increment counters - nr += 1 - offset += packet.size() - - -# this file is very useful to have stand alone to parse raw packet files, so make it work as such -if __name__ == "__main__": - import sys - main(sys.argv) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py index 30946bdcda6c504d08d6c30a6e4e8490e6253c75..73a8054882d5dd29c4fecc971c3605880cd2a866 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py @@ -14,14 +14,14 @@ # PyTango imports from tango.server import device_property, attribute from tango import AttrWriteType -# Additional import +# Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.clients.opcua_client import OPCUAConnection from tangostationcontrol.clients.statistics.client import StatisticsClient from tangostationcontrol.devices.sdp.statistics import Statistics -from tangostationcontrol.devices.sdp.statistics_collector import SSTCollector +from tangostationcontrol.statistics.collector import StationSSTCollector import numpy @@ -30,7 +30,7 @@ __all__ = ["SST", "main"] class SST(Statistics): - STATISTICS_COLLECTOR_CLASS = SSTCollector + STATISTICS_COLLECTOR_CLASS = StationSSTCollector # ----------------- # Device Properties @@ -95,18 +95,18 @@ class SST(Statistics): FPGA_sst_offload_nof_valid_R = attribute_wrapper(comms_annotation=["FPGA_sst_offload_nof_valid_R"], datatype=numpy.int32, dims=(16,)) # number of packets with valid payloads - nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64) + nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(StationSSTCollector.MAX_FPGAS,), datatype=numpy.uint64) # number of packets with invalid payloads - nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64) + nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(StationSSTCollector.MAX_FPGAS,), datatype=numpy.uint64) # latest SSTs - sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(SSTCollector.MAX_INPUTS, SSTCollector.MAX_SUBBANDS), datatype=numpy.uint64) + sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(StationSSTCollector.MAX_INPUTS, StationSSTCollector.MAX_SUBBANDS), datatype=numpy.uint64) # reported timestamp # for each row in the latest SSTs - sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) + sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=numpy.uint64) # integration interval for each row in the latest SSTs - integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.float32) + integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=numpy.float32) # whether the subband data was calibrated by the SDP (that is, were subband weights applied) - subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(SSTCollector.MAX_INPUTS,), datatype=bool) + subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=bool) # ---------- # Summarising Attributes diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py deleted file mode 100644 index b330a120488de8acdab62e7b3ae8d88f4f341811..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py +++ /dev/null @@ -1,355 +0,0 @@ -import logging -import numpy -import datetime - -from .packet import SSTPacket, XSTPacket, BSTPacket -from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index - -from tango import DeviceProxy, DevFailed, DevState - -logger = logging.getLogger() - - -class StatisticsCollector: - """ Base class to process statistics packets into parameters matrices. """ - - # Maximum number of FPGAs we receive data from (used for diagnostics) - MAX_FPGAS = 16 - - def __init__(self): - self.parameters = self._default_parameters() - - def _default_parameters(self): - return { - "nof_packets": numpy.uint64(0), - - # Packet count for packets that could not be parsed - "nof_invalid_packets": numpy.uint64(0), - - # Full contents of the latest packet we deemed invalid. - "last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8), - } - - def process_packet(self, packet, device=None): - self.parameters["nof_packets"] += numpy.uint64(1) - - try: - self.parse_packet(packet, device) - except Exception as e: - self.parameters["last_invalid_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) - self.parameters["nof_invalid_packets"] += numpy.uint64(1) - - raise ValueError("Could not parse statistics packet") from e - - def parse_packet(self, packet, device): - """ Update any information based on this packet. """ - - raise NotImplementedError - - def parse_device_attributes(self): - """ Update information based on device attributes """ - raise NotImplementedError - -class SSTCollector(StatisticsCollector): - """ Class to process SST statistics packets. """ - - # Maximum number of antenna inputs we support (used to determine array sizes) - MAX_INPUTS = 192 - - # Maximum number of subbands we support (used to determine array sizes) - MAX_SUBBANDS = 512 - - def _default_parameters(self): - defaults = super()._default_parameters() - - defaults.update({ - # Number of packets received so far that we could parse correctly and do not have a payload error - "nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), - - # Packets that reported a payload error - "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), - - # Last value array we've constructed out of the packets - "sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64), - "sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), - "integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32), - "subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=bool), - - # RECV attribute values to monitor the station configuration - "rcu_attenuator_dB": numpy.zeros((32,3), dtype=numpy.int64), - "rcu_band_select": numpy.zeros((32,3), dtype=numpy.int64), - "rcu_dth_on": numpy.full((32,3), False), - }) - - return defaults - - def parse_packet(self, packet, device): - fields = SSTPacket(packet) - - # determine which input this packet contains data for - if fields.signal_input_index >= self.MAX_INPUTS: - # packet describes an input that is out of bounds for us - raise ValueError("Packet describes input %d, but we are limited to describing MAX_INPUTS=%d" % (fields.signal_input_index, self.MAX_INPUTS)) - - input_index = fields.signal_input_index - - if fields.payload_error: - # cannot trust the data if a payload error is reported - self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1) - - # don't raise, as packet is valid - return - - # process the packet - self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) - self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload - self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) - self.parameters["integration_intervals"][input_index] = fields.integration_interval() - self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flag - - # add tango values to packet - self.parse_device_attributes(device) - - def parse_device_attributes(self, device: DeviceProxy): - - #If Tango connection has been disabled, set explicitly to None, - # because otherwise default_values are inserted - if device is None or device.state() != DevState.ON: - self.parameters["rcu_attenuator_dB"] = None - self.parameters["rcu_band_select"] = None - self.parameters["rcu_dth_on"] = None - else: - try: - self.parameters["rcu_attenuator_dB"] = device.RCU_Attenuator_dB_R - self.parameters["rcu_band_select"] = device.RCU_Band_Select_R - self.parameters["rcu_dth_on"] = device.RCU_DTH_on_R - except DevFailed as e: - logger.warning(f"Device {device.name()} not responding.") - self.parameters["rcu_attenuator_dB"] = None - self.parameters["rcu_band_select"] = None - self.parameters["rcu_dth_on"] = None - -class XSTCollector(StatisticsCollector): - """ Class to process XST statistics packets. - - XSTs are received for up to MAX_PARALLEL_SUBBANDS simultaneously, and only the values of the last - MAX_PARALLEL_SUBBANDS are kept. Raw data are collected for each subband in parameters["xst_blocks"], - and overwritten if newer (younger) data is received for the same subband. As such, the data represent - a rolling view on the XSTs. - - The xst_values() function is a user-friendly way to read the xst_blocks. - - The hardware can be configured to emit different and/or fewer subbands, causing some of the XSTs - to become stale. It is therefor advised to inspect parameters["xst_timestamps"] as well. - """ - - # Maximum number of subbands for which we collect XSTs simultaneously - MAX_PARALLEL_SUBBANDS = 8 - - # Maximum number of antenna inputs we support (used to determine array sizes) - MAX_INPUTS = 192 - - # Maximum number of baselines we can receive - MAX_BASELINES = nr_baselines(MAX_INPUTS) - - # Expected block size is BLOCK_LENGTH x BLOCK_LENGTH - BLOCK_LENGTH = 12 - - # Expected number of blocks: enough to cover all baselines without the conjugates (that is, the top-left triangle of the matrix). - MAX_BLOCKS = nr_baselines(MAX_INPUTS // BLOCK_LENGTH) - - # Maximum number of subbands we support (used to determine array sizes) - MAX_SUBBANDS = 512 - - # Complex values are (real, imag). A bit silly, but we don't want magical constants. - VALUES_PER_COMPLEX = 2 - - def _default_parameters(self): - defaults = super()._default_parameters() - - defaults.update({ - # Number of packets received so far that we could parse correctly and do not have a payload error - "nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), - - # Packets that reported a payload error - "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), - - # Last value array we've constructed out of the packets - "xst_blocks": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64), - # Whether the values are actually conjugated and transposed - "xst_conjugated": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS,), dtype=bool), - # When the youngest data for each subband was received - "xst_timestamps": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float64), - "xst_subbands": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.uint16), - "xst_integration_intervals": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float32), - }) - - return defaults - - def select_subband_slot(self, subband): - """ Return which subband slot (0..MAX_PARALLEL_SUBBANDS) to use when confronted with a new subband. - Keep recording the same subband if we're already tracking it, but allocate or replace a slot if not. """ - - indices = numpy.where(self.parameters["xst_subbands"] == subband)[0] - - if len(indices) > 0: - # subband already being recorded, use same spot - return indices[0] - else: - # a new subband, kick out the oldest - oldest_timestamp = self.parameters["xst_timestamps"].min() - - # prefer the first one in case of multiple minima - return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0] - - def parse_packet(self, packet, device=None): - fields = XSTPacket(packet) - - if fields.payload_error: - # cannot trust the data if a payload error is reported - self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1) - - # don't raise, as packet is valid - return - - # the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH - if fields.nof_signal_inputs != self.BLOCK_LENGTH: - raise ValueError("Packet describes a block of {0} x {0} baselines, but we can only parse blocks of {1} x {1} baselines".format(fields.nof_signal_inputs, self.BLOCK_LENGTH)) - - # check whether set of baselines in this packet are not out of bounds - for antenna in (0,1): - if fields.first_baseline[antenna] + fields.nof_signal_inputs > self.MAX_INPUTS: - # packet describes an input that is out of bounds for us - raise ValueError(f"Packet describes {fields.nof_signal_inputs} x {fields.nof_signal_inputs} baselines starting at {fields.first_baseline}, but we are limited to describing MAX_INPUTS={self.MAX_INPUTS}") - - # the blocks of baselines need to be tightly packed, and thus be provided at exact intervals - if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0: - raise ValueError(f"Packet describes baselines starting at {fields.first_baseline}, but we require a multiple of BLOCK_LENGTH={self.MAX_INPUTS}") - - # Make sure we always have a baseline (a,b) with a>=b. If not, we swap the indices and mark that the data must be conjugated and transposed when processed. - first_baseline = fields.first_baseline - if first_baseline[0] < first_baseline[1]: - conjugated = True - first_baseline = (first_baseline[1], first_baseline[0]) - else: - conjugated = False - - # we keep track of multiple subbands. select slot for this one - subband_slot = self.select_subband_slot(fields.subband_index) - - assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, f"Selected slot {subband_slot}, but only have room for {self.MAX_PARALLEL_SUBBANDS}. Existing slots are {self.parameters['xst_subbands']}, processing subband {fields.subband_index}." - - # log if we're replacing a subband we were once recording - self._log_replacing_subband(subband_slot, fields) - - # the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH - # starting at baseline first_baseline. - # - # we honour this format, as we want to keep the metadata together with these blocks. we do need to put the blocks in a linear - # and tight order, however, so we calculate a block index. - block_index = baseline_index(first_baseline[0] // self.BLOCK_LENGTH, first_baseline[1] // self.BLOCK_LENGTH) - - # We did enough checks on first_baseline for this to be a logic error in our code - assert 0 <= block_index < self.MAX_BLOCKS, f"Received block {block_index}, but have only room for {self.MAX_BLOCKS}. Block starts at baseline {first_baseline}." - - # process the packet - self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) - - self.parameters["xst_blocks"][subband_slot, block_index, :fields.nof_statistics_per_packet] = fields.payload - self.parameters["xst_timestamps"][subband_slot] = numpy.float64(fields.timestamp().timestamp()) - self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated - self.parameters["xst_subbands"][subband_slot] = numpy.uint16(fields.subband_index) - self.parameters["xst_integration_intervals"][subband_slot] = fields.integration_interval() - - def _log_replacing_subband(self, subband_slot, fields): - # log if we're replacing a subband we were once recording - previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot] - if previous_subband_in_slot != fields.subband_index: - if self.parameters["xst_timestamps"][subband_slot] > 0: - previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot]) - logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.") - - def xst_values(self, subband_indices = None): - """ xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] of complex values. - - The subband indices must be in [0..MAX_PARALLEL_SUBBANDS). By default, all recorded XSTs are returned. - """ - - if subband_indices is None: - subband_indices = range(self.MAX_PARALLEL_SUBBANDS) - - matrix = numpy.zeros((len(subband_indices), self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64) - xst_blocks = self.parameters["xst_blocks"] - xst_conjugated = self.parameters["xst_conjugated"] - - for matrix_idx, subband_index in enumerate(subband_indices): - for block_index in range(self.MAX_BLOCKS): - # convert real/imag int to complex float values. this works as real/imag come in pairs - block = xst_blocks[subband_index][block_index].astype(numpy.float32).view(numpy.complex64) - - if xst_conjugated[subband_index][block_index]: - # block is conjugated and transposed. process. - block = block.conjugate().transpose() - - # reshape into [a][b] - block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH) - - # compute destination in matrix - first_baseline = baseline_from_index(block_index) - first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH) - - # copy block into matrix - matrix[matrix_idx][first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block - - return matrix - - -class BSTCollector(StatisticsCollector): - """ Class to process SST statistics packets. """ - - # beamlets = 488 * 2 for the x and y polorisations - MAX_BEAMLETS = 976 - - MAX_BLOCKS = 2 - - def _default_parameters(self): - defaults = super()._default_parameters() - - defaults.update({ - # Number of packets received so far that we could parse correctly and do not have a payload error - "nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), - - # Packets that reported a payload error - "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), - - # Last value array we've constructed out of the packets - "bst_values": numpy.zeros((self.MAX_BLOCKS, self.MAX_BEAMLETS), dtype=numpy.uint64), - "bst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64), - "integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32), - }) - - return defaults - - def parse_packet(self, packet, device=None): - fields = BSTPacket(packet) - - # To get the block_index we floor divide this beamlet_index by the max amount of beamlets per block - block_index = fields.beamlet_index // self.MAX_BEAMLETS - - # determine which input this packet contains data for - if block_index >= self.MAX_BLOCKS: - # packet describes an input that is out of bounds for us - raise ValueError("Packet describes beamlet %d, but we are limited to describing MAX_BEAMLETS=%d" % (fields.beamlet_index, self.MAX_BEAMLETS)) - - if fields.payload_error: - # cannot trust the data if a payload error is reported - self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1) - - # don't raise, as packet is valid - return - - # process the packet - self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) - self.parameters["bst_values"][block_index][:self.MAX_BEAMLETS] = fields.payload - self.parameters["bst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp()) - self.parameters["integration_intervals"][block_index] = fields.integration_interval() diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py index ccb39ccf6214e638c0eb47cea4aa8b53afd088f9..dec4acfb4d5ea9837a133db3474a008bd57983be 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py @@ -15,6 +15,8 @@ from tango.server import device_property, attribute from tango import AttrWriteType +from lofar_station_client.statistics.collector import XSTCollector + # Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper @@ -22,7 +24,6 @@ from tangostationcontrol.clients.opcua_client import OPCUAConnection from tangostationcontrol.clients.statistics.client import StatisticsClient from tangostationcontrol.devices.sdp.statistics import Statistics -from tangostationcontrol.devices.sdp.statistics_collector import XSTCollector import numpy diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/statistics_writer/__init__.py b/tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/__init__.py similarity index 100% rename from tangostationcontrol/tangostationcontrol/integration_test/default/statistics_writer/__init__.py rename to tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/__init__.py diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/test_tango_prometheus_client.py b/tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/test_tango_prometheus_client.py new file mode 100644 index 0000000000000000000000000000000000000000..f0d1354f6ce9779d3585bd1119f2820652651810 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/test_tango_prometheus_client.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import importlib +import sys, os +import numpy + +from tango import Database + +from tangostationcontrol.integration_test.base import BaseIntegrationTestCase +from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy + +module_name = 'ArchiverPolicy' +file_path = os.path.join(os.path.realpath('..'), 'docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py') +spec = importlib.util.spec_from_file_location(module_name, file_path) +tpc_policy = importlib.util.module_from_spec(spec) +sys.modules[module_name] = tpc_policy +spec.loader.exec_module(tpc_policy) + +module_name = 'CustomCollector' +spec = importlib.util.spec_from_file_location(module_name, file_path) +tpc_cc = importlib.util.module_from_spec(spec) +sys.modules[module_name] = tpc_cc +spec.loader.exec_module(tpc_cc) + +class TestPrometheusClient(BaseIntegrationTestCase): + + config_path = os.path.join(os.path.realpath('..'), 'docker-compose/tango-prometheus-exporter/lofar2-policy.json') + CONFIG = tpc_policy.ArchiverPolicy.load_config(config_path) + + def setUp(self): + super().setUp() + + def initialise_collector(self): + db = Database() + station = db.get_property("station","name")["name"][0] + custom_collector = tpc_cc.CustomCollector(self.CONFIG, station) + self.assertIsNotNone(custom_collector) + return custom_collector + + def setup_recv_proxy(self, device_name='stat/recv/1'): + # setup RECV + recv_proxy = TestDeviceProxy(device_name) + recv_proxy.off() + recv_proxy.warm_boot() + recv_proxy.set_defaults() + return recv_proxy + + def test_tango_db_devices(self): + """ Test if device names are correctly retrieved from Tango DB """ + policy = tpc_policy.ArchiverPolicy(self.CONFIG) + db_devices = policy.device_list() + self.assertNotEqual(len(db_devices), 0) + + def test_policy_devices(self): + """ Test if device names are correctly filtered with policy file """ + policy = tpc_policy.ArchiverPolicy(self.CONFIG) + db_devices = policy.device_list() + policy_devices = policy.devices() + self.assertLessEqual(len(policy_devices), len(db_devices)) + config_retrieved_devices = [*policy.config['devices'].keys()] # list of device names from policy file + for d in config_retrieved_devices: + if '*' not in d: # filter out wildcards + self.assertIn(d, policy_devices) + + def test_archiver_policy_attribute_list(self): + """ Test if the full set of archiving policy for the given device is retrieved """ + device_name = 'stat/recv/1' + recv_proxy = self.setup_recv_proxy(device_name) + policy = tpc_policy.ArchiverPolicy(self.CONFIG) + attribute_list = policy.attribute_list(device_name, recv_proxy.get_attribute_list()) + include = policy.config['devices']['stat/recv/1']['include'] # attribute that must be included + for i in include: + if '*' not in i: # exclude wildcard + self.assertIn(i, attribute_list) + exclude = policy.config['devices']['stat/recv/1']['exclude'] # attribute that must be excluded + for e in exclude: + if '*' not in e: # exclude wildcard + self.assertNotIn(e, attribute_list) + + def test_label_metric_list(self): + """ Test whether the metric label list matches up with the ones defined in the GaugeMetricFamily constructor""" + collector = self.initialise_collector() + attribute_metrics, scraping_metrics = collector.collect() + expected_attribute_labels = ['station', 'device', 'name', 'str_value', 'type', 'x', 'y', 'idx'] + expected_scraping_labels = ['station', 'device'] + numpy.testing.assert_equal([*attribute_metrics.samples[0].labels.keys()], expected_attribute_labels) + numpy.testing.assert_equal([*scraping_metrics.samples[0].labels.keys()], expected_scraping_labels) + + def test_collector_metrics_with_devices_in_off(self): + """ Test if the metrics are exposed even if devices are in OFF state """ + device_name = 'stat/recv/1' + recv_proxy = TestDeviceProxy(device_name) + recv_proxy.off() + collector = self.initialise_collector() + expected_attrs = ['State', 'Status'] # only state attributes are scraped when device is in OFF + metrics = collector.device_metrics(device_name) + actual_attrs = [metrics[0][0][2], metrics[1][0][2]] + numpy.testing.assert_equal(sorted(actual_attrs), expected_attrs) + + def test_collector_metrics(self): + """ Test if the metrics are correctly exposed """ + device_name = 'stat/recv/1' + recv_proxy = self.setup_recv_proxy(device_name) + collector = self.initialise_collector() + expected_attr_values = recv_proxy.ANT_error_R + numpy.testing.assert_equal(expected_attr_values, numpy.array([True] * 96)) + attribute_metrics, scraping_metrics = collector.collect() + metric_samples = attribute_metrics.samples + # Test attribute metrics ANT_error_R + samples_values = [] + for s in metric_samples: + if (s.labels['name'] == 'ANT_error_R'): + samples_values.append(numpy.bool(s.value)) + numpy.testing.assert_equal(samples_values, expected_attr_values) + # Test scraping metrics + total_scraping_time = scraping_metrics.samples[-1].value + self.assertLess(total_scraping_time, 10) # Set acceptable scraping time ? diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/statistics/__init__.py b/tangostationcontrol/tangostationcontrol/integration_test/default/statistics/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/statistics_writer/test_statistics_writer_sst.py b/tangostationcontrol/tangostationcontrol/integration_test/default/statistics/test_writer_sst.py similarity index 82% rename from tangostationcontrol/tangostationcontrol/integration_test/default/statistics_writer/test_statistics_writer_sst.py rename to tangostationcontrol/tangostationcontrol/integration_test/default/statistics/test_writer_sst.py index f7f9c913c6ca6ba49a486a2afb4066be082ec7ca..88382175385a01e3493984fd1a53d026a19bbb02 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/statistics_writer/test_statistics_writer_sst.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/statistics/test_writer_sst.py @@ -9,8 +9,10 @@ from tangostationcontrol.integration_test.base import BaseIntegrationTestCase from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy -from tangostationcontrol.devices.sdp.statistics_collector import SSTCollector -from tangostationcontrol.statistics_writer import statistics_writer, statistics_reader +from tangostationcontrol.statistics.collector import StationSSTCollector +from tangostationcontrol.statistics_writer import statistics_reader +from tangostationcontrol.statistics import writer + import sys from os.path import dirname, isfile, join from tempfile import TemporaryDirectory @@ -18,6 +20,7 @@ from unittest import mock from tango import DevState + class TestStatisticsWriterSST(BaseIntegrationTestCase): def setUp(self): @@ -42,7 +45,7 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): def test_insert_tango_SST_statistics(self): self.setup_recv_proxy() self.assertEqual(DevState.ON, self.recv_proxy.state()) - collector = SSTCollector() + collector = StationSSTCollector() # Test attribute values retrieval collector.parse_device_attributes(self.recv_proxy) @@ -51,10 +54,10 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): self.assertListEqual(collector.parameters["rcu_dth_on"].tolist(), self.recv_proxy.rcu_dth_on_r.tolist()) with TemporaryDirectory() as tmpdir: - new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] - with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] + with mock.patch.object(writer.sys, 'argv', new_sys_argv): with self.assertRaises(SystemExit): - statistics_writer.main() + writer.main() # check if file was written self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5")) @@ -68,18 +71,18 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0] self.assertIsNotNone(stat) self.assertEqual(121, stat.data_id_signal_input_index) - # RECV attributes are not present since the stats-writer is not connecting to any host - self.assertEqual(stat.rcu_attenuator_dB.tolist(), None) - self.assertEqual(stat.rcu_band_select.tolist(), None) - self.assertEqual(stat.rcu_dth_on.tolist(), None) - + # Test RECV attributes + self.assertListEqual(stat.rcu_attenuator_dB.tolist(), [0] * 96) + self.assertListEqual(stat.rcu_band_select.tolist(), [0] * 96) + self.assertListEqual(stat.rcu_dth_on.tolist(), [False] * 96) + def test_no_tango_SST_statistics(self): with TemporaryDirectory() as tmpdir: - new_sys_argv = [sys.argv[0], "--mode", "SST", "--no-tango", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] - with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + new_sys_argv = [sys.argv[0], "--mode", "SST", "--no-tango", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] + with mock.patch.object(writer.sys, 'argv', new_sys_argv): with self.assertRaises(SystemExit): - statistics_writer.main() + writer.main() # check if file was written self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5")) @@ -104,10 +107,10 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): self.assertEqual(DevState.OFF, self.recv_proxy.state()) with TemporaryDirectory() as tmpdir: - new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] - with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] + with mock.patch.object(writer.sys, 'argv', new_sys_argv): with self.assertRaises(SystemExit): - statistics_writer.main() + writer.main() # check if file was written self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5")) diff --git a/tangostationcontrol/tangostationcontrol/statistics/__init__.py b/tangostationcontrol/tangostationcontrol/statistics/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tangostationcontrol/tangostationcontrol/statistics/collector.py b/tangostationcontrol/tangostationcontrol/statistics/collector.py new file mode 100644 index 0000000000000000000000000000000000000000..7befd038d108ba3c147126082447a8e813838e42 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/statistics/collector.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import abc +import logging + +from lofar_station_client.statistics.collector import SSTCollector + +from tango import DevFailed +from tango import DeviceProxy +from tango import DevState + +logger = logging.getLogger() + + +class StationStatisticsCollectorInterface(abc.ABC): + + @abc.abstractmethod + def parse_device_attributes(self, device: DeviceProxy): + """Update information based on device attributes""" + raise NotImplementedError + + +class StationSSTCollector(StationStatisticsCollectorInterface, SSTCollector): + + def parse_packet(self, packet, obj): + super(StationSSTCollector, self).parse_packet(packet, obj) + + # add tango values to packet + self.parse_device_attributes(obj) + + def parse_device_attributes(self, device: DeviceProxy): + + # If Tango connection has been disabled, set explicitly to None, + # because otherwise default_values are inserted + if device is None or device.state() != DevState.ON: + self.parameters["rcu_attenuator_dB"] = None + self.parameters["rcu_band_select"] = None + self.parameters["rcu_dth_on"] = None + else: + try: + self.parameters["rcu_attenuator_dB"] = device.RCU_Attenuator_dB_R + self.parameters["rcu_band_select"] = device.RCU_Band_Select_R + self.parameters["rcu_dth_on"] = device.RCU_DTH_on_R + except DevFailed as e: + logger.warning(f"Device {device.name()} not responding.") + self.parameters["rcu_attenuator_dB"] = None + self.parameters["rcu_band_select"] = None + self.parameters["rcu_dth_on"] = None diff --git a/tangostationcontrol/tangostationcontrol/statistics/packet.py b/tangostationcontrol/tangostationcontrol/statistics/packet.py new file mode 100644 index 0000000000000000000000000000000000000000..87383704daf73fc77e2a34d7d329c6e0246725e2 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/statistics/packet.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import sys +import pprint + +from lofar_station_client.statistics.packet import SDPPacket +from lofar_station_client.statistics.packet import PACKET_CLASS_FOR_MARKER + + +def read_packet(read_func) -> SDPPacket: + """Read a packet using the given read function, with signature + + ```read_func(num_bytes: int) -> bytes``` + + and return it. The packet type is sensed from the data and + the correct subclass of SDPPacket is returned. + + If read_func() returns None, this function will as well. + """ + + # read just the marker + marker = read_func(1) + if not marker: + return None + + # read the packet header based on type + packetClass = PACKET_CLASS_FOR_MARKER[marker] + + # read the rest of the header + header = read_func(packetClass.HEADER_SIZE - len(marker)) + if not header: + return None + + header = marker + header + + # parse the packet header size + packet = packetClass(header) + + # read the payload + payload_size = packet.expected_size() - len(header) + payload = read_func(payload_size) + + if not payload: + return None + + # return full packet + return packetClass(header + payload) + + +def main(args=None, **kwargs): + # parse one packet from stdin + + # packet counter + nr = 0 + + # byte offset in the stream + offset = 0 + + while True: + # read the packet from input + packet = read_packet(sys.stdin.buffer.read) + + if not packet: + break + + # print header + print( + f"# Packet {nr} of class {packet.__class__.__name__} starting at " + f"offset {offset} with length {packet.size()}" + ) + pprint.pprint(packet.header()) + + # increment counters + nr += 1 + offset += packet.size() + + +# this file is very useful to have stand alone to parse raw packet files, so make it +# work as such +if __name__ == "__main__": + main(sys.argv) diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py b/tangostationcontrol/tangostationcontrol/statistics/writer.py similarity index 94% rename from tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py rename to tangostationcontrol/tangostationcontrol/statistics/writer.py index 16d2290685017f0637e9a67761498d6c2186f4ab..23d3869959ccad60d0fa2d9c098aef259b6b9f88 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics/writer.py @@ -1,3 +1,12 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + import argparse import time import sys @@ -10,6 +19,7 @@ import logging logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s') logger = logging.getLogger("statistics_writer") + def _create_parser(): """Define the parser""" parser = argparse.ArgumentParser( @@ -55,6 +65,7 @@ def _create_parser(): ) return parser + def _create_receiver(filename, host, port): """ creates the TCP receiver that is given to the writer """ if filename: @@ -65,6 +76,7 @@ def _create_receiver(filename, host, port): logger.fatal("Must provide either a host and port, or a file to receive input from") sys.exit(1) + def _create_writer(mode, interval, output_dir, decimation): """Create the writer""" if mode == "XST": @@ -77,6 +89,7 @@ def _create_writer(mode, interval, output_dir, decimation): logger.fatal(f"Invalid mode: {mode}") sys.exit(1) + def _start_loop(receiver, writer, reconnect, filename, device): """Main loop""" try: @@ -88,6 +101,7 @@ def _start_loop(receiver, writer, reconnect, filename, device): finally: writer.close_writer() + def _receive_packets(receiver, writer, reconnect, filename, device): try: packet = receiver.get_packet() @@ -107,7 +121,8 @@ def _receive_packets(receiver, writer, reconnect, filename, device): else: logger.info("End of input.") raise SystemExit - + + def main(): parser = _create_parser() @@ -141,11 +156,13 @@ def main(): logger.debug("Setting loglevel to DEBUG") # sets the Tango connection in order to retrieve attribute values - if tango_disabled or not host: + if tango_disabled: logger.warning("Tango connection is DISABLED") - device = None + device = None + elif host: + device = DeviceProxy(f"tango://{host}:10000/{args.device}".lower()) if mode == 'SST' else None else: - device = DeviceProxy(f"tango://{host}:10000/{args.device}".lower()) if mode=='SST' else None + device = DeviceProxy(args.device) if mode == 'SST' else None # creates the TCP receiver that is given to the writer receiver = _create_receiver(filename, host, port) diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py index c81a4288d6574d7181d0a2a9dccda3f8e119aa7b..846621208075c644d00eb8260f4d70b4cd4fd122 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py @@ -1,19 +1,26 @@ -# imports for working with datetime objects +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from abc import ABC, abstractmethod from datetime import datetime, timedelta -import pytz +import logging # python hdf5 import h5py - import numpy -import logging -from abc import ABC, abstractmethod +import pytz + +from lofar_station_client.statistics.collector import BSTCollector +from lofar_station_client.statistics.collector import XSTCollector +from tangostationcontrol.statistics.collector import StationSSTCollector -# import statistics classes with workaround -import sys -sys.path.append("..") -from tangostationcontrol.devices.sdp.packet import SSTPacket, XSTPacket, BSTPacket -import tangostationcontrol.devices.sdp.statistics_collector as statistics_collector +from lofar_station_client.statistics.packet import SSTPacket, XSTPacket, BSTPacket logger = logging.getLogger("statistics_writer") @@ -229,7 +236,7 @@ class sst_hdf5_writer(hdf5_writer): return SSTPacket(packet) def new_collector(self): - return statistics_collector.SSTCollector() + return StationSSTCollector() def write_values_matrix(self, current_group): # store the SST values @@ -251,7 +258,7 @@ class bst_hdf5_writer(hdf5_writer): return BSTPacket(packet) def new_collector(self): - return statistics_collector.BSTCollector() + return BSTCollector() def write_values_matrix(self, current_group): # store the BST values @@ -267,7 +274,7 @@ class xst_hdf5_writer(hdf5_writer): return XSTPacket(packet) def new_collector(self): - return statistics_collector.XSTCollector() + return XSTCollector() def next_filename(self, timestamp): time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py index b70446c30d21714b46d1e96f8cf62ac65a509103..233b819c5173fcfefc010f9025563ddcaef3e65c 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py @@ -1,9 +1,16 @@ -import socket +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. -import sys -sys.path.append("..") -from tangostationcontrol.devices.sdp.packet import StatisticsPacket import os +import socket + +from lofar_station_client.statistics.packet import StatisticsPacket class receiver: """ Reads data from a file descriptor. """ diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/udp_dev/udp_client.py b/tangostationcontrol/tangostationcontrol/statistics_writer/udp_dev/udp_client.py index cef6a079d17dc0fb45d71f181ee2be908e9bd091..ea5a644f10985257452b4d93249cea36c37708e9 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/udp_dev/udp_client.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/udp_dev/udp_client.py @@ -30,7 +30,7 @@ class UDP_Client: time.sleep(1) - f = open("../../test/SDP_SST_statistics_packet.bin", "rb") + f = open("../../test/statistics/SDP_SST_statistics_packet.bin", "rb") send_data = f.read() s.sendto(send_data, (self.server_ip, self.server_port)) print("\n\n 1. Client Sent SST Packet at: ", datetime.now()) diff --git a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py index a3b5779650217572dfeddb73eff324657acead2b..12d015774c933322495fc3aa7abe8d5c57052628 100644 --- a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py +++ b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py @@ -144,7 +144,6 @@ class TestDelays(base.TestCase): self.assertAlmostEqual(0.1, delays[0], 6, f"delays[0] = {delays[0]}") - def test_convert_bulk(self): d = delay_calculator([0, 0, 0]) timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that. diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py deleted file mode 100644 index a53bd10a02080991f132ab86232ac874519a1fa6..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py +++ /dev/null @@ -1,297 +0,0 @@ -from tangostationcontrol.devices.sdp.statistics_collector import XSTCollector, BSTCollector -from tangostationcontrol.devices.sdp.packet import XSTPacket, BSTPacket - -from tangostationcontrol.test import base - -import numpy - -class TestSelectSubbandSlot(base.TestCase): - def test_first_entry(self): - collector = XSTCollector() - - # on start, any subband should map on the first entry - self.assertEqual(0, collector.select_subband_slot(102)) - - def test_subsequent_entries(self): - collector = XSTCollector() - - # assign some subbands - collector.parameters["xst_subbands"][0] = 102 - collector.parameters["xst_subbands"][2] = 103 - collector.parameters["xst_subbands"][3] = 104 - - # give them non-zero timestamps to make them newer than the other entries - collector.parameters["xst_timestamps"][0] = 1 - collector.parameters["xst_timestamps"][2] = 1 - collector.parameters["xst_timestamps"][3] = 1 - - # these should be reported back when looking them up again - self.assertEqual(0, collector.select_subband_slot(102)) - self.assertEqual(2, collector.select_subband_slot(103)) - self.assertEqual(3, collector.select_subband_slot(104)) - - # a place for another subband should be the lowest - self.assertEqual(1, collector.select_subband_slot(101)) - - def test_spilling(self): - collector = XSTCollector() - - # assign all subbands, in decreasing age - for n in range(XSTCollector.MAX_PARALLEL_SUBBANDS): - collector.parameters["xst_subbands"][n] = 100 + n - collector.parameters["xst_timestamps"][n] = 100 - n - - # check where a new subband replaces the oldest - self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200)) - - -class TestXSTCollector(base.TestCase): - def test_valid_packet(self): - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - fpga_index = fields.gn_index - - # baseline indeed should be (12,0) - self.assertEqual((12,0), fields.first_baseline) - - # subband should indeed be 102 - self.assertEqual(102, fields.subband_index) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) - - self.assertListEqual([102,0,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values()[0] - - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) - - if baseline_a_was_in_packet and baseline_b_was_in_packet: - self.assertEqual(1+1j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - def test_conjugated_packet(self): - """ Test whether a packet with a baseline (a,b) with a<b will get its payload conjugated. """ - - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload, at baseline (0,12) - # VV VV - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x0c\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - - # baseline indeed should be (0,12) - self.assertEqual((0,12), fields.first_baseline) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values()[0] - - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - # use swapped indices! - baseline_a_was_in_packet = (fields.first_baseline[1] <= baseline_a < fields.first_baseline[1] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[0] <= baseline_b < fields.first_baseline[0] + fields.nof_signal_inputs) - - if baseline_a_was_in_packet and baseline_b_was_in_packet: - self.assertEqual(1-1j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up conjugated in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - def test_multiple_subbands(self): - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) - packet_subband_102 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - packet_subband_103 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00g\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x02' - - # make sure the subband_indices are indeed what we claim they are - fields = XSTPacket(packet_subband_102) - self.assertEqual(102, fields.subband_index) - - fields = XSTPacket(packet_subband_103) - self.assertEqual(103, fields.subband_index) - - # process our packets - collector.process_packet(packet_subband_102) - collector.process_packet(packet_subband_103) - - # counters should now be updated - self.assertListEqual([102,103,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values() - - for subband_idx in range(collector.MAX_PARALLEL_SUBBANDS): - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) - - baseline_in_pk = baseline_a_was_in_packet and baseline_b_was_in_packet - - if baseline_in_pk and subband_idx == 0: - self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - elif baseline_in_pk and subband_idx == 1: - self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - - def test_invalid_packet(self): - collector = XSTCollector() - - # an invalid packet - # V - packet = b'S\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # this should throw - with self.assertRaises(ValueError): - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(1, collector.parameters["nof_invalid_packets"]) - - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_valid_payloads"])) - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_payload_errors"])) - - def test_payload_error(self): - collector = XSTCollector() - - # an valid packet with a payload error - # V - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x14\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - fpga_index = fields.gn_index - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index]) - -class TestBSTCollector(base.TestCase): - def test_valid_packet(self): - collector = BSTCollector() - - # a valid packet as obtained from DTS outside - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - # beamlet_index should be zero - self.assertEqual(0, fields.beamlet_index) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) - - self.assertEqual(1653044924.999997, collector.parameters["bst_timestamps"][fpga_index]) - self.assertAlmostEqual(0.99999744, collector.parameters["integration_intervals"][fpga_index], 8) - - numpy.testing.assert_array_equal(numpy.zeros((2,976), dtype=numpy.uint64), collector.parameters["bst_values"]) - - def test_invalid_packet(self): - collector = BSTCollector() - - # an invalid packet - # V - packet = b'S\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # this should throw - with self.assertRaises(ValueError): - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(1, collector.parameters["nof_invalid_packets"]) - - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_valid_payloads"])) - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_payload_errors"])) - - def test_payload_error(self): - collector = BSTCollector() - - # an valid packet with a payload error - # V - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x14\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index]) - - def test_index_error(self): - collector = BSTCollector() - - # An otherwise valid packet that has its beamlet_index set to 4096. This should throw an error as the max value is 962 * 3 - # V - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x10\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - with self.assertRaises(ValueError): - collector.process_packet(packet) diff --git a/tangostationcontrol/tangostationcontrol/test/prometheus/__init__.py b/tangostationcontrol/tangostationcontrol/test/prometheus/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tangostationcontrol/tangostationcontrol/test/prometheus/test_archiver_policy.py b/tangostationcontrol/tangostationcontrol/test/prometheus/test_archiver_policy.py new file mode 100644 index 0000000000000000000000000000000000000000..9b8f53478227e5390eea98c4a9436b8363eaa7c3 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/test/prometheus/test_archiver_policy.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import sys, os +import importlib.util + +from tangostationcontrol.test import base + +module_name = 'ArchiverPolicy' +file_path = os.path.join(os.path.realpath('..'), 'docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py') +spec = importlib.util.spec_from_file_location(module_name, file_path) +tpc = importlib.util.module_from_spec(spec) +sys.modules[module_name] = tpc +spec.loader.exec_module(tpc) + +class TestArchiverPolicy(base.TestCase): + + config_path = os.path.join(os.path.realpath('..'), 'docker-compose/tango-prometheus-exporter/lofar2-policy.json') + CONFIG = tpc.ArchiverPolicy.load_config(config_path) + + def test_config_file(self): + """ Test if policy config file is correctly retrieved """ + empty_policy = tpc.ArchiverPolicy() # empty config file + expected_config = {'default': {}, 'devices': {}} + self.assertEqual(empty_policy.config, expected_config) + policy = tpc.ArchiverPolicy(self.CONFIG) + self.assertEqual([*policy.config], ['default', 'devices']) # dict keys + diff --git a/tangostationcontrol/tangostationcontrol/test/SDP_BST_statistics_packets.bin b/tangostationcontrol/tangostationcontrol/test/statistics/SDP_BST_statistics_packets.bin similarity index 100% rename from tangostationcontrol/tangostationcontrol/test/SDP_BST_statistics_packets.bin rename to tangostationcontrol/tangostationcontrol/test/statistics/SDP_BST_statistics_packets.bin diff --git a/tangostationcontrol/tangostationcontrol/test/SDP_SST_statistics_packet.bin b/tangostationcontrol/tangostationcontrol/test/statistics/SDP_SST_statistics_packet.bin similarity index 100% rename from tangostationcontrol/tangostationcontrol/test/SDP_SST_statistics_packet.bin rename to tangostationcontrol/tangostationcontrol/test/statistics/SDP_SST_statistics_packet.bin diff --git a/tangostationcontrol/tangostationcontrol/test/SDP_SST_statistics_packets.bin b/tangostationcontrol/tangostationcontrol/test/statistics/SDP_SST_statistics_packets.bin similarity index 100% rename from tangostationcontrol/tangostationcontrol/test/SDP_SST_statistics_packets.bin rename to tangostationcontrol/tangostationcontrol/test/statistics/SDP_SST_statistics_packets.bin diff --git a/tangostationcontrol/tangostationcontrol/test/SDP_XST_statistics_packets.bin b/tangostationcontrol/tangostationcontrol/test/statistics/SDP_XST_statistics_packets.bin similarity index 100% rename from tangostationcontrol/tangostationcontrol/test/SDP_XST_statistics_packets.bin rename to tangostationcontrol/tangostationcontrol/test/statistics/SDP_XST_statistics_packets.bin diff --git a/tangostationcontrol/tangostationcontrol/test/SDP_XST_statistics_packets_multiple_subbands.bin b/tangostationcontrol/tangostationcontrol/test/statistics/SDP_XST_statistics_packets_multiple_subbands.bin similarity index 100% rename from tangostationcontrol/tangostationcontrol/test/SDP_XST_statistics_packets_multiple_subbands.bin rename to tangostationcontrol/tangostationcontrol/test/statistics/SDP_XST_statistics_packets_multiple_subbands.bin diff --git a/tangostationcontrol/tangostationcontrol/test/statistics/__init__.py b/tangostationcontrol/tangostationcontrol/test/statistics/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py b/tangostationcontrol/tangostationcontrol/test/statistics/test_writer.py similarity index 79% rename from tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py rename to tangostationcontrol/tangostationcontrol/test/statistics/test_writer.py index 37f3c2f1d96f2fa39d60241d406a1324f549b4ad..56d6d85e0d83c6b9ac1e35e981ade193ab90d317 100644 --- a/tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py +++ b/tangostationcontrol/tangostationcontrol/test/statistics/test_writer.py @@ -8,12 +8,13 @@ # See LICENSE.txt for more info. from tangostationcontrol.test import base -from tangostationcontrol.statistics_writer import statistics_writer +from tangostationcontrol.statistics import writer import sys from os.path import dirname, isfile from tempfile import TemporaryDirectory from unittest import mock + class TestStatisticsWriter(base.TestCase): # test_sst --> moved to integration_test @@ -21,9 +22,9 @@ class TestStatisticsWriter(base.TestCase): def test_xst(self): with TemporaryDirectory() as tmpdir: new_sys_argv = [sys.argv[0], "--mode", "XST", "--file", dirname(__file__) + "/SDP_XST_statistics_packets.bin", "--output_dir", tmpdir] - with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + with mock.patch.object(writer.sys, 'argv', new_sys_argv): with self.assertRaises(SystemExit): - statistics_writer.main() + writer.main() # check if file was written self.assertTrue(isfile(f"{tmpdir}/XST_SB102_2021-09-13-13-21-32.h5")) @@ -31,9 +32,9 @@ class TestStatisticsWriter(base.TestCase): def test_xst_multiple_subbands(self): with TemporaryDirectory() as tmpdir: new_sys_argv = [sys.argv[0], "--mode", "XST", "--file", dirname(__file__) + "/SDP_XST_statistics_packets_multiple_subbands.bin", "--output_dir", tmpdir] - with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + with mock.patch.object(writer.sys, 'argv', new_sys_argv): with self.assertRaises(SystemExit): - statistics_writer.main() + writer.main() # check if files were written self.assertTrue(isfile(f"{tmpdir}/XST_SB102_2021-09-13-13-21-32.h5")) @@ -42,9 +43,9 @@ class TestStatisticsWriter(base.TestCase): def test_bst(self): with TemporaryDirectory() as tmpdir: new_sys_argv = [sys.argv[0], "--mode", "BST", "--file", dirname(__file__) + "/SDP_BST_statistics_packets.bin", "--output_dir", tmpdir] - with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + with mock.patch.object(writer.sys, 'argv', new_sys_argv): with self.assertRaises(SystemExit): - statistics_writer.main() + writer.main() # check if file was written self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44.h5")) diff --git a/tangostationcontrol/test-requirements.txt b/tangostationcontrol/test-requirements.txt index b838488f5b1c37e796e6c9a0a7dc0a5c2e0ba341..b8fe7099dac8cd826106e9f74136dd2c78f41bc3 100644 --- a/tangostationcontrol/test-requirements.txt +++ b/tangostationcontrol/test-requirements.txt @@ -19,3 +19,5 @@ testscenarios>=0.5.0 # Apache-2.0/BSD testtools>=2.4.0 # MIT timeout-decorator>=0.5 # MIT xenon>=0.8.0 # MIT +prometheus_client # Apache-2.0 +python-logstash-async # MIT diff --git a/tangostationcontrol/tox.ini b/tangostationcontrol/tox.ini index 74e93462b8e97325c12e81b53f372b34961a05e1..46de7e82971df8e253affc1f8c7b6f47cd409f9f 100644 --- a/tangostationcontrol/tox.ini +++ b/tangostationcontrol/tox.ini @@ -21,6 +21,7 @@ setenv = VIRTUAL_ENV={envdir} PYTHONWARNINGS=default::DeprecationWarning deps = + -r{toxinidir}/requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/test-requirements.txt commands = {envpython} -m stestr run {posargs} @@ -40,6 +41,7 @@ setenv = VIRTUAL_ENV={envdir} PYTHON={envpython} -m coverage run --source tangostationcontrol --parallel-mode deps = + -r{toxinidir}/requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/test-requirements.txt commands =