Skip to content
Snippets Groups Projects
Commit 960587c4 authored by Stefano Di Frischia's avatar Stefano Di Frischia Committed by Corné Lukken
Browse files

L2SS-863: refactor SST collector

parent 55ccae4f
No related branches found
No related tags found
1 merge request!392L2SS-863: refactor SST collector
......@@ -9,6 +9,7 @@
from tangostationcontrol.integration_test.base import BaseIntegrationTestCase
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tangostationcontrol.statistics.collector import StationSSTCollector
from tangostationcontrol.statistics_writer import statistics_reader
from tangostationcontrol.statistics import writer
......@@ -19,6 +20,7 @@ from tempfile import TemporaryDirectory
from unittest import mock
from tango import DevState
import numpy
class TestStatisticsWriterSST(BaseIntegrationTestCase):
......@@ -45,13 +47,13 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
def test_insert_tango_SST_statistics(self):
self.setup_recv_proxy()
self.assertEqual(DevState.ON, self.recv_proxy.state())
collector = StationSSTCollector()
collector = StationSSTCollector(device=self.recv_proxy)
# Test attribute values retrieval
collector.parse_device_attributes(self.recv_proxy)
self.assertListEqual(collector.parameters["rcu_attenuator_dB"].tolist(), self.recv_proxy.rcu_attenuator_dB_r.tolist())
self.assertListEqual(collector.parameters["rcu_band_select"].tolist(), self.recv_proxy.rcu_band_select_r.tolist())
self.assertListEqual(collector.parameters["rcu_dth_on"].tolist(), self.recv_proxy.rcu_dth_on_r.tolist())
collector.parse_device_attributes()
numpy.testing.assert_equal(collector.parameters["rcu_attenuator_dB"].flatten(), self.recv_proxy.rcu_attenuator_dB_r)
numpy.testing.assert_equal(collector.parameters["rcu_band_select"].flatten(), self.recv_proxy.rcu_band_select_r.tolist())
numpy.testing.assert_equal(collector.parameters["rcu_dth_on"].flatten(), self.recv_proxy.rcu_dth_on_r.tolist())
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
......@@ -77,7 +79,6 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
self.assertListEqual(stat.rcu_dth_on.tolist(), [False] * 96)
def test_no_tango_SST_statistics(self):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--no-tango", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir]
with mock.patch.object(writer.sys, 'argv', new_sys_argv):
......
......@@ -90,7 +90,7 @@ def _create_writer(mode, interval, output_dir, decimation):
sys.exit(1)
def _start_loop(receiver, writer, reconnect, filename, device):
def _start_loop(receiver, writer, reconnect, filename):
"""Main loop"""
try:
while True:
......@@ -171,4 +171,5 @@ def main():
writer = _create_writer(mode, interval, output_dir, decimation)
# start looping
_start_loop(receiver, writer, reconnect, filename, device)
_start_loop(receiver, writer, reconnect, filename)
......@@ -20,7 +20,16 @@ from lofar_station_client.statistics.collector import BSTCollector
from lofar_station_client.statistics.collector import XSTCollector
from tangostationcontrol.statistics.collector import StationSSTCollector
<<<<<<< HEAD
from lofar_station_client.statistics.packet import SSTPacket, XSTPacket, BSTPacket
=======
# import statistics classes with workaround
import sys
sys.path.append("..")
from tango import DeviceProxy, DevFailed, DevState
from tangostationcontrol.devices.sdp.packet import SSTPacket, XSTPacket, BSTPacket
import tangostationcontrol.devices.sdp.statistics_collector as statistics_collector
>>>>>>> L2SS-863: refactor SST collector
logger = logging.getLogger("statistics_writer")
......@@ -34,7 +43,7 @@ class hdf5_writer(ABC):
XST_MODE = "XST"
BST_MODE = "BST"
def __init__(self, new_file_time_interval, file_location, statistics_mode, decimation_factor):
def __init__(self, new_file_time_interval, file_location, statistics_mode, decimation_factor, device):
# all variables that deal with the matrix that's currently being decoded
self.current_matrix = None
......@@ -57,6 +66,9 @@ class hdf5_writer(ABC):
# parameters that are configured depending on the mode the statistics writer is in (SST,XST,BST)
self.mode = statistics_mode.upper()
# Proxy device object that contains attributes of interest
self.device = device
@abstractmethod
def decoder(self):
pass
......@@ -65,7 +77,7 @@ class hdf5_writer(ABC):
def new_collector(self):
pass
def next_packet(self, packet, device):
def next_packet(self, packet):
"""
All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets.
As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple
......@@ -100,7 +112,7 @@ class hdf5_writer(ABC):
self.start_new_matrix(statistics_timestamp)
self.current_timestamp = statistics_timestamp
self.process_packet(packet, device)
self.process_packet(packet)
def start_new_matrix(self, timestamp):
"""
......@@ -137,6 +149,7 @@ class hdf5_writer(ABC):
# create a new and empty current_matrix
self.current_matrix = self.new_collector()
self.current_device_matrix = self.new_device_collector()
self.statistics_header = None
def write_matrix(self):
......@@ -181,7 +194,7 @@ class hdf5_writer(ABC):
time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
return f"{self.file_location}/{self.mode}_{time_str}{suffix}"
def process_packet(self, packet, device):
def process_packet(self, packet):
"""
Adds the newly received statistics packet to the statistics matrix
"""
......@@ -189,7 +202,8 @@ class hdf5_writer(ABC):
if self.statistics_counter % self.decimation_factor != 0:
return
self.current_matrix.process_packet(packet, device)
self.current_matrix.process_packet(packet)
self.current_device_matrix.parse_device_attributes()
def start_new_hdf5(self, timestamp):
......@@ -229,22 +243,29 @@ class hdf5_writer(ABC):
class sst_hdf5_writer(hdf5_writer):
def __init__(self, new_file_time_interval, file_location, decimation_factor):
super().__init__(new_file_time_interval, file_location, hdf5_writer.SST_MODE, decimation_factor)
def __init__(self, new_file_time_interval, file_location, decimation_factor, device):
super().__init__(new_file_time_interval, file_location, hdf5_writer.SST_MODE, decimation_factor, device)
def decoder(self, packet):
return SSTPacket(packet)
def new_collector(self):
<<<<<<< HEAD
return StationSSTCollector()
=======
return statistics_collector.SSTCollector()
def new_device_collector(self):
return DeviceCollector(self.device)
>>>>>>> L2SS-863: refactor SST collector
def write_values_matrix(self, current_group):
# store the SST values
current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip")
try:
current_group.create_dataset(name="rcu_attenuator_dB", data=self.current_matrix.parameters["rcu_attenuator_dB"].astype(numpy.int64), compression="gzip")
current_group.create_dataset(name="rcu_band_select", data=self.current_matrix.parameters["rcu_band_select"].astype(numpy.int64), compression="gzip")
current_group.create_dataset(name="rcu_dth_on", data=self.current_matrix.parameters["rcu_dth_on"].astype(numpy.bool_), compression="gzip")
current_group.create_dataset(name="rcu_attenuator_dB", data=self.current_device_matrix.parameters["rcu_attenuator_dB"].astype(numpy.int64), compression="gzip")
current_group.create_dataset(name="rcu_band_select", data=self.current_device_matrix.parameters["rcu_band_select"].astype(numpy.int64), compression="gzip")
current_group.create_dataset(name="rcu_dth_on", data=self.current_device_matrix.parameters["rcu_dth_on"].astype(numpy.bool_), compression="gzip")
except AttributeError as e:
logger.warning("Device values not written.")
except Exception as e:
......@@ -306,7 +327,7 @@ class parallel_xst_hdf5_writer:
self.new_writer = new_writer
def next_packet(self, packet, device=None):
def next_packet(self, packet):
# decode to get subband of this packet
fields = XSTPacket(packet)
subband = fields.subband_index
......@@ -316,7 +337,7 @@ class parallel_xst_hdf5_writer:
self.writers[subband] = self.new_writer(subband)
# demux packet to the correct writer
self.writers[subband].next_packet(packet, device)
self.writers[subband].next_packet(packet)
def close_writer(self):
for writer in self.writers.values():
......@@ -324,3 +345,34 @@ class parallel_xst_hdf5_writer:
self.writers = {}
class DeviceCollector():
def __init__(self, device: DeviceProxy):
self.parameters = self._default_parameters()
self.device = device
def _default_parameters(self):
return {
# RECV attribute values to monitor the station configuration
"rcu_attenuator_dB": numpy.zeros((32,3), dtype=numpy.int64),
"rcu_band_select": numpy.zeros((32,3), dtype=numpy.int64),
"rcu_dth_on": numpy.full((32,3), False)
}
def parse_device_attributes(self):
#If Tango connection has been disabled, set explicitly to None,
# because otherwise default_values are inserted
if self.device is None or self.device.state() != DevState.ON:
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
else:
try:
self.parameters["rcu_attenuator_dB"] = self.device.RCU_Attenuator_dB_R
self.parameters["rcu_band_select"] = self.device.RCU_Band_Select_R
self.parameters["rcu_dth_on"] = self.device.RCU_DTH_on_R
except DevFailed as e:
logger.warning(f"Device {self.device.name()} not responding.")
self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment