Skip to content
Snippets Groups Projects
Commit 07fa16f5 authored by Corné Lukken's avatar Corné Lukken
Browse files

Merge branch 'L2SS-868' into 'master'

Resolve L2SS-868

Closes L2SS-868

See merge request !394
parents 969fe156 95deb35e
Branches
Tags
1 merge request!394Resolve L2SS-868
Showing
with 261 additions and 979 deletions
......@@ -2,6 +2,7 @@
# order of appearance. Changing the order has an impact on the overall
# integration process, which may cause wedges in the gate later.
lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.3.0
asyncua >= 0.9.90 # LGPLv3
PyMySQL[rsa] >= 1.0.2 # MIT
psycopg2-binary >= 2.9.2 # LGPL
......
......@@ -51,7 +51,7 @@ console_scripts =
l2ss-bst = tangostationcontrol.devices.sdp.bst:main
l2ss-sst = tangostationcontrol.devices.sdp.sst:main
l2ss-statistics-reader = tangostationcontrol.statistics_writer.statistics_reader:main
l2ss-statistics-writer = tangostationcontrol.statistics_writer.statistics_writer:main
l2ss-statistics-writer = tangostationcontrol.statistics.writer:main
l2ss-unb2 = tangostationcontrol.devices.unb2:main
l2ss-xst = tangostationcontrol.devices.sdp.xst:main
l2ss-temperature-manager = tangostationcontrol.devices.temperature_manager:main
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from queue import Queue
import logging
import numpy
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from abc import ABC
from abc import abstractmethod
import logging
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import logging
from threading import Thread
from queue import Queue
from lofar_station_client.statistics.collector import StatisticsCollector
from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
from tangostationcontrol.devices.sdp.statistics_collector import StatisticsCollector
logger = logging.getLogger()
......
......@@ -11,17 +11,19 @@
"""
import numpy
from tango.server import device_property, attribute
from tango import AttrWriteType
from lofar_station_client.statistics.collector import BSTCollector
# Own imports
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.clients.opcua_client import OPCUAConnection
from tangostationcontrol.clients.statistics.client import StatisticsClient
from tangostationcontrol.devices.sdp.statistics import Statistics
from tangostationcontrol.devices.sdp.statistics_collector import BSTCollector
import numpy
__all__ = ["BST", "main"]
......
This diff is collapsed.
......@@ -14,14 +14,14 @@
# PyTango imports
from tango.server import device_property, attribute
from tango import AttrWriteType
# Additional import
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.clients.opcua_client import OPCUAConnection
from tangostationcontrol.clients.statistics.client import StatisticsClient
from tangostationcontrol.devices.sdp.statistics import Statistics
from tangostationcontrol.devices.sdp.statistics_collector import SSTCollector
from tangostationcontrol.statistics.collector import StationSSTCollector
import numpy
......@@ -30,7 +30,7 @@ __all__ = ["SST", "main"]
class SST(Statistics):
STATISTICS_COLLECTOR_CLASS = SSTCollector
STATISTICS_COLLECTOR_CLASS = StationSSTCollector
# -----------------
# Device Properties
......@@ -95,18 +95,18 @@ class SST(Statistics):
FPGA_sst_offload_nof_valid_R = attribute_wrapper(comms_annotation=["FPGA_sst_offload_nof_valid_R"], datatype=numpy.int32, dims=(16,))
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(StationSSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(StationSSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# latest SSTs
sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(SSTCollector.MAX_INPUTS, SSTCollector.MAX_SUBBANDS), datatype=numpy.uint64)
sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(StationSSTCollector.MAX_INPUTS, StationSSTCollector.MAX_SUBBANDS), datatype=numpy.uint64)
# reported timestamp
# for each row in the latest SSTs
sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
# integration interval for each row in the latest SSTs
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.float32)
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=numpy.float32)
# whether the subband data was calibrated by the SDP (that is, were subband weights applied)
subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(SSTCollector.MAX_INPUTS,), datatype=bool)
subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=bool)
# ----------
# Summarising Attributes
......
import logging
import numpy
import datetime
from .packet import SSTPacket, XSTPacket, BSTPacket
from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index
from tango import DeviceProxy, DevFailed, DevState
logger = logging.getLogger()
class StatisticsCollector:
""" Base class to process statistics packets into parameters matrices. """
# Maximum number of FPGAs we receive data from (used for diagnostics)
MAX_FPGAS = 16
def __init__(self):
self.parameters = self._default_parameters()
def _default_parameters(self):
return {
"nof_packets": numpy.uint64(0),
# Packet count for packets that could not be parsed
"nof_invalid_packets": numpy.uint64(0),
# Full contents of the latest packet we deemed invalid.
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
}
def process_packet(self, packet, device=None):
self.parameters["nof_packets"] += numpy.uint64(1)
try:
self.parse_packet(packet, device)
except Exception as e:
self.parameters["last_invalid_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
raise ValueError("Could not parse statistics packet") from e
def parse_packet(self, packet, device):
""" Update any information based on this packet. """
raise NotImplementedError
def parse_device_attributes(self):
""" Update information based on device attributes """
raise NotImplementedError
class SSTCollector(StatisticsCollector):
""" Class to process SST statistics packets. """
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64),
"sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32),
"subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=bool),
# RECV attribute values to monitor the station configuration
"rcu_attenuator_dB": numpy.zeros((32,3), dtype=numpy.int64),
"rcu_band_select": numpy.zeros((32,3), dtype=numpy.int64),
"rcu_dth_on": numpy.full((32,3), False),
})
return defaults
def parse_packet(self, packet, device):
fields = SSTPacket(packet)
# determine which input this packet contains data for
if fields.signal_input_index >= self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
raise ValueError("Packet describes input %d, but we are limited to describing MAX_INPUTS=%d" % (fields.signal_input_index, self.MAX_INPUTS))
input_index = fields.signal_input_index
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval()
self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flag
# add tango values to packet
self.parse_device_attributes(device)
def parse_device_attributes(self, device: DeviceProxy):
#If Tango connection has been disabled, set explicitly to None,
# because otherwise default_values are inserted
if device is None or device.state() != DevState.ON:
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
else:
try:
self.parameters["rcu_attenuator_dB"] = device.RCU_Attenuator_dB_R
self.parameters["rcu_band_select"] = device.RCU_Band_Select_R
self.parameters["rcu_dth_on"] = device.RCU_DTH_on_R
except DevFailed as e:
logger.warning(f"Device {device.name()} not responding.")
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
class XSTCollector(StatisticsCollector):
""" Class to process XST statistics packets.
XSTs are received for up to MAX_PARALLEL_SUBBANDS simultaneously, and only the values of the last
MAX_PARALLEL_SUBBANDS are kept. Raw data are collected for each subband in parameters["xst_blocks"],
and overwritten if newer (younger) data is received for the same subband. As such, the data represent
a rolling view on the XSTs.
The xst_values() function is a user-friendly way to read the xst_blocks.
The hardware can be configured to emit different and/or fewer subbands, causing some of the XSTs
to become stale. It is therefor advised to inspect parameters["xst_timestamps"] as well.
"""
# Maximum number of subbands for which we collect XSTs simultaneously
MAX_PARALLEL_SUBBANDS = 8
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of baselines we can receive
MAX_BASELINES = nr_baselines(MAX_INPUTS)
# Expected block size is BLOCK_LENGTH x BLOCK_LENGTH
BLOCK_LENGTH = 12
# Expected number of blocks: enough to cover all baselines without the conjugates (that is, the top-left triangle of the matrix).
MAX_BLOCKS = nr_baselines(MAX_INPUTS // BLOCK_LENGTH)
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
# Complex values are (real, imag). A bit silly, but we don't want magical constants.
VALUES_PER_COMPLEX = 2
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"xst_blocks": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64),
# Whether the values are actually conjugated and transposed
"xst_conjugated": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS,), dtype=bool),
# When the youngest data for each subband was received
"xst_timestamps": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float64),
"xst_subbands": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.uint16),
"xst_integration_intervals": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float32),
})
return defaults
def select_subband_slot(self, subband):
""" Return which subband slot (0..MAX_PARALLEL_SUBBANDS) to use when confronted with a new subband.
Keep recording the same subband if we're already tracking it, but allocate or replace a slot if not. """
indices = numpy.where(self.parameters["xst_subbands"] == subband)[0]
if len(indices) > 0:
# subband already being recorded, use same spot
return indices[0]
else:
# a new subband, kick out the oldest
oldest_timestamp = self.parameters["xst_timestamps"].min()
# prefer the first one in case of multiple minima
return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0]
def parse_packet(self, packet, device=None):
fields = XSTPacket(packet)
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1)
# don't raise, as packet is valid
return
# the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH
if fields.nof_signal_inputs != self.BLOCK_LENGTH:
raise ValueError("Packet describes a block of {0} x {0} baselines, but we can only parse blocks of {1} x {1} baselines".format(fields.nof_signal_inputs, self.BLOCK_LENGTH))
# check whether set of baselines in this packet are not out of bounds
for antenna in (0,1):
if fields.first_baseline[antenna] + fields.nof_signal_inputs > self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
raise ValueError(f"Packet describes {fields.nof_signal_inputs} x {fields.nof_signal_inputs} baselines starting at {fields.first_baseline}, but we are limited to describing MAX_INPUTS={self.MAX_INPUTS}")
# the blocks of baselines need to be tightly packed, and thus be provided at exact intervals
if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0:
raise ValueError(f"Packet describes baselines starting at {fields.first_baseline}, but we require a multiple of BLOCK_LENGTH={self.MAX_INPUTS}")
# Make sure we always have a baseline (a,b) with a>=b. If not, we swap the indices and mark that the data must be conjugated and transposed when processed.
first_baseline = fields.first_baseline
if first_baseline[0] < first_baseline[1]:
conjugated = True
first_baseline = (first_baseline[1], first_baseline[0])
else:
conjugated = False
# we keep track of multiple subbands. select slot for this one
subband_slot = self.select_subband_slot(fields.subband_index)
assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, f"Selected slot {subband_slot}, but only have room for {self.MAX_PARALLEL_SUBBANDS}. Existing slots are {self.parameters['xst_subbands']}, processing subband {fields.subband_index}."
# log if we're replacing a subband we were once recording
self._log_replacing_subband(subband_slot, fields)
# the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH
# starting at baseline first_baseline.
#
# we honour this format, as we want to keep the metadata together with these blocks. we do need to put the blocks in a linear
# and tight order, however, so we calculate a block index.
block_index = baseline_index(first_baseline[0] // self.BLOCK_LENGTH, first_baseline[1] // self.BLOCK_LENGTH)
# We did enough checks on first_baseline for this to be a logic error in our code
assert 0 <= block_index < self.MAX_BLOCKS, f"Received block {block_index}, but have only room for {self.MAX_BLOCKS}. Block starts at baseline {first_baseline}."
# process the packet
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
self.parameters["xst_blocks"][subband_slot, block_index, :fields.nof_statistics_per_packet] = fields.payload
self.parameters["xst_timestamps"][subband_slot] = numpy.float64(fields.timestamp().timestamp())
self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated
self.parameters["xst_subbands"][subband_slot] = numpy.uint16(fields.subband_index)
self.parameters["xst_integration_intervals"][subband_slot] = fields.integration_interval()
def _log_replacing_subband(self, subband_slot, fields):
# log if we're replacing a subband we were once recording
previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot]
if previous_subband_in_slot != fields.subband_index:
if self.parameters["xst_timestamps"][subband_slot] > 0:
previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot])
logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.")
def xst_values(self, subband_indices = None):
""" xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] of complex values.
The subband indices must be in [0..MAX_PARALLEL_SUBBANDS). By default, all recorded XSTs are returned.
"""
if subband_indices is None:
subband_indices = range(self.MAX_PARALLEL_SUBBANDS)
matrix = numpy.zeros((len(subband_indices), self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64)
xst_blocks = self.parameters["xst_blocks"]
xst_conjugated = self.parameters["xst_conjugated"]
for matrix_idx, subband_index in enumerate(subband_indices):
for block_index in range(self.MAX_BLOCKS):
# convert real/imag int to complex float values. this works as real/imag come in pairs
block = xst_blocks[subband_index][block_index].astype(numpy.float32).view(numpy.complex64)
if xst_conjugated[subband_index][block_index]:
# block is conjugated and transposed. process.
block = block.conjugate().transpose()
# reshape into [a][b]
block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH)
# compute destination in matrix
first_baseline = baseline_from_index(block_index)
first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH)
# copy block into matrix
matrix[matrix_idx][first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block
return matrix
class BSTCollector(StatisticsCollector):
""" Class to process SST statistics packets. """
# beamlets = 488 * 2 for the x and y polorisations
MAX_BEAMLETS = 976
MAX_BLOCKS = 2
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"bst_values": numpy.zeros((self.MAX_BLOCKS, self.MAX_BEAMLETS), dtype=numpy.uint64),
"bst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32),
})
return defaults
def parse_packet(self, packet, device=None):
fields = BSTPacket(packet)
# To get the block_index we floor divide this beamlet_index by the max amount of beamlets per block
block_index = fields.beamlet_index // self.MAX_BEAMLETS
# determine which input this packet contains data for
if block_index >= self.MAX_BLOCKS:
# packet describes an input that is out of bounds for us
raise ValueError("Packet describes beamlet %d, but we are limited to describing MAX_BEAMLETS=%d" % (fields.beamlet_index, self.MAX_BEAMLETS))
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
self.parameters["bst_values"][block_index][:self.MAX_BEAMLETS] = fields.payload
self.parameters["bst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][block_index] = fields.integration_interval()
......@@ -15,6 +15,8 @@
from tango.server import device_property, attribute
from tango import AttrWriteType
from lofar_station_client.statistics.collector import XSTCollector
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
......@@ -22,7 +24,6 @@ from tangostationcontrol.clients.opcua_client import OPCUAConnection
from tangostationcontrol.clients.statistics.client import StatisticsClient
from tangostationcontrol.devices.sdp.statistics import Statistics
from tangostationcontrol.devices.sdp.statistics_collector import XSTCollector
import numpy
......
......@@ -9,8 +9,10 @@
from tangostationcontrol.integration_test.base import BaseIntegrationTestCase
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tangostationcontrol.devices.sdp.statistics_collector import SSTCollector
from tangostationcontrol.statistics_writer import statistics_writer, statistics_reader
from tangostationcontrol.statistics.collector import StationSSTCollector
from tangostationcontrol.statistics_writer import statistics_reader
from tangostationcontrol.statistics import writer
import sys
from os.path import dirname, isfile, join
from tempfile import TemporaryDirectory
......@@ -18,6 +20,7 @@ from unittest import mock
from tango import DevState
class TestStatisticsWriterSST(BaseIntegrationTestCase):
def setUp(self):
......@@ -42,7 +45,7 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
def test_insert_tango_SST_statistics(self):
self.setup_recv_proxy()
self.assertEqual(DevState.ON, self.recv_proxy.state())
collector = SSTCollector()
collector = StationSSTCollector()
# Test attribute values retrieval
collector.parse_device_attributes(self.recv_proxy)
......@@ -51,10 +54,10 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
self.assertListEqual(collector.parameters["rcu_dth_on"].tolist(), self.recv_proxy.rcu_dth_on_r.tolist())
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
writer.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
......@@ -68,18 +71,18 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0]
self.assertIsNotNone(stat)
self.assertEqual(121, stat.data_id_signal_input_index)
# RECV attributes are not present since the stats-writer is not connecting to any host
self.assertEqual(stat.rcu_attenuator_dB.tolist(), None)
self.assertEqual(stat.rcu_band_select.tolist(), None)
self.assertEqual(stat.rcu_dth_on.tolist(), None)
# Test RECV attributes
self.assertListEqual(stat.rcu_attenuator_dB.tolist(), [0] * 96)
self.assertListEqual(stat.rcu_band_select.tolist(), [0] * 96)
self.assertListEqual(stat.rcu_dth_on.tolist(), [False] * 96)
def test_no_tango_SST_statistics(self):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--no-tango", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
new_sys_argv = [sys.argv[0], "--mode", "SST", "--no-tango", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
writer.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
......@@ -104,10 +107,10 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
self.assertEqual(DevState.OFF, self.recv_proxy.state())
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
writer.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import abc
import logging
from lofar_station_client.statistics.collector import SSTCollector
from tango import DevFailed
from tango import DeviceProxy
from tango import DevState
logger = logging.getLogger()
class StationStatisticsCollectorInterface(abc.ABC):
@abc.abstractmethod
def parse_device_attributes(self, device: DeviceProxy):
"""Update information based on device attributes"""
raise NotImplementedError
class StationSSTCollector(StationStatisticsCollectorInterface, SSTCollector):
def parse_packet(self, packet, obj):
super(StationSSTCollector, self).parse_packet(packet, obj)
# add tango values to packet
self.parse_device_attributes(obj)
def parse_device_attributes(self, device: DeviceProxy):
# If Tango connection has been disabled, set explicitly to None,
# because otherwise default_values are inserted
if device is None or device.state() != DevState.ON:
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
else:
try:
self.parameters["rcu_attenuator_dB"] = device.RCU_Attenuator_dB_R
self.parameters["rcu_band_select"] = device.RCU_Band_Select_R
self.parameters["rcu_dth_on"] = device.RCU_DTH_on_R
except DevFailed as e:
logger.warning(f"Device {device.name()} not responding.")
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import sys
import pprint
from lofar_station_client.statistics.packet import SDPPacket
from lofar_station_client.statistics.packet import PACKET_CLASS_FOR_MARKER
def read_packet(read_func) -> SDPPacket:
"""Read a packet using the given read function, with signature
```read_func(num_bytes: int) -> bytes```
and return it. The packet type is sensed from the data and
the correct subclass of SDPPacket is returned.
If read_func() returns None, this function will as well.
"""
# read just the marker
marker = read_func(1)
if not marker:
return None
# read the packet header based on type
packetClass = PACKET_CLASS_FOR_MARKER[marker]
# read the rest of the header
header = read_func(packetClass.HEADER_SIZE - len(marker))
if not header:
return None
header = marker + header
# parse the packet header size
packet = packetClass(header)
# read the payload
payload_size = packet.expected_size() - len(header)
payload = read_func(payload_size)
if not payload:
return None
# return full packet
return packetClass(header + payload)
def main(args=None, **kwargs):
# parse one packet from stdin
# packet counter
nr = 0
# byte offset in the stream
offset = 0
while True:
# read the packet from input
packet = read_packet(sys.stdin.buffer.read)
if not packet:
break
# print header
print(
f"# Packet {nr} of class {packet.__class__.__name__} starting at "
f"offset {offset} with length {packet.size()}"
)
pprint.pprint(packet.header())
# increment counters
nr += 1
offset += packet.size()
# this file is very useful to have stand alone to parse raw packet files, so make it
# work as such
if __name__ == "__main__":
main(sys.argv)
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import argparse
import time
import sys
......@@ -10,6 +19,7 @@ import logging
logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s')
logger = logging.getLogger("statistics_writer")
def _create_parser():
"""Define the parser"""
parser = argparse.ArgumentParser(
......@@ -55,6 +65,7 @@ def _create_parser():
)
return parser
def _create_receiver(filename, host, port):
""" creates the TCP receiver that is given to the writer """
if filename:
......@@ -65,6 +76,7 @@ def _create_receiver(filename, host, port):
logger.fatal("Must provide either a host and port, or a file to receive input from")
sys.exit(1)
def _create_writer(mode, interval, output_dir, decimation):
"""Create the writer"""
if mode == "XST":
......@@ -77,6 +89,7 @@ def _create_writer(mode, interval, output_dir, decimation):
logger.fatal(f"Invalid mode: {mode}")
sys.exit(1)
def _start_loop(receiver, writer, reconnect, filename, device):
"""Main loop"""
try:
......@@ -106,6 +119,7 @@ def _start_loop(receiver, writer, reconnect, filename, device):
finally:
writer.close_writer()
def main():
parser = _create_parser()
......@@ -139,11 +153,13 @@ def main():
logger.debug("Setting loglevel to DEBUG")
# sets the Tango connection in order to retrieve attribute values
if tango_disabled or not host:
if tango_disabled:
logger.warning("Tango connection is DISABLED")
device = None
else:
elif host:
device = DeviceProxy(f"tango://{host}:10000/{args.device}".lower()) if mode == 'SST' else None
else:
device = DeviceProxy(args.device) if mode == 'SST' else None
# creates the TCP receiver that is given to the writer
receiver = _create_receiver(filename, host, port)
......
# imports for working with datetime objects
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import pytz
import logging
# python hdf5
import h5py
import numpy
import logging
from abc import ABC, abstractmethod
import pytz
from lofar_station_client.statistics.collector import BSTCollector
from lofar_station_client.statistics.collector import XSTCollector
from tangostationcontrol.statistics.collector import StationSSTCollector
# import statistics classes with workaround
import sys
sys.path.append("..")
from tangostationcontrol.devices.sdp.packet import SSTPacket, XSTPacket, BSTPacket
import tangostationcontrol.devices.sdp.statistics_collector as statistics_collector
from lofar_station_client.statistics.packet import SSTPacket, XSTPacket, BSTPacket
logger = logging.getLogger("statistics_writer")
......@@ -229,7 +236,7 @@ class sst_hdf5_writer(hdf5_writer):
return SSTPacket(packet)
def new_collector(self):
return statistics_collector.SSTCollector()
return StationSSTCollector()
def write_values_matrix(self, current_group):
# store the SST values
......@@ -251,7 +258,7 @@ class bst_hdf5_writer(hdf5_writer):
return BSTPacket(packet)
def new_collector(self):
return statistics_collector.BSTCollector()
return BSTCollector()
def write_values_matrix(self, current_group):
# store the BST values
......@@ -267,7 +274,7 @@ class xst_hdf5_writer(hdf5_writer):
return XSTPacket(packet)
def new_collector(self):
return statistics_collector.XSTCollector()
return XSTCollector()
def next_filename(self, timestamp):
time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
......
import socket
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import sys
sys.path.append("..")
from tangostationcontrol.devices.sdp.packet import StatisticsPacket
import os
import socket
from lofar_station_client.statistics.packet import StatisticsPacket
class receiver:
""" Reads data from a file descriptor. """
......
......@@ -30,7 +30,7 @@ class UDP_Client:
time.sleep(1)
f = open("../../test/SDP_SST_statistics_packet.bin", "rb")
f = open("../../test/statistics/SDP_SST_statistics_packet.bin", "rb")
send_data = f.read()
s.sendto(send_data, (self.server_ip, self.server_port))
print("\n\n 1. Client Sent SST Packet at: ", datetime.now())
......
......@@ -144,7 +144,6 @@ class TestDelays(base.TestCase):
self.assertAlmostEqual(0.1, delays[0], 6, f"delays[0] = {delays[0]}")
def test_convert_bulk(self):
d = delay_calculator([0, 0, 0])
timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment