diff --git a/tangostationcontrol/tangostationcontrol/clients/tcp_replicator.py b/tangostationcontrol/tangostationcontrol/clients/tcp_replicator.py index f49e7fce2ef814e5a44fc4d58b40e5534d4271cd..5b32466eabe8690936cae768814841a1f7f1f1b0 100644 --- a/tangostationcontrol/tangostationcontrol/clients/tcp_replicator.py +++ b/tangostationcontrol/tangostationcontrol/clients/tcp_replicator.py @@ -47,7 +47,7 @@ class TCPReplicator(Thread, StatisticsClientThread): """Default options for TCPReplicator we kindly ask to not change this static variable at runtime. """ - _default_options = { + _DEFAULT_OPTIONS = { "tcp_bind": '0.0.0.0', "tcp_port": 6666, "tcp_buffer_size": 128000000, # In bytes @@ -108,7 +108,7 @@ class TCPReplicator(Thread, StatisticsClientThread): @property def _options(self) -> dict: - return TCPReplicator._default_options + return TCPReplicator._DEFAULT_OPTIONS class TCPServerProtocol(asyncio.Protocol): """TCP protocol used for connected clients""" diff --git a/tangostationcontrol/tangostationcontrol/clients/udp_receiver.py b/tangostationcontrol/tangostationcontrol/clients/udp_receiver.py index 2bda038a1fb0646af2d2e14082e10ca3d7866816..9f90782106602aab556061c2cfec19caef5d7c87 100644 --- a/tangostationcontrol/tangostationcontrol/clients/udp_receiver.py +++ b/tangostationcontrol/tangostationcontrol/clients/udp_receiver.py @@ -18,7 +18,7 @@ class UDPReceiver(Thread, StatisticsClientThread): """ # Default options for UDPReceiver - _default_options = { + _DEFAULT_OPTIONS = { "udp_host": None, "udp_port": None, "poll_timeout": 0.1, @@ -81,7 +81,7 @@ class UDPReceiver(Thread, StatisticsClientThread): @property def _options(self) -> dict: - return UDPReceiver._default_options + return UDPReceiver._DEFAULT_OPTIONS def run(self): # all variables are manually defined and are updated each time diff --git a/tangostationcontrol/tangostationcontrol/common/lofar_logging.py b/tangostationcontrol/tangostationcontrol/common/lofar_logging.py index 277ab60b5634982bf7a9880e96b42ba5c5d71aa0..bc613361db7d78cc65e636f8e1de262b0b7de828 100644 --- a/tangostationcontrol/tangostationcontrol/common/lofar_logging.py +++ b/tangostationcontrol/tangostationcontrol/common/lofar_logging.py @@ -8,7 +8,7 @@ import time from .lofar_version import get_version class TangoLoggingHandler(logging.Handler): - level_to_device_stream = { + LEVEL_TO_DEVICE_STREAM = { logging.DEBUG: Device.debug_stream, logging.INFO: Device.info_stream, logging.WARN: Device.warn_stream, @@ -26,7 +26,7 @@ class TangoLoggingHandler(logging.Handler): return # determine which log stream to use - stream = self.level_to_device_stream[record.levelno] + stream = self.LEVEL_TO_DEVICE_STREAM[record.levelno] # send the log message to Tango try: diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index a7d76bc536cf8c67d470d464bacd3929356c1a88..1ff328f783a64b4b6762495d27c0c874d26815ac 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -78,10 +78,10 @@ class lofar_device(Device, metaclass=DeviceMeta): version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) # list of translator property names to be set by set_translator_defaults - translator_default_settings = [] + TRANSLATOR_DEFAULT_SETTINGS = [] # list of hardware property names to be set first by set_defaults - first_default_settings = [] + FIRST_DEFAULT_SETTINGS = [] @classmethod def attr_list(cls): @@ -299,9 +299,9 @@ class lofar_device(Device, metaclass=DeviceMeta): XXX_default can be f.e. a constant, or a device_property. The points are set in the following order: - 1) The python class property 'first_default_settings' is read, as an array of strings denoting property names. Each property + 1) The python class property 'FIRST_DEFAULT_SETTINGS' is read, as an array of strings denoting property names. Each property is set in that order. - 2) Any remaining default properties are set, except the translators (those in 'translator_default_settings'). + 2) Any remaining default properties are set, except the translators (those in 'TRANSLATOR_DEFAULT_SETTINGS'). """ # collect all attributes for which defaults are provided @@ -310,10 +310,10 @@ class lofar_device(Device, metaclass=DeviceMeta): if isinstance(getattr(self, name), Attribute) # with a default set and hasattr(self, f"{name}_default") - and name not in self.translator_default_settings] + and name not in self.TRANSLATOR_DEFAULT_SETTINGS] # determine the order: first do the ones mentioned in default_settings_order - attributes_to_set = self.first_default_settings + [name for name in attributes_with_defaults if name not in self.first_default_settings] + attributes_to_set = self.FIRST_DEFAULT_SETTINGS + [name for name in attributes_with_defaults if name not in self.FIRST_DEFAULT_SETTINGS] # set them self._set_defaults(attributes_to_set) @@ -387,7 +387,7 @@ class lofar_device(Device, metaclass=DeviceMeta): def _set_translator_defaults(self): """ Initialise any translators to their default settings. """ - self._set_defaults(self.translator_default_settings) + self._set_defaults(self.TRANSLATOR_DEFAULT_SETTINGS) def _prepare_hardware(self): """ Override this method to load any firmware before configuring the hardware. """ diff --git a/tangostationcontrol/tangostationcontrol/devices/recv.py b/tangostationcontrol/tangostationcontrol/devices/recv.py index 4914f73134a60e746c1e38047fcae7964f423987..96da333820f69238ad34e2159752fac2cf0263e4 100644 --- a/tangostationcontrol/tangostationcontrol/devices/recv.py +++ b/tangostationcontrol/tangostationcontrol/devices/recv.py @@ -81,7 +81,7 @@ class RECV(opcua_device): default_value=1 ) - translator_default_settings = [ + TRANSLATOR_DEFAULT_SETTINGS = [ 'ANT_mask_RW', 'RCU_mask_RW' ] diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py index 5d6e8412ba81f12f72db4a0d1edabf329e5c3fa8..9b72ce655249770b84b99a9b43ce2777f84f785e 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py @@ -30,13 +30,13 @@ logger = logging.getLogger() class Beamlet(opcua_device): # List of OPC-UA CP for BF beamlets - S_pn = SDP.S_pn - N_pn = SDP.N_pn - A_pn = 6 - N_pol = 2 - N_beamlets_ctrl = 488 - N_beamsets_ctrl = 2 - N_pol_bf = 2 + S_PN = SDP.S_pn + N_PN = SDP.N_pn + A_PN = 6 + N_POL = 2 + N_BEAMLETS_CTRL = 488 + N_BEAMSETS_CTRL = 2 + N_POL_BF = 2 # ----------------- # Device Properties @@ -72,16 +72,16 @@ class Beamlet(opcua_device): FPGA_bf_weights_xy_yx_RW_default = device_property( dtype='DevVarULongArray', mandatory=False, - default_value = [[0] * A_pn * N_pol * N_beamlets_ctrl] * N_pn + default_value = [[0] * A_PN * N_POL * N_BEAMLETS_CTRL] * N_PN ) subband_select_RW_default = device_property( dtype='DevVarULongArray', mandatory=False, - default_value = [102] * N_beamlets_ctrl + default_value = [102] * N_BEAMLETS_CTRL ) - first_default_settings = [ + FIRST_DEFAULT_SETTINGS = [ 'FPGA_beamlet_output_hdr_eth_destination_mac_RW', 'FPGA_beamlet_output_hdr_ip_destination_address_RW', 'FPGA_beamlet_output_hdr_udp_destination_port_RW', @@ -103,65 +103,65 @@ class Beamlet(opcua_device): FPGA_beamlet_output_hdr_udp_destination_port_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_beamlet_output_scale_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_scale_R"], datatype=numpy.double, dims=(16,)) FPGA_beamlet_output_scale_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_scale_RW"], datatype=numpy.double, dims=(16,), access=AttrWriteType.READ_WRITE) - FPGA_beamlet_output_bsn_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_bsn_R"], datatype=numpy.int64, dims=(N_pn, N_beamsets_ctrl)) + FPGA_beamlet_output_bsn_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_bsn_R"], datatype=numpy.int64, dims=(N_PN, N_BEAMSETS_CTRL)) - FPGA_beamlet_output_nof_packets_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_nof_packets_R"], datatype=numpy.int32, dims=(N_beamsets_ctrl, N_pn)) - FPGA_beamlet_output_nof_valid_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_nof_valid_R"], datatype=numpy.int32, dims=(N_beamsets_ctrl, N_pn)) + FPGA_beamlet_output_nof_packets_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_nof_packets_R"], datatype=numpy.int32, dims=(N_BEAMSETS_CTRL, N_PN)) + FPGA_beamlet_output_nof_valid_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_nof_valid_R"], datatype=numpy.int32, dims=(N_BEAMSETS_CTRL, N_PN)) - # boolean[N_pn][N_beamsets_ctrl] - FPGA_beamlet_output_ready_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_ready_R"], datatype=bool, dims=(N_beamsets_ctrl, N_pn)) - # boolean[N_pn][N_beamsets_ctrl] - FPGA_beamlet_output_xon_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_xon_R"], datatype=bool, dims=(N_beamsets_ctrl, N_pn)) + # boolean[N_PN][N_BEAMSETS_CTRL] + FPGA_beamlet_output_ready_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_ready_R"], datatype=bool, dims=(N_BEAMSETS_CTRL, N_PN)) + # boolean[N_PN][N_BEAMSETS_CTRL] + FPGA_beamlet_output_xon_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_output_xon_R"], datatype=bool, dims=(N_BEAMSETS_CTRL, N_PN)) - # uint16[N_pn][A_pn][N_pol][N_beamlets_ctrl] + # uint16[N_PN][A_PN][N_POL][N_BEAMLETS_CTRL] # Select subband per dual-polarisation beamlet. # 0 for antenna polarization X in beamlet polarization X, # 1 for antenna polarization Y in beamlet polarization Y. - FPGA_beamlet_subband_select_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_R"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn)) - FPGA_beamlet_subband_select_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_RW"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + FPGA_beamlet_subband_select_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_R"], datatype=numpy.uint32, dims=(A_PN * N_POL * N_BEAMLETS_CTRL, N_PN)) + FPGA_beamlet_subband_select_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_RW"], datatype=numpy.uint32, dims=(A_PN * N_POL * N_BEAMLETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) - # uint32[N_pn][N_beamset_ctrl] - FPGA_bf_ring_nof_transport_hops_R = attribute_wrapper(comms_annotation=["FPGA_bf_ring_nof_transport_hops_R"], datatype=numpy.uint32, dims=(N_beamsets_ctrl, N_pn)) - FPGA_bf_ring_nof_transport_hops_RW = attribute_wrapper(comms_annotation=["FPGA_bf_ring_nof_transport_hops_RW"], datatype=numpy.uint32, dims=(N_beamsets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + # uint32[N_PN][N_beamset_ctrl] + FPGA_bf_ring_nof_transport_hops_R = attribute_wrapper(comms_annotation=["FPGA_bf_ring_nof_transport_hops_R"], datatype=numpy.uint32, dims=(N_BEAMSETS_CTRL, N_PN)) + FPGA_bf_ring_nof_transport_hops_RW = attribute_wrapper(comms_annotation=["FPGA_bf_ring_nof_transport_hops_RW"], datatype=numpy.uint32, dims=(N_BEAMSETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) - # cint16[N_pn][A_pn][N_pol][N_beamlets_ctrl] - # Co-polarization BF weights. The N_pol = 2 parameter index is: + # cint16[N_PN][A_PN][N_POL][N_BEAMLETS_CTRL] + # Co-polarization BF weights. The N_POL = 2 parameter index is: # 0 for antenna polarization X in beamlet polarization X, # 1 for antenna polarization Y in beamlet polarization Y. - FPGA_bf_weights_xx_yy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_yy_R"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn)) - FPGA_bf_weights_xx_yy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_yy_RW"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + FPGA_bf_weights_xx_yy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_yy_R"], datatype=numpy.uint32, dims=(A_PN * N_POL * N_BEAMLETS_CTRL, N_PN)) + FPGA_bf_weights_xx_yy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_yy_RW"], datatype=numpy.uint32, dims=(A_PN * N_POL * N_BEAMLETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) - # cint16[N_pn][A_pn][N_pol][N_beamlets_ctrl] - # Cross-polarization BF weights. The N_pol = 2 parameter index is (note that index pol in range 0:N_pol-1 is the antenna polarization, so index !pol is the beamlet polarization): + # cint16[N_PN][A_PN][N_POL][N_BEAMLETS_CTRL] + # Cross-polarization BF weights. The N_POL = 2 parameter index is (note that index pol in range 0:N_POL-1 is the antenna polarization, so index !pol is the beamlet polarization): # 0 for antenna polarization X in beamlet polarization Y, # 1 for antenna polarization Y in beamlet polarization X. - FPGA_bf_weights_xy_yx_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xy_yx_R"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn)) - FPGA_bf_weights_xy_yx_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xy_yx_RW"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + FPGA_bf_weights_xy_yx_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xy_yx_R"], datatype=numpy.uint32, dims=(A_PN * N_POL * N_BEAMLETS_CTRL, N_PN)) + FPGA_bf_weights_xy_yx_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xy_yx_RW"], datatype=numpy.uint32, dims=(A_PN * N_POL * N_BEAMLETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) - # cint16[N_pn][N_pol_bf][A_pn][N_pol][N_beamlets_ctrl] + # cint16[N_PN][N_POL_BF][A_PN][N_POL][N_BEAMLETS_CTRL] # Full Jones matrix of BF weights. - FPGA_bf_weights_xx_xy_yx_yy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_xy_yx_yy_R"], datatype=numpy.uint32, dims=(N_pol_bf * A_pn * N_pol * N_beamlets_ctrl, N_pn)) - FPGA_bf_weights_xx_xy_yx_yy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_xy_yx_yy_RW"], datatype=numpy.uint32, dims=(N_pol_bf * A_pn * N_pol * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + FPGA_bf_weights_xx_xy_yx_yy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_xy_yx_yy_R"], datatype=numpy.uint32, dims=(N_POL_BF * A_PN * N_POL * N_BEAMLETS_CTRL, N_PN)) + FPGA_bf_weights_xx_xy_yx_yy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_xy_yx_yy_RW"], datatype=numpy.uint32, dims=(N_POL_BF * A_PN * N_POL * N_BEAMLETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) - # cint16[N_pn][A_pn][N_beamlets_ctrl] + # cint16[N_PN][A_PN][N_BEAMLETS_CTRL] # BF weights for separate access to respectively w_xx, w_xy, w_yx, and w_yy. - FPGA_bf_weights_xx_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_R"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn)) - FPGA_bf_weights_xx_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_RW"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) - FPGA_bf_weights_xy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xy_R"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn)) - FPGA_bf_weights_xy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xy_RW"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) - FPGA_bf_weights_yx_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yx_R"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn)) - FPGA_bf_weights_yx_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yx_RW"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) - FPGA_bf_weights_yy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yy_R"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn)) - FPGA_bf_weights_yy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yy_RW"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + FPGA_bf_weights_xx_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_R"], datatype=numpy.uint32, dims=(A_PN * N_BEAMLETS_CTRL, N_PN)) + FPGA_bf_weights_xx_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xx_RW"], datatype=numpy.uint32, dims=(A_PN * N_BEAMLETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) + FPGA_bf_weights_xy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xy_R"], datatype=numpy.uint32, dims=(A_PN * N_BEAMLETS_CTRL, N_PN)) + FPGA_bf_weights_xy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_xy_RW"], datatype=numpy.uint32, dims=(A_PN * N_BEAMLETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) + FPGA_bf_weights_yx_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yx_R"], datatype=numpy.uint32, dims=(A_PN * N_BEAMLETS_CTRL, N_PN)) + FPGA_bf_weights_yx_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yx_RW"], datatype=numpy.uint32, dims=(A_PN * N_BEAMLETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) + FPGA_bf_weights_yy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yy_R"], datatype=numpy.uint32, dims=(A_PN * N_BEAMLETS_CTRL, N_PN)) + FPGA_bf_weights_yy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yy_RW"], datatype=numpy.uint32, dims=(A_PN * N_BEAMLETS_CTRL, N_PN), access=AttrWriteType.READ_WRITE) - subband_select_RW = attribute(dtype=(numpy.uint32,), max_dim_x=N_beamlets_ctrl, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed") + subband_select_RW = attribute(dtype=(numpy.uint32,), max_dim_x=N_BEAMLETS_CTRL, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed") def read_subband_select_RW(self): return self._subband_select def write_subband_select_RW(self, subbands): # Use the same subband for all inputs and polarisations of a beamlet - self.proxy.FPGA_beamlet_subband_select_RW = numpy.tile(subbands, (self.N_pn, self.A_pn * self.N_pol)) + self.proxy.FPGA_beamlet_subband_select_RW = numpy.tile(subbands, (self.N_PN, self.A_PN * self.N_POL)) self.cache_clear() @@ -179,7 +179,7 @@ class Beamlet(opcua_device): def configure_for_initialise(self): super().configure_for_initialise() - self._subband_select = numpy.zeros(self.N_beamlets_ctrl, dtype=numpy.uint32) + self._subband_select = numpy.zeros(self.N_BEAMLETS_CTRL, dtype=numpy.uint32) self.sdp_proxy = DeviceProxy("STAT/SDP/1") self.sdp_proxy.set_source(DevSource.DEV) @@ -314,7 +314,7 @@ class Beamlet(opcua_device): """ converts a difference in delays (in seconds) to a FPGA weight (in complex number) """ # Calculate the FPGA weight array - delays = delays.reshape(self.N_pn, self.A_pn * self.N_pol * self.N_beamlets_ctrl) + delays = delays.reshape(self.N_PN, self.A_PN * self.N_POL * self.N_BEAMLETS_CTRL) beamlet_frequencies = self._beamlet_frequencies() bf_weights = self._calculate_bf_weights(delays, beamlet_frequencies) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py index 8cab71dc7a656ae13f5d7a2a3f68aacbb59c0b32..3ff59ef6cdeb5ba5206c17a9414d46eee260b386 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py @@ -53,7 +53,7 @@ class BST(Statistics): default_value=[True] * 16 ) - first_default_settings = [ + FIRST_DEFAULT_SETTINGS = [ 'FPGA_bst_offload_hdr_eth_destination_mac_RW', 'FPGA_bst_offload_hdr_ip_destination_address_RW', 'FPGA_bst_offload_hdr_udp_destination_port_RW', diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index c2e2fa4dfe3696a027b0a67a3842fccc7bc824fc..7bb139292584d117442cfff418023f100d0dcf5f 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -185,7 +185,7 @@ class DigitalBeam(beam_device): result = result.reshape(result.shape[0] * 2, result.shape[1] // 2) # cast into (fpga_nr, [input_nr][pol_nr][beamlet]) - result = result.reshape((Beamlet.N_pn, Beamlet.A_pn * Beamlet.N_pol * Beamlet.N_beamlets_ctrl)) + result = result.reshape((Beamlet.N_PN, Beamlet.A_PN * Beamlet.N_POL * Beamlet.N_BEAMLETS_CTRL)) return result @@ -201,7 +201,7 @@ class DigitalBeam(beam_device): fpga_delays = self._map_inputs_on_polarised_inputs(antenna_delays) beam_weights = self.beamlet_proxy.calculate_bf_weights(fpga_delays.flatten()) - beam_weights = beam_weights.reshape((Beamlet.N_pn, Beamlet.A_pn * Beamlet.N_pol * Beamlet.N_beamlets_ctrl)) + beam_weights = beam_weights.reshape((Beamlet.N_PN, Beamlet.A_PN * Beamlet.N_POL * Beamlet.N_BEAMLETS_CTRL)) # Filter out unwanted antennas beam_weights *= self._map_inputs_on_polarised_inputs(self._input_select) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py b/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py index f9c7a2276e14bbac7ace287ad871bd0609462c13..f36bff3199759422c16596adc9cdc3482295d8ac 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py @@ -68,8 +68,8 @@ class SDPPacket(object): f"Invalid packet of type {self.__class__.__name__}: packet marker (first byte) is {self.marker}, not one of {self.valid_markers()}.") # format string for the header, see unpack below - header_format = ">cBL HH xxxxx xxxxxxx HQ" - header_size = struct.calcsize(header_format) + HEADER_FORMAT = ">cBL HH xxxxx xxxxxxx HQ" + HEADER_SIZE = struct.calcsize(HEADER_FORMAT) @classmethod def valid_markers(cls): @@ -94,7 +94,7 @@ class SDPPacket(object): self.station_id, self.source_info, self.block_period_raw, - self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size]) + self.block_serial_number) = struct.unpack(self.HEADER_FORMAT, self.packet[:self.HEADER_SIZE]) except struct.error as e: raise ValueError("Error parsing statistics packet") from e @@ -195,7 +195,7 @@ class SDPPacket(object): bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic]) return numpy.array( - struct.unpack(format_str, self.packet[self.header_size:self.header_size + struct.calcsize(format_str)])) + struct.unpack(format_str, self.packet[self.HEADER_SIZE:self.HEADER_SIZE + struct.calcsize(format_str)])) class BeamletPacket(SDPPacket): @@ -214,8 +214,8 @@ class BeamletPacket(SDPPacket): nof_beamlets_per_block: number of beamlets in the payload. """ # format string for the header, see unpack below - header_format = ">cBL HH xxxxx HHBHHQ" - header_size = struct.calcsize(header_format) + HEADER_FORMAT = ">cBL HH xxxxx HHBHHQ" + HEADER_SIZE = struct.calcsize(HEADER_FORMAT) def unpack(self): """ Unpack the packet into properties of this object. """ @@ -232,7 +232,7 @@ class BeamletPacket(SDPPacket): self.nof_blocks_per_packet, self.nof_beamlets_per_block, self.block_period_raw, - self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size]) + self.block_serial_number) = struct.unpack(self.HEADER_FORMAT, self.packet[:self.HEADER_SIZE]) except struct.error as e: raise ValueError("Error parsing beamlet packet") from e @@ -249,7 +249,7 @@ class BeamletPacket(SDPPacket): def expected_size(self) -> int: """ The size this packet should be (header + payload), according to the header. """ - return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic + return self.HEADER_SIZE + self.nof_statistics_per_packet * self.nof_bytes_per_statistic @property def nof_statistics_per_packet(self) -> int: @@ -322,8 +322,8 @@ class StatisticsPacket(SDPPacket): """ # statistics format string for the header, see unpack below - header_format = ">cBL HHB BHL BBH HQ" - header_size = struct.calcsize(header_format) + HEADER_FORMAT = ">cBL HHB BHL BBH HQ" + HEADER_SIZE = struct.calcsize(HEADER_FORMAT) def unpack(self): """ Unpack the packet into properties of this object. """ @@ -345,7 +345,7 @@ class StatisticsPacket(SDPPacket): self.nof_bytes_per_statistic, self.nof_statistics_per_packet, self.block_period_raw, - self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size]) + self.block_serial_number) = struct.unpack(self.HEADER_FORMAT, self.packet[:self.HEADER_SIZE]) self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo except struct.error as e: @@ -358,7 +358,7 @@ class StatisticsPacket(SDPPacket): def expected_size(self) -> int: """ The size this packet should be (header + payload), according to the header. """ - return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic + return self.HEADER_SIZE + self.nof_statistics_per_packet * self.nof_bytes_per_statistic def unpack_source_info(self): """ Unpack the source_info field into properties of this object. """ @@ -517,7 +517,7 @@ def read_packet(read_func) -> SDPPacket: packetClass = PACKET_CLASS_FOR_MARKER[marker] # read the rest of the header - header = read_func(packetClass.header_size - len(marker)) + header = read_func(packetClass.HEADER_SIZE - len(marker)) if not header: return None diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py index d8254cabee83c3cafd72a9a3ecfc5f6963975d75..d4c4c1c4a2cea160c676af37d399a795d4a80476 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py @@ -102,7 +102,7 @@ class SDP(opcua_device): default_value = 200 * 1000000 ) - translator_default_settings = [ + TRANSLATOR_DEFAULT_SETTINGS = [ 'TR_fpga_mask_RW' ] diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py index b9d818345769098d7f5d42e90e5d2151d85e1d76..a1ae871efd63be5de397984584460de8a0012987 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py @@ -63,7 +63,7 @@ class SST(Statistics): default_value=[True] * 16 ) - first_default_settings = [ + FIRST_DEFAULT_SETTINGS = [ 'FPGA_sst_offload_hdr_eth_destination_mac_RW', 'FPGA_sst_offload_hdr_ip_destination_address_RW', 'FPGA_sst_offload_hdr_udp_destination_port_RW', diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py index 5f19d895dfdbff975084578b76f2418d0e6cb0cf..9f93b9b59cb8145948751f8968d08b884f553533 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py @@ -332,7 +332,7 @@ class StatisticsConsumer(Thread, StatisticsClientThread): DISCONNECT_TIMEOUT = 10.0 # No default options required, for now? - _default_options = {} + _DEFAULT_OPTIONS = {} def __init__(self, queue: Queue, collector: StatisticsCollector): self.queue = queue @@ -344,7 +344,7 @@ class StatisticsConsumer(Thread, StatisticsClientThread): @property def _options(self) -> dict: - return StatisticsConsumer._default_options + return StatisticsConsumer._DEFAULT_OPTIONS def run(self): logger.info("Starting statistics thread") diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py index 3adf539d5a5fe4de728c4a7273d14dab4f25cb69..19068ff768dcf0eda9782dc213cf293bcb0ed004 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py @@ -76,7 +76,7 @@ class XST(Statistics): default_value=[True] * 16 ) - first_default_settings = [ + FIRST_DEFAULT_SETTINGS = [ 'FPGA_xst_offload_hdr_eth_destination_mac_RW', 'FPGA_xst_offload_hdr_ip_destination_address_RW', 'FPGA_xst_offload_hdr_udp_destination_port_RW', @@ -117,34 +117,34 @@ class XST(Statistics): FPGA_xst_offload_nof_packets_R = attribute_wrapper(comms_annotation=["FPGA_xst_offload_nof_packets_R"], datatype=numpy.int32, dims=(16,)) FPGA_xst_offload_nof_valid_R = attribute_wrapper(comms_annotation=["FPGA_xst_offload_nof_valid_R"], datatype=numpy.int32, dims=(16,)) - N_pn = 16 # Number of FPGAs per antenna band that is controlled via the SC - SDP interface. - P_sq = 9 - - FPGA_xst_ring_rx_clear_total_counts_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_clear_total_counts_RW"], datatype=bool, dims=(N_pn,), access=AttrWriteType.READ_WRITE) - FPGA_xst_ring_rx_clear_total_counts_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_clear_total_counts_R"], datatype=bool, dims=(N_pn,)) - FPGA_xst_rx_align_stream_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_stream_enable_RW"], datatype=numpy.uint32, dims=(N_pn,), access=AttrWriteType.READ_WRITE) - FPGA_xst_rx_align_stream_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_stream_enable_R"], datatype=numpy.uint32, dims=(N_pn,)) - FPGA_xst_ring_rx_total_nof_packets_received_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_total_nof_packets_received_R"], datatype=numpy.uint32, dims=(N_pn,)) - FPGA_xst_ring_rx_total_nof_packets_discarded_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_total_nof_packets_discarded_R"], datatype=numpy.uint32, dims=(N_pn,)) - FPGA_xst_ring_rx_total_nof_sync_received_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_total_nof_sync_received_R"], datatype=numpy.uint32, dims=(N_pn,)) - FPGA_xst_ring_rx_total_nof_sync_discarded_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_total_nof_sync_discarded_R"], datatype=numpy.uint32, dims=(N_pn,)) - FPGA_xst_ring_rx_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_bsn_R"], datatype=numpy.int64, dims=(N_pn, N_pn)) - FPGA_xst_ring_rx_nof_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_nof_packets_R"], datatype=numpy.int32, dims=(N_pn, N_pn)) - FPGA_xst_ring_rx_nof_valid_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_nof_valid_R"], datatype=numpy.int32, dims=(N_pn, N_pn)) - FPGA_xst_ring_rx_latency_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_latency_R"], datatype=numpy.int32, dims=(N_pn, N_pn)) - FPGA_xst_rx_align_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_bsn_R"], datatype=numpy.int64, dims=(N_pn,P_sq)) - FPGA_xst_rx_align_nof_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_nof_packets_R"], datatype=numpy.int32, dims=(N_pn,P_sq)) - FPGA_xst_rx_align_nof_valid_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_nof_valid_R"], datatype=numpy.int32, dims=(N_pn,P_sq)) - FPGA_xst_rx_align_latency_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_latency_R"], datatype=numpy.int32, dims=(N_pn,P_sq)) - FPGA_xst_rx_align_nof_replaced_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_nof_replaced_packets_R"], datatype=numpy.int32, dims=(N_pn,P_sq)) - FPGA_xst_aligned_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_aligned_bsn_R"], datatype=numpy.int64, dims=(N_pn,)) - FPGA_xst_aligned_nof_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_aligned_nof_packets_R"], datatype=numpy.int32, dims=(N_pn,)) - FPGA_xst_aligned_nof_valid_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_aligned_nof_valid_R"], datatype=numpy.int32, dims=(N_pn,)) - FPGA_xst_aligned_latency_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_aligned_latency_R"], datatype=numpy.int32, dims=(N_pn,)) - FPGA_xst_ring_tx_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_tx_bsn_R"], datatype=numpy.int64, dims=(N_pn,N_pn)) - FPGA_xst_ring_tx_nof_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_tx_nof_packets_R"], datatype=numpy.int32, dims=(N_pn,N_pn)) - FPGA_xst_ring_tx_nof_valid_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_tx_nof_valid_R"], datatype=numpy.int32, dims=(N_pn,N_pn)) - FPGA_xst_ring_tx_latency_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_tx_latency_R"], datatype=numpy.int32, dims=(N_pn,N_pn)) + N_PN = 16 # Number of FPGAs per antenna band that is controlled via the SC - SDP interface. + P_SQ = 9 + + FPGA_xst_ring_rx_clear_total_counts_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_clear_total_counts_RW"], datatype=bool, dims=(N_PN,), access=AttrWriteType.READ_WRITE) + FPGA_xst_ring_rx_clear_total_counts_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_clear_total_counts_R"], datatype=bool, dims=(N_PN,)) + FPGA_xst_rx_align_stream_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_stream_enable_RW"], datatype=numpy.uint32, dims=(N_PN,), access=AttrWriteType.READ_WRITE) + FPGA_xst_rx_align_stream_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_stream_enable_R"], datatype=numpy.uint32, dims=(N_PN,)) + FPGA_xst_ring_rx_total_nof_packets_received_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_total_nof_packets_received_R"], datatype=numpy.uint32, dims=(N_PN,)) + FPGA_xst_ring_rx_total_nof_packets_discarded_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_total_nof_packets_discarded_R"], datatype=numpy.uint32, dims=(N_PN,)) + FPGA_xst_ring_rx_total_nof_sync_received_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_total_nof_sync_received_R"], datatype=numpy.uint32, dims=(N_PN,)) + FPGA_xst_ring_rx_total_nof_sync_discarded_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_total_nof_sync_discarded_R"], datatype=numpy.uint32, dims=(N_PN,)) + FPGA_xst_ring_rx_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_bsn_R"], datatype=numpy.int64, dims=(N_PN, N_PN)) + FPGA_xst_ring_rx_nof_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_nof_packets_R"], datatype=numpy.int32, dims=(N_PN, N_PN)) + FPGA_xst_ring_rx_nof_valid_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_nof_valid_R"], datatype=numpy.int32, dims=(N_PN, N_PN)) + FPGA_xst_ring_rx_latency_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_rx_latency_R"], datatype=numpy.int32, dims=(N_PN, N_PN)) + FPGA_xst_rx_align_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_bsn_R"], datatype=numpy.int64, dims=(N_PN,P_SQ)) + FPGA_xst_rx_align_nof_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_nof_packets_R"], datatype=numpy.int32, dims=(N_PN,P_SQ)) + FPGA_xst_rx_align_nof_valid_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_nof_valid_R"], datatype=numpy.int32, dims=(N_PN,P_SQ)) + FPGA_xst_rx_align_latency_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_latency_R"], datatype=numpy.int32, dims=(N_PN,P_SQ)) + FPGA_xst_rx_align_nof_replaced_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_rx_align_nof_replaced_packets_R"], datatype=numpy.int32, dims=(N_PN,P_SQ)) + FPGA_xst_aligned_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_aligned_bsn_R"], datatype=numpy.int64, dims=(N_PN,)) + FPGA_xst_aligned_nof_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_aligned_nof_packets_R"], datatype=numpy.int32, dims=(N_PN,)) + FPGA_xst_aligned_nof_valid_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_aligned_nof_valid_R"], datatype=numpy.int32, dims=(N_PN,)) + FPGA_xst_aligned_latency_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_aligned_latency_R"], datatype=numpy.int32, dims=(N_PN,)) + FPGA_xst_ring_tx_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_tx_bsn_R"], datatype=numpy.int64, dims=(N_PN,N_PN)) + FPGA_xst_ring_tx_nof_packets_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_tx_nof_packets_R"], datatype=numpy.int32, dims=(N_PN,N_PN)) + FPGA_xst_ring_tx_nof_valid_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_tx_nof_valid_R"], datatype=numpy.int32, dims=(N_PN,N_PN)) + FPGA_xst_ring_tx_latency_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_xst_ring_tx_latency_R"], datatype=numpy.int32, dims=(N_PN,N_PN)) # number of packets with valid payloads nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64) diff --git a/tangostationcontrol/tangostationcontrol/devices/unb2.py b/tangostationcontrol/tangostationcontrol/devices/unb2.py index cb7096a5d86144c591ae89add89e271f9fa0fa89..51cd13735a8f8b72a1ee8ed77efb1da6dc3fef2a 100644 --- a/tangostationcontrol/tangostationcontrol/devices/unb2.py +++ b/tangostationcontrol/tangostationcontrol/devices/unb2.py @@ -58,10 +58,10 @@ class UNB2(opcua_device): # Attributes # ---------- - N_unb = 2 - N_fpga = 4 - N_ddr = 2 - N_qsfp = 6 + N_UNB = 2 + N_FPGA = 4 + N_DDR = 2 + N_QSFP = 6 UNB2_mask_RW_default = device_property( dtype='DevVarBooleanArray', @@ -69,7 +69,7 @@ class UNB2(opcua_device): default_value=[True] * 2 ) - translator_default_settings = [ + TRANSLATOR_DEFAULT_SETTINGS = [ 'UNB2_mask_RW' ] diff --git a/tangostationcontrol/tangostationcontrol/examples/load_from_disk/ini_client.py b/tangostationcontrol/tangostationcontrol/examples/load_from_disk/ini_client.py index 17e4690467fadacf585345a93f9ed75b4eda0048..a31fdce9267413305663e2647e55367bddc77896 100644 --- a/tangostationcontrol/tangostationcontrol/examples/load_from_disk/ini_client.py +++ b/tangostationcontrol/tangostationcontrol/examples/load_from_disk/ini_client.py @@ -8,7 +8,7 @@ logger = logging.getLogger() __all__ = ["ini_client"] -numpy_to_ini_dict = { +NUMPY_TO_INI_DICT = { numpy.int64: int, numpy.double: float, numpy.float64: float, @@ -16,7 +16,7 @@ numpy_to_ini_dict = { str: str, } -numpy_to_ini_get_dict = { +NUMPY_TO_INI_GET_DICT = { numpy.int64: configparser.ConfigParser.getint, numpy.double: configparser.ConfigParser.getfloat, numpy.float64: configparser.ConfigParser.getfloat, @@ -24,7 +24,7 @@ numpy_to_ini_get_dict = { str: str, } -ini_to_numpy_dict = { +INI_TO_NUMPY_DICT = { int: numpy.int64, float: numpy.float64, bool: bool, diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py index 02c65cd161eb99fdd932f41a6f5aadc6ac6447ef..32dfa8e322100fa16a86c1ab7d8610f617e315ed 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py @@ -28,7 +28,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): # The AntennaField is setup with self.NR_TILES tiles in the test configuration NR_TILES = 48 - pointing_direction = numpy.array([["J2000","0deg","0deg"]] * NR_TILES).flatten() + POINTING_DIRECTION = numpy.array([["J2000","0deg","0deg"]] * NR_TILES).flatten() def setUp(self): super().setUp("STAT/TileBeam/1") @@ -64,7 +64,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): self.proxy.warm_boot() # verify delays method returns the correct dimensions - delays = self.proxy.delays(self.pointing_direction) + delays = self.proxy.delays(self.POINTING_DIRECTION) self.assertEqual(self.NR_TILES*16, len(delays)) def test_set_pointing(self): @@ -82,7 +82,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): time.sleep(3) # Verify writing operation does not lead to errors - self.proxy.set_pointing(self.pointing_direction) # write values to RECV + self.proxy.set_pointing(self.POINTING_DIRECTION) # write values to RECV delays_r2 = numpy.array(antennafield_proxy.read_attribute('HBAT_BF_delay_steps_RW').value) self.assertIsNotNone(delays_r2) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_lofar_device.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_lofar_device.py index 3a420bbd3c3f03b34d4664f39d631de154c69bd0..8187216ca293fc79c4537eafcf8b061b49023611 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_lofar_device.py @@ -18,25 +18,25 @@ class TestProxyAttributeAccess(base.IntegrationTestCase): """ Test whether DeviceProxy's can always access attributes immediately after turning them on. """ # We use RECV as our victim. Any device would do. - device_name = "STAT/RECV/1" + DEVICE_NAME = "STAT/RECV/1" # an attribute to access - attribute_name = "ANT_mask_RW" + ATTRIBUTE_NAME = "ANT_mask_RW" def setUp(self): - self.proxy = TestDeviceProxy(self.device_name) + self.proxy = TestDeviceProxy(self.DEVICE_NAME) self.proxy.off() def poll_attribute(self): # make sure the attribute is polled, so we force proxies to access the poll first - if self.proxy.is_attribute_polled(self.attribute_name): - self.proxy.stop_poll_attribute(self.attribute_name) - self.proxy.poll_attribute(self.attribute_name, 1000) + if self.proxy.is_attribute_polled(self.ATTRIBUTE_NAME): + self.proxy.stop_poll_attribute(self.ATTRIBUTE_NAME) + self.proxy.poll_attribute(self.ATTRIBUTE_NAME, 1000) def dont_poll_attribute(self): # make sure the attribute is NOT polled, so we force proxies to access the device - if self.proxy.is_attribute_polled(self.attribute_name): - self.proxy.stop_poll_attribute(self.attribute_name) + if self.proxy.is_attribute_polled(self.ATTRIBUTE_NAME): + self.proxy.stop_poll_attribute(self.ATTRIBUTE_NAME) def read_attribute(self): # turn on the device @@ -44,7 +44,7 @@ class TestProxyAttributeAccess(base.IntegrationTestCase): self.assertEqual(DevState.STANDBY, self.proxy.state()) # read the attribute -- shouldn't throw - _ = self.proxy.read_attribute(self.attribute_name) + _ = self.proxy.read_attribute(self.ATTRIBUTE_NAME) def test_fast_setup_polled_attribute(self): """ Setup a device as fast as possible and access its attributes immediately. """ diff --git a/tangostationcontrol/tangostationcontrol/integration_test/recv_cluster/test_recv_cluster.py b/tangostationcontrol/tangostationcontrol/integration_test/recv_cluster/test_recv_cluster.py index f97827681c39e90ca425553d319454bae428ff04..0c1fdcafc848ff87d23c1ba8ddd0eb9b87b46bab 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/recv_cluster/test_recv_cluster.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/recv_cluster/test_recv_cluster.py @@ -23,7 +23,7 @@ logger = logging.getLogger() class TestRecvCluster(base.IntegrationTestCase): - pointing_direction = numpy.array([["J2000", "0deg", "0deg"]] * 96).flatten() + POINTING_DIRECTION = numpy.array([["J2000", "0deg", "0deg"]] * 96).flatten() def setUp(self): @@ -72,7 +72,7 @@ class TestRecvCluster(base.IntegrationTestCase): for _i in range(25): start_time = time.monotonic_ns() for proxy in beam_proxies: - proxy.set_pointing(self.pointing_direction) + proxy.set_pointing(self.POINTING_DIRECTION) stop_time = time.monotonic_ns() results.append(stop_time - start_time) diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py b/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py index 31a9873586f056a522d0e907dd78542854248532..fd45244f870ccf0da97f378891638a17447bb28b 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py @@ -23,13 +23,13 @@ import asyncio import mock import numpy -scalar_dims = (1,) -spectrum_dims = (4,) -image_dims = (3,2) +SCALAR_DIMS = (1,) +SPECTRUM_DIMS = (4,) +IMAGE_DIMS = (3,2) -str_scalar_val = '1' -str_spectrum_val = ['1','1', '1','1'] -str_image_val = [['1','1'],['1','1'],['1','1']] +STR_SCALAR_VAL = '1' +STR_SPECTRUM_VAL = ['1','1', '1','1'] +STR_IMAGE_VAL = [['1','1'],['1','1'],['1','1']] def dev_init(device): @@ -132,85 +132,85 @@ class TestAttributeTypes(base.TestCase): dev_init(self) class str_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="str_spectrum_R", datatype=str, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="str_spectrum_RW", datatype=str, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="str_spectrum_R", datatype=str, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="str_spectrum_RW", datatype=str, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class bool_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="bool_spectrum_R", datatype=bool, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="bool_spectrum_RW", datatype=bool, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="bool_spectrum_R", datatype=bool, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="bool_spectrum_RW", datatype=bool, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class float32_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="float32_spectrum_R", datatype=numpy.float32, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="float32_spectrum_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="float32_spectrum_R", datatype=numpy.float32, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="float32_spectrum_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class float64_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="float64_spectrum_R", datatype=numpy.float64, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="float64_spectrum_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="float64_spectrum_R", datatype=numpy.float64, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="float64_spectrum_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class double_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="double_spectrum_R", datatype=numpy.double, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="double_spectrum_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="double_spectrum_R", datatype=numpy.double, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="double_spectrum_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class uint8_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="uint8_spectrum_R", datatype=numpy.uint8, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="uint8_spectrum_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="uint8_spectrum_R", datatype=numpy.uint8, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="uint8_spectrum_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class uint16_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="uint16_spectrum_R", datatype=numpy.uint16, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="uint16_spectrum_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="uint16_spectrum_R", datatype=numpy.uint16, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="uint16_spectrum_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class uint32_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="uint32_spectrum_R", datatype=numpy.uint32, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="uint32_spectrum_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="uint32_spectrum_R", datatype=numpy.uint32, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="uint32_spectrum_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class uint64_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="uint64_spectrum_R", datatype=numpy.uint64, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="uint64_spectrum_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="uint64_spectrum_R", datatype=numpy.uint64, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="uint64_spectrum_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class int16_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="int16_spectrum_R", datatype=numpy.int16, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="int16_spectrum_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="int16_spectrum_R", datatype=numpy.int16, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="int16_spectrum_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class int32_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="int32_spectrum_R", datatype=numpy.int32, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="int32_spectrum_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="int32_spectrum_R", datatype=numpy.int32, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="int32_spectrum_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) class int64_spectrum_device(lofar_device): - spectrum_R = attribute_wrapper(comms_annotation="int64_spectrum_R", datatype=numpy.int64, dims=spectrum_dims) - spectrum_RW = attribute_wrapper(comms_annotation="int64_spectrum_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + spectrum_R = attribute_wrapper(comms_annotation="int64_spectrum_R", datatype=numpy.int64, dims=SPECTRUM_DIMS) + spectrum_RW = attribute_wrapper(comms_annotation="int64_spectrum_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS) def configure_for_initialise(self): dev_init(self) @@ -311,10 +311,10 @@ class TestAttributeTypes(base.TestCase): expected = numpy.zeros((1,), dtype=dtype) val = proxy.scalar_R elif test_type == "spectrum": - expected = numpy.zeros(spectrum_dims, dtype=dtype) + expected = numpy.zeros(SPECTRUM_DIMS, dtype=dtype) val = proxy.spectrum_R elif test_type == "image": - expected = numpy.zeros(image_dims, dtype=dtype) + expected = numpy.zeros(IMAGE_DIMS, dtype=dtype) val = numpy.array(proxy.image_R) #is needed for STR since they act differently # cant use all() for 2d arrays so instead compare the dimensions and then flatten to 2d @@ -344,22 +344,22 @@ class TestAttributeTypes(base.TestCase): if test_type == "scalar": if dtype is str: - val = str_scalar_val + val = STR_SCALAR_VAL else: val = dtype(1) proxy.scalar_RW = val elif test_type == "spectrum": if dtype is str: - val = str_spectrum_val + val = STR_SPECTRUM_VAL else: - val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1) + val = numpy.full(SPECTRUM_DIMS, dtype=dtype, fill_value=1) print(val) proxy.spectrum_RW = val elif test_type == "image": if dtype is str: - val = str_image_val + val = STR_IMAGE_VAL else: - val = numpy.full(image_dims, dtype=dtype, fill_value=1) + val = numpy.full(IMAGE_DIMS, dtype=dtype, fill_value=1) proxy.image_RW = val else: self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type)) @@ -385,10 +385,10 @@ class TestAttributeTypes(base.TestCase): expected = numpy.zeros((1,), dtype=dtype) val = proxy.scalar_RW elif test_type == "spectrum": - expected = numpy.zeros(spectrum_dims, dtype=dtype) + expected = numpy.zeros(SPECTRUM_DIMS, dtype=dtype) val = proxy.spectrum_RW elif test_type == "image": - expected = numpy.zeros(image_dims, dtype=dtype) + expected = numpy.zeros(IMAGE_DIMS, dtype=dtype) val = numpy.array(proxy.image_RW) #is needed for STR since they act differently # cant use all() for 2d arrays so instead compare the dimensions and then flatten to 2d @@ -414,7 +414,7 @@ class TestAttributeTypes(base.TestCase): def _get_result_type(self, dtype, test_type, proxy): if test_type == "scalar": if dtype is str: - val = str_scalar_val + val = STR_SCALAR_VAL else: val = dtype(1) proxy.scalar_RW = val @@ -422,17 +422,17 @@ class TestAttributeTypes(base.TestCase): result_RW = proxy.scalar_RW elif test_type == "spectrum": if dtype is str: - val = str_spectrum_val + val = STR_SPECTRUM_VAL else: - val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1) + val = numpy.full(SPECTRUM_DIMS, dtype=dtype, fill_value=1) proxy.spectrum_RW = val result_R = proxy.spectrum_R result_RW = proxy.spectrum_RW elif test_type == "image": if dtype is str: - val = str_image_val + val = STR_IMAGE_VAL else: - val = numpy.full(image_dims, dtype=dtype, fill_value=1) + val = numpy.full(IMAGE_DIMS, dtype=dtype, fill_value=1) # info += " write value: {}".format(val) proxy.image_RW = val @@ -440,7 +440,7 @@ class TestAttributeTypes(base.TestCase): result_RW = proxy.image_RW if dtype != str: - self.assertEqual(result_R.shape, image_dims, "not the correct dimensions") + self.assertEqual(result_R.shape, IMAGE_DIMS, "not the correct dimensions") result_R = result_R.reshape(-1) result_RW = result_RW.reshape(-1) @@ -497,7 +497,7 @@ class TestAttributeTypes(base.TestCase): might have unexpected results. Each type is bound to a device scalar, spectrum and image class """ - attribute_type_tests = [ + ATTRIBUTE_TYPE_TESTS = [ { 'type': str, 'scalar': str_scalar_device, 'spectrum': str_spectrum_device, "image": str_image_device @@ -549,71 +549,71 @@ class TestAttributeTypes(base.TestCase): ] def test_scalar_R(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.read_R_test( attribute_type_test['scalar'], attribute_type_test['type'], 'scalar') def test_scalar_RW(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.read_RW_test( attribute_type_test['scalar'], attribute_type_test['type'], 'scalar') def test_scalar_W(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.write_RW_test( attribute_type_test['scalar'], attribute_type_test['type'], 'scalar') def test_scalar_readback(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.readback_test( attribute_type_test['scalar'], attribute_type_test['type'], 'scalar') def test_spectrum_R(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.read_R_test( attribute_type_test['spectrum'], attribute_type_test['type'], 'spectrum') def test_spectrum_RW(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.read_RW_test( attribute_type_test['spectrum'], attribute_type_test['type'], 'spectrum') def test_spectrum_W(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.write_RW_test( attribute_type_test['spectrum'], attribute_type_test['type'], 'spectrum') def test_spectrum_readback(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.readback_test( attribute_type_test['spectrum'], attribute_type_test['type'], 'spectrum') def test_image_R(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.read_R_test( attribute_type_test['image'], attribute_type_test['type'], 'image') def test_image_RW(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.read_RW_test( attribute_type_test['image'], attribute_type_test['type'], 'image') def test_image_W(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.write_RW_test(attribute_type_test['image'], attribute_type_test['type'], 'image') def test_image_readback(self): - for attribute_type_test in self.attribute_type_tests: + for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS: self.readback_test( attribute_type_test['image'], attribute_type_test['type'], 'image') diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_mib_loading.py b/tangostationcontrol/tangostationcontrol/test/clients/test_mib_loading.py index 6abf0f52e6ccda67f5ba482a8c12f811d5421fcb..c084bed8113934defe6290c3f59c85ae97d0c1b5 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_mib_loading.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_mib_loading.py @@ -11,10 +11,10 @@ from os import path class TestMibLoading(base.TestCase): #name and directory of the pymib file - mib = "TEST-MIB" + MIB = "TEST-MIB" # mib file is in a folder that is in the same folder as this test - rel_dir = "SNMP_mib_loading" + REL_DIR = "SNMP_mib_loading" def test_content(self): """ @@ -23,9 +23,9 @@ class TestMibLoading(base.TestCase): """ - abs_dir = path.dirname(__file__) + "/" + self.rel_dir + abs_dir = path.dirname(__file__) + "/" + self.REL_DIR loader = mib_loader(abs_dir) - loader.load_pymib(self.mib) + loader.load_pymib(self.MIB) # used to view mibs client side mibView = view.MibViewController(loader.mibBuilder) @@ -37,14 +37,14 @@ class TestMibLoading(base.TestCase): testNamedValue_value = 1 # get testValue and set a value of 1 - obj_T = pysnmp.ObjectType(ObjectIdentity(self.mib, testNamedValue), pysnmp.Integer32(1)) + obj_T = pysnmp.ObjectType(ObjectIdentity(self.MIB, testNamedValue), pysnmp.Integer32(1)) obj_T.resolveWithMib(mibView) # get the oid self.assertEqual(str(obj_T[0]), testNamedValue_oid) # get the name format: mib::name - self.assertEqual(obj_T[0].prettyPrint(), f"{self.mib}::{testNamedValue}") + self.assertEqual(obj_T[0].prettyPrint(), f"{self.MIB}::{testNamedValue}") # get the namedValue self.assertEqual(str(obj_T[1]), testNamedValue_named) diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py index 697f6b220a338c4d256ba126bc61f771a0e4fd19..5df197618c702ad9900c3c1bc7df624b7ffde3a0 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py @@ -17,7 +17,7 @@ class attr_props: self.numpy_type = numpy_type -attr_test_types = [ +ATTR_TEST_TYPES = [ attr_props(numpy_type=bool), attr_props(numpy_type=str), attr_props(numpy_type=numpy.float32), @@ -32,10 +32,10 @@ attr_test_types = [ attr_props(numpy_type=numpy.int64) ] -scalar_shape = (1,) -spectrum_shape = (4,) -image_shape = (2, 3) -dimension_tests = [scalar_shape, spectrum_shape, image_shape] +SCALAR_SHAPE = (1,) +SPECTRUM_SHAPE = (4,) +IMAGE_SHAPE = (2, 3) +DIMENSION_TESTS = [SCALAR_SHAPE, SPECTRUM_SHAPE, IMAGE_SHAPE] class TestOPCua(base.AsyncTestCase): @@ -90,14 +90,14 @@ class TestOPCua(base.AsyncTestCase): m_opc_client_members.get_objects_node = asynctest.Mock(return_value=m_objects_node) m_opc_client.return_value = m_opc_client_members - for i in attr_test_types: + for i in ATTR_TEST_TYPES: class mock_attr: def __init__(self, dtype, x, y): self.numpy_type = dtype self.dim_x = x self.dim_y = y - for j in dimension_tests: + for j in DIMENSION_TESTS: if len(j) == 1: dim_x = j[0] dim_y = 0 @@ -127,9 +127,9 @@ class TestOPCua(base.AsyncTestCase): """ # for all datatypes - for i in attr_test_types: + for i in ATTR_TEST_TYPES: # for all dimensions - for j in dimension_tests: + for j in DIMENSION_TESTS: node = mock.Mock() @@ -158,8 +158,8 @@ class TestOPCua(base.AsyncTestCase): This tests the read functions. """ - for j in dimension_tests: - for i in attr_test_types: + for j in DIMENSION_TESTS: + for i in ATTR_TEST_TYPES: def get_test_value(): return numpy.zeros(j, i.numpy_type) @@ -250,10 +250,10 @@ class TestOPCua(base.AsyncTestCase): """ # for all dimensionalities - for j in dimension_tests: + for j in DIMENSION_TESTS: #for all datatypes - for i in attr_test_types: + for i in ATTR_TEST_TYPES: # get numpy array of the test value def get_test_value(): @@ -275,7 +275,7 @@ class TestOPCua(base.AsyncTestCase): async def compare_values(val): # test valuest val = val.tolist() if type(val) == numpy.ndarray else val - if j != dimension_tests[0]: + if j != DIMENSION_TESTS[0]: comp = val.Value == get_mock_value(get_test_value().flatten()).Value self.assertTrue(comp.all(), "Array attempting to write unequal to expected array: \n\t got: {} \n\texpected: {}".format(val,get_mock_value(get_test_value()))) diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py index 9db811338d77f5afab496dc5f5ec1e305cc98a94..3c3a38ef679118046e8b47cbfa070bc30ee78373 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py @@ -10,7 +10,7 @@ from tangostationcontrol.clients.snmp_client import SNMP_client, snmp_attribute, class server_imitator: # conversion dict - snmp_to_numpy_dict = { + SNMP_TO_NUMPY_DICT = { hlapi.Integer32: numpy.int64, hlapi.TimeTicks: numpy.int64, hlapi.OctetString: str, @@ -20,7 +20,7 @@ class server_imitator: } # shortcut for testing dimensionality - dim_list = { + DIM_LIST = { "scalar": (1, 0), "spectrum": (4, 0), } @@ -30,7 +30,7 @@ class server_imitator: provides the return value for the set/get functions that an actual server would return. """ - if dims == self.dim_list["scalar"]: + if dims == self.DIM_LIST["scalar"]: if snmp_type is hlapi.ObjectIdentity: read_val = (None, snmp_type("1.3.6.1.2.1.1.1.0")) elif snmp_type is hlapi.IpAddress: @@ -41,7 +41,7 @@ class server_imitator: read_val = (None, snmp_type(1)) - elif dims == self.dim_list["spectrum"]: + elif dims == self.DIM_LIST["spectrum"]: if snmp_type is hlapi.ObjectIdentity: read_val = [] for _i in range(dims[0]): @@ -69,14 +69,14 @@ class server_imitator: provides the values we expect and would provide to the attribute after converting the """ - if dims == self.dim_list["scalar"]: + if dims == self.DIM_LIST["scalar"]: snmp_type_dict = {hlapi.ObjectIdentity:"1.3.6.1.2.1.1.1.0.1", hlapi.IpAddress: "1.1.1.1", hlapi.OctetString: "1"} check_val = 1 for k,v in snmp_type_dict.items(): if snmp_type is k: check_val = v - elif dims == self.dim_list["spectrum"]: + elif dims == self.DIM_LIST["spectrum"]: snmp_type_dict = {hlapi.ObjectIdentity:["1.3.6.1.2.1.1.1.0.1"] * dims[0], hlapi.IpAddress: ["1.1.1.1"] * dims[0], hlapi.OctetString: ["1"] * dims[0]} @@ -118,9 +118,9 @@ class TestSNMP(base.TestCase): server = server_imitator() - for j in server.dim_list: - for i in server.snmp_to_numpy_dict: - m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j])) + for j in server.DIM_LIST: + for i in server.SNMP_TO_NUMPY_DICT: + m_next.return_value = (None, None, None, server.get_return_val(i, server.DIM_LIST[j])) def __fakeInit__(self): pass @@ -128,11 +128,11 @@ class TestSNMP(base.TestCase): with mock.patch.object(SNMP_comm, '__init__', __fakeInit__): m_comms = SNMP_comm() - snmp_attr = snmp_attribute(comm=m_comms, mib="test", name="test", idx=0, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) + snmp_attr = snmp_attribute(comm=m_comms, mib="test", name="test", idx=0, dtype=server.SNMP_TO_NUMPY_DICT[i], dim_x=server.DIM_LIST[j][0], dim_y=server.DIM_LIST[j][1]) val = snmp_attr.read_function() - checkval = server.val_check(i, server.dim_list[j]) + checkval = server.val_check(i, server.DIM_LIST[j]) self.assertEqual(checkval, val, f"During test {j} {i}; Expected: {checkval} of type {i}, got: {val} of type {type(val)}") @mock.patch('pysnmp.hlapi.ObjectIdentity') @@ -146,9 +146,9 @@ class TestSNMP(base.TestCase): server = server_imitator() - for j in server.dim_list: - for i in server.snmp_to_numpy_dict: - m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j])) + for j in server.DIM_LIST: + for i in server.SNMP_TO_NUMPY_DICT: + m_next.return_value = (None, None, None, server.get_return_val(i, server.DIM_LIST[j])) def __fakeInit__(self): pass @@ -156,14 +156,14 @@ class TestSNMP(base.TestCase): with mock.patch.object(SNMP_comm, '__init__', __fakeInit__): m_comms = SNMP_comm() - set_val = server.val_check(i, server.dim_list[j]) + set_val = server.val_check(i, server.DIM_LIST[j]) - snmp_attr = snmp_attribute(comm=m_comms, mib="test", name="test", idx=0, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) + snmp_attr = snmp_attribute(comm=m_comms, mib="test", name="test", idx=0, dtype=server.SNMP_TO_NUMPY_DICT[i], dim_x=server.DIM_LIST[j][0], dim_y=server.DIM_LIST[j][1]) res_lst = [] def test(*value): res_lst.append(value[1]) - return None, None, None, server.get_return_val(i, server.dim_list[j]) + return None, None, None, server.get_return_val(i, server.DIM_LIST[j]) hlapi.ObjectType = test @@ -172,7 +172,7 @@ class TestSNMP(base.TestCase): if len(res_lst) == 1: res_lst = res_lst[0] - checkval = server.val_check(i, server.dim_list[j]) + checkval = server.val_check(i, server.DIM_LIST[j]) self.assertEqual(checkval, res_lst, f"During test {j} {i}; Expected: {checkval}, got: {res_lst}") @mock.patch('tangostationcontrol.clients.snmp_client.SNMP_comm.getter') diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_tcp_replicator.py b/tangostationcontrol/tangostationcontrol/test/clients/test_tcp_replicator.py index 04c87f1d5a15705f7d0b8ce1460141255cb02b27..03a1c3b10eff18625cdaeccd869874b85e8748c1 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_tcp_replicator.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_tcp_replicator.py @@ -61,7 +61,7 @@ class TestTCPReplicator(base.TestCase): """Validate option parsing""" # Perform string copy of current tcp_bind value - t_tcp_bind = str(TCPReplicator._default_options['tcp_bind']) + t_tcp_bind = str(TCPReplicator._DEFAULT_OPTIONS['tcp_bind']) test_options = { "random": 12346, # I should be ignored @@ -72,7 +72,7 @@ class TestTCPReplicator(base.TestCase): self.assertTrue(replicator.is_alive()) # Ensure replicator initialization does not modify static variable - self.assertEqual(t_tcp_bind, TCPReplicator._default_options['tcp_bind']) + self.assertEqual(t_tcp_bind, TCPReplicator._DEFAULT_OPTIONS['tcp_bind']) # Ensure options are correctly updated upon initialization self.assertEqual(test_options['tcp_bind'], replicator.options['tcp_bind']) diff --git a/tangostationcontrol/tangostationcontrol/test/common/test_measures.py b/tangostationcontrol/tangostationcontrol/test/common/test_measures.py index bcb020a049a1d4e756623ba80c3e94fb526fb04e..bb22bedafafe5a62155000beb51b9e7ae05928bb 100644 --- a/tangostationcontrol/tangostationcontrol/test/common/test_measures.py +++ b/tangostationcontrol/tangostationcontrol/test/common/test_measures.py @@ -19,8 +19,8 @@ from tangostationcontrol.test import base # where our WSRT_Measures.ztar surrogate is located # two versions with different timestamps are provided -fake_measures = os.path.dirname(__file__) + "/fake_measures.ztar" -fake_measures_newer = os.path.dirname(__file__) + "/fake_measures_newer.ztar" +FAKE_MEASURES = os.path.dirname(__file__) + "/fake_measures.ztar" +FAKE_MEASURES_NEWER = os.path.dirname(__file__) + "/fake_measures_newer.ztar" class TestMeasures(base.TestCase): @mock.patch.object(urllib.request, 'urlretrieve') @@ -32,7 +32,7 @@ class TestMeasures(base.TestCase): mock.patch('tangostationcontrol.common.measures.DOWNLOAD_DIR', tmpdirname) as downloaddir: # emulate the download - m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(fake_measures, tmpdirname + "/WSRT_Measures.ztar") + m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(FAKE_MEASURES, tmpdirname + "/WSRT_Measures.ztar") # 'download' and process our fake measures newdir = measures.download_measures() @@ -53,9 +53,9 @@ class TestMeasures(base.TestCase): mock.patch('tangostationcontrol.common.measures.DOWNLOAD_DIR', tmpdirname) as downloaddir: # 'download' two measures with different timestamps - m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(fake_measures, tmpdirname + "/WSRT_Measures.ztar") + m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(FAKE_MEASURES, tmpdirname + "/WSRT_Measures.ztar") newdir1 = measures.download_measures() - m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(fake_measures_newer, tmpdirname + "/WSRT_Measures.ztar") + m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(FAKE_MEASURES_NEWER, tmpdirname + "/WSRT_Measures.ztar") newdir2 = measures.download_measures() # check if both are available diff --git a/tangostationcontrol/tangostationcontrol/test/devices/random_data.py b/tangostationcontrol/tangostationcontrol/test/devices/random_data.py index d010a09c06d307af32209bf30275c0366e619644..52f65ff2edceb86f44f3cd40784428aca6894b30 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/random_data.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/random_data.py @@ -19,13 +19,13 @@ class Random_Data(Device): Random data monitor point device """ - dim_array = 1024 # x-axis dimension of a random values array + DIM_ARRAY = 1024 # x-axis dimension of a random values array def read(self): return random.random() def read_array(self): - return random.rand(self.dim_array).astype(double) + return random.rand(self.DIM_ARRAY).astype(double) # Attributes rnd1 = attribute( @@ -390,7 +390,7 @@ class Random_Data(Device): rnd21 = attribute( dtype = ('DevDouble',), - max_dim_x = dim_array, + max_dim_x = DIM_ARRAY, max_dim_y = 1, polling_period = 1000, period = 1000, diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py index 48692d08b255bea2a232776737a7d17f18633606..94256ba5ce4bbba274f45d41ca6098acdd434381 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py @@ -20,15 +20,15 @@ from tangostationcontrol.test.devices import device_base class TestHBATToRecvMapper(base.TestCase): # A mapping where HBATs are all not mapped to power RCUs - power_not_connected = [[-1, -1]] * 48 + POWER_NOT_CONNECTED = [[-1, -1]] * 48 # A mapping where HBATs are all not mapped to control RCUs - control_not_connected = [[-1, -1]] * 48 + CONTROL_NOT_CONNECTED = [[-1, -1]] * 48 # A mapping where first two HBATs are mapped on the first Receiver. # The first HBAT control line on RCU 1 and the second HBAT control line on RCU 0. - control_hba_0_and_1_on_rcu_1_and_0_of_recv_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46 + CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46 def test_ant_read_mask_r_no_mapping(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * 3] * 32, [[False] * 3] * 32, [[False] * 3] * 32] expected = [False] * 48 @@ -36,7 +36,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_ant_read_mask_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False, True, False]] + [[False, False, False]] * 31, [[False] * 3] * 32, [[False] * 3] * 32] expected = [True, False] + [False] * 46 @@ -45,14 +45,14 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_rcu_band_select_no_mapping(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[0] * 3] * 32, [[0] * 3] * 32, [[0] * 3] * 32] expected = [0] * 48 actual = mapper.map_read("RCU_band_select_RW", receiver_values) numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_r_no_mapping(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[0] * 32] * 96, [[0] * 32] * 96, [[0] * 32] * 96] expected = [[0] * 32] * 48 @@ -60,7 +60,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[2] * 32, [1] * 32] + [[0] * 32] * 94, [[0] * 32] * 96, [[0] * 32] * 96] expected = [[1] * 32, [2] * 32] + [[0] * 32] * 46 @@ -69,7 +69,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_rw_no_mapping(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[0] * 32] * 96, [[0] * 32] * 96, [[0] * 32] * 96] expected = [[0] * 32] * 48 @@ -77,7 +77,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[2] * 32, [1] * 32] + [[0] * 32] * 94, [[0] * 32] * 96, [[0] * 32] * 96] expected = [[1] * 32, [2] * 32] + [[0] * 32] * 46 @@ -86,7 +86,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_r_unmapped(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] expected = [[False] * 32] * 48 @@ -94,7 +94,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] @@ -103,7 +103,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_rw_unmapped(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] expected = [[False] * 32] * 48 @@ -111,7 +111,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] @@ -120,7 +120,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_r_unmapped(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] expected = [[False] * 32] * 48 @@ -128,7 +128,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] @@ -137,7 +137,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_rw_unmapped(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] expected = [[False] * 32] * 48 @@ -145,7 +145,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] @@ -154,7 +154,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_r_unmapped(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] expected = [[False] * 32] * 48 @@ -162,7 +162,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] @@ -171,7 +171,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_rw_unmapped(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] expected = [[False] * 32] * 48 @@ -179,7 +179,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] @@ -190,7 +190,7 @@ class TestHBATToRecvMapper(base.TestCase): # Rename to write def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [False] * 48 expected = [[[False] * 3] * 32] @@ -198,7 +198,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [False] * 48 expected = [[[False] * 3] * 32] * 2 @@ -206,7 +206,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [True, False] + [False] * 46 expected = [[[False, True, False]] + [[False] * 3] * 31] @@ -214,7 +214,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [0] * 48 expected = [[[0] * 3] * 32] @@ -222,7 +222,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [0] * 48 expected = [[[0] * 3] * 32] * 2 @@ -230,7 +230,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [1, 0] + [0] * 46 expected = [[[0, 1, 0]] + [[0] * 3] * 31] @@ -238,7 +238,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_no_mapping_and_one_receiver(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [[1] * 32] * 48 expected = [[[0] * 32] * 96] @@ -246,7 +246,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_no_mapping_and_two_receivers(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [[1] * 32] * 48 expected = [[[0] * 32] * 96, [[0] * 32] * 96] @@ -254,7 +254,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [[1] * 32, [2] * 32] + [[0] * 32] * 46 expected = [[[2] * 32, [1] * 32] + [[0] * 32] * 94] @@ -262,7 +262,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_one_receiver(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [[False] * 32] * 48 expected = [[[False] * 32] * 96] @@ -270,7 +270,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_two_receivers(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [[False] * 32] * 48 expected = [[[False] * 32] * 96, [[False] * 32] * 96] @@ -278,7 +278,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] @@ -286,7 +286,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [[False] * 32] * 48 expected = [[[False] * 32] * 96] @@ -294,7 +294,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [[False] * 32] * 48 expected = [[[False] * 32] * 96, [[False] * 32] * 96] @@ -302,7 +302,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] @@ -310,7 +310,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [[False] * 32] * 48 expected = [[[False] * 32] * 96] @@ -318,7 +318,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self): - mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2) + mapper = HBATToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [[False] * 32] * 48 expected = [[[False] * 32] * 96, [[False] * 32] * 96] @@ -326,7 +326,7 @@ class TestHBATToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1) + mapper = HBATToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] @@ -337,7 +337,7 @@ class TestHBATToRecvMapper(base.TestCase): class TestAntennafieldDevice(device_base.DeviceTestCase): # some dummy values for mandatory properties - at_properties = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0, + AT_PROPERTIES = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0, 'Antenna_Field_Reference_ITRF' : [3.0, 3.0, 3.0], 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]} def setUp(self): @@ -347,7 +347,7 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): def test_read_Antenna_Field_Reference(self): """Verify if Antenna coordinates are correctly provided""" # Device uses ITRF coordinates by default - with DeviceTestContext(antennafield.AntennaField, properties=self.at_properties, process=True) as proxy: + with DeviceTestContext(antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True) as proxy: self.assertEqual(3.0, proxy.Antenna_Field_Reference_ITRF_R[0]) # Device derives coordinates from ETRS if ITRF ones are not found at_properties_v2 = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0, 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]} diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py index 1e1c08570f20c217aa5be54fdee43571aa886707..fe53bc69729aa809dcbddc9bec410a7bcde69330 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py @@ -19,7 +19,7 @@ from tangostationcontrol.test.devices import device_base class TestRecvDevice(device_base.DeviceTestCase): # some dummy values for mandatory properties - recv_properties = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0} + RECV_PROPERTIES = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0} def setUp(self): # DeviceTestCase setUp patches lofar_device DeviceProxy @@ -27,14 +27,14 @@ class TestRecvDevice(device_base.DeviceTestCase): def test_calculate_HBAT_bf_delay_steps(self): """Verify HBAT beamforming calculations are correctly executed""" - with DeviceTestContext(recv.RECV, properties=self.recv_properties, process=True) as proxy: + with DeviceTestContext(recv.RECV, properties=self.RECV_PROPERTIES, process=True) as proxy: delays = numpy.random.rand(96,16).flatten() HBAT_bf_delay_steps = proxy.calculate_HBAT_bf_delay_steps(delays) self.assertEqual(3072, len(HBAT_bf_delay_steps)) # 96x32=3072 def test_get_rcu_band_from_filter(self): """Verify filter lookup table values are correctly retrieved""" - with DeviceTestContext(recv.RECV, properties=self.recv_properties, process=True) as proxy: + with DeviceTestContext(recv.RECV, properties=self.RECV_PROPERTIES, process=True) as proxy: filter_name = "HBA_170_230" self.assertEqual(1, proxy.get_rcu_band_from_filter(filter_name)) self.assertEqual(-1, proxy.get_rcu_band_from_filter('MOCK_FILTER')) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_snmp_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_snmp_device.py index 23d81f6d5634b6e4cb1e7ca9ee935484efbdbcc7..609b30ede7f95d3fcb88ef40b9dfbf86f75cbdd0 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_snmp_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_snmp_device.py @@ -19,15 +19,15 @@ from tangostationcontrol.test.devices import device_base class TestSNMPDevice(device_base.DeviceTestCase): # some dummy values for mandatory properties - snmp_properties = {'SNMP_community': 'localhost', 'SNMP_host': 161, 'SNMP_rel_mib_dir': "SNMP_mib_loading", 'SNMP_timeout': 5.0} + SNMP_PROPERTIES = {'SNMP_community': 'localhost', 'SNMP_host': 161, 'SNMP_rel_mib_dir': "SNMP_mib_loading", 'SNMP_timeout': 5.0} def setUp(self): # DeviceTestCase setUp patches lofar_device DeviceProxy super(TestSNMPDevice, self).setUp() def test_get_mib_dir(self): - with DeviceTestContext(snmp_device.SNMP, properties=self.snmp_properties, process=True) as proxy: + with DeviceTestContext(snmp_device.SNMP, properties=self.SNMP_PROPERTIES, process=True) as proxy: mib_dir = proxy.get_mib_dir() - self.assertEqual(mib_dir, f"{path.dirname(snmp_device.__file__)}/{self.snmp_properties['SNMP_rel_mib_dir']}") + self.assertEqual(mib_dir, f"{path.dirname(snmp_device.__file__)}/{self.SNMP_PROPERTIES['SNMP_rel_mib_dir']}") diff --git a/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_configurator.py b/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_configurator.py index 28465a5b48b65417505fb6491d320e033abc6414..aaf8a622faf044362c2ff13b931ecad30b8bd327 100644 --- a/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_configurator.py +++ b/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_configurator.py @@ -14,37 +14,37 @@ import pkg_resources class TestArchiverConfigurator(base.TestCase): - device_name = 'STAT/RECV/1' - attribute_name = 'ant_mask_rw' - prod_config_dict = json.load(pkg_resources.resource_stream('tangostationcontrol.toolkit', f'archiver_config/lofar2_prod.json')) - dev_config_dict = json.load(pkg_resources.resource_stream('tangostationcontrol.toolkit', f'archiver_config/lofar2_dev.json')) - dev_suffixes = dev_config_dict['global']['suffixes'] - prod_suffixes = prod_config_dict['global']['suffixes'] - prod_infixes = prod_config_dict['global']['infixes'] + DEVICE_NAME = 'STAT/RECV/1' + ATTRIBUTE_NAME = 'ant_mask_rw' + PROD_CONFIG_DICT = json.load(pkg_resources.resource_stream('tangostationcontrol.toolkit', f'archiver_config/lofar2_prod.json')) + DEV_CONFIG_DICT = json.load(pkg_resources.resource_stream('tangostationcontrol.toolkit', f'archiver_config/lofar2_dev.json')) + DEV_SUFFIXES = DEV_CONFIG_DICT['global']['suffixes'] + PROD_SUFFIXES = PROD_CONFIG_DICT['global']['suffixes'] + PROD_INFIXES = PROD_CONFIG_DICT['global']['infixes'] def test_get_parameters_from_attribute(self): """Test if the attribute archiving parameters are correctly retrieved from the JSON config file""" - self.assertIsNotNone(self.dev_config_dict) - archive_period, event_period, abs_change, rel_change = get_parameters_from_attribute(self.device_name, self.attribute_name, - self.dev_config_dict) - self.assertEqual(archive_period,int(self.dev_suffixes[2]['archive_period'])) - self.assertEqual(event_period,int(self.dev_suffixes[2]['event_period'])) - self.assertEqual(abs_change,float(self.dev_suffixes[2]['abs_change'])) - self.assertEqual(rel_change, self.dev_suffixes[2]['rel_change'] and int(self.dev_suffixes[2]['rel_change'])) + self.assertIsNotNone(self.DEV_CONFIG_DICT) + archive_period, event_period, abs_change, rel_change = get_parameters_from_attribute(self.DEVICE_NAME, self.ATTRIBUTE_NAME, + self.DEV_CONFIG_DICT) + self.assertEqual(archive_period,int(self.DEV_SUFFIXES[2]['archive_period'])) + self.assertEqual(event_period,int(self.DEV_SUFFIXES[2]['event_period'])) + self.assertEqual(abs_change,float(self.DEV_SUFFIXES[2]['abs_change'])) + self.assertEqual(rel_change, self.DEV_SUFFIXES[2]['rel_change'] and int(self.DEV_SUFFIXES[2]['rel_change'])) """Test if the attribute archiving parameters are correctly retrieved from the infixes list (production environment)""" attribute_name = 'rcu_temp_r' # 'TEMP' is in the infixes list - archive_period, event_period, abs_change, rel_change = get_parameters_from_attribute(self.device_name, attribute_name, - self.prod_config_dict) - self.assertEqual(archive_period,int(self.prod_infixes[2]['archive_period'])) - self.assertEqual(event_period,int(self.prod_infixes[2]['event_period'])) - self.assertEqual(abs_change,float(self.prod_infixes[2]['abs_change'])) - self.assertEqual(rel_change,self.prod_infixes[2]['rel_change'] and int(self.prod_infixes[2]['rel_change'])) + archive_period, event_period, abs_change, rel_change = get_parameters_from_attribute(self.DEVICE_NAME, attribute_name, + self.PROD_CONFIG_DICT) + self.assertEqual(archive_period,int(self.PROD_INFIXES[2]['archive_period'])) + self.assertEqual(event_period,int(self.PROD_INFIXES[2]['event_period'])) + self.assertEqual(abs_change,float(self.PROD_INFIXES[2]['abs_change'])) + self.assertEqual(rel_change,self.PROD_INFIXES[2]['rel_change'] and int(self.PROD_INFIXES[2]['rel_change'])) def test_get_global_env_parameters(self): """Test if the include attribute list is correctly retrieved from the JSON config file""" - self.assertIsNotNone(self.prod_config_dict) - polling_time, archive_abs_change, archive_rel_change, archive_period, event_period, strategy = get_global_env_parameters(self.prod_config_dict) + self.assertIsNotNone(self.PROD_CONFIG_DICT) + polling_time, archive_abs_change, archive_rel_change, archive_period, event_period, strategy = get_global_env_parameters(self.PROD_CONFIG_DICT) self.assertEqual(type(polling_time),int) self.assertEqual(type(archive_abs_change), int) self.assertEqual(f"{type(archive_rel_change)}", f"<class 'NoneType'>") diff --git a/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_util.py b/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_util.py index 842aff8367ab78a30ba55eb31bd48fcb29193d8e..1d2f92b754620969a63669ea27a0e075381efb88 100644 --- a/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_util.py +++ b/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_util.py @@ -12,28 +12,28 @@ from tangostationcontrol.toolkit.archiver_util import get_attribute_from_fqdn, s class TestArchiverUtil(base.TestCase): - device_name = 'STAT/RECV/1' - attribute_name = 'ant_mask_rw' + DEVICE_NAME = 'STAT/RECV/1' + ATTRIBUTE_NAME = 'ant_mask_rw' def test_get_attribute_from_fqdn(self): """Test if a Tango attribute name is correctly retrieved from a Tango FQDN""" - fqdn = f"tango://databaseds:10000/{self.device_name}/{self.attribute_name}" + fqdn = f"tango://databaseds:10000/{self.DEVICE_NAME}/{self.ATTRIBUTE_NAME}" self.assertEqual('STAT/RECV/1/ant_mask_rw', get_attribute_from_fqdn(fqdn)) def test_device_fqdn(self): """Test if a device name is correctly converted in a Tango FQDN""" - self.assertEqual(f"tango://databaseds:10000/{self.device_name}".lower(), device_fqdn(self.device_name, "databaseds:10000")) + self.assertEqual(f"tango://databaseds:10000/{self.DEVICE_NAME}".lower(), device_fqdn(self.DEVICE_NAME, "databaseds:10000")) def test_attribute_fqdn(self): """Test if an attribute name is correctly converted in a Tango FQDN""" - self.assertEqual(f"tango://databaseds:10000/{self.device_name}/{self.attribute_name}".lower(), - attribute_fqdn(f"{self.device_name}/{self.attribute_name}", "databaseds:10000")) - self.assertRaises(ValueError, lambda: attribute_fqdn(self.attribute_name)) + self.assertEqual(f"tango://databaseds:10000/{self.DEVICE_NAME}/{self.ATTRIBUTE_NAME}".lower(), + attribute_fqdn(f"{self.DEVICE_NAME}/{self.ATTRIBUTE_NAME}", "databaseds:10000")) + self.assertRaises(ValueError, lambda: attribute_fqdn(self.ATTRIBUTE_NAME)) def test_split_tango_name(self): """Test if the Tango full qualified domain names are correctly splitted""" - self.assertEqual(('STAT','RECV','1'), split_tango_name(self.device_name, 'device')) - self.assertEqual(('STAT','RECV','1', 'ant_mask_rw'), split_tango_name(f"{self.device_name}/{self.attribute_name}", 'attribute')) + self.assertEqual(('STAT','RECV','1'), split_tango_name(self.DEVICE_NAME, 'device')) + self.assertEqual(('STAT','RECV','1', 'ant_mask_rw'), split_tango_name(f"{self.DEVICE_NAME}/{self.ATTRIBUTE_NAME}", 'attribute')) def test_get_size_from_datatype(self): """Test if the bytesize of a certain datatype is correctly retrieved""" diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py index 610ae3c0554e9e5bfbc1bd0df6fe900a863296b1..4e40ebeab891fca63f20708348f456cc4dd6de5c 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py @@ -55,12 +55,12 @@ class Archiver(): """ # Global environment variables set by configuration file - global_polling_time = 1000 - global_archive_abs_change = 1 - global_archive_rel_change = None - global_archive_period = 10000 # 3600000 (prod) - global_event_period = 1000 # 60000 (prod) - global_strategy = 'RUN' + GLOBAL_POLLING_TIME = 1000 + GLOBAL_ARCHIVE_ABS_CHANGE = 1 + GLOBAL_ARCHIVE_REL_CHANGE = None + GLOBAL_ARCHIVE_PERIOD = 10000 # 3600000 (prod) + GLOBAL_EVENT_PERIOD = 1000 # 60000 (prod) + GLOBAL_STRATEGY = 'RUN' def __init__(self, cm_name: str = 'archiving/hdbppts/confmanager01', context: str = 'RUN'): self.cm_name = cm_name @@ -122,7 +122,7 @@ class Archiver(): Apply the customized strategy defined by the given archiver configuration. """ # Set global development env variables - self.global_polling_time, self.global_archive_abs_change, self.global_archive_rel_change, self.global_archive_period, self.global_event_period, self.global_strategy = get_global_env_parameters(config_dict) + self.GLOBAL_POLLING_TIME, self.GLOBAL_ARCHIVE_ABS_CHANGE, self.GLOBAL_ARCHIVE_REL_CHANGE, self.GLOBAL_ARCHIVE_PERIOD, self.GLOBAL_EVENT_PERIOD, self.GLOBAL_STRATEGY = get_global_env_parameters(config_dict) # Set devices archiving env_dict = config_dict['devices'] # Check if device has more than one member (domain/family/*) @@ -164,11 +164,11 @@ class Archiver(): archive_period, event_period, abs_change, rel_change = get_parameters_from_attribute(device,att,config_dict) att_fqname = attribute_fqdn(att) # Add the attribute to the archiver setting either specific or global parameters - self.add_attribute_to_archiver(att_fqname, self.global_polling_time, archive_period or self.global_archive_period, - self.global_strategy, abs_change or self.global_archive_abs_change, - rel_change or self.global_archive_rel_change) - self.add_attributes_by_device(device, self.global_archive_period, self.global_archive_abs_change, - self.global_archive_rel_change, exclude=exclude_att_list) + self.add_attribute_to_archiver(att_fqname, self.GLOBAL_POLLING_TIME, archive_period or self.GLOBAL_ARCHIVE_PERIOD, + self.GLOBAL_STRATEGY, abs_change or self.GLOBAL_ARCHIVE_ABS_CHANGE, + rel_change or self.GLOBAL_ARCHIVE_REL_CHANGE) + self.add_attributes_by_device(device, self.GLOBAL_ARCHIVE_PERIOD, self.GLOBAL_ARCHIVE_ABS_CHANGE, + self.GLOBAL_ARCHIVE_REL_CHANGE, exclude=exclude_att_list) # Remove attributes by custom configuration if already present # The following cycle is a security check in the special case that an attribute is in the # included list in DEV mode, and in the excluded list in PROD mode