diff --git a/tangostationcontrol/docs/source/devices/recv.rst b/tangostationcontrol/docs/source/devices/recv.rst index 847f8bb51f7f52850e66b0d25c01269b9961e726..00bbd89ef2e748b120e34143c8e6cc4cb5763887 100644 --- a/tangostationcontrol/docs/source/devices/recv.rst +++ b/tangostationcontrol/docs/source/devices/recv.rst @@ -13,3 +13,28 @@ The ``recv == DeviceProxy("STAT/RECV/1")`` device controls the RCUs, the LBA ant Typically, ``N_RCUs == 32``, and ``N_antennas_per_RCU == 3``. +Error information +--------------------- + +These attributes summarise the basic state of the device. Any elements which are not present in ``FPGA_mask_RW`` will be ignored and thus not report errors: + +:RCU_error_R: Whether the RCUs appear usable. + + :type: ``bool[N_RCUs]`` + +:ANT_error_R: Whether the antennas appear usable. + + :type: ``bool[N_RCUs][N_antennas_per_RCU]`` + +:RCU_IOUT_error_R: Whether there are alarms on any of the amplitudes in the measured currents. + + :type: ``bool[N_RCUs]`` + +:RCU_VOUT_error_R: Whether there are alarms on any of the voltages in the measured currents. + + :type: ``bool[N_RCUs]`` + +:RCU_TEMP_error_R: Whether there are alarms on any of the temperatures. NB: These values are also exposed for unused RCUs (the ``RCU_mask_RW`` is ignored). + + :type: ``bool[N_RCUs]`` + diff --git a/tangostationcontrol/docs/source/devices/sdp.rst b/tangostationcontrol/docs/source/devices/sdp.rst index 2ca1ea6fa95e7295a21041d12242e15cec3b8001..6386dd851b178ce44b0b1b53968ba0f219fe5140 100644 --- a/tangostationcontrol/docs/source/devices/sdp.rst +++ b/tangostationcontrol/docs/source/devices/sdp.rst @@ -30,16 +30,29 @@ The following points are significant for the operations of this device: Data-quality information --------------------------- -The following fields describe the data quality: +The following fields describe the data quality (see also :doc:`../signal_chain`): :FPGA_signal_input_mean_R: Mean value of the last second of input (in ADC quantisation units). Should be close to 0. :type: ``double[N_fpgas][N_ants_per_fpga]`` -:FPGA_signal_input_rms_R: Root means square value of the last second of input (in ADC quantisation units). ``rms^2 = mean^2 + std^2``. Values above 2048 indicate strong RFI. +:FPGA_signal_input_rms_R: Root means square value of the last second of input (in ADC quantisation units). ``rms^2 = mean^2 + std^2``. Values above 2048 indicate strong RFI. Values of 0 indicate a lack of signal input. :type: ``double[N_fpgas][N_ants_per_fpga]`` +Error information +--------------------- + +These attributes summarise the basic state of the device. Any elements which are not present in ``FPGA_mask_RW`` will be ignored and thus not report errors: + +:FPGA_error_R: Whether the FPGAs appear usable. + + :type: ``bool[N_fpgas]`` + +:FPGA_procesing_error_R: Whether the FPGAs are processing their input from the RCUs. NB: This will also raise an error if the Waveform Generator is enabled. + + :type: ``bool[N_fpgas]`` + Version Information --------------------- diff --git a/tangostationcontrol/docs/source/devices/using.rst b/tangostationcontrol/docs/source/devices/using.rst index 7aef380071836e7cd6ece9d7202f04658e68a99a..825ee74d83b5a7ad8a8e18c8813ee1ace81f5459 100644 --- a/tangostationcontrol/docs/source/devices/using.rst +++ b/tangostationcontrol/docs/source/devices/using.rst @@ -59,13 +59,12 @@ typically involves the following sequence of commands:: # turn the device off completely first. device.off() - # setup any connections and threads - device.initialise() + # turn on the device and fully reinitialise it + # alternatively, device.warm_boot() can be used, + # in which case no hardware is reinitialised. + device.boot() - # turn on the device - device.on() - -Of course, the device could go into ``FAULT`` again, even during the ``initialise()`` command, for example because the hardware it manages is unreachable. To debug the fault condition, check the :doc:`../interfaces/logs` of the device in question. +Of course, the device could go into ``FAULT`` again, even during the ``boot()`` command, for example because the hardware it manages is unreachable. To debug the fault condition, check the :doc:`../interfaces/logs` of the device in question. Initialise hardware ```````````````````` @@ -74,6 +73,10 @@ Most devices provide the following commands, in order to configure the hardware :initialise(): Initialise the device (connect to the hardware). Moves from ``OFF`` to ``STANDBY``. +:set_translator_defaults(): Select the hardware to configure and monitor. + +:prepare_hardware(): For devices that control hardware, this command prepares the hardware to accept commands (f.e. load firmware). + :set_defaults(): Upload default attribute settings from the TangoDB to the hardware. :initialise_hardware(): For devices that control hardware, this command runs the hardware initialisation procedure. @@ -145,4 +148,3 @@ For example, the ``RCU_mask_RW`` array is the RCU mask in the ``recv`` device. I # <--- only LED0 on RCU3 is now on # recv.RCU_LED0_R should show this, # if you have the RCU hardware installed. - diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics_client.py b/tangostationcontrol/tangostationcontrol/clients/statistics_client.py index 16d46bc71bbeba33c66aa8aa590f301cd0de0fa8..e602b410ab202906f2953ec1a722b74a2ff64734 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics_client.py @@ -97,7 +97,14 @@ class StatisticsClient(AsyncCommClient): # redirect to right object. this works as long as the parameter names are unique among them. if annotation["type"] == "statistics": def read_function(): - return self.collector.parameters[parameter] + if annotation.get("reshape", False): + # force array into the shape of the attribute + if attribute.max_dim_y > 1: + return self.collector.parameters[parameter].reshape(attribute.max_dim_y, attribute.max_dim_x) + else: + return self.collector.parameters[parameter].reshape(attribute.max_dim_x) + else: + return self.collector.parameters[parameter] elif annotation["type"] == "udp": def read_function(): return self.udp.parameters[parameter] diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index a53fdfcfd8b61167789a4ed2a05728379d026e61..82991eb261a1f397cff582d240f0ae6b499e3049 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -62,7 +62,10 @@ class lofar_device(Device, metaclass=DeviceMeta): version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) - # list of property names too be set first by set_defaults + # list of translator property names to be set by set_translator_defaults + translator_default_settings = [] + + # list of hardware property names to be set first by set_defaults first_default_settings = [] @classmethod @@ -224,6 +227,27 @@ class lofar_device(Device, metaclass=DeviceMeta): """Method always executed before any TANGO command is executed.""" pass + def _set_defaults(self, attribute_names: list): + """ Set hardware points to their default value. + + attribute_names: The names of the attributes to set to their default value. + + A hardware point XXX is set to the value of the object member named XXX_default, if it exists. + XXX_default can be f.e. a constant, or a device_property. + """ + + # set them all + for name in attribute_names: + try: + default_value = getattr(self, f"{name}_default") + + # set the attribute to the configured default + logger.debug(f"Setting attribute {name} to {default_value}") + self.proxy.write_attribute(name, default_value) + except Exception as e: + # log which attribute we're addressing + raise Exception(f"Cannot assign default to attribute {name}") from e + def properties_changed(self): pass @@ -240,7 +264,7 @@ class lofar_device(Device, metaclass=DeviceMeta): The points are set in the following order: 1) The python class property 'first_default_settings' is read, as an array of strings denoting property names. Each property is set in that order. - 2) Any remaining default properties are set. + 2) Any remaining default properties are set, except the translators (those in 'translator_default_settings'). """ # collect all attributes for which defaults are provided @@ -248,22 +272,33 @@ class lofar_device(Device, metaclass=DeviceMeta): # collect all attribute members if isinstance(getattr(self, name), Attribute) # with a default set - and hasattr(self, f"{name}_default")] + and hasattr(self, f"{name}_default") + and name not in self.translator_default_settings] # determine the order: first do the ones mentioned in default_settings_order attributes_to_set = self.first_default_settings + [name for name in attributes_with_defaults if name not in self.first_default_settings] - # set them all - for name in attributes_to_set: - try: - default_value = getattr(self, f"{name}_default") + # set them + self._set_defaults(attributes_to_set) - # set the attribute to the configured default - logger.debug(f"Setting attribute {name} to {default_value}") - self.proxy.write_attribute(name, default_value) - except Exception as e: - # log which attribute we're addressing - raise Exception(f"Cannot assign default to attribute {name}") from e + + @only_in_states([DevState.STANDBY, DevState.INIT, DevState.ON]) + @fault_on_error() + @command() + def set_translator_defaults(self): + """ Initialise the translator translators to their configured settings. """ + + # This is just the command version of _set_translator_defaults(). + self._set_translator_defaults() + + @only_in_states([DevState.STANDBY, DevState.INIT, DevState.ON]) + @fault_on_error() + @command() + def prepare_hardware(self): + """ Load firmware required before configuring anything. """ + + # This is just the command version of _prepare_hardware(). + self._prepare_hardware() @only_in_states([DevState.STANDBY, DevState.INIT, DevState.ON]) @fault_on_error() @@ -278,7 +313,12 @@ class lofar_device(Device, metaclass=DeviceMeta): # setup connections self.Initialise() + self.set_translator_defaults() + if initialise_hardware: + # prepare hardware to accept settings + self.prepare_hardware() + # initialise settings self.set_defaults() @@ -288,7 +328,6 @@ class lofar_device(Device, metaclass=DeviceMeta): # make device available self.On() - @only_in_states([DevState.OFF]) @command() def boot(self): @@ -299,6 +338,15 @@ class lofar_device(Device, metaclass=DeviceMeta): def warm_boot(self): self._boot(initialise_hardware=False) + def _set_translator_defaults(self): + """ Initialise any translators to their default settings. """ + + self._set_defaults(self.translator_default_settings) + + def _prepare_hardware(self): + """ Override this method to load any firmware before configuring the hardware. """ + pass + def _initialise_hardware(self): """ Override this method to initialise any hardware after configuring it. """ pass @@ -309,13 +357,21 @@ class lofar_device(Device, metaclass=DeviceMeta): Raises an Exception if it has not after the timeout. + value: The value that needs to be matched, or a function + that needs to evaluate to True given the attribute. timeout: time until an Exception is raised, in seconds. pollperiod: how often to check the attribute, in seconds. """ + if type(value) == type(lambda x: True): + # evaluate function + is_correct = value + else: + # compare to value + is_correct = lambda x: x == value # Poll every half a second for _ in range(math.ceil(timeout/pollperiod)): - if getattr(self.proxy, attr_name) != value: + if is_correct(getattr(self.proxy, attr_name)): return time.sleep(pollperiod) @@ -335,7 +391,7 @@ class lofar_device(Device, metaclass=DeviceMeta): # fetch attribute value as an array value = self.proxy.read_attribute(attr_name).value if is_scalar: - value = numpy.array(value) + value = numpy.array(value) # this stays a scalar in numpy # construct alarm state, in the same shape as the attribute alarm_state = numpy.zeros(value.shape, dtype=bool) @@ -346,9 +402,6 @@ class lofar_device(Device, metaclass=DeviceMeta): if alarms.min_alarm != 'Not specified': alarm_state |= value <= value.dtype.type(alarms.min_alarm) - # return alarm state, as the same type as the attribute - if is_scalar: - return alarm_state[0] - else: - return alarm_state + # return alarm state, with the same dimensions as the attribute + return alarm_state diff --git a/tangostationcontrol/tangostationcontrol/devices/recv.py b/tangostationcontrol/tangostationcontrol/devices/recv.py index 4fdcd59b31f30fc712247875267eec08de2e3db5..b1fbd0602047dab9ef2895ac512a31611b2bf2ea 100644 --- a/tangostationcontrol/tangostationcontrol/devices/recv.py +++ b/tangostationcontrol/tangostationcontrol/devices/recv.py @@ -74,8 +74,7 @@ class RECV(opcua_device): default_value=1 ) - first_default_settings = [ - # set the masks first, as those filter any subsequent settings + translator_default_settings = [ 'ANT_mask_RW', 'RCU_mask_RW' ] diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py index 93e9077f07580f79a3f06e89d81a4e5fe42ae7c8..6cacb14de48f067836bf7b480624995ae1667247 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py @@ -89,8 +89,7 @@ class SDP(opcua_device): default_value=[[8192] * 12 * 512] * 16 ) - first_default_settings = [ - # set the masks first, as those filter any subsequent settings + translator_default_settings = [ 'TR_fpga_mask_RW' ] @@ -109,8 +108,8 @@ class SDP(opcua_device): FPGA_beamlet_output_scale_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_scale_R"], datatype=numpy.uint32, dims=(16,)) FPGA_beamlet_output_scale_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_scale_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_firmware_version_R = attribute_wrapper(comms_annotation=["FPGA_firmware_version_R"], datatype=numpy.str, dims=(16,)) - FPGA_reboot_R = attribute_wrapper(comms_annotation=["FPGA_reboot_R"], datatype=numpy.uint32, dims=(16,), doc="Active FPGA image (0=factory, 1=user)") - FPGA_reboot_RW = attribute_wrapper(comms_annotation=["FPGA_reboot_R"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_boot_image_R = attribute_wrapper(comms_annotation=["FPGA_reboot_R"], datatype=numpy.uint32, dims=(16,), doc="Active FPGA image (0=factory, 1=user)") + FPGA_boot_image_RW = attribute_wrapper(comms_annotation=["FPGA_reboot_R"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_global_node_index_R = attribute_wrapper(comms_annotation=["FPGA_global_node_index_R"], datatype=numpy.uint32, dims=(16,)) FPGA_hardware_version_R = attribute_wrapper(comms_annotation=["FPGA_hardware_version_R"], datatype=numpy.str, dims=(16,)) FPGA_processing_enable_R = attribute_wrapper(comms_annotation=["FPGA_processing_enable_R"], datatype=numpy.bool_, dims=(16,)) @@ -223,14 +222,23 @@ class SDP(opcua_device): def read_FPGA_processing_error_R(self): return self.proxy.TR_fpga_mask_RW & ( ~self.proxy.FPGA_processing_enable_R - | (self.proxy.FPGA_reboot_R == 0) - | ~self.proxy.FPGA_wg_enable_R.any(axis=1) + | (self.proxy.FPGA_boot_image_R == 0) + | self.proxy.FPGA_wg_enable_R.any(axis=1) + | (self.proxy.FPGA_signal_input_rms_R == 0) ) # -------- # overloaded functions # -------- + def _prepare_hardware(self): + # FPGAs need the correct firmware loaded + self.FPGA_boot_image_RW = [1] * N_pn + + # wait for the firmware to be loaded (ignoring masked out elements) + mask = self.proxy.TR_fpga_mask_RW + self.wait_attribute("FPGA_boot_image_R", lambda attr: (attr == 1) | ~mask, 10) + # -------- # Commands # -------- diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py index 5c00c90b1dff70e5aa281ee84093454c44b4ef96..05a76d9b39d0245251c1ddeb2ee67eb9f74870a9 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py @@ -2,6 +2,7 @@ from queue import Queue from threading import Thread import logging import numpy +import datetime from .statistics_packet import SSTPacket, XSTPacket from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index @@ -99,7 +100,21 @@ class SSTCollector(StatisticsCollector): self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flag class XSTCollector(StatisticsCollector): - """ Class to process XST statistics packets. """ + """ Class to process XST statistics packets. + + XSTs are received for up to MAX_PARALLEL_SUBBANDS simultaneously, and only the values of the last + MAX_PARALLEL_SUBBANDS are kept. Raw data are collected for each subband in parameters["xst_blocks"], + and overwritten if newer (younger) data is received for the same subband. As such, the data represent + a rolling view on the XSTs. + + The xst_values() function is a user-friendly way to read the xst_blocks. + + The hardware can be configured to emit different and/or fewer subbands, causing some of the XSTs + to become stale. It is therefor advised to inspect parameters["xst_timestamps"] as well. + """ + + # Maximum number of subbands for which we collect XSTs simultaneously + MAX_PARALLEL_SUBBANDS = 8 # Maximum number of antenna inputs we support (used to determine array sizes) MAX_INPUTS = 192 @@ -130,16 +145,33 @@ class XSTCollector(StatisticsCollector): "nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64), # Last value array we've constructed out of the packets - "xst_blocks": numpy.zeros((self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64), + "xst_blocks": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.int64), # Whether the values are actually conjugated and transposed - "xst_conjugated": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.bool_), - "xst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64), - "xst_subbands": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.uint16), - "integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32), + "xst_conjugated": numpy.zeros((self.MAX_PARALLEL_SUBBANDS, self.MAX_BLOCKS,), dtype=numpy.bool_), + # When the youngest data for each subband was received + "xst_timestamps": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float64), + "xst_subbands": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.uint16), + "integration_intervals": numpy.zeros((self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float32), }) return defaults + def select_subband_slot(self, subband): + """ Return which subband slot (0..MAX_PARALLEL_SUBBANDS) to use when confronted with a new subband. + Keep recording the same subband if we're already tracking it, but allocate or replace a slot if not. """ + + indices = numpy.where(self.parameters["xst_subbands"] == subband)[0] + + if len(indices) > 0: + # subband already being recorded, use same spot + return indices[0] + else: + # a new subband, kick out the oldest + oldest_timestamp = self.parameters["xst_timestamps"].min() + + # prefer the first one in case of multiple minima + return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0] + def parse_packet(self, packet): fields = XSTPacket(packet) @@ -172,6 +204,19 @@ class XSTCollector(StatisticsCollector): else: conjugated = False + # we keep track of multiple subbands. select slot for this one + subband_slot = self.select_subband_slot(fields.subband_index) + + assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, f"Selected slot {subband_slot}, but only have room for {self.MAX_PARALLEL_SUBBANDS}. Existing slots are {self.parameters['xst_subbands']}, processing subband {fields.subband_index}." + + # log if we're replacing a subband we were once recording + previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot] + if previous_subband_in_slot != fields.subband_index: + previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot]) + + if previous_subband_timestamp.timestamp() > 0: + logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.") + # the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH # starting at baseline first_baseline. # @@ -185,36 +230,40 @@ class XSTCollector(StatisticsCollector): # process the packet self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1) - self.parameters["xst_blocks"][block_index][:fields.nof_statistics_per_packet] = fields.payload - self.parameters["xst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp()) - self.parameters["xst_conjugated"][block_index] = conjugated - self.parameters["xst_subbands"][block_index] = numpy.uint16(fields.subband_index) - self.parameters["integration_intervals"][block_index] = fields.integration_interval() + self.parameters["xst_blocks"][subband_slot, block_index, :fields.nof_statistics_per_packet] = fields.payload + self.parameters["xst_timestamps"][subband_slot] = numpy.float64(fields.timestamp().timestamp()) + self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated + self.parameters["xst_subbands"][subband_slot] = numpy.uint16(fields.subband_index) + self.parameters["integration_intervals"][subband_slot] = fields.integration_interval() - def xst_values(self): - """ xst_blocks, but as a matrix[MAX_INPUTS][MAX_INPUTS] of complex values. """ + def xst_values(self, subband_indices=range(MAX_PARALLEL_SUBBANDS)): + """ xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] of complex values. + + The subband indices must be in [0..MAX_PARALLEL_SUBBANDS). By default, all recorded XSTs are returned. + """ - matrix = numpy.zeros((self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64) + matrix = numpy.zeros((len(subband_indices), self.MAX_INPUTS, self.MAX_INPUTS), dtype=numpy.complex64) xst_blocks = self.parameters["xst_blocks"] xst_conjugated = self.parameters["xst_conjugated"] - for block_index in range(self.MAX_BLOCKS): - # convert real/imag int to complex float values. this works as real/imag come in pairs - block = xst_blocks[block_index].astype(numpy.float32).view(numpy.complex64) + for subband_index in subband_indices: + for block_index in range(self.MAX_BLOCKS): + # convert real/imag int to complex float values. this works as real/imag come in pairs + block = xst_blocks[subband_index][block_index].astype(numpy.float32).view(numpy.complex64) - if xst_conjugated[block_index]: - # block is conjugated and transposed. process. - block = block.conjugate().transpose() + if xst_conjugated[subband_index][block_index]: + # block is conjugated and transposed. process. + block = block.conjugate().transpose() - # reshape into [a][b] - block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH) + # reshape into [a][b] + block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH) - # compute destination in matrix - first_baseline = baseline_from_index(block_index) - first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH) + # compute destination in matrix + first_baseline = baseline_from_index(block_index) + first_baseline = (first_baseline[0] * self.BLOCK_LENGTH, first_baseline[1] * self.BLOCK_LENGTH) - # copy block into matrix - matrix[first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block + # copy block into matrix + matrix[subband_index][first_baseline[0]:first_baseline[0]+self.BLOCK_LENGTH, first_baseline[1]:first_baseline[1]+self.BLOCK_LENGTH] = block return matrix diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py index 89da8dddcf19dc9195db6bc6c88c61475c61c2f6..ec93b11f0d765aa06e8324babe027b043ae93541 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py @@ -116,33 +116,86 @@ class XST(Statistics): # number of packets with invalid payloads nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64) # latest XSTs - xst_blocks_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_blocks"}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.int64) + xst_blocks_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_blocks", "reshape": True}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.int64) # whether the values in the block are conjugated and transposed - xst_conjugated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_conjugated"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.bool_) - # reported timestamp for each row in the latest XSTs - xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint64) + xst_conjugated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_conjugated", "reshape": True}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.bool_) + # reported timestamp for each subband in the latest XSTs + xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.PARALLEL_SUBBANDS,), datatype=numpy.uint64) # which subband the XSTs describe - xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint16) - # integration interval for each row in the latest XSTs - integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.float32) + xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_PARALLEL_SUBBANDS,), datatype=numpy.uint16) + # integration interval for each subband in the latest XSTs + integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_PARALLEL_SUBBANDS,), datatype=numpy.float32) - # xst_R, but as a matrix of input x input - xst_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) - xst_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) - xst_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) - xst_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),)) + # xst_R, but as a matrix of subband x (input x input) + xst_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),)) + xst_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),)) + xst_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),)) + xst_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_PARALLEL_SUBBANDS, dtype=((numpy.float32,),)) def read_xst_real_R(self): - return numpy.real(self.statistics_client.collector.xst_values()) + return numpy.real(self.statistics_client.collector.xst_values()).reshape(XSTCollector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS) def read_xst_imag_R(self): - return numpy.imag(self.statistics_client.collector.xst_values()) + return numpy.imag(self.statistics_client.collector.xst_values()).reshape(XSTCollector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS) def read_xst_power_R(self): - return numpy.abs(self.statistics_client.collector.xst_values()) + return numpy.abs(self.statistics_client.collector.xst_values()).reshape(XSTCo llector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS) def read_xst_phase_R(self): - return numpy.angle(self.statistics_client.collector.xst_values()) + return numpy.angle(self.statistics_client.collector.xst_values()).reshape(XSTCllector.MAX_PARALLEL_SUBBANDS, XSTCollector.MAX_INPUTS * XSTCollector.MAX_INPUTS) + + # xst_R, but as a matrix of input x input, for each specific subband index + xst_0_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(0)) + xst_0_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(0)) + xst_0_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(0)) + xst_0_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(0)) + + xst_1_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(1)) + xst_1_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(1)) + xst_1_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(1)) + xst_1_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(1)) + + xst_2_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(2)) + xst_2_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(2)) + xst_2_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(2)) + xst_2_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(2)) + + xst_3_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(3)) + xst_3_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(3)) + xst_3_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(3)) + xst_3_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(3)) + + xst_4_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(4)) + xst_4_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(4)) + xst_4_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(4)) + xst_4_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(4)) + + xst_5_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(5)) + xst_5_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(5)) + xst_5_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(5)) + xst_5_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(5)) + + xst_6_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(6)) + xst_6_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(6)) + xst_6_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(6)) + xst_6_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(6)) + + xst_7_real_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_real(7)) + xst_7_imag_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_imag(7)) + xst_7_power_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_power(7)) + xst_7_phase_R = attribute(max_dim_x=XSTCollector.MAX_INPUTS, max_dim_y=XSTCollector.MAX_INPUTS, dtype=((numpy.float32,),), fget = lambda self: self.read_xst_N_phase(7)) + + def read_xst_N_real_R(self, subband_idx): + return numpy.real(self.statistics_client.collector.xst_values(subband_idx)[0]) + + def read_xst_N_imag_R(self, subband_idx): + return numpy.imag(self.statistics_client.collector.xst_values(subband_idx)[0]) + + def read_xst_N_power_R(self, subband_idx): + return numpy.abs(self.statistics_client.collector.xst_values(subband_idx)[0]) + + def read_xst_N_phase_R(self, subband_idx): + return numpy.angle(self.statistics_client.collector.xst_values(subband_idx)[0]) # ---------- # Summarising Attributes diff --git a/tangostationcontrol/tangostationcontrol/devices/unb2.py b/tangostationcontrol/tangostationcontrol/devices/unb2.py index a49b120bc6f3502b7d836b86ca68e8ec0b2df9b8..0b2d5c48256405a83b6de67e6e39103f76d044bd 100644 --- a/tangostationcontrol/tangostationcontrol/devices/unb2.py +++ b/tangostationcontrol/tangostationcontrol/devices/unb2.py @@ -59,8 +59,7 @@ class UNB2(opcua_device): default_value=[True] * 2 ) - first_default_settings = [ - # set the masks first, as those filter any subsequent settings + translator_default_settings = [ 'UNB2_mask_RW' ] diff --git a/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py b/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py index a1286d28be21b7e2e41eb01de5e59cfb66d5484a..1e6015c7437014904492f82038fb8b251fcd103a 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py @@ -10,6 +10,7 @@ from tangostationcontrol.integration_test.base import BaseIntegrationTestCase from tangostationcontrol.toolkit.archiver import * from tangostationcontrol.toolkit.retriever import RetrieverTimescale +from tangostationcontrol.toolkit.archiver_util import attribute_fqdn from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy import time @@ -61,7 +62,7 @@ class TestArchiver(BaseIntegrationTestCase): self.archiver.add_attribute_to_archiver(attr_fullname, polling_period=1000, event_period=3000) time.sleep(3) # Test if the attribute has been correctly added to event subscriber - self.assertTrue(self.archiver.is_attribute_archived(attr_fullname)) + self.assertTrue(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) # Retrieve data from DB views self.retriever = RetrieverTimescale() @@ -78,7 +79,7 @@ class TestArchiver(BaseIntegrationTestCase): self.archiver.remove_attribute_from_archiver(attr_fullname) time.sleep(3) # Test if the attribute has been correctly removed - self.assertFalse(self.archiver.is_attribute_archived(attr_fullname)) + self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) recv_proxy.off() def test_archive_array_attribute(self): @@ -101,7 +102,7 @@ class TestArchiver(BaseIntegrationTestCase): self.archiver.add_attribute_to_archiver(attr_fullname, polling_period=1000, event_period=3000) time.sleep(3) # Test if the attribute has been correctly added to event subscriber - self.assertTrue(self.archiver.is_attribute_archived(attr_fullname)) + self.assertTrue(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) # Retrieve data from DB views self.retriever = RetrieverTimescale() @@ -119,5 +120,5 @@ class TestArchiver(BaseIntegrationTestCase): self.archiver.remove_attribute_from_archiver(attr_fullname) time.sleep(3) # Test if the attribute has been correctly removed - self.assertFalse(self.archiver.is_attribute_archived(attr_fullname)) + self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) sdp_proxy.off() diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py index 30710a871e157909aa9e4d42169ac686fc23e889..d2bdbad1c02eae894ec100b63477a92ce1e84994 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py @@ -7,6 +7,7 @@ import h5py import numpy import logging +from abc import ABC, abstractmethod # import statistics classes with workaround import sys @@ -17,9 +18,10 @@ import tangostationcontrol.devices.sdp.statistics_collector as statistics_collec logger = logging.getLogger("statistics_writer") -__all__ = ["hdf5_writer"] +__all__ = ["hdf5_writer", "parallel_xst_hdf5_writer", "xst_hdf5_writer", "sst_hdf5_writer"] -class hdf5_writer: + +class hdf5_writer(ABC): SST_MODE = "SST" XST_MODE = "XST" @@ -39,18 +41,22 @@ class hdf5_writer: self.statistics_header = None # file handing - self.file_location = file_location + self.file_location = file_location or '.' self.decimation_factor = decimation_factor self.new_file_time_interval = timedelta(seconds=new_file_time_interval) self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) self.file = None # parameters that are configured depending on the mode the statistics writer is in (SST,XST,BST) - self.decoder = None - self.collector = None - self.store_function = None self.mode = statistics_mode.upper() - self.config_mode() + + @abstractmethod + def decoder(self): + pass + + @abstractmethod + def new_collector(self): + pass def next_packet(self, packet): """ @@ -123,7 +129,7 @@ class hdf5_writer: self.start_new_hdf5(timestamp) # create a new and empty current_matrix - self.current_matrix = self.collector() + self.current_matrix = self.new_collector() self.statistics_header = None def write_matrix(self): @@ -136,7 +142,7 @@ class hdf5_writer: current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.isoformat(timespec="milliseconds"))) # store the statistics values for the current group - self.store_function(current_group) + self.write_values_matrix(current_group) # might be optional, but they're easy to add. current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"]) @@ -145,6 +151,10 @@ class hdf5_writer: # get the statistics header header = self.statistics_header + if not header: + # edge case: no valid packet received at all + return + # can't store datetime objects, convert to string instead header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds") @@ -156,17 +166,13 @@ class hdf5_writer: else: current_group.attrs[k] = v - def write_sst_matrix(self, current_group): - # store the SST values - current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip") - - def write_xst_matrix(self, current_group): - # requires a function call to transform the xst_blocks in to the right structure - current_group.create_dataset(name="values", data=self.current_matrix.xst_values().astype(numpy.cfloat), compression="gzip") - - def write_bst_matrix(self, current_group): - raise NotImplementedError("BST values not implemented") + @abstractmethod + def write_values_matrix(self, current_group): + pass + def next_filename(self, timestamp, suffix=".h5"): + time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) + return f"{self.file_location}/{self.mode}_{time_str}{suffix}" def process_packet(self, packet): """ @@ -186,44 +192,17 @@ class hdf5_writer: except Exception as e: logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.") - current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) - logger.info(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5") + filename = self.next_filename(timestamp) + logger.info(f"creating new file: {filename}") try: - self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w') + self.file = h5py.File(filename, 'w') except Exception as e: logger.exception(f"Error while creating new file") raise e self.last_file_time = timestamp - def config_mode(self): - logger.debug(f"attempting to configure {self.mode} mode") - - """ - Configures the object for the correct statistics type to be used. - decoder: the class to decode a single packet - collector: the class to collect statistics packets - store_function: the function to write the mode specific data to file - """ - - if self.mode == self.SST_MODE: - self.decoder = SSTPacket - self.collector = statistics_collector.SSTCollector - self.store_function = self.write_sst_matrix - - elif self.mode == self.XST_MODE: - self.decoder = XSTPacket - self.collector = statistics_collector.XSTCollector - self.store_function = self.write_xst_matrix - - elif self.mode == self.BST_MODE: - self.store_function = self.write_bst_matrix - raise NotImplementedError("BST collector has not yet been implemented") - - else: - raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode)) - def close_writer(self): """ Function that can be used to stop the writer without data loss. @@ -240,3 +219,79 @@ class hdf5_writer: self.file.close() logger.debug(f"{filename} closed") logger.debug(f"Received a total of {self.statistics_counter} statistics while running. With {int(self.statistics_counter/self.decimation_factor)} written to disk ") + + +class sst_hdf5_writer(hdf5_writer): + def __init__(self, new_file_time_interval, file_location, decimation_factor): + super().__init__(new_file_time_interval, file_location, hdf5_writer.SST_MODE, decimation_factor) + + def decoder(self, packet): + return SSTPacket(packet) + + def new_collector(self): + return statistics_collector.SSTCollector() + + def write_values_matrix(self, current_group): + # store the SST values + current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip") + + +class xst_hdf5_writer(hdf5_writer): + def __init__(self, new_file_time_interval, file_location, decimation_factor, subband_index): + super().__init__(new_file_time_interval, file_location, hdf5_writer.XST_MODE, decimation_factor) + self.subband_index = subband_index + + def decoder(self, packet): + return XSTPacket(packet) + + def new_collector(self): + return statistics_collector.XSTCollector() + + def next_filename(self, timestamp): + time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) + return f"{self.file_location}/{self.mode}_SB{self.subband_index}_{time_str}.h5" + + def write_values_matrix(self, current_group): + # requires a function call to transform the xst_blocks in to the right structure + current_group.create_dataset(name="values", data=self.current_matrix.xst_values([self.subband_index])[0].astype(numpy.cfloat), compression="gzip") + + +class parallel_xst_hdf5_writer: + """ Writes multiple subbands in parallel. Each subband gets written to its own HDF5 file(s). """ + + def __init__(self, new_file_time_interval, file_location, decimation_factor): + # maintain a dedicated hdf5_writer per subband + self.writers = {} + + # function to create a new writer, to avoid having to store + # all the init parameters just for this purpose. + # + def new_writer(subband): + # Since we use a dedicated writer per subband, the data will end + # up at subband_index == 0 in each of them. + return xst_hdf5_writer( + new_file_time_interval, + file_location, + decimation_factor, + 0) + + self.new_writer = new_writer + + def next_packet(self, packet): + # decode to get subband of this packet + fields = XSTPacket(packet) + subband = fields.subband_index + + # make sure there is a writer for it + if subband not in self.writers: + self.writers[subband] = self.new_writer(subband) + + # demux packet to the correct writer + self.writers[subband].next_packet(packet) + + def close_writer(self): + for writer in self.writers.values(): + writer.close_writer() + + self.writers = {} + diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py index 1a1ecb671159e1b3ca143ecbf860000d6cdbe0c5..52747fff71d436d62bcbe40844aa0a3a45a2ab25 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py @@ -3,7 +3,7 @@ import time import sys from tangostationcontrol.statistics_writer.receiver import tcp_receiver, file_receiver -from tangostationcontrol.statistics_writer.hdf5_writer import hdf5_writer +from tangostationcontrol.statistics_writer.hdf5_writer import sst_hdf5_writer, parallel_xst_hdf5_writer import logging logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s') @@ -13,13 +13,13 @@ def main(): parser = argparse.ArgumentParser( description='Converts a stream of statistics packets into HDF5 files.') parser.add_argument( - '-a', '--host', type=str, required=True, help='the host to connect to') + '-a', '--host', type=str, required=False, help='the host to connect to') parser.add_argument( '-p', '--port', type=int, default=0, help='the port to connect to, or 0 to use default port for the ' 'selected mode (default: %(default)s)') parser.add_argument( - '-f', '--file', type=str, required=True, help='the file to read from') + '-f', '--file', type=str, required=False, help='the file to read from') parser.add_argument( '-m', '--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', help='sets the statistics type to be decoded options (default: ' @@ -57,6 +57,9 @@ def main(): debug = args.debug reconnect = args.reconnect + if not filename and not host: + raise ValueError("Supply either a filename (--file) or a hostname (--host)") + if decimation < 1: raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ") @@ -78,7 +81,16 @@ def main(): sys.exit(1) # create the writer - writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode, decimation_factor=decimation) + if mode == "XST": + writer = parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + elif mode == "SST": + writer = sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + elif mode == "BST": + logger.fatal(f"BST mode not supported") + sys.exit(1) + else: + logger.fatal(f"Invalid mode: {mode}") + sys.exit(1) # start looping try: diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py index 4b58141c06d9b09d68dba295b007b41081ca3618..7113ee837631789e99218f3356a602637ccac116 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py @@ -3,6 +3,45 @@ from tangostationcontrol.devices.sdp.statistics_packet import XSTPacket from tangostationcontrol.test import base +class TestSelectSubbandSlot(base.TestCase): + def test_first_entry(self): + collector = XSTCollector() + + # on start, any subband should map on the first entry + self.assertEqual(0, collector.select_subband_slot(102)) + + def test_subsequent_entries(self): + collector = XSTCollector() + + # assign some subbands + collector.parameters["xst_subbands"][0] = 102 + collector.parameters["xst_subbands"][2] = 103 + collector.parameters["xst_subbands"][3] = 104 + + # give them non-zero timestamps to make them newer than the other entries + collector.parameters["xst_timestamps"][0] = 1 + collector.parameters["xst_timestamps"][2] = 1 + collector.parameters["xst_timestamps"][3] = 1 + + # these should be reported back when looking them up again + self.assertEqual(0, collector.select_subband_slot(102)) + self.assertEqual(2, collector.select_subband_slot(103)) + self.assertEqual(3, collector.select_subband_slot(104)) + + # a place for another subband should be the lowest + self.assertEqual(1, collector.select_subband_slot(101)) + + def test_spilling(self): + collector = XSTCollector() + + # assign all subbands, in decreasing age + for n in range(XSTCollector.MAX_PARALLEL_SUBBANDS): + collector.parameters["xst_subbands"][n] = 100 + n + collector.parameters["xst_timestamps"][n] = 100 - n + + # check where a new subband replaces the oldest + self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200)) + class TestXSTCollector(base.TestCase): def test_valid_packet(self): collector = XSTCollector() @@ -17,6 +56,9 @@ class TestXSTCollector(base.TestCase): # baseline indeed should be (12,0) self.assertEqual((12,0), fields.first_baseline) + # subband should indeed be 102 + self.assertEqual(102, fields.subband_index) + # this should not throw collector.process_packet(packet) @@ -27,8 +69,10 @@ class TestXSTCollector(base.TestCase): self.assertEqual(1, collector.parameters["nof_valid_payloads"][fpga_index]) self.assertEqual(0, collector.parameters["nof_payload_errors"][fpga_index]) + self.assertListEqual([102,0,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) + # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values() + xst_values = collector.xst_values()[0] for baseline_a in range(collector.MAX_INPUTS): for baseline_b in range(collector.MAX_INPUTS): @@ -67,7 +111,7 @@ class TestXSTCollector(base.TestCase): self.assertEqual(0, collector.parameters["nof_invalid_packets"]) # check whether the data ended up in the right block, and the rest is still zero - xst_values = collector.xst_values() + xst_values = collector.xst_values()[0] for baseline_a in range(collector.MAX_INPUTS): for baseline_b in range(collector.MAX_INPUTS): @@ -84,6 +128,48 @@ class TestXSTCollector(base.TestCase): else: self.assertEqual(0+0j, xst_values[baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') + def test_multiple_subbands(self): + collector = XSTCollector() + + # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) + packet_subband_102 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x01' + packet_subband_103 = b'X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00g\x0c\x00\x0c\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3' + 288 * b'\x00\x00\x00\x00\x00\x00\x00\x02' + + # make sure the subband_indices are indeed what we claim they are + fields = XSTPacket(packet_subband_102) + self.assertEqual(102, fields.subband_index) + + fields = XSTPacket(packet_subband_103) + self.assertEqual(103, fields.subband_index) + + # process our packets + collector.process_packet(packet_subband_102) + collector.process_packet(packet_subband_103) + + # counters should now be updated + self.assertListEqual([102,103,0,0,0,0,0,0], list(collector.parameters["xst_subbands"])) + + # check whether the data ended up in the right block, and the rest is still zero + xst_values = collector.xst_values() + + for subband_idx in range(collector.MAX_PARALLEL_SUBBANDS): + for baseline_a in range(collector.MAX_INPUTS): + for baseline_b in range(collector.MAX_INPUTS): + if baseline_b > baseline_a: + # only scan top-left triangle + continue + + baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) + baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) + + if baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 0: + self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') + elif baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 1: + self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') + else: + self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') + + def test_invalid_packet(self): collector = XSTCollector() diff --git a/tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py b/tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..4b10230c8fca8c0773d085c6e42719b74dd7b2c3 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/test/test_statistics_writer.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from tangostationcontrol.test import base +from tangostationcontrol.statistics_writer import statistics_writer +import sys +from os.path import dirname +from tempfile import TemporaryDirectory +from unittest import mock + +class TestStatisticsWriter(base.TestCase): + def test_sst(self): + with TemporaryDirectory() as tmpdir: + new_sys_argv = [sys.argv[0], "--mode", "SST", "--file", dirname(__file__) + "/SDP_SST_statistics_packets.bin", "--output_dir", tmpdir] + with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + with self.assertRaises(SystemExit): + statistics_writer.main() + + def test_xst(self): + with TemporaryDirectory() as tmpdir: + new_sys_argv = [sys.argv[0], "--mode", "XST", "--file", dirname(__file__) + "/SDP_XST_statistics_packets.bin", "--output_dir", tmpdir] + with mock.patch.object(statistics_writer.sys, 'argv', new_sys_argv): + with self.assertRaises(SystemExit): + statistics_writer.main() diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py index 4f5e43530af284d6fd51e1a99538b5bbeba2b3fa..eae77f0abee704eaf406d4456fd0c0e4e4297d2d 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py @@ -3,7 +3,7 @@ import logging from tango import DeviceProxy, AttributeProxy, DevState, DevFailed -from tangostationcontrol.toolkit.archiver_util import get_db_config, attribute_name_from_url, device_name_url +from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn import time import json @@ -79,7 +79,7 @@ class Archiver(): """ es_list = self.get_subscribers() # Only one subscriber in ConfManager list - if len(es_list)==1: + if len(es_list) == 1: return es_list[0] else : # Choose the best subscriber analysing their load @@ -114,7 +114,7 @@ class Archiver(): self.remove_attributes_by_device(device, exclude=include_att_list) # Include attributes by custom configuration for att in include_att_list: - att_fqname = f"{device}/{att}".lower() + att_fqname = attribute_fqdn(f"{device}/{att}") self.add_attribute_to_archiver(att_fqname,self.dev_polling_time,self.dev_archive_time) elif dev_env == 'production': # PROD environment -> all attributes are included by default exclude_att_list = env_dict[device].get('exclude', []) @@ -123,7 +123,7 @@ class Archiver(): # The following cycle is a security check in the special case that an attribute is in the # included list in DEV mode, and in the excluded list in PROD mode for att in exclude_att_list: - att_fqname = f"{device}/{att}".lower() + att_fqname = attribute_fqdn(f"{device}/{att}") self.remove_attribute_from_archiver(att_fqname) except Exception as e: if 'API_DeviceNotExported' in str(e): # ignore if device is offline @@ -148,7 +148,7 @@ class Archiver(): es = DeviceProxy(es_name) if es.state() == DevState.FAULT: raise Exception(f"Event Subscriber {es_name} is in FAULT state") - self.cm.ArchiverAdd(device_name_url(es_name)) + self.cm.ArchiverAdd(device_fqdn(es_name)) except DevFailed as e: if e.args[0].reason == "Archiver already present": logger.warning(f"Event Subscriber {es_name} already present in Configuration Manager") @@ -162,7 +162,7 @@ class Archiver(): The ConfigurationManager and EventSubscriber devices must be already up and running. The archiving-DBMS must be already properly configured. """ - attribute_name = attribute_name_from_url(attribute_name) + attribute_name = attribute_fqdn(attribute_name) try: self.cm.write_attribute('SetAttributeName', attribute_name) self.cm.write_attribute('SetArchiver', es_name or self.get_next_subscriber()) @@ -177,16 +177,16 @@ class Archiver(): else: raise - def add_attributes_by_device(self,device_name,global_archive_period:int = None, es_name:str=None, exclude:list = []): + def add_attributes_by_device(self, device_name, global_archive_period:int = None, es_name:str=None, exclude:list = []): """ Add sequentially all the attributes of the selected device in the event subscriber list, if not already present """ - d = DeviceProxy(device_name) + d = DeviceProxy(device_fqdn(device_name)) dev_attrs_list = d.get_attribute_list() exclude_list = [a.lower() for a in exclude] attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison for a in attrs_list: - attr_fullname = f"{device_name}/{a}".lower() + attr_fullname = attribute_fqdn(f"{device_name}/{a}") attr_proxy = AttributeProxy(attr_fullname) if attr_proxy.is_polled() and not self.is_attribute_archived(attr_fullname): # if not polled attribute is also not archived try: @@ -195,7 +195,6 @@ class Archiver(): archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_time self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period, event_period=archive_period, es_name = es.name()) - #time.sleep(0.5) except IndexError as e: logger.warning(f"Attribute {attr_fullname} will not be archived because archive event period is not defined!") except Exception as e: @@ -208,30 +207,29 @@ class Archiver(): """ Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list. """ - attribute_name = attribute_name_from_url(attribute_name) - + attribute_name = attribute_fqdn(attribute_name) self.cm.AttributeStop(attribute_name) self.cm.AttributeRemove(attribute_name) logger.warning(f"Attribute {attribute_name} removed!") - def remove_attributes_by_device(self,device_name:str,exclude:list=[]): + def remove_attributes_by_device(self, device_name:str, exclude:list=[]): """ Stops the data archiving of all the attributes of the selected device, and remove them from the subscriber's list """ - d = DeviceProxy(device_name) - dev_attrs_list = d.get_attribute_list() + d = DeviceProxy(device_fqdn(device_name)) + dev_attrs_list = d.get_attribute_list() # this list contains only the attribute names (not fqdn) exclude_list = [a.lower() for a in exclude] attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison for a in attrs_list: try: - attr_fullname = f"{device_name}/{a}".lower() + attr_fullname = attribute_fqdn(f"{device_name}/{a}") if self.is_attribute_archived(attr_fullname): self.remove_attribute_from_archiver(attr_fullname) except Exception as e: raise Exception from e - def remove_attributes_in_error(self,exclude:list=[],es_name:str=None): + def remove_attributes_in_error(self, exclude:list=[], es_name:str=None): """ Remove from the subscribers list all the attributes currently in error (not being archived) """ @@ -242,11 +240,10 @@ class Archiver(): for es_name in es_list: es = DeviceProxy(es_name) attributes_nok = es.AttributeNokList or [] - exclude_list = [a.lower() for a in exclude] + exclude_list = [attribute_fqdn(a.lower()) for a in exclude] attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list] for a in attrs_list: - attr_fullname = attribute_name_from_url(a) - self.remove_attribute_from_archiver(attr_fullname) + self.remove_attribute_from_archiver(a) @warn_if_attribute_not_found() def start_archiving_attribute(self, attribute_name:str): @@ -254,8 +251,6 @@ class Archiver(): Starts the archiving of the attribute passed as input. The attribute must be already present in the subscriber's list """ - attribute_name = attribute_name_from_url(attribute_name) - self.cm.AttributeStart(attribute_name) @warn_if_attribute_not_found() @@ -264,25 +259,24 @@ class Archiver(): Stops the archiving of the attribute passed as input. The attribute must be already present in the subscriber's list """ - attribute_name = attribute_name_from_url(attribute_name) - self.cm.AttributeStop(attribute_name) def is_attribute_archived(self,attribute_name:str): """ Check if an attribute is in the archiving list """ - attribute_name = attribute_name_from_url(attribute_name) - attributes = self.cm.AttributeSearch(attribute_name.lower()) + attribute_name = attribute_fqdn(attribute_name) + attributes = self.cm.AttributeSearch(attribute_name) # search returns all matches in which attribute_name is part of the name, # so check whether an exact match is included. - return any(attribute_name.lower() in a for a in attributes) + return any(attribute_name == a for a in attributes) def update_archiving_attribute(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN'): """ Update the archiving properties of an attribute already in a subscriber list """ + attribute_name = attribute_fqdn(attribute_name) self.remove_attribute_from_archiver(attribute_name) time.sleep(3.) self.add_attribute_to_archiver(attribute_name,polling_period,event_period,strategy) @@ -322,10 +316,10 @@ class Archiver(): """ Return the error related to the attribute """ - attribute_name = attribute_name_from_url(attribute_name) + attribute_name = attribute_fqdn(attribute_name) errs_dict = self.get_subscriber_errors() for e in errs_dict: - if attribute_name in e: + if attribute_name == e: return errs_dict[e] return None @@ -344,27 +338,31 @@ class Archiver(): """ Given an attribute name, return the event subscriber associated with it """ - attribute_name = attribute_name_from_url(attribute_name) - # If the ConfManager manages more than one subscriber - if len(self.get_subscribers())>1: - for es_name in self.get_subscribers(): - # Search the attribute in the subscriber list (search substring because of the Tango naming conventions) - for a in list(DeviceProxy(es_name).AttributeList or []): - if attribute_name.lower() in a: - return es_name + attribute_name = attribute_fqdn(attribute_name) + # Check if attribute is archived + if self.is_attribute_archived(attribute_name): + # If the ConfManager manages more than one subscriber + if len(self.get_subscribers())>1: + for es_name in self.get_subscribers(): + # Search the attribute in the subscriber list + for a in list(DeviceProxy(es_name).AttributeList or []): + if attribute_name.lower() == a: + return es_name + else: + return self.get_next_subscriber() else: - return self.get_next_subscriber() + logger.warning(f"Attribute {attribute_name} not found!") def get_attribute_freq(self,attribute_name:str): """ Return the attribute archiving frequency in events/minute """ - attribute_name = attribute_name_from_url(attribute_name) + attribute_name = attribute_fqdn(attribute_name) if self.is_attribute_archived(attribute_name): es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) freq_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeRecordFreqList)} for f in freq_dict: - if attribute_name.lower() in f: + if attribute_name.lower() == f: return freq_dict[f] else: logger.warning(f"Attribute {attribute_name} not found!") @@ -373,12 +371,12 @@ class Archiver(): """ Return the attribute failure archiving frequency in events/minute """ - attribute_name = attribute_name_from_url(attribute_name) + attribute_name = attribute_fqdn(attribute_name) if self.is_attribute_archived(attribute_name): es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) fail_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeFailureFreqList)} for f in fail_dict: - if attribute_name.lower() in f: + if attribute_name.lower() == f: return fail_dict[f] else: logger.warning(f"Attribute {attribute_name} not found!") diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py index 7d808edc3e0fbe030b9bb4728a81abf93e56d73c..211fbd33286148b4615764f42b76d7fee1b2f010 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py @@ -18,7 +18,7 @@ def get_db_config(device_name:str) -> dict: config = dict(config_str.split("=",1) for config_str in config_strs) return config -def attribute_name_from_url(attribute_name:str): +def get_attribute_from_fqdn(attribute_name:str): """ For some operations Tango attribute must be transformed from the form 'tango://db:port/domain/family/name/attribute' to canonical 'domain/family/name/attribute' @@ -31,35 +31,35 @@ def attribute_name_from_url(attribute_name:str): return attribute_name -def device_name_url(device_name:str, tango_host:str = 'databaseds:10000'): +def device_fqdn(device_name:str, tango_host:str = 'databaseds:10000'): """ For some operations Tango devices must be transformed from the form 'domain/family/name' to 'tango://db:port/domain/family/name' """ if device_name.startswith('tango://'): - return device_name + return device_name.lower() if len(device_name.split('/')) != 3: raise ValueError(f"Expected device name of format 'domain/family/name', got {device_name}") - return f"tango://{tango_host}/{device_name}" + return f"tango://{tango_host}/{device_name}".lower() -def attribute_name_url(attribute_name:str, tango_host:str = 'databaseds:10000'): +def attribute_fqdn(attribute_name:str, tango_host:str = 'databaseds:10000'): """ For some operations Tango devices must be transformed from the form 'domain/family/name/attribute' to 'tango://db:port/domain/family/name/attribute' """ if attribute_name.startswith('tango://'): - return attribute_name + return attribute_name.lower() if len(attribute_name.split('/')) != 4: raise ValueError(f"Expected attribute name of format 'domain/family/name/attribute', got {attribute_name}") - return f"tango://{tango_host}/{attribute_name}" + return f"tango://{tango_host}/{attribute_name}".lower() def split_tango_name(tango_fqname:str, tango_type:str): """ - Helper function to split device or attribute Tango full qualified names + Helper function to split device or attribute Tango full qualified domain names into its components """ if tango_type.lower() == 'device':