Skip to content
Snippets Groups Projects
Commit ca09ecfe authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-868: Use migrated components now part of station client

parent 17fc70e1
No related branches found
No related tags found
1 merge request!394Resolve L2SS-868
......@@ -2,7 +2,7 @@
# order of appearance. Changing the order has an impact on the overall
# integration process, which may cause wedges in the gate later.
lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client
lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.3.0
asyncua >= 0.9.90 # LGPLv3
PyMySQL[rsa] >= 1.0.2 # MIT
psycopg2-binary >= 2.9.2 # LGPL
......
......@@ -11,8 +11,9 @@ import logging
from threading import Thread
from queue import Queue
from lofar_station_client.statistics.collector import StatisticsCollector
from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
from tangostationcontrol.statistics.collector import StatisticsCollector
logger = logging.getLogger()
......
......@@ -7,14 +7,28 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from lofar_station_client.statistics import SSTCollector
import abc
import logging
from lofar_station_client.statistics.collector import SSTCollector
from tango import DevFailed
from tango import DeviceProxy
from tango import DevState
logger = logging.getLogger()
class StationStatisticsCollectorInterface(abc.ABC):
@abc.abstractmethod
def parse_device_attributes(self, device: DeviceProxy):
"""Update information based on device attributes"""
raise NotImplementedError
class StationSSTCollector(StationStatisticsCollectorInterface, SSTCollector):
class StationSSTCollector(SSTCollector):
def parse_packet(self, packet, obj):
super(StationSSTCollector, self).parse_packet(packet, obj)
......
# imports for working with datetime objects
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import pytz
import logging
# python hdf5
import h5py
import numpy
import logging
from abc import ABC, abstractmethod
import pytz
import lofar_station_client.statistics.collector as statistics_collector
# import statistics classes with workaround
import sys
sys.path.append("..")
from tangostationcontrol.statistics.packets import SSTPacket, XSTPacket, BSTPacket
import tangostationcontrol.statistics.collector as statistics_collector
from lofar_station_client.statistics.packet import SSTPacket, XSTPacket, BSTPacket
logger = logging.getLogger("statistics_writer")
......
import socket
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import sys
sys.path.append("..")
from tangostationcontrol.statistics.packets import StatisticsPacket
import os
import socket
from lofar_station_client.statistics.packet import StatisticsPacket
class receiver:
""" Reads data from a file descriptor. """
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tangostationcontrol.statistics.collector import XSTCollector, BSTCollector
from tangostationcontrol.statistics.packets import XSTPacket, BSTPacket
from tangostationcontrol.test import base
import numpy
class TestSelectSubbandSlot(base.TestCase):
def test_first_entry(self):
collector = XSTCollector()
# on start, any subband should map on the first entry
self.assertEqual(0, collector.select_subband_slot(102))
def test_subsequent_entries(self):
collector = XSTCollector()
# assign some subbands
collector.parameters["xst_subbands"][0] = 102
collector.parameters["xst_subbands"][2] = 103
collector.parameters["xst_subbands"][3] = 104
# give them non-zero timestamps to make them newer than the other entries
collector.parameters["xst_timestamps"][0] = 1
collector.parameters["xst_timestamps"][2] = 1
collector.parameters["xst_timestamps"][3] = 1
# these should be reported back when looking them up again
self.assertEqual(0, collector.select_subband_slot(102))
self.assertEqual(2, collector.select_subband_slot(103))
self.assertEqual(3, collector.select_subband_slot(104))
# a place for another subband should be the lowest
self.assertEqual(1, collector.select_subband_slot(101))
def test_spilling(self):
collector = XSTCollector()
# assign all subbands, in decreasing age
for n in range(XSTCollector.MAX_PARALLEL_SUBBANDS):
collector.parameters["xst_subbands"][n] = 100 + n
collector.parameters["xst_timestamps"][n] = 100 - n
# check where a new subband replaces the oldest
self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200))
class TestXSTCollector(base.TestCase):
def test_valid_packet(self):
collector = XSTCollector()
# a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0)
packet = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01'
# parse it ourselves to extract info nicely
fields = XSTPacket(packet)
fpga_index = fields.gn_index
# baseline indeed should be (12,0)
self.assertEqual((12,0), fields.first_baseline)
# subband should indeed be 102
self.assertEqual(102, fields.subband_index)
# this should not throw
collector.process_packet(packet)
# counters should now be updated
self.assertEqual(1, collector.parameters["nof_packets"])
self.assertEqual(0, collector.parameters["nof_invalid_packets"])
self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index])
self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index])
self.assertListEqual([102,0,0,0,0,0,0,0], list(collector.parameters["xst_subbands"]))
# check whether the data ended up in the right block, and the rest is still zero
xst_values = collector.xst_values()[0]
for baseline_a in range(collector.MAX_INPUTS):
for baseline_b in range(collector.MAX_INPUTS):
if baseline_b > baseline_a:
# only scan top-left triangle
continue
baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs)
baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs)
if baseline_a_was_in_packet and baseline_b_was_in_packet:
self.assertEqual(1+1j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.')
else:
self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.')
def test_conjugated_packet(self):
""" Test whether a packet with a baseline (a,b) with a<b will get its payload conjugated. """
collector = XSTCollector()
# a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload, at baseline (0,12)
# VV VV
packet = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x0c\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01'
# parse it ourselves to extract info nicely
fields = XSTPacket(packet)
# baseline indeed should be (0,12)
self.assertEqual((0,12), fields.first_baseline)
# this should not throw
collector.process_packet(packet)
# counters should now be updated
self.assertEqual(1, collector.parameters["nof_packets"])
self.assertEqual(0, collector.parameters["nof_invalid_packets"])
# check whether the data ended up in the right block, and the rest is still zero
xst_values = collector.xst_values()[0]
for baseline_a in range(collector.MAX_INPUTS):
for baseline_b in range(collector.MAX_INPUTS):
if baseline_b > baseline_a:
# only scan top-left triangle
continue
# use swapped indices!
baseline_a_was_in_packet = (fields.first_baseline[1] <= baseline_a < fields.first_baseline[1] + fields.nof_signal_inputs)
baseline_b_was_in_packet = (fields.first_baseline[0] <= baseline_b < fields.first_baseline[0] + fields.nof_signal_inputs)
if baseline_a_was_in_packet and baseline_b_was_in_packet:
self.assertEqual(1-1j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up conjugated in XST matrix.')
else:
self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.')
def test_multiple_subbands(self):
collector = XSTCollector()
# a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0)
packet_subband_102 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01'
packet_subband_103 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00g\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x02'
# make sure the subband_indices are indeed what we claim they are
fields = XSTPacket(packet_subband_102)
self.assertEqual(102, fields.subband_index)
fields = XSTPacket(packet_subband_103)
self.assertEqual(103, fields.subband_index)
# process our packets
collector.process_packet(packet_subband_102)
collector.process_packet(packet_subband_103)
# counters should now be updated
self.assertListEqual([102,103,0,0,0,0,0,0], list(collector.parameters["xst_subbands"]))
# check whether the data ended up in the right block, and the rest is still zero
xst_values = collector.xst_values()
for subband_idx in range(collector.MAX_PARALLEL_SUBBANDS):
for baseline_a in range(collector.MAX_INPUTS):
for baseline_b in range(collector.MAX_INPUTS):
if baseline_b > baseline_a:
# only scan top-left triangle
continue
baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs)
baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs)
baseline_in_pk = baseline_a_was_in_packet and baseline_b_was_in_packet
if baseline_in_pk and subband_idx == 0:
self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.')
elif baseline_in_pk and subband_idx == 1:
self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.')
else:
self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.')
def test_invalid_packet(self):
collector = XSTCollector()
# an invalid packet
# V
packet = b'S\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01'
# this should throw
with self.assertRaises(ValueError):
collector.process_packet(packet)
# counters should now be updated
self.assertEqual(1, collector.parameters["nof_packets"])
self.assertEqual(1, collector.parameters["nof_invalid_packets"])
self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_valid_payloads"]))
self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_payload_errors"]))
def test_payload_error(self):
collector = XSTCollector()
# an valid packet with a payload error
# V
packet = b'X\x05\x00\x00\x00\x00\x00\x00\x14\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01'
# parse it ourselves to extract info nicely
fields = XSTPacket(packet)
fpga_index = fields.gn_index
# this should not throw
collector.process_packet(packet)
# counters should now be updated
self.assertEqual(1, collector.parameters["nof_packets"])
self.assertEqual(0, collector.parameters["nof_invalid_packets"])
self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index])
self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index])
class TestBSTCollector(base.TestCase):
def test_valid_packet(self):
collector = BSTCollector()
# a valid packet as obtained from DTS outside
packet = b'B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0'
# parse it ourselves to extract info nicely
fields = BSTPacket(packet)
fpga_index = fields.gn_index
# beamlet_index should be zero
self.assertEqual(0, fields.beamlet_index)
# this should not throw
collector.process_packet(packet)
# counters should now be updated
self.assertEqual(1, collector.parameters["nof_packets"])
self.assertEqual(0, collector.parameters["nof_invalid_packets"])
self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index])
self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index])
self.assertEqual(1653044924.999997, collector.parameters["bst_timestamps"][fpga_index])
self.assertAlmostEqual(0.99999744, collector.parameters["integration_intervals"][fpga_index], 8)
numpy.testing.assert_array_equal(numpy.zeros((2,976), dtype=numpy.uint64), collector.parameters["bst_values"])
def test_invalid_packet(self):
collector = BSTCollector()
# an invalid packet
# V
packet = b'S\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0'
# this should throw
with self.assertRaises(ValueError):
collector.process_packet(packet)
# counters should now be updated
self.assertEqual(1, collector.parameters["nof_packets"])
self.assertEqual(1, collector.parameters["nof_invalid_packets"])
self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_valid_payloads"]))
self.assertListEqual([0] * collector.MAX_FPGAS, list(collector.parameters["nof_payload_errors"]))
def test_payload_error(self):
collector = BSTCollector()
# an valid packet with a payload error
# V
packet = b'B\x05\x00\x00\x00\x00\x03\x87\x14\x00\x00\x02\xfa\xf0\x00\x00\x00\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0'
# parse it ourselves to extract info nicely
fields = BSTPacket(packet)
fpga_index = fields.gn_index
# this should not throw
collector.process_packet(packet)
# counters should now be updated
self.assertEqual(1, collector.parameters["nof_packets"])
self.assertEqual(0, collector.parameters["nof_invalid_packets"])
self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index])
self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index])
def test_index_error(self):
collector = BSTCollector()
# An otherwise valid packet that has its beamlet_index set to 4096. This should throw an error as the max value is 962 * 3
# V
packet = b'B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x10\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e' + 1952 * b'\0\0\0\0'
# parse it ourselves to extract info nicely
fields = BSTPacket(packet)
fpga_index = fields.gn_index
with self.assertRaises(ValueError):
collector.process_packet(packet)
......@@ -21,6 +21,7 @@ setenv =
VIRTUAL_ENV={envdir}
PYTHONWARNINGS=default::DeprecationWarning
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt
-r{toxinidir}/test-requirements.txt
commands = {envpython} -m stestr run {posargs}
......@@ -40,6 +41,7 @@ setenv =
VIRTUAL_ENV={envdir}
PYTHON={envpython} -m coverage run --source tangostationcontrol --parallel-mode
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt
-r{toxinidir}/test-requirements.txt
commands =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment