Skip to content
Snippets Groups Projects
Commit 52ae821f authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'master' into L2SS-822-cache-archiver-ts-docker-image

parents ebdb617c 068ddf5f
No related branches found
No related tags found
1 merge request!417Resolve L2SS-822 "Cache archiver ts docker image"
Showing
with 572 additions and 152 deletions
...@@ -32,12 +32,16 @@ services: ...@@ -32,12 +32,16 @@ services:
- control - control
ports: ports:
- "5704:5704" # unique port for this DS - "5704:5704" # unique port for this DS
- "5804:5804" # ZeroMQ event port
- "5904:5904" # ZeroMQ heartbeat port
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
volumes: volumes:
- ..:/opt/lofar/tango:rw - ..:/opt/lofar/tango:rw
environment: environment:
- TANGO_HOST=${TANGO_HOST} - TANGO_HOST=${TANGO_HOST}
- TANGO_ZMQ_EVENT_PORT=5804
- TANGO_ZMQ_HEARTBEAT_PORT=5904
working_dir: /opt/lofar/tango working_dir: /opt/lofar/tango
entrypoint: entrypoint:
- bin/start-ds.sh - bin/start-ds.sh
......
...@@ -35,12 +35,16 @@ services: ...@@ -35,12 +35,16 @@ services:
- "5002:5002/udp" # port to receive XST UDP packets on - "5002:5002/udp" # port to receive XST UDP packets on
- "5102:5102/tcp" # port to emit XST TCP packets on - "5102:5102/tcp" # port to emit XST TCP packets on
- "5706:5706" # unique port for this DS - "5706:5706" # unique port for this DS
- "5806:5806" # ZeroMQ event port
- "5906:5906" # ZeroMQ heartbeat port
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
volumes: volumes:
- ..:/opt/lofar/tango:rw - ..:/opt/lofar/tango:rw
environment: environment:
- TANGO_HOST=${TANGO_HOST} - TANGO_HOST=${TANGO_HOST}
- TANGO_ZMQ_EVENT_PORT=5806
- TANGO_ZMQ_HEARTBEAT_PORT=5906
working_dir: /opt/lofar/tango working_dir: /opt/lofar/tango
entrypoint: entrypoint:
- bin/start-ds.sh - bin/start-ds.sh
......
# -*- coding: utf-8 -*-
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from collections.abc import Sequence
import numpy
def is_sequence(obj):
"""Identify sequences / collections"""
return isinstance(obj, Sequence) or isinstance(obj, numpy.ndarray)
def sequence_not_str(obj):
"""Separate sequences / collections from str, byte or bytearray"""
return is_sequence(obj) and not isinstance(obj, (str, bytes, bytearray))
def type_not_sequence(obj):
"""Separate sequences / collections from types"""
return not is_sequence(obj) and isinstance(obj, type)
...@@ -9,8 +9,7 @@ If a new device is added, it will (likely) need to be referenced in several plac ...@@ -9,8 +9,7 @@ If a new device is added, it will (likely) need to be referenced in several plac
- Adjust `CDB/LOFAR_ConfigDb.json` to create the device in the Tango device database, - Adjust `CDB/LOFAR_ConfigDb.json` to create the device in the Tango device database,
- Adjust `docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py` to make an alias for it available in Jupyter, - Adjust `docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py` to make an alias for it available in Jupyter,
- Adjust `tangostationcontrol/tangostationcontrol/devices/boot.py` to add the device to the station initialisation sequence, - Adjust `tangostationcontrol/tangostationcontrol/devices/boot.py` to add the device to the station initialisation sequence,
- Add to `docker-compose/` to create a YaML file to start the device in a docker container. NOTE: it needs a unique 57xx port assigned, - Add to `docker-compose/` to create a YaML file to start the device in a docker container. NOTE: it needs a unique 57xx port assigned (current _unused_ port value: 5722), a unique 58xx port for ZMQ events, and a unique 59xx port for ZMQ heartbeat
current _unused_ port value: 5718
- Adjust `tangostationcontrol/setup.cfg` to add an entry point for the device in the package installation, - Adjust `tangostationcontrol/setup.cfg` to add an entry point for the device in the package installation,
- Add to `tangostationcontrol/tangostationcontrol/integration_test/default/devices/` to add an integration test, - Add to `tangostationcontrol/tangostationcontrol/integration_test/default/devices/` to add an integration test,
- Adjust `sbin/run_integration_test.sh` to have the device started when running the integration tests, - Adjust `sbin/run_integration_test.sh` to have the device started when running the integration tests,
......
...@@ -9,12 +9,15 @@ ...@@ -9,12 +9,15 @@
from enum import IntEnum from enum import IntEnum
from math import pi from math import pi
import numpy import numpy
from typing import List
# PyTango imports # PyTango imports
from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVarLongArray from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVarLongArray
from tango.server import device_property, attribute, command from tango.server import device_property, attribute, command
# Additional import # Additional import
from tangostationcontrol.common.type_checking import sequence_not_str
from tangostationcontrol.common.type_checking import type_not_sequence
from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.devices.lofar_device import lofar_device from tangostationcontrol.devices.lofar_device import lofar_device
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
...@@ -47,7 +50,10 @@ class mapped_attribute(attribute): ...@@ -47,7 +50,10 @@ class mapped_attribute(attribute):
if access == AttrWriteType.READ_WRITE: if access == AttrWriteType.READ_WRITE:
@fault_on_error() @fault_on_error()
def write_func_wrapper(device, value): def write_func_wrapper(device, value):
write_func = device.set_mapped_attribute(mapping_attribute, value) cast_type = dtype
while not type_not_sequence(cast_type):
cast_type = cast_type[0]
write_func = device.set_mapped_attribute(mapping_attribute, value, cast_type)
self.fset = write_func_wrapper self.fset = write_func_wrapper
...@@ -384,7 +390,7 @@ class AntennaField(lofar_device): ...@@ -384,7 +390,7 @@ class AntennaField(lofar_device):
power_mapping = numpy.reshape(self.Power_to_RECV_mapping, (-1, 2)) power_mapping = numpy.reshape(self.Power_to_RECV_mapping, (-1, 2))
self.__mapper = AntennaToRecvMapper(control_mapping, power_mapping, number_of_receivers) self.__mapper = AntennaToRecvMapper(control_mapping, power_mapping, number_of_receivers)
def get_mapped_attribute(self, mapped_point): def get_mapped_attribute(self, mapped_point: str):
recv_results = [] recv_results = []
for recv_proxy in self.recv_proxies: for recv_proxy in self.recv_proxies:
...@@ -395,11 +401,24 @@ class AntennaField(lofar_device): ...@@ -395,11 +401,24 @@ class AntennaField(lofar_device):
return mapped_values return mapped_values
def set_mapped_attribute(self, mapped_point, value): def set_mapped_attribute(self, mapped_point: str, value, cast_type: type):
"""Set the attribute to new value only for controlled points
:warning: This method is susceptible to a lost update race condition if the
attribute on the RECV device is written to in between `read_attribute`
and `write_attribute`!
"""
mapped_value = self.__mapper.map_write(mapped_point, value) mapped_value = self.__mapper.map_write(mapped_point, value)
for idx, recv_proxy in enumerate(self.recv_proxies): for idx, recv_proxy in enumerate(self.recv_proxies):
recv_proxy.write_attribute(mapped_point, mapped_value[idx]) new_values = mapped_value[idx]
# TODO(Corne): Resolve potential lost update race condition
current_values = recv_proxy.read_attribute(mapped_point).value
self.__mapper.merge_write(new_values, current_values)
recv_proxy.write_attribute(mapped_point, new_values.astype(dtype=cast_type))
# -------- # --------
# Overloaded functions # Overloaded functions
...@@ -452,67 +471,115 @@ class AntennaField(lofar_device): ...@@ -452,67 +471,115 @@ class AntennaField(lofar_device):
return result_values.flatten() return result_values.flatten()
class AntennaToRecvMapper(object): class AntennaToRecvMapper(object):
_VALUE_MAP_NONE_96 = numpy.full(96, None)
_VALUE_MAP_NONE_96_32 = numpy.full((96, 32), None)
def __init__(self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers): def __init__(self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers):
number_of_antennas = len(control_to_recv_mapping) number_of_antennas = len(control_to_recv_mapping)
self.__control_mapping = control_to_recv_mapping # Reduce memory footprint of default values by creating single instance of
self.__power_mapping = power_to_recv_mapping # common fields
self.__number_of_receivers = number_of_receivers value_map_ant_32_int = numpy.zeros([number_of_antennas, 32], dtype=numpy.int64)
self.__default_value_mapping_read = { value_map_ant_32_bool = numpy.full((number_of_antennas, 32), False)
"ANT_mask_RW": numpy.full(number_of_antennas, False), value_map_ant_bool = numpy.full(number_of_antennas, False)
"RCU_PWR_ANT_on_R": numpy.full(number_of_antennas, False),
"RCU_PWR_ANT_on_RW": numpy.full(number_of_antennas, False), self._control_mapping = control_to_recv_mapping
"HBAT_BF_delay_steps_R": numpy.zeros([number_of_antennas,32], dtype=numpy.int64), self._power_mapping = power_to_recv_mapping
"HBAT_BF_delay_steps_RW": numpy.zeros([number_of_antennas,32], dtype=numpy.int64), self._number_of_receivers = number_of_receivers
"HBAT_LED_on_R": numpy.full((number_of_antennas,32), False), self._default_value_mapping_read = {
"HBAT_LED_on_RW": numpy.full((number_of_antennas,32), False), "ANT_mask_RW": value_map_ant_bool,
"HBAT_PWR_LNA_on_R": numpy.full((number_of_antennas,32), False), "RCU_PWR_ANT_on_R": value_map_ant_bool,
"HBAT_PWR_LNA_on_RW": numpy.full((number_of_antennas,32), False), "RCU_PWR_ANT_on_RW": value_map_ant_bool,
"HBAT_PWR_on_R": numpy.full((number_of_antennas,32), False), "HBAT_BF_delay_steps_R": value_map_ant_32_int,
"HBAT_PWR_on_RW": numpy.full((number_of_antennas,32), False), "HBAT_BF_delay_steps_RW": value_map_ant_32_int,
"HBAT_LED_on_R": value_map_ant_32_bool,
"HBAT_LED_on_RW": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_R": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_RW": value_map_ant_32_bool,
"HBAT_PWR_on_R": value_map_ant_32_bool,
"HBAT_PWR_on_RW": value_map_ant_32_bool,
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64) "RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64)
} }
self.__default_value_mapping_write = { self._masked_value_mapping_write = {
"ANT_mask_RW": numpy.full(96, False), "ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_PWR_ANT_on_RW": numpy.full(96, False), "RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"HBAT_BF_delay_steps_RW": numpy.zeros([96,32], dtype=numpy.int64), "HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_LED_on_RW": numpy.full((96,32), False), "HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_LNA_on_RW": numpy.full((96,32), False), "HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_on_RW": numpy.full((96,32), False), "HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"RCU_band_select_RW": numpy.zeros(96, dtype=numpy.int64) "RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
} }
self.__reshape_attributes_in = { self._reshape_attributes_in = {
"HBAT_BF_delay_steps_RW": (96, 32), "HBAT_BF_delay_steps_RW": (96, 32),
} }
self.__reshape_attributes_out = { self._reshape_attributes_out = {
"HBAT_BF_delay_steps_RW": (96, 32), "HBAT_BF_delay_steps_RW": (96, 32),
} }
def map_read(self, mapped_attribute, recv_results): def map_read(self, mapped_attribute: str, recv_results: List[any]) -> List[any]:
default_values = self.__default_value_mapping_read[mapped_attribute] """Perform a mapped read for the attribute using the recv_results
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_read`
:param recv_results: Results as gathered by appending attributes from RECV
devices
:return: recv_results as mapped given attribute dimensions and control mapping
"""
default_values = self._default_value_mapping_read[mapped_attribute]
if mapped_attribute in self.__reshape_attributes_in: if mapped_attribute in self._reshape_attributes_in:
recv_results = numpy.reshape(recv_results, recv_results = numpy.reshape(recv_results,
(self.__number_of_receivers,) + self.__reshape_attributes_in[mapped_attribute]) (self._number_of_receivers,) + self._reshape_attributes_in[mapped_attribute])
return self._mapped_r_values(recv_results, default_values) return self._mapped_r_values(recv_results, default_values)
def map_write(self, mapped_attribute, set_values): def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]:
default_values = self.__default_value_mapping_write[mapped_attribute] """Perform a mapped write for the attribute using the set_values
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_write`
:param set_values: The values to be set for the specified attribute
:return: set_values as mapped given attribute dimensions and control mapping
"""
default_values = self._masked_value_mapping_write[mapped_attribute]
mapped_values = self._mapped_rw_values(set_values, default_values) mapped_values = self._mapped_rw_values(set_values, default_values)
if mapped_attribute in self.__reshape_attributes_out: if mapped_attribute in self._reshape_attributes_out:
mapped_values = numpy.reshape(mapped_values, mapped_values = numpy.reshape(mapped_values,
(self.__number_of_receivers,) + self.__reshape_attributes_out[mapped_attribute]) (self._number_of_receivers,) + self._reshape_attributes_out[mapped_attribute])
return mapped_values return mapped_values
def _mapped_r_values(self, recv_results, default_values): def merge_write(self, merge_values: List[any], current_values: List[any]):
"""Merge values as retrieved from :py:func:`~map_write` with current_values
This method will modify the contents of merge_values.
To be used by the :py:class:`~AntennaField` device to remove None fields
from mapped_values with recently retrieved current_values from RECV device.
:param merge_values: values as retrieved from :py:func:`~map_write`
:param current_values: values retrieved from RECV device on specific attribute
"""
for idx, value in enumerate(merge_values):
if sequence_not_str(value):
self.merge_write(merge_values[idx], current_values[idx])
elif value is None:
merge_values[idx] = current_values[idx]
def _mapped_r_values(self, recv_results: List[any], default_values: List[any]):
"""Mapping for read using :py:attribute:`~_control_mapping` and shallow copy"""
mapped_values = numpy.array(default_values) mapped_values = numpy.array(default_values)
for idx, mapping in enumerate(self.__control_mapping): for idx, mapping in enumerate(self._control_mapping):
recv = mapping[0] recv = mapping[0]
rcu = mapping[1] rcu = mapping[1]
if recv > 0: if recv > 0:
...@@ -520,14 +587,16 @@ class AntennaToRecvMapper(object): ...@@ -520,14 +587,16 @@ class AntennaToRecvMapper(object):
return mapped_values return mapped_values
def _mapped_rw_values(self, set_values, default_values): def _mapped_rw_values(self, set_values: List[any], default_values: List[any]):
"""Mapping for write using :py:attribute:`~_control_mapping` and shallow copy"""
mapped_values = [] mapped_values = []
for _ in range(self.__number_of_receivers): for _ in range(self._number_of_receivers):
mapped_values.append(default_values) mapped_values.append(default_values)
mapped_values = numpy.array(mapped_values) mapped_values = numpy.array(mapped_values)
for idx, mapping in enumerate(self.__control_mapping): for idx, mapping in enumerate(self._control_mapping):
recv = mapping[0] recv = mapping[0]
rcu = mapping[1] rcu = mapping[1]
if recv > 0: if recv > 0:
......
...@@ -23,7 +23,7 @@ These arguments and modules can also be passed at the level of the Makefile ...@@ -23,7 +23,7 @@ These arguments and modules can also be passed at the level of the Makefile
instead of through tox directly: instead of through tox directly:
```shell ```shell
make integration default import.path.class.functionname` make integration import.path.class.functionname`
``` ```
## Breakpoints & Debuggers with Integration Tests ## Breakpoints & Debuggers with Integration Tests
......
...@@ -7,20 +7,39 @@ ...@@ -7,20 +7,39 @@
# Distributed under the terms of the APACHE license. # Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info. # See LICENSE.txt for more info.
from tango._tango import DevState
import numpy import numpy
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse
from .base import AbstractTestBases from .base import AbstractTestBases
class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
def setUp(self): def setUp(self):
super().setUp("STAT/AntennaField/1") super().setUp("STAT/AntennaField/1")
self.proxy.put_property({"RECV_devices": ["STAT/RECV/1"], self.proxy.put_property({
"Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92}) "RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92
})
self.recv_proxy = self.setup_recv_proxy() self.recv_proxy = self.setup_recv_proxy()
self.addCleanup(self.restore_antennafield)
self.addCleanup(self.shutdown_recv)
def restore_antennafield(self):
self.proxy.put_property({
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping": [-1, -1] * 48
})
@staticmethod
def shutdown_recv():
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy.off()
def setup_recv_proxy(self): def setup_recv_proxy(self):
# setup RECV # setup RECV
recv_proxy = TestDeviceProxy("STAT/RECV/1") recv_proxy = TestDeviceProxy("STAT/RECV/1")
...@@ -35,38 +54,222 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): ...@@ -35,38 +54,222 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self): def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self):
""" Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values""" """ Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values"""
recv_proxy = self.setup_recv_proxy()
antennafield_proxy = self.proxy antennafield_proxy = self.proxy
numpy.testing.assert_equal(numpy.array([True] * 96), recv_proxy.ANT_mask_RW) numpy.testing.assert_equal(
numpy.array([True] * 96), self.recv_proxy.ANT_mask_RW
)
antenna_qualities = numpy.array([AntennaQuality.OK] * 96) antenna_qualities = numpy.array([AntennaQuality.OK] * 96)
antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95) antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95)
antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} antenna_properties = {
mapping_properties = {"RECV_devices": ["STAT/RECV/1"], 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use
}
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48, "Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46} # Two inputs of recv device connected, only defined for 48 inputs
# each pair is one input
"Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46
}
antennafield_proxy.off() antennafield_proxy.off()
antennafield_proxy.put_property(antenna_properties) antennafield_proxy.put_property(antenna_properties)
antennafield_proxy.put_property(mapping_properties) antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot() # initialise hardware values as well antennafield_proxy.boot() # initialises hardware values as well
numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R)
numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46), antennafield_proxy.ANT_mask_RW) # Verify all antennas are indicated to work
numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW) numpy.testing.assert_equal(
numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R
)
# Verify only connected inputs + Antenna_Usage_Mask_R are true
# As well as dimensions of ANT_mask_RW must match control mapping
numpy.testing.assert_equal(
numpy.array([True] * 2 + [False] * 46),
antennafield_proxy.ANT_mask_RW
)
# Verify recv proxy values unaffected as default for ANT_mask_RW is true
numpy.testing.assert_equal(
numpy.array([True] * 2 + [True] * 94),
self.recv_proxy.ANT_mask_RW
)
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(self): def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(self):
"""Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)""" """Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)"""
recv_proxy = self.setup_recv_proxy()
antennafield_proxy = self.proxy antennafield_proxy = self.proxy
# Broken antennas except second
antenna_qualities = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94) antenna_qualities = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94)
antenna_use = numpy.array([AntennaUse.AUTO] * 96) antenna_use = numpy.array([AntennaUse.AUTO] * 96)
antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} antenna_properties = {
mapping_properties = {"RECV_devices": ["STAT/RECV/1"], 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use
}
# Configure control mapping to control all 96 inputs of recv device
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48, "Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46} "Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, 96)]).flatten()
}
# Cycle device and set properties
antennafield_proxy.off() antennafield_proxy.off()
antennafield_proxy.put_property(antenna_properties) antennafield_proxy.put_property(antenna_properties)
antennafield_proxy.put_property(mapping_properties) antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot() # initialise hardware values as well antennafield_proxy.boot() # initialises hardware values as well
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46), antennafield_proxy.ANT_mask_RW) # Antenna_Usage_Mask_R should be false except one
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW) numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * 94),
antennafield_proxy.Antenna_Usage_Mask_R
)
# device.boot() writes Antenna_Usage_Mask_R to ANT_mask_RW
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * 94),
antennafield_proxy.ANT_mask_RW
)
# ANT_mask_RW on antennafield writes to configured recv devices for all
# mapped inputs
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * 94),
self.recv_proxy.ANT_mask_RW
)
def test_antennafield_set_mapped_attribute_ignore_all(self):
"""Verify RECV device attribute unaffected by antennafield if not mapped"""
# Connect recv/1 device but no control inputs
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping": [-1, -1] * 48
}
# Cycle device an put properties
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
# Set HBAT_PWR_on_RW to false on recv device and read results
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
current_values = self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
# write true through antennafield
antennafield_proxy.write_attribute("HBAT_PWR_on_RW", [[True] * 32] * 48)
# Test that original recv values for HBAT_PWR_on_RW match current
numpy.testing.assert_equal(
current_values,
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute(self):
"""Verify RECV device attribute changed by antennafield if mapped inputs"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
# Each pair is one mapping so 2 inputs are connected
"Control_to_RECV_mapping": [1, 0, 1, 1] + [-1, -1] * 46
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
try:
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * 32] * 48
)
numpy.testing.assert_equal(
numpy.array([[True] * 32] * 2 + [[False] * 32] * 94),
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * 32] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute_all(self):
"""Verify RECV device attribute changed by antennafield all inputs mapped"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, 96)]).flatten()
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
try:
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * 32] * 96
)
numpy.testing.assert_equal(
numpy.array([[True] * 32] * 96),
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * 32] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute_small(self):
"""Verify small RECV device attribute changed all inputs mapped"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, 96)]).flatten()
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("RCU_band_select_RW", [False] * 96)
try:
antennafield_proxy.write_attribute(
"RCU_band_select_RW", [True] * 96
)
numpy.testing.assert_equal(
numpy.array([True] * 96),
self.recv_proxy.read_attribute("RCU_band_select_RW").value
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"RCU_band_select_RW", [False] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
...@@ -63,9 +63,11 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): ...@@ -63,9 +63,11 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
NR_TILES = 48 NR_TILES = 48
antennafield_proxy = TestDeviceProxy(self.antennafield_iden) antennafield_proxy = TestDeviceProxy(self.antennafield_iden)
control_mapping = [[1,i] for i in range(NR_TILES)] control_mapping = [[1,i] for i in range(NR_TILES)]
antennafield_proxy.put_property({"RECV_devices": [self.recv_iden], antennafield_proxy.put_property({
"HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), "RECV_devices": [self.recv_iden],
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}) "Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}
)
antennafield_proxy.off() antennafield_proxy.off()
antennafield_proxy.boot() antennafield_proxy.boot()
return antennafield_proxy return antennafield_proxy
......
...@@ -182,9 +182,9 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): ...@@ -182,9 +182,9 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
self.assertIsNotNone(stat) self.assertIsNotNone(stat)
self.assertEqual(121, stat.data_id_signal_input_index) self.assertEqual(121, stat.data_id_signal_input_index)
# Test RECV attributes # Test RECV attributes
self.assertEqual(stat.rcu_attenuator_dB.tolist(), None) self.assertEqual(stat.rcu_attenuator_dB, None)
self.assertEqual(stat.rcu_band_select.tolist(), None) self.assertEqual(stat.rcu_band_select, None)
self.assertEqual(stat.rcu_dth_on.tolist(), None) self.assertEqual(stat.rcu_dth_on, None)
def test_SST_statistics_with_device_in_off(self): def test_SST_statistics_with_device_in_off(self):
self.setup_recv_proxy() self.setup_recv_proxy()
...@@ -222,6 +222,6 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase): ...@@ -222,6 +222,6 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
self.assertIsNotNone(stat) self.assertIsNotNone(stat)
self.assertEqual(121, stat.data_id_signal_input_index) self.assertEqual(121, stat.data_id_signal_input_index)
# Test RECV attributes # Test RECV attributes
self.assertEqual(stat.rcu_attenuator_dB.tolist(), None) self.assertEqual(stat.rcu_attenuator_dB, None)
self.assertEqual(stat.rcu_band_select.tolist(), None) self.assertEqual(stat.rcu_band_select, None)
self.assertEqual(stat.rcu_dth_on.tolist(), None) self.assertEqual(stat.rcu_dth_on, None)
...@@ -22,7 +22,7 @@ logger = logging.getLogger() ...@@ -22,7 +22,7 @@ logger = logging.getLogger()
class TestRecvCluster(base.IntegrationTestCase): class TestRecvCluster(base.IntegrationTestCase):
# The AntennaField is setup with self.NR_TILES tiles in the test configuration
NR_TILES = 48 NR_TILES = 48
POINTING_DIRECTION = numpy.array([["J2000", "0deg", "0deg"]] * NR_TILES).flatten() POINTING_DIRECTION = numpy.array([["J2000", "0deg", "0deg"]] * NR_TILES).flatten()
......
...@@ -202,9 +202,24 @@ class statistics_data: ...@@ -202,9 +202,24 @@ class statistics_data:
# get SST specific stuff # get SST specific stuff
if self.marker == "S": if self.marker == "S":
self.data_id_signal_input_index = file[group_key].attrs["data_id_signal_input_index"] self.data_id_signal_input_index = file[group_key].attrs["data_id_signal_input_index"]
# check if the dataset is empty or not. if empty, set to None, if not get the value
if file.get(f'{group_key}/rcu_attenuator_dB').shape is None:
self.rcu_attenuator_dB = None
else:
self.rcu_attenuator_dB = numpy.array(file.get(f"{group_key}/rcu_attenuator_dB")) self.rcu_attenuator_dB = numpy.array(file.get(f"{group_key}/rcu_attenuator_dB"))
if file.get(f'{group_key}/rcu_band_select').shape is None:
self.rcu_band_select = None
else:
self.rcu_band_select = numpy.array(file.get(f"{group_key}/rcu_band_select")) self.rcu_band_select = numpy.array(file.get(f"{group_key}/rcu_band_select"))
if file.get(f'{group_key}/rcu_dth_on').shape is None:
self.rcu_dth_on = None
else:
self.rcu_dth_on = numpy.array(file.get(f"{group_key}/rcu_dth_on")) self.rcu_dth_on = numpy.array(file.get(f"{group_key}/rcu_dth_on"))
# get XST specific stuff # get XST specific stuff
......
...@@ -332,32 +332,43 @@ class SstHdf5Writer(HDF5Writer): ...@@ -332,32 +332,43 @@ class SstHdf5Writer(HDF5Writer):
].astype(numpy.float32), ].astype(numpy.float32),
compression="gzip", compression="gzip",
) )
try: try:
current_group.create_dataset( current_group.create_dataset(
name="rcu_attenuator_dB", name="rcu_attenuator_dB",
data=self.current_matrix.parameters["rcu_attenuator_dB"].astype( data=self.current_matrix.parameters["rcu_attenuator_dB"].astype(numpy.int64),
numpy.int64
),
compression="gzip", compression="gzip",
) )
except AttributeError:
current_group.create_dataset(
name="rcu_attenuator_dB",
data=h5py.Empty("f"),
)
try:
current_group.create_dataset( current_group.create_dataset(
name="rcu_band_select", name="rcu_band_select",
data=self.current_matrix.parameters["rcu_band_select"].astype( data=self.current_matrix.parameters["rcu_band_select"].astype(numpy.int64),
numpy.int64
),
compression="gzip", compression="gzip",
) )
except AttributeError:
current_group.create_dataset(
name="rcu_band_select",
data=h5py.Empty("f"),
)
try:
current_group.create_dataset( current_group.create_dataset(
name="rcu_dth_on", name="rcu_dth_on",
data=self.current_matrix.parameters["rcu_dth_on"].astype( data=self.current_matrix.parameters["rcu_dth_on"].astype(numpy.bool_),
numpy.bool_
),
compression="gzip", compression="gzip",
) )
except AttributeError as e: except AttributeError:
logger.warning("Device values not written.") current_group.create_dataset(
except Exception as e: name="rcu_dth_on",
raise Exception from e data=h5py.Empty("f"),
)
class BstHdf5Writer(HDF5Writer): class BstHdf5Writer(HDF5Writer):
......
...@@ -7,6 +7,13 @@ ...@@ -7,6 +7,13 @@
# Distributed under the terms of the APACHE license. # Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info. # See LICENSE.txt for more info.
import time
import statistics
import logging
import unittest
from unittest import mock
import numpy import numpy
from tango.test_context import DeviceTestContext from tango.test_context import DeviceTestContext
...@@ -16,6 +23,8 @@ from tangostationcontrol.devices.antennafield import AntennaToRecvMapper, Antenn ...@@ -16,6 +23,8 @@ from tangostationcontrol.devices.antennafield import AntennaToRecvMapper, Antenn
from tangostationcontrol.test import base from tangostationcontrol.test import base
from tangostationcontrol.test.devices import device_base from tangostationcontrol.test.devices import device_base
logger = logging.getLogger()
class TestAntennaToRecvMapper(base.TestCase): class TestAntennaToRecvMapper(base.TestCase):
...@@ -190,50 +199,54 @@ class TestAntennaToRecvMapper(base.TestCase): ...@@ -190,50 +199,54 @@ class TestAntennaToRecvMapper(base.TestCase):
# Rename to write # Rename to write
def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self): def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self):
"""Verify results None without control and array sizes"""
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [False] * 48 set_values = [None] * 48
expected = [[False] * 96] expected = [[None] * 96]
actual = mapper.map_write("ANT_mask_RW", set_values) actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self): def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self):
"""Verify results None without control and array sizes"""
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [False] * 48 set_values = [None] * 48
expected = [[False] * 96] * 2 expected = [[None] * 96] * 2
actual = mapper.map_write("ANT_mask_RW", set_values) actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [True, False] + [False] * 46 set_values = [True, False] + [None] * 46
expected = [[False, True, False] + [False] * 93] expected = [[False, True] + [None] * 94]
actual = mapper.map_write("ANT_mask_RW", set_values) actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self): def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [0] * 48 set_values = [None] * 48
expected = [[0] * 96] expected = [[None] * 96]
actual = mapper.map_write("RCU_band_select_RW", set_values) actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self): def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [0] * 48 set_values = [None] * 48
expected = [[0] * 96] * 2 expected = [[None] * 96] * 2
actual = mapper.map_write("RCU_band_select_RW", set_values) actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [1, 0] + [0] * 46 set_values = [1, 0] + [None] * 46
expected = [[0, 1, 0] + [0] * 93] expected = [[0, 1] + [None] * 94]
actual = mapper.map_write("RCU_band_select_RW", set_values) actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
...@@ -241,7 +254,7 @@ class TestAntennaToRecvMapper(base.TestCase): ...@@ -241,7 +254,7 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[1] * 32] * 48 set_values = [[1] * 32] * 48
expected = [[[0] * 32] * 96] expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
...@@ -249,104 +262,138 @@ class TestAntennaToRecvMapper(base.TestCase): ...@@ -249,104 +262,138 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[1] * 32] * 48 set_values = [[1] * 32] * 48
expected = [[[0] * 32] * 96, [[0] * 32] * 96] expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[1] * 32, [2] * 32] + [[0] * 32] * 46 set_values = [[1] * 32, [2] * 32] + [[None] * 32] * 46
expected = [[[2] * 32, [1] * 32] + [[0] * 32] * 94] expected = [[[2] * 32, [1] * 32] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_one_receiver(self): def test_map_write_led_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[False] * 32] * 48 set_values = [[None] * 32] * 48
expected = [[[False] * 32] * 96] expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_LED_on_RW", set_values) actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_two_receivers(self): def test_map_write_led_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48 set_values = [[None] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96] expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_LED_on_RW", set_values) actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_LED_on_RW", set_values) actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self): def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[False] * 32] * 48 set_values = [[None] * 32] * 48
expected = [[[False] * 32] * 96] expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self): def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48 set_values = [[None] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96] expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self): def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[None] * 32] * 48
set_values = [[False] * 32] * 48 expected = [[[None] * 32] * 96]
expected = [[[False] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values) actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self): def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48 set_values = [[None] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96] expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values) actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values) actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual) numpy.testing.assert_equal(expected, actual)
def test_merge_write(self):
"""Verify all None fields are replaced by merge_write if no control"""
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
merge_values = [[None] * 32] * 96
current_values = [[False] * 32] * 96
mapper.merge_write(merge_values, current_values)
numpy.testing.assert_equal(merge_values, current_values)
results = []
for _i in range(25):
start_time = time.monotonic_ns()
mapper.merge_write(merge_values, current_values)
stop_time = time.monotonic_ns()
results.append(stop_time - start_time)
logging.error(
f"Merge write performance: Median {statistics.median(results) / 1.e9} "
f"Stdev {statistics.stdev(results) / 1.e9}"
)
def test_merge_write_values(self):
"""Verify all fields with values are retained by merge_write"""
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
merge_values = [[True] * 32] * 2 + [[None] * 32] * 94
current_values = [[True] * 32] * 2 + [[False] * 32] * 94
mapper.merge_write(merge_values, current_values)
numpy.testing.assert_equal(merge_values, current_values)
class TestAntennafieldDevice(device_base.DeviceTestCase): class TestAntennafieldDevice(device_base.DeviceTestCase):
# some dummy values for mandatory properties # some dummy values for mandatory properties
AT_PROPERTIES = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0, AT_PROPERTIES = {
'Antenna_Field_Reference_ITRF' : [3.0, 3.0, 3.0], 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]} 'OPC_Server_Name': 'example.com',
'OPC_Server_Port': 4840,
# A mapping where Antennas are all not mapped to power RCUs 'OPC_Time_Out': 5.0,
POWER_NOT_CONNECTED = [[-1, -1]] * 48 'Antenna_Field_Reference_ITRF': [3.0, 3.0, 3.0],
# A mapping where Antennas are all not mapped to control RCUs 'Antenna_Field_Reference_ETRS': [7.0, 7.0, 7.0],
CONTROL_NOT_CONNECTED = [[-1, -1]] * 48 }
# A mapping where first two Antennas are mapped on the first Receiver.
# The first Antenna control line on RCU 1 and the second Antenna control line on RCU 0.
CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46
def setUp(self): def setUp(self):
# DeviceTestCase setUp patches lofar_device DeviceProxy # DeviceTestCase setUp patches lofar_device DeviceProxy
...@@ -397,3 +444,33 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): ...@@ -397,3 +444,33 @@ class TestAntennafieldDevice(device_base.DeviceTestCase):
with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy: with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy:
for i in range(len(antenna_names)): for i in range(len(antenna_names)):
self.assertTrue(proxy.Antenna_Names_R[i]==f"C{i}") self.assertTrue(proxy.Antenna_Names_R[i]==f"C{i}")
@unittest.skip("Test for manual use, enable at most one (process=false)")
@mock.patch.object(antennafield, "DeviceProxy")
def test_set_mapped_attribute(self, m_proxy):
"""Verify set_mapped_attribute only modifies controlled inputs"""
antenna_properties = {
'RECV_devices': ['stat/RECV/1'],
}
data = numpy.array([[False] * 32] * 96)
m_proxy.return_value = mock.Mock(
read_attribute=mock.Mock(
return_value=mock.Mock(value=data)
)
)
with DeviceTestContext(
antennafield.AntennaField, process=False,
properties={**self.AT_PROPERTIES, **antenna_properties}
) as proxy:
proxy.boot()
proxy.write_attribute("HBAT_PWR_on_RW", numpy.array([[False] * 32] * 48))
numpy.testing.assert_equal(
m_proxy.return_value.write_attribute.call_args[0][1],
data
)
...@@ -24,7 +24,11 @@ deps = ...@@ -24,7 +24,11 @@ deps =
-r{toxinidir}/requirements.txt -r{toxinidir}/requirements.txt
-r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt
-r{toxinidir}/test-requirements.txt -r{toxinidir}/test-requirements.txt
commands = {envpython} -m stestr run {posargs} commands_pre =
{envpython} --version
commands =
{envpython} -m stestr --version
{envpython} -m stestr run {posargs}
[testenv:integration] [testenv:integration]
; Warning running integration tests will make changes to your docker system! ; Warning running integration tests will make changes to your docker system!
...@@ -37,6 +41,8 @@ setenv = ...@@ -37,6 +41,8 @@ setenv =
PYTHON={envpython} -m coverage run --source tangostationcontrol --parallel-mode PYTHON={envpython} -m coverage run --source tangostationcontrol --parallel-mode
commands = commands =
echo "Integration test directory configured for{env:TESTS_DIR} ({env:TEST_MODULE:default})" echo "Integration test directory configured for{env:TESTS_DIR} ({env:TEST_MODULE:default})"
{envpython} -m stestr --version
{envpython} -m coverage --version
{envpython} -m stestr run --serial {posargs} {envpython} -m stestr run --serial {posargs}
{envpython} -m coverage combine {envpython} -m coverage combine
{envpython} -m coverage html --omit='*test*' -d cover {envpython} -m coverage html --omit='*test*' -d cover
...@@ -54,6 +60,8 @@ deps = ...@@ -54,6 +60,8 @@ deps =
-r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt
-r{toxinidir}/test-requirements.txt -r{toxinidir}/test-requirements.txt
commands = commands =
{envpython} -m stestr --version
{envpython} -m coverage --version
{envpython} -m coverage erase {envpython} -m coverage erase
{envpython} -m stestr run {posargs} {envpython} -m stestr run {posargs}
{envpython} -m coverage combine {envpython} -m coverage combine
...@@ -64,6 +72,8 @@ commands = ...@@ -64,6 +72,8 @@ commands =
; TODO(Corne): Integrate Hacking to customize pep8 rules ; TODO(Corne): Integrate Hacking to customize pep8 rules
[testenv:pep8] [testenv:pep8]
commands = commands =
{envpython} -m doc8 --version
{envpython} -m flake8 --version
{envpython} -m doc8 docs/source/ --ignore D001 {envpython} -m doc8 docs/source/ --ignore D001
{envpython} -m flake8 {envpython} -m flake8
...@@ -73,10 +83,12 @@ commands = ...@@ -73,10 +83,12 @@ commands =
; It thus matters what interfaces Docker will bind our ; It thus matters what interfaces Docker will bind our
; containers to, not what our containers listen on. ; containers to, not what our containers listen on.
commands = commands =
{envpython} -m bandit --version
{envpython} -m bandit -r devices/ -n5 -ll -s B104 {envpython} -m bandit -r devices/ -n5 -ll -s B104
[testenv:xenon]; [testenv:xenon];
commands = commands =
{envpython} -m xenon --version
{envpython} -m xenon tangostationcontrol -b B -m A -a A -i libhdbpp-python {envpython} -m xenon tangostationcontrol -b B -m A -a A -i libhdbpp-python
[testenv:docs] [testenv:docs]
...@@ -84,6 +96,7 @@ deps = ...@@ -84,6 +96,7 @@ deps =
-r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt -r{toxinidir}/../docker-compose/lofar-device-base/lofar-requirements.txt
-r{toxinidir}/docs/docs-requirements.txt -r{toxinidir}/docs/docs-requirements.txt
commands = commands =
sphinx-build --version
sphinx-build -W -b html docs/source docs/build/html sphinx-build -W -b html docs/source docs/build/html
[flake8] [flake8]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment