Skip to content
Snippets Groups Projects
Commit 23c394a3 authored by Stefano Di Frischia's avatar Stefano Di Frischia Committed by Corné Lukken
Browse files

L2SS-863: Update refactoring to match inherited LSC parts

parent 27f25aac
No related branches found
No related tags found
1 merge request!392L2SS-863: refactor SST collector
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# order of appearance. Changing the order has an impact on the overall # order of appearance. Changing the order has an impact on the overall
# integration process, which may cause wedges in the gate later. # integration process, which may cause wedges in the gate later.
lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.3.0 lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.4.0
asyncua >= 0.9.90 # LGPLv3 asyncua >= 0.9.90 # LGPLv3
PyMySQL[rsa] >= 1.0.2 # MIT PyMySQL[rsa] >= 1.0.2 # MIT
psycopg2-binary >= 2.9.2 # LGPL psycopg2-binary >= 2.9.2 # LGPL
......
...@@ -25,38 +25,57 @@ import numpy ...@@ -25,38 +25,57 @@ import numpy
class TestStatisticsWriterSST(BaseIntegrationTestCase): class TestStatisticsWriterSST(BaseIntegrationTestCase):
RECV_PROXY_STRING = "STAT/RECV/1"
def setUp(self): def setUp(self):
self.recv_proxy = self.setup_recv_proxy() self.recv_proxy = self.setup_recv_proxy()
return super().setUp() return super().setUp()
def setup_recv_proxy(self): @staticmethod
def setup_recv_proxy():
# setup RECV # setup RECV
recv_proxy = TestDeviceProxy("STAT/RECV/1") recv_proxy = TestDeviceProxy(TestStatisticsWriterSST.RECV_PROXY_STRING)
recv_proxy.off() recv_proxy.off()
recv_proxy.warm_boot() recv_proxy.warm_boot()
recv_proxy.set_defaults() recv_proxy.set_defaults()
return recv_proxy return recv_proxy
def test_retrieve_data_from_RECV(self): def test_retrieve_data_from_RECV(self):
recv_proxy = self.setup_recv_proxy() self.assertEqual(DevState.ON, self.recv_proxy.state())
self.assertEqual(DevState.ON, recv_proxy.state()) self.assertIsNotNone(self.recv_proxy.RCU_Attenuator_dB_R)
self.assertIsNotNone(recv_proxy.RCU_Attenuator_dB_R) self.assertIsNotNone(self.recv_proxy.RCU_Band_Select_R)
self.assertIsNotNone(recv_proxy.RCU_Band_Select_R) self.assertIsNotNone(self.recv_proxy.RCU_DTH_on_R)
self.assertIsNotNone(recv_proxy.RCU_DTH_on_R)
def test_insert_tango_SST_statistics(self): def test_insert_tango_SST_statistics(self):
self.setup_recv_proxy()
self.assertEqual(DevState.ON, self.recv_proxy.state()) self.assertEqual(DevState.ON, self.recv_proxy.state())
collector = StationSSTCollector(device=self.recv_proxy) collector = StationSSTCollector(device=self.recv_proxy)
# Test attribute values retrieval # Test attribute values retrieval
collector.parse_device_attributes() collector.parse_device_attributes()
numpy.testing.assert_equal(collector.parameters["rcu_attenuator_dB"].flatten(), self.recv_proxy.rcu_attenuator_dB_r) numpy.testing.assert_equal(
numpy.testing.assert_equal(collector.parameters["rcu_band_select"].flatten(), self.recv_proxy.rcu_band_select_r.tolist()) collector.parameters["rcu_attenuator_dB"].flatten(),
numpy.testing.assert_equal(collector.parameters["rcu_dth_on"].flatten(), self.recv_proxy.rcu_dth_on_r.tolist()) self.recv_proxy.rcu_attenuator_dB_r
)
numpy.testing.assert_equal(
collector.parameters["rcu_band_select"].flatten(),
self.recv_proxy.rcu_band_select_r.tolist()
)
numpy.testing.assert_equal(
collector.parameters["rcu_dth_on"].flatten(),
self.recv_proxy.rcu_dth_on_r.tolist()
)
with TemporaryDirectory() as tmpdir: with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] new_sys_argv = [
sys.argv[0],
"--mode", "SST",
"--file", join(
dirname(dirname(dirname(dirname(__file__)))),
"test/statistics", "SDP_SST_statistics_packets.bin"
),
""
"--output_dir", tmpdir
]
with mock.patch.object(writer.sys, 'argv', new_sys_argv): with mock.patch.object(writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit): with self.assertRaises(SystemExit):
writer.main() writer.main()
...@@ -65,12 +84,20 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): ...@@ -65,12 +84,20 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5")) self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
# test statistics reader # test statistics reader
new_sys_argv = [sys.argv[0], "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", "--start_time", "2021-09-20#07:40:08.937+00:00", "--end_time", "2021-10-04#07:50:08.937+00:00"] new_sys_argv = [
sys.argv[0],
"--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5",
"--start_time", "2021-09-20#07:40:08.937+00:00",
"--end_time", "2021-10-04#07:50:08.937+00:00"
]
with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv): with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv):
stat_parser = statistics_reader.setup_stat_parser() stat_parser = statistics_reader.setup_stat_parser()
SSTstatistics = stat_parser.list_statistics() SSTstatistics = stat_parser.list_statistics()
self.assertIsNotNone(SSTstatistics) self.assertIsNotNone(SSTstatistics)
stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0] # same as stat_parser.statistics[0]
stat = stat_parser.get_statistic(
'2021-09-20T12:17:40.000+00:00'
)
self.assertIsNotNone(stat) self.assertIsNotNone(stat)
self.assertEqual(121, stat.data_id_signal_input_index) self.assertEqual(121, stat.data_id_signal_input_index)
# Test RECV attributes # Test RECV attributes
...@@ -80,7 +107,17 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): ...@@ -80,7 +107,17 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
def test_no_tango_SST_statistics(self): def test_no_tango_SST_statistics(self):
with TemporaryDirectory() as tmpdir: with TemporaryDirectory() as tmpdir:
new_sys_argv = [sys.argv[0], "--mode", "SST", "--no-tango", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] new_sys_argv = [
sys.argv[0],
"--mode", "SST",
"--no-tango",
"--file", join(
dirname(dirname(dirname(dirname(__file__)))),
"test/statistics", "SDP_SST_statistics_packets.bin"
),
"--output_dir", tmpdir
]
with mock.patch.object(writer.sys, 'argv', new_sys_argv): with mock.patch.object(writer.sys, 'argv', new_sys_argv):
with self.assertRaises(SystemExit): with self.assertRaises(SystemExit):
writer.main() writer.main()
...@@ -89,12 +126,20 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): ...@@ -89,12 +126,20 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5")) self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5"))
# test statistics reader # test statistics reader
new_sys_argv = [sys.argv[0], "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", "--start_time", "2021-09-20#07:40:08.937+00:00", "--end_time", "2021-10-04#07:50:08.937+00:00"] new_sys_argv = [
sys.argv[0],
"--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5",
"--start_time", "2021-09-20#07:40:08.937+00:00",
"--end_time", "2021-10-04#07:50:08.937+00:00"
]
with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv): with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv):
stat_parser = statistics_reader.setup_stat_parser() stat_parser = statistics_reader.setup_stat_parser()
SSTstatistics = stat_parser.list_statistics() SSTstatistics = stat_parser.list_statistics()
self.assertIsNotNone(SSTstatistics) self.assertIsNotNone(SSTstatistics)
stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0] # same as stat_parser.statistics[0]
stat = stat_parser.get_statistic(
'2021-09-20T12:17:40.000+00:00'
)
self.assertIsNotNone(stat) self.assertIsNotNone(stat)
self.assertEqual(121, stat.data_id_signal_input_index) self.assertEqual(121, stat.data_id_signal_input_index)
# Test RECV attributes # Test RECV attributes
......
...@@ -19,37 +19,51 @@ from tango import DevState ...@@ -19,37 +19,51 @@ from tango import DevState
logger = logging.getLogger() logger = logging.getLogger()
class StationStatisticsCollectorInterface(abc.ABC): class DeviceCollectorInterface(abc.ABC):
"""Small interface for deviceproxy enabled collectors"""
def __init__(self, device: DeviceProxy = None):
self.device = device
@abc.abstractmethod @abc.abstractmethod
def parse_device_attributes(self, device: DeviceProxy): def parse_device_attributes(self):
"""Update information based on device attributes""" """Update information based on device attributes"""
raise NotImplementedError raise NotImplementedError
class StationSSTCollector(StationStatisticsCollectorInterface, SSTCollector): class StationSSTCollector(DeviceCollectorInterface, SSTCollector):
def __init__(self, device: DeviceProxy = None):
"""Manually combine the constructors with appropriate arguments"""
DeviceCollectorInterface.__init__(self, device=device)
SSTCollector.__init__(self)
def parse_packet(self, packet, obj): def _parse_packet(self, packet):
super(StationSSTCollector, self).parse_packet(packet, obj) super()._parse_packet(packet)
# add tango values to packet # add tango values to packet
self.parse_device_attributes(obj) self.parse_device_attributes()
def parse_device_attributes(self, device: DeviceProxy): def parse_device_attributes(self):
# If Tango connection has been disabled, set explicitly to None, # If Tango connection has been disabled, set explicitly to None,
# because otherwise default_values are inserted # because otherwise default_values are inserted
if device is None or device.state() != DevState.ON: if not self.device or self.device.state() != DevState.ON:
self.parameters["rcu_attenuator_dB"] = None self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None self.parameters["rcu_dth_on"] = None
else: else:
try: try:
self.parameters["rcu_attenuator_dB"] = device.RCU_Attenuator_dB_R self.parameters[
self.parameters["rcu_band_select"] = device.RCU_Band_Select_R "rcu_attenuator_dB"
self.parameters["rcu_dth_on"] = device.RCU_DTH_on_R ] = self.device.RCU_Attenuator_dB_R
self.parameters[
"rcu_band_select"
] = self.device.RCU_Band_Select_R
self.parameters["rcu_dth_on"] = self.device.RCU_DTH_on_R
except DevFailed as e: except DevFailed as e:
logger.warning(f"Device {device.name()} not responding.") logger.warning("Device: %s not responding.", self.device.name())
self.parameters["rcu_attenuator_dB"] = None self.parameters["rcu_attenuator_dB"] = None
self.parameters["rcu_band_select"] = None self.parameters["rcu_band_select"] = None
self.parameters["rcu_dth_on"] = None self.parameters["rcu_dth_on"] = None
...@@ -13,10 +13,16 @@ import sys ...@@ -13,10 +13,16 @@ import sys
from tango import DeviceProxy from tango import DeviceProxy
from tangostationcontrol.statistics_writer.receiver import tcp_receiver, file_receiver from tangostationcontrol.statistics_writer.receiver import tcp_receiver, file_receiver
from tangostationcontrol.statistics_writer.hdf5_writer import sst_hdf5_writer, parallel_xst_hdf5_writer, bst_hdf5_writer from tangostationcontrol.statistics_writer.hdf5_writer import BstHdf5Writer
from tangostationcontrol.statistics_writer.hdf5_writer import SstHdf5Writer
from tangostationcontrol.statistics_writer.hdf5_writer import ParallelXstHdf5Writer
import logging import logging
logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s:%(levelname)s: %(message)s'
)
logger = logging.getLogger("statistics_writer") logger = logging.getLogger("statistics_writer")
...@@ -77,16 +83,31 @@ def _create_receiver(filename, host, port): ...@@ -77,16 +83,31 @@ def _create_receiver(filename, host, port):
sys.exit(1) sys.exit(1)
def _create_writer(mode, interval, output_dir, decimation): def _create_writer(
mode, interval, output_dir, decimation, device: DeviceProxy = None
):
"""Create the writer""" """Create the writer"""
if mode == "XST": if mode == "XST":
return parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) return ParallelXstHdf5Writer(
new_file_time_interval=interval,
file_location=output_dir,
decimation_factor=decimation,
)
elif mode == "SST": elif mode == "SST":
return sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) return SstHdf5Writer(
new_file_time_interval=interval,
file_location=output_dir,
decimation_factor=decimation,
device=device
)
elif mode == "BST": elif mode == "BST":
return bst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) return BstHdf5Writer(
new_file_time_interval=interval,
file_location=output_dir,
decimation_factor=decimation,
)
else: else:
logger.fatal(f"Invalid mode: {mode}") logger.fatal("Invalid mode: %s", mode)
sys.exit(1) sys.exit(1)
...@@ -168,7 +189,7 @@ def main(): ...@@ -168,7 +189,7 @@ def main():
receiver = _create_receiver(filename, host, port) receiver = _create_receiver(filename, host, port)
# create the writer # create the writer
writer = _create_writer(mode, interval, output_dir, decimation) writer = _create_writer(mode, interval, output_dir, decimation, device)
# start looping # start looping
_start_loop(receiver, writer, reconnect, filename) _start_loop(receiver, writer, reconnect, filename)
......
# TCP to HDF5 statistics writer # TCP to HDF5 statistics writer
The TCP to HDF5 statistics writer can be started with `statistics_writer.py` This script imports the receiver script and `hdf5_writer.py`. `receiver.py` only takes care of receiving packets. The TCP to HDF5 statistics writer can be started with `statistics_writer.py` This script imports the receiver script and `HDF5Writer.py`. `receiver.py` only takes care of receiving packets.
`hdf5_writer.py` takes the receive function from the receiver and uses it to obtain packets. `HDF5Writer.py` takes the receive function from the receiver and uses it to obtain packets.
Any function that can deliver statistics packets can be used by this code. Any function that can deliver statistics packets can be used by this code.
`hdf5_writer.py` takes care of processing the packets it receives, filling statistics matrices `HDF5Writer.py` takes care of processing the packets it receives, filling statistics matrices
and writing those matrices (as well as a bunch of metadata) to hdf5. and writing those matrices (as well as a bunch of metadata) to hdf5.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment