diff --git a/tangostationcontrol/requirements.txt b/tangostationcontrol/requirements.txt index c91d2f965f4a374e08a109d2e1b7f51d0afb9161..21a4422bbb65100a0e3b9d3c10136b2f3345e9bb 100644 --- a/tangostationcontrol/requirements.txt +++ b/tangostationcontrol/requirements.txt @@ -2,7 +2,7 @@ # order of appearance. Changing the order has an impact on the overall # integration process, which may cause wedges in the gate later. -lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client +lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.3.0 asyncua >= 0.9.90 # LGPLv3 PyMySQL[rsa] >= 1.0.2 # MIT psycopg2-binary >= 2.9.2 # LGPL diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py index d5a16bbe7d74e50f0ae41058dbbfea02fc6f9ab4..8930cdb371cd4ac23b299d479e19dcd28dd08d38 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py @@ -11,8 +11,9 @@ import logging from threading import Thread from queue import Queue +from lofar_station_client.statistics.collector import StatisticsCollector + from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread -from tangostationcontrol.statistics.collector import StatisticsCollector logger = logging.getLogger() diff --git a/tangostationcontrol/tangostationcontrol/statistics/collector.py b/tangostationcontrol/tangostationcontrol/statistics/collector.py index 5b559eabc66bd5da4c00c34948adfd13ba60bd32..7befd038d108ba3c147126082447a8e813838e42 100644 --- a/tangostationcontrol/tangostationcontrol/statistics/collector.py +++ b/tangostationcontrol/tangostationcontrol/statistics/collector.py @@ -7,14 +7,28 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. -from lofar_station_client.statistics import SSTCollector +import abc +import logging + +from lofar_station_client.statistics.collector import SSTCollector from tango import DevFailed from tango import DeviceProxy from tango import DevState +logger = logging.getLogger() + + +class StationStatisticsCollectorInterface(abc.ABC): + + @abc.abstractmethod + def parse_device_attributes(self, device: DeviceProxy): + """Update information based on device attributes""" + raise NotImplementedError + + +class StationSSTCollector(StationStatisticsCollectorInterface, SSTCollector): -class StationSSTCollector(SSTCollector): def parse_packet(self, packet, obj): super(StationSSTCollector, self).parse_packet(packet, obj) diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py index 530a7c7d6ae3e5b6395cae8dca8521da515303e8..1385ccd13acad48c0bbc4903f84706bd4a0e74f5 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py @@ -1,19 +1,24 @@ -# imports for working with datetime objects +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from abc import ABC, abstractmethod from datetime import datetime, timedelta -import pytz +import logging # python hdf5 import h5py - import numpy -import logging -from abc import ABC, abstractmethod +import pytz + +import lofar_station_client.statistics.collector as statistics_collector -# import statistics classes with workaround -import sys -sys.path.append("..") -from tangostationcontrol.statistics.packets import SSTPacket, XSTPacket, BSTPacket -import tangostationcontrol.statistics.collector as statistics_collector +from lofar_station_client.statistics.packet import SSTPacket, XSTPacket, BSTPacket logger = logging.getLogger("statistics_writer") diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py index 484345a7b4f73469c99b0546b3d6bce7e25ce9b8..233b819c5173fcfefc010f9025563ddcaef3e65c 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py @@ -1,9 +1,16 @@ -import socket +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. -import sys -sys.path.append("..") -from tangostationcontrol.statistics.packets import StatisticsPacket import os +import socket + +from lofar_station_client.statistics.packet import StatisticsPacket class receiver: """ Reads data from a file descriptor. """ diff --git a/tangostationcontrol/tangostationcontrol/test/statistics/test_collector.py b/tangostationcontrol/tangostationcontrol/test/statistics/test_collector.py deleted file mode 100644 index aaf39ba86f336477cfac6bb35585d44cdbd53867..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/test/statistics/test_collector.py +++ /dev/null @@ -1,307 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of the LOFAR 2.0 Station Software -# -# -# -# Distributed under the terms of the APACHE license. -# See LICENSE.txt for more info. - -from tangostationcontrol.statistics.collector import XSTCollector, BSTCollector -from tangostationcontrol.statistics.packets import XSTPacket, BSTPacket - -from tangostationcontrol.test import base - -import numpy - - -class TestSelectSubbandSlot(base.TestCase): - def test_first_entry(self): - collector = XSTCollector() - - # on start, any subband should map on the first entry - self.assertEqual(0, collector.select_subband_slot(102)) - - def test_subsequent_entries(self): - collector = XSTCollector() - - # assign some subbands - collector.parameters["xst_subbands"][0] = 102 - collector.parameters["xst_subbands"][2] = 103 - collector.parameters["xst_subbands"][3] = 104 - - # give them non-zero timestamps to make them newer than the other entries - collector.parameters["xst_timestamps"][0] = 1 - collector.parameters["xst_timestamps"][2] = 1 - collector.parameters["xst_timestamps"][3] = 1 - - # these should be reported back when looking them up again - self.assertEqual(0, collector.select_subband_slot(102)) - self.assertEqual(2, collector.select_subband_slot(103)) - self.assertEqual(3, collector.select_subband_slot(104)) - - # a place for another subband should be the lowest - self.assertEqual(1, collector.select_subband_slot(101)) - - def test_spilling(self): - collector = XSTCollector() - - # assign all subbands, in decreasing age - for n in range(XSTCollector.MAX_PARALLEL_SUBBANDS): - collector.parameters["xst_subbands"][n] = 100 + n - collector.parameters["xst_timestamps"][n] = 100 - n - - # check where a new subband replaces the oldest - self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200)) - - -class TestXSTCollector(base.TestCase): - def test_valid_packet(self): - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - fpga_index = fields.gn_index - - # baseline indeed should be (12,0) - self.assertEqual((12,0), fields.first_baseline) - - # subband should indeed be 102 - self.assertEqual(102, fields.subband_index) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) - - self.assertListEqual([102,0,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values()[0] - - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) - - if baseline_a_was_in_packet and baseline_b_was_in_packet: - self.assertEqual(1+1j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - def test_conjugated_packet(self): - """ Test whether a packet with a baseline (a,b) with a<b will get its payload conjugated. """ - - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload, at baseline (0,12) - # VV VV - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x0c\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - - # baseline indeed should be (0,12) - self.assertEqual((0,12), fields.first_baseline) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values()[0] - - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - # use swapped indices! - baseline_a_was_in_packet = (fields.first_baseline[1] <= baseline_a < fields.first_baseline[1] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[0] <= baseline_b < fields.first_baseline[0] + fields.nof_signal_inputs) - - if baseline_a_was_in_packet and baseline_b_was_in_packet: - self.assertEqual(1-1j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up conjugated in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - def test_multiple_subbands(self): - collector = XSTCollector() - - # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) - packet_subband_102 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - packet_subband_103 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00g\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x02' - - # make sure the subband_indices are indeed what we claim they are - fields = XSTPacket(packet_subband_102) - self.assertEqual(102, fields.subband_index) - - fields = XSTPacket(packet_subband_103) - self.assertEqual(103, fields.subband_index) - - # process our packets - collector.process_packet(packet_subband_102) - collector.process_packet(packet_subband_103) - - # counters should now be updated - self.assertListEqual([102,103,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) - - # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values() - - for subband_idx in range(collector.MAX_PARALLEL_SUBBANDS): - for baseline_a in range(collector.MAX_INPUTS): - for baseline_b in range(collector.MAX_INPUTS): - if baseline_b > baseline_a: - # only scan top-left triangle - continue - - baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) - baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) - - baseline_in_pk = baseline_a_was_in_packet and baseline_b_was_in_packet - - if baseline_in_pk and subband_idx == 0: - self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - elif baseline_in_pk and subband_idx == 1: - self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - else: - self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') - - def test_invalid_packet(self): - collector = XSTCollector() - - # an invalid packet - # V - packet = b'S\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # this should throw - with self.assertRaises(ValueError): - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(1, collector.parameters["nof_invalid_packets"]) - - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_valid_payloads"])) - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_payload_errors"])) - - def test_payload_error(self): - collector = XSTCollector() - - # an valid packet with a payload error - # V - packet = b'X\x05\x00\x00\x00\x00\x00\x00\x14\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' - - # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - fpga_index = fields.gn_index - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index]) - - -class TestBSTCollector(base.TestCase): - def test_valid_packet(self): - collector = BSTCollector() - - # a valid packet as obtained from DTS outside - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - # beamlet_index should be zero - self.assertEqual(0, fields.beamlet_index) - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) - - self.assertEqual(1653044924.999997, collector.parameters["bst_timestamps"][fpga_index]) - self.assertAlmostEqual(0.99999744, collector.parameters["integration_intervals"][fpga_index], 8) - - numpy.testing.assert_array_equal(numpy.zeros((2,976), dtype=numpy.uint64), collector.parameters["bst_values"]) - - def test_invalid_packet(self): - collector = BSTCollector() - - # an invalid packet - # V - packet = b'S\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # this should throw - with self.assertRaises(ValueError): - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(1, collector.parameters["nof_invalid_packets"]) - - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_valid_payloads"])) - self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_payload_errors"])) - - def test_payload_error(self): - collector = BSTCollector() - - # an valid packet with a payload error - # V - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x14\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - # this should not throw - collector.process_packet(packet) - - # counters should now be updated - self.assertEqual(1, collector.parameters["nof_packets"]) - self.assertEqual(0, collector.parameters["nof_invalid_packets"]) - - self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index]) - self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index]) - - def test_index_error(self): - collector = BSTCollector() - - # An otherwise valid packet that has its beamlet_index set to 4096. This should throw an error as the max value is 962 * 3 - # V - packet = b'B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x10\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0' - - # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index - - with self.assertRaises(ValueError): - collector.process_packet(packet) diff --git a/tangostationcontrol/tox.ini b/tangostationcontrol/tox.ini index 74e93462b8e97325c12e81b53f372b34961a05e1..46de7e82971df8e253affc1f8c7b6f47cd409f9f 100644 --- a/tangostationcontrol/tox.ini +++ b/tangostationcontrol/tox.ini @@ -21,6 +21,7 @@ setenv = VIRTUAL_ENV={envdir} PYTHONWARNINGS=default::DeprecationWarning deps = + -r{toxinidir}/requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/test-requirements.txt commands = {envpython} -m stestr run {posargs} @@ -40,6 +41,7 @@ setenv = VIRTUAL_ENV={envdir} PYTHON={envpython} -m coverage run --source tangostationcontrol --parallel-mode deps = + -r{toxinidir}/requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/test-requirements.txt commands =