Skip to content
Snippets Groups Projects
Commit 05583190 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-819-annotate-sst-with-station-config' into 'master'

Resolve L2SS-819 "Annotate sst with station config"

Closes L2SS-819

See merge request !366
parents be3c1cca 80c1b284
No related branches found
No related tags found
1 merge request!366Resolve L2SS-819 "Annotate sst with station config"
......@@ -8,6 +8,8 @@ from .packet import SSTPacket, XSTPacket, BSTPacket
from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index
from tangostationcontrol.clients.statistics_client_thread import StatisticsClientThread
from tango import DeviceProxy, DevFailed, DevState
logger = logging.getLogger()
class StatisticsCollector:
......@@ -30,21 +32,25 @@ class StatisticsCollector:
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
}
def process_packet(self, packet):
def process_packet(self, packet, device=None):
self.parameters["nof_packets"] += numpy.uint64(1)
try:
self.parse_packet(packet)
self.parse_packet(packet, device)
except Exception as e:
self.parameters["last_invalid_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
raise ValueError("Could not parse statistics packet") from e
def parse_packet(self, packet):
def parse_packet(self, packet, device):
""" Update any information based on this packet. """
raise NotImplementedError
def parse_device_attributes(self):
""" Update information based on device attributes """
raise NotImplementedError
class SSTCollector(StatisticsCollector):
""" Class to process SST statistics packets. """
......@@ -70,11 +76,16 @@ class SSTCollector(StatisticsCollector):
"sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32),
"subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=bool),
# RECV attribute values to monitor the station configuration
"rcu_attenuator_dB": numpy.zeros((32,3), dtype=numpy.int64),
"rcu_band_select": numpy.zeros((32,3), dtype=numpy.int64),
"rcu_dth_on": numpy.full((32,3), False),
})
return defaults
def parse_packet(self, packet):
def parse_packet(self, packet, device):
fields = SSTPacket(packet)
# determine which input this packet contains data for
......@@ -98,6 +109,27 @@ class SSTCollector(StatisticsCollector):
self.parameters["integration_intervals"][input_index] = fields.integration_interval()
self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flag
# add tango values to packet
self.parse_device_attributes(device)
def parse_device_attributes(self, device: DeviceProxy):
#If Tango connection has been disabled, set explicitly to None,
# because otherwise default_values are inserted
if device is None or device.state() != DevState.ON:
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
else:
try:
self.parameters["rcu_attenuator_dB"] = device.RCU_Attenuator_dB_R
self.parameters["rcu_band_select"] = device.RCU_Band_Select_R
self.parameters["rcu_dth_on"] = device.RCU_DTH_on_R
except DevFailed as e:
logger.warning(f"Device {device.name()} not responding.")
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
class XSTCollector(StatisticsCollector):
""" Class to process XST statistics packets.
......@@ -172,7 +204,7 @@ class XSTCollector(StatisticsCollector):
# prefer the first one in case of multiple minima
return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0]
def parse_packet(self, packet):
def parse_packet(self, packet, device=None):
fields = XSTPacket(packet)
if fields.payload_error:
......@@ -300,7 +332,7 @@ class BSTCollector(StatisticsCollector):
return defaults
def parse_packet(self, packet):
def parse_packet(self, packet, device=None):
fields = BSTPacket(packet)
# To get the block_index we floor divide this beamlet_index by the max amount of beamlets per block
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tangostationcontrol.integration_test.base import BaseIntegrationTestCase
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tangostationcontrol.devices.sdp.statistics_collector import SSTCollector
from tangostationcontrol.statistics_writer import statistics_writer, statistics_reader
import sys
from os.path import dirname, isfile, join
from tempfile import TemporaryDirectory
from unittest import mock
from tango import DevState
class TestStatisticsWriterSST(BaseIntegrationTestCase):
def setUp(self):
self.recv_proxy = self.setup_recv_proxy()
return super().setUp()
def setup_recv_proxy(self):
# setup RECV
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy.off()
recv_proxy.warm_boot()
recv_proxy.set_defaults()
return recv_proxy
def test_retrieve_data_from_RECV(self):
recv_proxy = self.setup_recv_proxy()
self.assertEqual(DevState.ON, recv_proxy.state())
self.assertIsNotNone(recv_proxy.RCU_Attenuator_dB_R)
self.assertIsNotNone(recv_proxy.RCU_Band_Select_R)
self.assertIsNotNone(recv_proxy.RCU_DTH_on_R)
def test_insert_tango_SST_statistics(self):
self.setup_recv_proxy()
self.assertEqual(DevState.ON, self.recv_proxy.state())
collector = SSTCollector()
# Test attribute values retrieval
collector.parse_device_attributes(self.recv_proxy)
self.assertListEqual(collector.parameters["rcu_attenuator_dB"].tolist(), self.recv_proxy.rcu_attenuator_dB_r.tolist())
self.assertListEqual(collector.parameters["rcu_band_select"].tolist(), self.recv_proxy.rcu_band_select_r.tolist())
self.assertListEqual(collector.parameters["rcu_dth_on"].tolist(), self.recv_proxy.rcu_dth_on_r.tolist())
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
# test statistics reader
new_sys_argv = [sys.argv[0], "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", "--start_time", "2021-09-20#07:40:08.937+00:00", "--end_time", "2021-10-04#07:50:08.937+00:00"]
with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv):
stat_parser = statistics_reader.setup_stat_parser()
SSTstatistics = stat_parser.list_statistics()
self.assertIsNotNone(SSTstatistics)
stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0]
self.assertIsNotNone(stat)
self.assertEqual(121, stat.data_id_signal_input_index)
# Test RECV attributes
self.assertListEqual(stat.rcu_attenuator_dB.tolist(), [[0] * 3] * 32)
self.assertListEqual(stat.rcu_band_select.tolist(), [[0] * 3] * 32)
self.assertListEqual(stat.rcu_dth_on.tolist(), [[False] * 3] * 32)
def test_no_tango_SST_statistics(self):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--no-tango", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
# test statistics reader
new_sys_argv = [sys.argv[0], "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", "--start_time", "2021-09-20#07:40:08.937+00:00", "--end_time", "2021-10-04#07:50:08.937+00:00"]
with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv):
stat_parser = statistics_reader.setup_stat_parser()
SSTstatistics = stat_parser.list_statistics()
self.assertIsNotNone(SSTstatistics)
stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0]
self.assertIsNotNone(stat)
self.assertEqual(121, stat.data_id_signal_input_index)
# Test RECV attributes
self.assertEqual(stat.rcu_attenuator_dB.tolist(), None)
self.assertEqual(stat.rcu_band_select.tolist(), None)
self.assertEqual(stat.rcu_dth_on.tolist(), None)
def test_SST_statistics_with_device_in_off(self):
self.setup_recv_proxy()
self.recv_proxy.Off()
self.assertEqual(DevState.OFF, self.recv_proxy.state())
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
# test statistics reader
new_sys_argv = [sys.argv[0], "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", "--start_time", "2021-09-20#07:40:08.937+00:00", "--end_time", "2021-10-04#07:50:08.937+00:00"]
with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv):
stat_parser = statistics_reader.setup_stat_parser()
SSTstatistics = stat_parser.list_statistics()
self.assertIsNotNone(SSTstatistics)
stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0]
self.assertIsNotNone(stat)
self.assertEqual(121, stat.data_id_signal_input_index)
# Test RECV attributes
self.assertEqual(stat.rcu_attenuator_dB.tolist(), None)
self.assertEqual(stat.rcu_band_select.tolist(), None)
self.assertEqual(stat.rcu_dth_on.tolist(), None)
......@@ -58,7 +58,7 @@ class hdf5_writer(ABC):
def new_collector(self):
pass
def next_packet(self, packet):
def next_packet(self, packet, device):
"""
All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets.
As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple
......@@ -93,7 +93,7 @@ class hdf5_writer(ABC):
self.start_new_matrix(statistics_timestamp)
self.current_timestamp = statistics_timestamp
self.process_packet(packet)
self.process_packet(packet, device)
def start_new_matrix(self, timestamp):
"""
......@@ -174,7 +174,7 @@ class hdf5_writer(ABC):
time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
return f"{self.file_location}/{self.mode}_{time_str}{suffix}"
def process_packet(self, packet):
def process_packet(self, packet, device):
"""
Adds the newly received statistics packet to the statistics matrix
"""
......@@ -182,7 +182,7 @@ class hdf5_writer(ABC):
if self.statistics_counter % self.decimation_factor != 0:
return
self.current_matrix.process_packet(packet)
self.current_matrix.process_packet(packet, device)
def start_new_hdf5(self, timestamp):
......@@ -234,6 +234,14 @@ class sst_hdf5_writer(hdf5_writer):
def write_values_matrix(self, current_group):
# store the SST values
current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip")
try:
current_group.create_dataset(name="rcu_attenuator_dB", data=self.current_matrix.parameters["rcu_attenuator_dB"].astype(numpy.int64), compression="gzip")
current_group.create_dataset(name="rcu_band_select", data=self.current_matrix.parameters["rcu_band_select"].astype(numpy.int64), compression="gzip")
current_group.create_dataset(name="rcu_dth_on", data=self.current_matrix.parameters["rcu_dth_on"].astype(numpy.bool_), compression="gzip")
except AttributeError as e:
logger.warning("Device values not written.")
except Exception as e:
raise Exception from e
class bst_hdf5_writer(hdf5_writer):
def __init__(self, new_file_time_interval, file_location, decimation_factor):
......@@ -291,7 +299,7 @@ class parallel_xst_hdf5_writer:
self.new_writer = new_writer
def next_packet(self, packet):
def next_packet(self, packet, device=None):
# decode to get subband of this packet
fields = XSTPacket(packet)
subband = fields.subband_index
......@@ -301,7 +309,7 @@ class parallel_xst_hdf5_writer:
self.writers[subband] = self.new_writer(subband)
# demux packet to the correct writer
self.writers[subband].next_packet(packet)
self.writers[subband].next_packet(packet, device)
def close_writer(self):
for writer in self.writers.values():
......
......@@ -5,6 +5,7 @@ import argparse
import os
import psutil
import pytz
import sys # noqa: F401
process = psutil.Process(os.getpid())
......@@ -160,7 +161,8 @@ class statistics_data:
"source_info_fsub_type", "source_info_beam_repositioning_flag", "source_info_antenna_band_index", "source_info__raw",
"observation_id", "nof_statistics_per_packet", "nof_signal_inputs", "nof_bytes_per_statistic", "marker", "integration_interval_raw",
"integration_interval", "data_id__raw", "block_serial_number", "block_period_raw", "block_period", "data_id_signal_input_index",
"data_id_subband_index", "data_id_first_baseline", "data_id_beamlet_index", "nof_valid_payloads", "nof_payload_errors", "values", )
"data_id_subband_index", "data_id_first_baseline", "data_id_beamlet_index", "nof_valid_payloads", "nof_payload_errors", "values",
"rcu_attenuator_dB", "rcu_band_select", "rcu_dth_on")
def __init__(self, file, group_key):
......@@ -198,6 +200,9 @@ class statistics_data:
# get SST specific stuff
if self.marker == "S":
self.data_id_signal_input_index = file[group_key].attrs["data_id_signal_input_index"]
self.rcu_attenuator_dB = numpy.array(file.get(f"{group_key}/rcu_attenuator_dB"))
self.rcu_band_select = numpy.array(file.get(f"{group_key}/rcu_band_select"))
self.rcu_dth_on = numpy.array(file.get(f"{group_key}/rcu_dth_on"))
# get XST specific stuff
if self.marker == "X":
......
import argparse
import time
import sys
from tango import DeviceProxy
from tangostationcontrol.statistics_writer.receiver import tcp_receiver, file_receiver
from tangostationcontrol.statistics_writer.hdf5_writer import sst_hdf5_writer, parallel_xst_hdf5_writer, bst_hdf5_writer
......@@ -44,6 +45,14 @@ def _create_parser():
'-r', '--reconnect', dest='reconnect', action='store_true', default=False,
help='Set the writer to keep trying to reconnect whenever connection '
'is lost. (default: %(default)s)')
parser.add_argument(
'-nt', '--no-tango', dest='no_tango', action='store_true', default=False,
help='Disable connection to Tango attribute values retrieval'
)
parser.add_argument(
'-dev', '--device', type=str, choices=['STAT/RECV/1'], default='STAT/RECV/1',
help='List the Tango device names needed'
)
return parser
def _create_receiver(filename, host, port):
......@@ -68,13 +77,13 @@ def _create_writer(mode, interval, output_dir, decimation):
logger.fatal(f"Invalid mode: {mode}")
sys.exit(1)
def _start_loop(receiver, writer, reconnect, filename):
def _start_loop(receiver, writer, reconnect, filename, device):
"""Main loop"""
try:
while True:
try:
packet = receiver.get_packet()
writer.next_packet(packet)
writer.next_packet(packet, device)
except EOFError:
if reconnect and not filename:
logger.warning("Connection lost, attempting to reconnect")
......@@ -113,6 +122,7 @@ def main():
decimation = args.decimation
debug = args.debug
reconnect = args.reconnect
tango_disabled = args.no_tango
if not filename and not host:
raise ValueError("Supply either a filename (--file) or a hostname (--host)")
......@@ -127,6 +137,13 @@ def main():
if debug:
logger.setLevel(logging.DEBUG)
logger.debug("Setting loglevel to DEBUG")
# sets the Tango connection in order to retrieve attribute values
if tango_disabled:
logger.warning("Tango connection is DISABLED")
device = None
else:
device = DeviceProxy(args.device) if mode=='SST' else None
# creates the TCP receiver that is given to the writer
receiver = _create_receiver(filename, host, port)
......@@ -135,4 +152,4 @@ def main():
writer = _create_writer(mode, interval, output_dir, decimation)
# start looping
_start_loop(receiver, writer, reconnect, filename)
_start_loop(receiver, writer, reconnect, filename, device)
......@@ -15,15 +15,8 @@ from tempfile import TemporaryDirectory
from unittest import mock
class TestStatisticsWriter(base.TestCase):
def test_sst(self):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", dirname(__file__) + "/SDP_SST_statistics_packets.bin", "--output_dir", tmpdir]
with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit):
statistics_writer.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
# test_sst --> moved to integration_test
def test_xst(self):
with TemporaryDirectory() as tmpdir:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment