diff --git a/tangostationcontrol/requirements.txt b/tangostationcontrol/requirements.txt index 21a4422bbb65100a0e3b9d3c10136b2f3345e9bb..9472aedca773bbc84e208b95aa96fabc025cc524 100644 --- a/tangostationcontrol/requirements.txt +++ b/tangostationcontrol/requirements.txt @@ -2,7 +2,7 @@ # order of appearance. Changing the order has an impact on the overall # integration process, which may cause wedges in the gate later. -lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.3.0 +lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client@0.4.0 asyncua >= 0.9.90 # LGPLv3 PyMySQL[rsa] >= 1.0.2 # MIT psycopg2-binary >= 2.9.2 # LGPL diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/statistics/test_writer_sst.py b/tangostationcontrol/tangostationcontrol/integration_test/default/statistics/test_writer_sst.py index 376aab6c4706c497df7b81f4aac642ae3576e187..19f27af5b121979ceeefba746a4bb863e2e00352 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/statistics/test_writer_sst.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/statistics/test_writer_sst.py @@ -25,38 +25,57 @@ import numpy class TestStatisticsWriterSST(BaseIntegrationTestCase): + RECV_PROXY_STRING = "STAT/RECV/1" + def setUp(self): self.recv_proxy = self.setup_recv_proxy() return super().setUp() - - def setup_recv_proxy(self): + + @staticmethod + def setup_recv_proxy(): # setup RECV - recv_proxy = TestDeviceProxy("STAT/RECV/1") + recv_proxy = TestDeviceProxy(TestStatisticsWriterSST.RECV_PROXY_STRING) recv_proxy.off() recv_proxy.warm_boot() recv_proxy.set_defaults() return recv_proxy def test_retrieve_data_from_RECV(self): - recv_proxy = self.setup_recv_proxy() - self.assertEqual(DevState.ON, recv_proxy.state()) - self.assertIsNotNone(recv_proxy.RCU_Attenuator_dB_R) - self.assertIsNotNone(recv_proxy.RCU_Band_Select_R) - self.assertIsNotNone(recv_proxy.RCU_DTH_on_R) + self.assertEqual(DevState.ON, self.recv_proxy.state()) + self.assertIsNotNone(self.recv_proxy.RCU_Attenuator_dB_R) + self.assertIsNotNone(self.recv_proxy.RCU_Band_Select_R) + self.assertIsNotNone(self.recv_proxy.RCU_DTH_on_R) def test_insert_tango_SST_statistics(self): - self.setup_recv_proxy() self.assertEqual(DevState.ON, self.recv_proxy.state()) collector = StationSSTCollector(device=self.recv_proxy) # Test attribute values retrieval collector.parse_device_attributes() - numpy.testing.assert_equal(collector.parameters["rcu_attenuator_dB"].flatten(), self.recv_proxy.rcu_attenuator_dB_r) - numpy.testing.assert_equal(collector.parameters["rcu_band_select"].flatten(), self.recv_proxy.rcu_band_select_r.tolist()) - numpy.testing.assert_equal(collector.parameters["rcu_dth_on"].flatten(), self.recv_proxy.rcu_dth_on_r.tolist()) + numpy.testing.assert_equal( + collector.parameters["rcu_attenuator_dB"].flatten(), + self.recv_proxy.rcu_attenuator_dB_r + ) + numpy.testing.assert_equal( + collector.parameters["rcu_band_select"].flatten(), + self.recv_proxy.rcu_band_select_r.tolist() + ) + numpy.testing.assert_equal( + collector.parameters["rcu_dth_on"].flatten(), + self.recv_proxy.rcu_dth_on_r.tolist() + ) with TemporaryDirectory() as tmpdir: - new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] + new_sys_argv = [ + sys.argv[0], + "--mode", "SST", + "--file", join( + dirname(dirname(dirname(dirname(__file__)))), + "test/statistics", "SDP_SST_statistics_packets.bin" + ), + "" + "--output_dir", tmpdir + ] with mock.patch.object(writer.sys, 'argv', new_sys_argv): with self.assertRaises(SystemExit): writer.main() @@ -65,12 +84,20 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5")) # test statistics reader - new_sys_argv = [sys.argv[0], "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", "--start_time", "2021-09-20#07:40:08.937+00:00", "--end_time", "2021-10-04#07:50:08.937+00:00"] + new_sys_argv = [ + sys.argv[0], + "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", + "--start_time", "2021-09-20#07:40:08.937+00:00", + "--end_time", "2021-10-04#07:50:08.937+00:00" + ] with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv): stat_parser = statistics_reader.setup_stat_parser() SSTstatistics = stat_parser.list_statistics() self.assertIsNotNone(SSTstatistics) - stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0] + # same as stat_parser.statistics[0] + stat = stat_parser.get_statistic( + '2021-09-20T12:17:40.000+00:00' + ) self.assertIsNotNone(stat) self.assertEqual(121, stat.data_id_signal_input_index) # Test RECV attributes @@ -80,7 +107,17 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): def test_no_tango_SST_statistics(self): with TemporaryDirectory() as tmpdir: - new_sys_argv = [sys.argv[0], "--mode", "SST", "--no-tango", "--file", join(dirname(dirname(dirname(dirname(__file__)))), "test/statistics", "SDP_SST_statistics_packets.bin"), "--output_dir", tmpdir] + new_sys_argv = [ + sys.argv[0], + "--mode", "SST", + "--no-tango", + "--file", join( + dirname(dirname(dirname(dirname(__file__)))), + "test/statistics", "SDP_SST_statistics_packets.bin" + ), + "--output_dir", tmpdir + ] + with mock.patch.object(writer.sys, 'argv', new_sys_argv): with self.assertRaises(SystemExit): writer.main() @@ -89,12 +126,20 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40.h5")) # test statistics reader - new_sys_argv = [sys.argv[0], "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", "--start_time", "2021-09-20#07:40:08.937+00:00", "--end_time", "2021-10-04#07:50:08.937+00:00"] + new_sys_argv = [ + sys.argv[0], + "--files", f"{tmpdir}/SST_2021-09-20-12-17-40.h5", + "--start_time", "2021-09-20#07:40:08.937+00:00", + "--end_time", "2021-10-04#07:50:08.937+00:00" + ] with mock.patch.object(statistics_reader.sys, 'argv', new_sys_argv): stat_parser = statistics_reader.setup_stat_parser() SSTstatistics = stat_parser.list_statistics() self.assertIsNotNone(SSTstatistics) - stat = stat_parser.get_statistic('2021-09-20T12:17:40.000+00:00') # same as stat_parser.statistics[0] + # same as stat_parser.statistics[0] + stat = stat_parser.get_statistic( + '2021-09-20T12:17:40.000+00:00' + ) self.assertIsNotNone(stat) self.assertEqual(121, stat.data_id_signal_input_index) # Test RECV attributes diff --git a/tangostationcontrol/tangostationcontrol/statistics/collector.py b/tangostationcontrol/tangostationcontrol/statistics/collector.py index 7befd038d108ba3c147126082447a8e813838e42..0f68d1adb2ab2679a665ce5d1f20ae434a6efa13 100644 --- a/tangostationcontrol/tangostationcontrol/statistics/collector.py +++ b/tangostationcontrol/tangostationcontrol/statistics/collector.py @@ -19,37 +19,51 @@ from tango import DevState logger = logging.getLogger() -class StationStatisticsCollectorInterface(abc.ABC): +class DeviceCollectorInterface(abc.ABC): + """Small interface for deviceproxy enabled collectors""" + + def __init__(self, device: DeviceProxy = None): + self.device = device @abc.abstractmethod - def parse_device_attributes(self, device: DeviceProxy): + def parse_device_attributes(self): """Update information based on device attributes""" raise NotImplementedError -class StationSSTCollector(StationStatisticsCollectorInterface, SSTCollector): +class StationSSTCollector(DeviceCollectorInterface, SSTCollector): + + def __init__(self, device: DeviceProxy = None): + """Manually combine the constructors with appropriate arguments""" + + DeviceCollectorInterface.__init__(self, device=device) + SSTCollector.__init__(self) - def parse_packet(self, packet, obj): - super(StationSSTCollector, self).parse_packet(packet, obj) + def _parse_packet(self, packet): + super()._parse_packet(packet) # add tango values to packet - self.parse_device_attributes(obj) + self.parse_device_attributes() - def parse_device_attributes(self, device: DeviceProxy): + def parse_device_attributes(self): # If Tango connection has been disabled, set explicitly to None, # because otherwise default_values are inserted - if device is None or device.state() != DevState.ON: + if not self.device or self.device.state() != DevState.ON: self.parameters["rcu_attenuator_dB"] = None self.parameters["rcu_band_select"] = None self.parameters["rcu_dth_on"] = None else: try: - self.parameters["rcu_attenuator_dB"] = device.RCU_Attenuator_dB_R - self.parameters["rcu_band_select"] = device.RCU_Band_Select_R - self.parameters["rcu_dth_on"] = device.RCU_DTH_on_R + self.parameters[ + "rcu_attenuator_dB" + ] = self.device.RCU_Attenuator_dB_R + self.parameters[ + "rcu_band_select" + ] = self.device.RCU_Band_Select_R + self.parameters["rcu_dth_on"] = self.device.RCU_DTH_on_R except DevFailed as e: - logger.warning(f"Device {device.name()} not responding.") + logger.warning("Device: %s not responding.", self.device.name()) self.parameters["rcu_attenuator_dB"] = None self.parameters["rcu_band_select"] = None self.parameters["rcu_dth_on"] = None diff --git a/tangostationcontrol/tangostationcontrol/statistics/writer.py b/tangostationcontrol/tangostationcontrol/statistics/writer.py index ba066bec466d82c23815c4c711dbd362ef723714..1138fa52fbe48f80742dc6dd7acd31236e642276 100644 --- a/tangostationcontrol/tangostationcontrol/statistics/writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics/writer.py @@ -13,10 +13,16 @@ import sys from tango import DeviceProxy from tangostationcontrol.statistics_writer.receiver import tcp_receiver, file_receiver -from tangostationcontrol.statistics_writer.hdf5_writer import sst_hdf5_writer, parallel_xst_hdf5_writer, bst_hdf5_writer +from tangostationcontrol.statistics_writer.hdf5_writer import BstHdf5Writer +from tangostationcontrol.statistics_writer.hdf5_writer import SstHdf5Writer +from tangostationcontrol.statistics_writer.hdf5_writer import ParallelXstHdf5Writer import logging -logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s') + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s:%(levelname)s: %(message)s' +) logger = logging.getLogger("statistics_writer") @@ -77,16 +83,31 @@ def _create_receiver(filename, host, port): sys.exit(1) -def _create_writer(mode, interval, output_dir, decimation): +def _create_writer( + mode, interval, output_dir, decimation, device: DeviceProxy = None +): """Create the writer""" if mode == "XST": - return parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + return ParallelXstHdf5Writer( + new_file_time_interval=interval, + file_location=output_dir, + decimation_factor=decimation, + ) elif mode == "SST": - return sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + return SstHdf5Writer( + new_file_time_interval=interval, + file_location=output_dir, + decimation_factor=decimation, + device=device + ) elif mode == "BST": - return bst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + return BstHdf5Writer( + new_file_time_interval=interval, + file_location=output_dir, + decimation_factor=decimation, + ) else: - logger.fatal(f"Invalid mode: {mode}") + logger.fatal("Invalid mode: %s", mode) sys.exit(1) @@ -168,7 +189,7 @@ def main(): receiver = _create_receiver(filename, host, port) # create the writer - writer = _create_writer(mode, interval, output_dir, decimation) + writer = _create_writer(mode, interval, output_dir, decimation, device) # start looping _start_loop(receiver, writer, reconnect, filename) diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/README.md b/tangostationcontrol/tangostationcontrol/statistics_writer/README.md index dc9e285ea47039351d98cb5b8c82d104b85e11bd..378ec13c1ff5dd75bfdb3d92cb5ed0a5085edcab 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/README.md +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/README.md @@ -1,8 +1,8 @@ # TCP to HDF5 statistics writer -The TCP to HDF5 statistics writer can be started with `statistics_writer.py` This script imports the receiver script and `hdf5_writer.py`. `receiver.py` only takes care of receiving packets. -`hdf5_writer.py` takes the receive function from the receiver and uses it to obtain packets. +The TCP to HDF5 statistics writer can be started with `statistics_writer.py` This script imports the receiver script and `HDF5Writer.py`. `receiver.py` only takes care of receiving packets. +`HDF5Writer.py` takes the receive function from the receiver and uses it to obtain packets. Any function that can deliver statistics packets can be used by this code. -`hdf5_writer.py` takes care of processing the packets it receives, filling statistics matrices +`HDF5Writer.py` takes care of processing the packets it receives, filling statistics matrices and writing those matrices (as well as a bunch of metadata) to hdf5. diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py index acfa3f9f734e84db78400afa8d1f52a9a14bb901..cfc84b4cf49c2a481e19fb831b7f84cbacd2e0c0 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py @@ -20,30 +20,29 @@ from lofar_station_client.statistics.collector import BSTCollector from lofar_station_client.statistics.collector import XSTCollector from tangostationcontrol.statistics.collector import StationSSTCollector -<<<<<<< HEAD -from lofar_station_client.statistics.packet import SSTPacket, XSTPacket, BSTPacket -======= -# import statistics classes with workaround -import sys -sys.path.append("..") -from tango import DeviceProxy, DevFailed, DevState -from tangostationcontrol.devices.sdp.packet import SSTPacket, XSTPacket, BSTPacket -import tangostationcontrol.devices.sdp.statistics_collector as statistics_collector ->>>>>>> L2SS-863: refactor SST collector +from lofar_station_client.statistics.packet import BSTPacket +from lofar_station_client.statistics.packet import SSTPacket +from lofar_station_client.statistics.packet import XSTPacket +from tango import DeviceProxy logger = logging.getLogger("statistics_writer") -__all__ = ["hdf5_writer", "parallel_xst_hdf5_writer", "xst_hdf5_writer", "sst_hdf5_writer", "bst_hdf5_writer"] +__all__ = [ + "HDF5Writer", "ParallelXstHdf5Writer", "XstHdf5Writer", + "SstHdf5Writer", "BstHdf5Writer", +] -class hdf5_writer(ABC): - +class HDF5Writer(ABC): SST_MODE = "SST" XST_MODE = "XST" BST_MODE = "BST" - def __init__(self, new_file_time_interval, file_location, statistics_mode, decimation_factor, device): + def __init__( + self, new_file_time_interval: int, file_location, statistics_mode, + decimation_factor, device: DeviceProxy = None + ): # all variables that deal with the matrix that's currently being decoded self.current_matrix = None @@ -53,7 +52,8 @@ class hdf5_writer(ABC): self.statistics_counter = 0 # the header of the first packet of a new matrix is written as metadata. - # Assumes all subsequent headers of the same matrix are identical (minus index) + # Assumes all subsequent headers of the same matrix are identical + # (minus index) self.statistics_header = None # file handing @@ -63,14 +63,15 @@ class hdf5_writer(ABC): self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) self.file = None - # parameters that are configured depending on the mode the statistics writer is in (SST,XST,BST) + # parameters that are configured depending on the mode the statistics + # writer is in (SST,XST,BST) self.mode = statistics_mode.upper() - # Proxy device object that contains attributes of interest + # Set device if any, defaults to None self.device = device @abstractmethod - def decoder(self): + def decoder(self, packet): pass @abstractmethod @@ -79,12 +80,16 @@ class hdf5_writer(ABC): def next_packet(self, packet): """ - All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets. - As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple - packets as well as storing matrices and starting new ones - The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp - it will close the current matrix, store it and start a new one. + All statistics packets come with a timestamp of the time they were + measured. All the values will be spread across multiple packets. + As long as the timestamp is the same they belong in the same matrix. + This code handles collecting the matrix from those multiple packets as + well as storing matrices and starting new ones + + The code receives new packets and checks the statistics timestamp of + them. If the timestamp is higher than the current timestamp it will + close the current matrix, store it and start a new one. """ # process the packet @@ -96,18 +101,26 @@ class hdf5_writer(ABC): # grab the timestamp statistics_timestamp = statistics_packet.timestamp() - # ignore packets with no timestamp, as they indicate FPGA processing was disabled - # and are useless anyway. + # ignore packets with no timestamp, as they indicate FPGA processing was + # disabled and are useless anyway. if statistics_packet.block_serial_number == 0: - logger.warning(f"Received statistics with no timestamp. Packet dropped.") + logger.warning( + "Received statistics with no timestamp. Packet dropped." + ) return - # check if te statistics timestamp is unexpectedly older than the current one + # check if te statistics timestamp is unexpectedly older than the + # current one if statistics_timestamp < self.current_timestamp: - logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.") + logger.warning( + "Received statistics with earlier timestamp than is currently" + "being processed (%s). Packet dropped.", + statistics_timestamp + ) return - # if this statistics packet has a new timestamp it means we need to start a new matrix + # if this statistics packet has a new timestamp it means we need to + # start a new matrix if statistics_timestamp > self.current_timestamp: self.start_new_matrix(statistics_timestamp) self.current_timestamp = statistics_timestamp @@ -117,56 +130,70 @@ class hdf5_writer(ABC): def start_new_matrix(self, timestamp): """ is called when a statistics packet with a newer timestamp is received. - Writes the matrix to the hdf5 file - Creates a new hdf5 file if needed - updates current timestamp and statistics matrix collector + Writes the matrix to the hdf5 file, creates a new hdf5 file if needed + and updates current timestamp and statistics matrix collector. """ + # always increment + self.statistics_counter += 1 + # only write the specified fraction of statistics, skip the rest if self.statistics_counter % self.decimation_factor != 0: - logger.debug(f"Skipping statistic with timestamp: {timestamp}. Only writing 1/{self.decimation_factor} statistics") - - # increment even though its skipped - self.statistics_counter += 1 + logger.debug( + "Skipping statistic with timestamp: %s. Only writing" + "1/%d statistics", timestamp, + self.decimation_factor + ) return - # received new statistic, so increment counter - self.statistics_counter += 1 - - logger.debug(f"starting new matrix with timestamp: {timestamp}") + logger.debug("starting new matrix with timestamp: %s", timestamp) # write the finished (and checks if its the first matrix) if self.current_matrix is not None: try: self.write_matrix() except Exception as e: - time = self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3] - logger.exception(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped") - - # only start a new file if its time AND we are done with the previous matrix. + time = self.current_timestamp.strftime( + "%Y-%m-%d-%H-%M-%S-%f" + )[:-3] + logger.exception( + "Exception while attempting to write matrix to HDF5." + "Matrix: %s dropped", time + ) + + # only start a new file if its time AND we are done with the previous + # matrix. if timestamp >= self.new_file_time_interval + self.last_file_time: self.start_new_hdf5(timestamp) # create a new and empty current_matrix self.current_matrix = self.new_collector() - self.current_device_matrix = self.new_device_collector() self.statistics_header = None def write_matrix(self): + """Writes the finished matrix to the hdf5 file""" + logger.debug("writing matrix to file") - """ - Writes the finished matrix to the hdf5 file - """ # create the new hdf5 group based on the timestamp of packets - current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.isoformat(timespec="milliseconds"))) + current_group = self.file.create_group( + "{}_{}".format(self.mode, self.current_timestamp.isoformat( + timespec="milliseconds" + )) + ) # store the statistics values for the current group self.write_values_matrix(current_group) # might be optional, but they're easy to add. - current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"]) - current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"]) + current_group.create_dataset( + name="nof_payload_errors", + data=self.current_matrix.parameters["nof_payload_errors"] + ) + current_group.create_dataset( + name="nof_valid_payloads", + data=self.current_matrix.parameters["nof_valid_payloads"] + ) # get the statistics header header = self.statistics_header @@ -176,10 +203,13 @@ class hdf5_writer(ABC): return # can't store datetime objects, convert to string instead - header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds") + header["timestamp"] = header["timestamp"].isoformat( + timespec="milliseconds" + ) - # Stores the header of the packet received for this matrix as a list of atttributes - for k,v in header.items(): + # Stores the header of the packet received for this matrix as a list of + # attributes + for k, v in header.items(): if type(v) == dict: for subk, subv in v.items(): current_group.attrs[f"{k}_{subk}"] = subv @@ -203,7 +233,6 @@ class hdf5_writer(ABC): return self.current_matrix.process_packet(packet) - self.current_device_matrix.parse_device_attributes() def start_new_hdf5(self, timestamp): @@ -211,7 +240,11 @@ class hdf5_writer(ABC): try: self.file.close() except Exception as e: - logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.") + logger.exception( + "Error while attempting to close hdf5 file to disk. file" + "%s likely empty, please verify integrity.", + self.file, + ) filename = self.next_filename(timestamp) logger.info(f"creating new file: {filename}") @@ -225,9 +258,8 @@ class hdf5_writer(ABC): self.last_file_time = timestamp def close_writer(self): - """ - Function that can be used to stop the writer without data loss. - """ + """Function that can be used to stop the writer without data loss.""" + logger.debug("closing hdf5 file") if self.file is not None: if self.current_matrix is not None: @@ -238,42 +270,84 @@ class hdf5_writer(ABC): finally: filename = str(self.file) self.file.close() - logger.debug(f"{filename} closed") - logger.debug(f"Received a total of {self.statistics_counter} statistics while running. With {int(self.statistics_counter/self.decimation_factor)} written to disk ") - - -class sst_hdf5_writer(hdf5_writer): - def __init__(self, new_file_time_interval, file_location, decimation_factor, device): - super().__init__(new_file_time_interval, file_location, hdf5_writer.SST_MODE, decimation_factor, device) + logger.debug("%S closed", filename) + logger.debug( + "Received a total of %d statistics while running. With " + "%d written to disk", + self.statistics_counter, + int(self.statistics_counter / self.decimation_factor) + ) + + +class SstHdf5Writer(HDF5Writer): + def __init__( + self, + new_file_time_interval, + file_location, + decimation_factor, + device: DeviceProxy = None, + ): + super().__init__( + new_file_time_interval, + file_location, + HDF5Writer.SST_MODE, + decimation_factor, + device=device + ) def decoder(self, packet): return SSTPacket(packet) def new_collector(self): -<<<<<<< HEAD - return StationSSTCollector() -======= - return statistics_collector.SSTCollector() - - def new_device_collector(self): - return DeviceCollector(self.device) ->>>>>>> L2SS-863: refactor SST collector + return StationSSTCollector(self.device) def write_values_matrix(self, current_group): # store the SST values - current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip") - try: - current_group.create_dataset(name="rcu_attenuator_dB", data=self.current_device_matrix.parameters["rcu_attenuator_dB"].astype(numpy.int64), compression="gzip") - current_group.create_dataset(name="rcu_band_select", data=self.current_device_matrix.parameters["rcu_band_select"].astype(numpy.int64), compression="gzip") - current_group.create_dataset(name="rcu_dth_on", data=self.current_device_matrix.parameters["rcu_dth_on"].astype(numpy.bool_), compression="gzip") + current_group.create_dataset( + name="values", + data=self.current_matrix.parameters[ + "sst_values" + ].astype(numpy.float32), + compression="gzip", + ) + try: + current_group.create_dataset( + name="rcu_attenuator_dB", + data=self.current_matrix.parameters["rcu_attenuator_dB"].astype( + numpy.int64 + ), + compression="gzip", + ) + current_group.create_dataset( + name="rcu_band_select", + data=self.current_matrix.parameters["rcu_band_select"].astype( + numpy.int64 + ), + compression="gzip", + ) + current_group.create_dataset( + name="rcu_dth_on", + data=self.current_matrix.parameters["rcu_dth_on"].astype( + numpy.bool_ + ), + compression="gzip", + ) except AttributeError as e: logger.warning("Device values not written.") except Exception as e: raise Exception from e -class bst_hdf5_writer(hdf5_writer): - def __init__(self, new_file_time_interval, file_location, decimation_factor): - super().__init__(new_file_time_interval, file_location, hdf5_writer.BST_MODE, decimation_factor, device=None) + +class BstHdf5Writer(HDF5Writer): + def __init__( + self, new_file_time_interval, file_location, decimation_factor + ): + super().__init__( + new_file_time_interval, + file_location, + HDF5Writer.BST_MODE, + decimation_factor, + ) def decoder(self, packet): return BSTPacket(packet) @@ -283,12 +357,29 @@ class bst_hdf5_writer(hdf5_writer): def write_values_matrix(self, current_group): # store the BST values - current_group.create_dataset(name="values", data=self.current_matrix.parameters["bst_values"].astype(numpy.float32), compression="gzip") - - -class xst_hdf5_writer(hdf5_writer): - def __init__(self, new_file_time_interval, file_location, decimation_factor, subband_index): - super().__init__(new_file_time_interval, file_location, hdf5_writer.XST_MODE, decimation_factor, device=None) + current_group.create_dataset( + name="values", + data=self.current_matrix.parameters["bst_values"].astype( + numpy.float32 + ), + compression="gzip", + ) + + +class XstHdf5Writer(HDF5Writer): + def __init__( + self, + new_file_time_interval, + file_location, + decimation_factor, + subband_index, + ): + super().__init__( + new_file_time_interval, + file_location, + HDF5Writer.XST_MODE, + decimation_factor, + ) self.subband_index = subband_index def decoder(self, packet): @@ -297,33 +388,46 @@ class xst_hdf5_writer(hdf5_writer): def new_collector(self): return XSTCollector() - def next_filename(self, timestamp): + def next_filename(self, timestamp, suffix=".h5"): time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) - return f"{self.file_location}/{self.mode}_SB{self.subband_index}_{time_str}.h5" + return ( + f"{self.file_location}/{self.mode}_SB{self.subband_index}_" + f"{time_str}{suffix}" + ) def write_values_matrix(self, current_group): - # requires a function call to transform the xst_blocks in to the right structure + # requires a function call to transform the xst_blocks in to the right + # structure # - # since we have a dedicated writer per subband_index, they all end up in slot 0 - # in their writer, so we only need to store the first set of xst_values. - current_group.create_dataset(name="values", data=self.current_matrix.xst_values([0])[0].astype(numpy.cfloat), compression="gzip") - - -class parallel_xst_hdf5_writer: - """ Writes multiple subbands in parallel. Each subband gets written to its own HDF5 file(s). """ - - def __init__(self, new_file_time_interval, file_location, decimation_factor): - # maintain a dedicated hdf5_writer per subband + # since we have a dedicated writer per subband_index, they all end up in + # slot 0 in their writer, so we only need to store the first set of + # xst_values. + current_group.create_dataset( + name="values", data=self.current_matrix.xst_values([0])[0].astype( + numpy.cfloat + ), + compression="gzip", + ) + + +class ParallelXstHdf5Writer: + """Writes multiple subbands in parallel. Each subband to separate file.""" + + def __init__( + self, new_file_time_interval, file_location, decimation_factor + ): + # maintain a dedicated HDF5Writer per subband self.writers = {} # function to create a new writer, to avoid having to store # all the init parameters just for this purpose. def new_writer(subband): - return xst_hdf5_writer( - new_file_time_interval, - file_location, - decimation_factor, - subband) + return XstHdf5Writer( + new_file_time_interval, + file_location, + decimation_factor, + subband, + ) self.new_writer = new_writer @@ -344,35 +448,3 @@ class parallel_xst_hdf5_writer: writer.close_writer() self.writers = {} - -class DeviceCollector(): - def __init__(self, device: DeviceProxy): - self.parameters = self._default_parameters() - self.device = device - - def _default_parameters(self): - return { - # RECV attribute values to monitor the station configuration - "rcu_attenuator_dB": numpy.zeros((32,3), dtype=numpy.int64), - "rcu_band_select": numpy.zeros((32,3), dtype=numpy.int64), - "rcu_dth_on": numpy.full((32,3), False) - } - - def parse_device_attributes(self): - - #If Tango connection has been disabled, set explicitly to None, - # because otherwise default_values are inserted - if self.device is None or self.device.state() != DevState.ON: - self.parameters["rcu_attenuator_dB"] = None - self.parameters["rcu_band_select"] = None - self.parameters["rcu_dth_on"] = None - else: - try: - self.parameters["rcu_attenuator_dB"] = self.device.RCU_Attenuator_dB_R - self.parameters["rcu_band_select"] = self.device.RCU_Band_Select_R - self.parameters["rcu_dth_on"] = self.device.RCU_DTH_on_R - except DevFailed as e: - logger.warning(f"Device {self.device.name()} not responding.") - self.parameters["rcu_attenuator_dB"] = None - self.parameters["rcu_band_select"] = None - self.parameters["rcu_dth_on"] = None