From ca09ecfe1e1698487b18133693c6ba9facf47ecd Mon Sep 17 00:00:00 2001 From: lukken <lukken@astron.nl> Date: Tue, 9 Aug 2022 11:52:37 +0000 Subject: [PATCH] L2SS-868: Use migrated components now part of station client --- tangostationcontrol/requirements.txt | 2 +- .../clients/statistics/consumer.py | 3 +- .../statistics/collector.py | 18 +- .../statistics_writer/hdf5_writer.py | 25 +- .../statistics_writer/receiver.py | 15 +- .../test/statistics/test_collector.py | 307 ------------------ tangostationcontrol/tox.ini | 2 + 7 files changed, 47 insertions(+), 325 deletions(-) delete mode 100644 tangostationcontrol/tangostationcontrol/test/statistics/test_collector.py diff --git a/tangostationcontrol/requirements.txt b/tangostationcontrol/requirements.txt index c91d2f965..21a4422bb 100644 --- a/tangostationcontrol/requirements.txt +++ b/tangostationcontrol/requirements.txt @@ -2,7 +2,7 @@ # order of appearance. Changing the order has an impact on the overall # integration process, which may cause wedges in the gate later. -lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client +lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.3.0 asyncua >= 0.9.90 # LGPLv3 PyMySQL[rsa] >= 1.0.2 # MIT psycopg2-binary >= 2.9.2 # LGPL diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py index d5a16bbe7..8930cdb37 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py @@ -11,8 +11,9 @@ import logging from threading import Thread from queue import Queue +from lofar_station_client.statistics.collector import StatisticsCollector + from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread -from tangostationcontrol.statistics.collector import StatisticsCollector logger = logging.getLogger() diff --git a/tangostationcontrol/tangostationcontrol/statistics/collector.py b/tangostationcontrol/tangostationcontrol/statistics/collector.py index 5b559eabc..7befd038d 100644 --- a/tangostationcontrol/tangostationcontrol/statistics/collector.py +++ b/tangostationcontrol/tangostationcontrol/statistics/collector.py @@ -7,14 +7,28 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. -from lofar_station_client.statistics import SSTCollector +import abc +import logging + +from lofar_station_client.statistics.collector import SSTCollector from tango import DevFailed from tango import DeviceProxy from tango import DevState +logger = logging.getLogger() + + +class StationStatisticsCollectorInterface(abc.ABC): + + @abc.abstractmethod + def parse_device_attributes(self, device: DeviceProxy): + """Update information based on device attributes""" + raise NotImplementedError + + +class StationSSTCollector(StationStatisticsCollectorInterface, SSTCollector): -class StationSSTCollector(SSTCollector): def parse_packet(self, packet, obj): super(StationSSTCollector, self).parse_packet(packet, obj) diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py index 530a7c7d6..1385ccd13 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py @@ -1,19 +1,24 @@ -# imports for working with datetime objects +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from abc import ABC, abstractmethod from datetime import datetime, timedelta -import pytz +import logging # python hdf5 import h5py - import numpy -import logging -from abc import ABC, abstractmethod +import pytz + +import lofar_station_client.statistics.collector as statistics_collector -# import statistics classes with workaround -import sys -sys.path.append("..") -from tangostationcontrol.statistics.packets import SSTPacket, XSTPacket, BSTPacket -import tangostationcontrol.statistics.collector as statistics_collector +from lofar_station_client.statistics.packet import SSTPacket, XSTPacket, BSTPacket logger = logging.getLogger("statistics_writer") diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py index 484345a7b..233b819c5 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py @@ -1,9 +1,16 @@ -import socket +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. -import sys -sys.path.append("..") -from tangostationcontrol.statistics.packets import StatisticsPacket import os +import socket + +from lofar_station_client.statistics.packet import StatisticsPacket class receiver: """ Reads data from a file descriptor. """ diff --git a/tangostationcontrol/tangostationcontrol/test/statistics/test_collector.py b/tangostationcontrol/tangostationcontrol/test/statistics/test_collector.py deleted file mode 100644 index aaf39ba86..000000000 --- a/tangostationcontrol/tangostationcontrol/test/statistics/test_collector.py +++ /dev/null @@ -1,307 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of the LOFAR 2.0 Station Software -# -# -# -# Distributed under the terms of the APACHE license. -# See LICENSE.txt for more info. - -from tangostationcontrol.statistics.collector import XSTCollector, BSTCollector -from tangostationcontrol.statistics.packets import XSTPacket, BSTPacket - -from tangostationcontrol.test import base - -import numpy - - -class TestSelectSubbandSlot(base.TestCase): - def test_first_entry(self): - collector = XSTCollector() - - # on start, any subband should map on the first entry - self.assertEqual(0, collector.select_subband_slot(102)) - - def test_subsequent_entries(self): - collector = XSTCollector() - - # assign some subbands - collector.parameters["xst_subbands"][0] = 102 - collector.parameters["xst_subbands"][2] = 103 - collector.parameters["xst_subbands"][3] = 104 - - # give them non-zero timestamps to make them newer than the other entries - collector.parameters["xst_timestamps"][0] = 1 - collector.parameters["xst_timestamps"][2] = 1 - collector.parameters["xst_timestamps"][3] = 1 - - # these should be reported back when looking them up again - self.assertEqual(0, collector.select_subband_slot(102)) - self.assertEqual(2, collector.select_subband_slot(103)) - self.assertEqual(3, collector.select_subband_slot(104)) - - # a place for another subband should be the lowest - self.assertEqual(1, collector.select_subband_slot(101)) - - def test_spilling(self): - collector = XSTCollector() - - # assign all subbands, in decreasing age - for n in range(XSTCollector.MAX_PARALLEL_SUBBANDS): - collector.parameters["xst_subbands"][n] = 100 + n - collector.parameters["xst_timestamps"][n] = 100 - n - - # check where a new subband replaces the oldest - self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200)) - - -class TestXSTCollector(base.TestCase): - def test_valid_packet(self): - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - fpga_index = fields.gn_index - - # baseline indeed should be (12,0) - self.assertEqual((12,0), fields.first_baseline) - - # subband should indeed be 102 - self.assertEqual(102, fields.subband_index) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) - - self.assertListEqual([102,0,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values()[0] - - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) - - if baseline_a_was_in_packet and baseline_b_was_in_packet: - self.assertEqual(1+1j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - def test_conjugated_packet(self): - """ Test whether a packet with a baseline (a,b) with a<b will get its payload conjugated. """ - - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload, at baseline (0,12) - # VV VV - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x0c\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - - # baseline indeed should be (0,12) - self.assertEqual((0,12), fields.first_baseline) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values()[0] - - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - # use swapped indices! - baseline_a_was_in_packet = (fields.first_baseline[1] <= baseline_a < fields.first_baseline[1] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[0] <= baseline_b < fields.first_baseline[0] + fields.nof_signal_inputs) - - if baseline_a_was_in_packet and baseline_b_was_in_packet: - self.assertEqual(1-1j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up conjugated in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - def test_multiple_subbands(self): - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) - packet_subband_102 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - packet_subband_103 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00g\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x02' - - # make sure the subband_indices are indeed what we claim they are - fields = XSTPacket(packet_subband_102) - self.assertEqual(102, fields.subband_index) - - fields = XSTPacket(packet_subband_103) - self.assertEqual(103, fields.subband_index) - - # process our packets - collector.process_packet(packet_subband_102) - collector.process_packet(packet_subband_103) - - # counters should now be updated - self.assertListEqual([102,103,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values() - - for subband_idx in range(collector.MAX_PARALLEL_SUBBANDS): - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) - - baseline_in_pk = baseline_a_was_in_packet and baseline_b_was_in_packet - - if baseline_in_pk and subband_idx == 0: - self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - elif baseline_in_pk and subband_idx == 1: - self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - def test_invalid_packet(self): - collector = XSTCollector() - - # an invalid packet - # V - packet = b'S\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # this should throw - with self.assertRaises(ValueError): - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(1, collector.parameters["nof_invalid_packets"]) - - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_valid_payloads"])) - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_payload_errors"])) - - def test_payload_error(self): - collector = XSTCollector() - - # an valid packet with a payload error - # V - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x14\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - fpga_index = fields.gn_index - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index]) - - -class TestBSTCollector(base.TestCase): - def test_valid_packet(self): - collector = BSTCollector() - - # a valid packet as obtained from DTS outside - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - # beamlet_index should be zero - self.assertEqual(0, fields.beamlet_index) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) - - self.assertEqual(1653044924.999997, collector.parameters["bst_timestamps"][fpga_index]) - self.assertAlmostEqual(0.99999744, collector.parameters["integration_intervals"][fpga_index], 8) - - numpy.testing.assert_array_equal(numpy.zeros((2,976), dtype=numpy.uint64), collector.parameters["bst_values"]) - - def test_invalid_packet(self): - collector = BSTCollector() - - # an invalid packet - # V - packet = b'S\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # this should throw - with self.assertRaises(ValueError): - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(1, collector.parameters["nof_invalid_packets"]) - - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_valid_payloads"])) - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_payload_errors"])) - - def test_payload_error(self): - collector = BSTCollector() - - # an valid packet with a payload error - # V - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x14\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index]) - - def test_index_error(self): - collector = BSTCollector() - - # An otherwise valid packet that has its beamlet_index set to 4096. This should throw an error as the max value is 962 * 3 - # V - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x10\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - with self.assertRaises(ValueError): - collector.process_packet(packet) diff --git a/tangostationcontrol/tox.ini b/tangostationcontrol/tox.ini index 74e93462b..46de7e829 100644 --- a/tangostationcontrol/tox.ini +++ b/tangostationcontrol/tox.ini @@ -21,6 +21,7 @@ setenv = VIRTUAL_ENV={envdir} PYTHONWARNINGS=default::DeprecationWarning deps = + -r{toxinidir}/requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/test-requirements.txt commands = {envpython} -m stestr run {posargs} @@ -40,6 +41,7 @@ setenv = VIRTUAL_ENV={envdir} PYTHON={envpython} -m coverage run --source tangostationcontrol --parallel-mode deps = + -r{toxinidir}/requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/test-requirements.txt commands = -- GitLab