Skip to content
Snippets Groups Projects
Commit b144cda1 authored by Auke Klazema's avatar Auke Klazema
Browse files

Merge branch 'L2SS-704' into 'master'

L2SS-704: Implement AntennaField device with tests

Closes L2SS-704

See merge request !302
parents 5963237f 4bba98ab
No related branches found
No related tags found
1 merge request!302L2SS-704: Implement AntennaField device with tests
Showing
with 609 additions and 4 deletions
...@@ -100,6 +100,7 @@ docker_build_image_all: ...@@ -100,6 +100,7 @@ docker_build_image_all:
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-tilebeam latest - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-tilebeam latest
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-beamlet latest - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-beamlet latest
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-digitalbeam latest - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-digitalbeam latest
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-antennafield latest
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-boot latest - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-boot latest
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-docker latest - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-docker latest
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-observation_control latest - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-observation_control latest
...@@ -331,6 +332,17 @@ docker_build_image_device_ovservation_control: ...@@ -331,6 +332,17 @@ docker_build_image_device_ovservation_control:
script: script:
# Do not remove 'bash' or statement will be ignored by primitive docker shell # Do not remove 'bash' or statement will be ignored by primitive docker shell
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-observation_control $tag - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-observation_control $tag
docker_build_image_device_antennafield:
extends: .base_docker_images_except
only:
refs:
- merge_requests
changes:
- docker-compose/device-antennafield.yml
- docker-compose/lofar-device-base/*
script:
# Do not remove 'bash' or statement will be ignored by primitive docker shell
- bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-antennafield $tag
docker_build_image_device_recv: docker_build_image_device_recv:
extends: .base_docker_images_except extends: .base_docker_images_except
only: only:
......
...@@ -14,6 +14,17 @@ ...@@ -14,6 +14,17 @@
} }
} }
}, },
"AntennaField": {
"STAT": {
"AntennaField": {
"STAT/AntennaField/1": {
"properties": {
"RECV_devices": ["STAT/RECV/1"]
}
}
}
}
},
"PDU": { "PDU": {
"STAT": { "STAT": {
"PDU": { "PDU": {
......
#
# Docker compose file that launches an interactive iTango session.
#
# Connect to the interactive session with 'docker attach itango'.
# Disconnect with the Docker deattach sequence: <CTRL>+<P> <CTRL>+<Q>
#
# Defines:
# - itango: iTango interactive session
#
# Requires:
# - lofar-device-base.yml
#
version: '2'
services:
device-antennafield:
image: device-antennafield
# build explicitly, as docker-compose does not understand a local image
# being shared among services.
build:
context: ..
dockerfile: docker-compose/lofar-device-base/Dockerfile
args:
SOURCE_IMAGE: ${LOCAL_DOCKER_REGISTRY_HOST}/${LOCAL_DOCKER_REGISTRY_USER}/tango-itango:${TANGO_ITANGO_VERSION}
container_name: ${CONTAINER_NAME_PREFIX}device-antennafield
logging:
driver: "json-file"
options:
max-size: "100m"
max-file: "10"
networks:
- control
ports:
- "5715:5715" # unique port for this DS
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ..:/opt/lofar/tango:rw
environment:
- TANGO_HOST=${TANGO_HOST}
working_dir: /opt/lofar/tango
entrypoint:
- bin/start-ds.sh
# configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA
# can't know about our Docker port forwarding
- l2ss-antennafield AntennaField STAT -v -ORBendPoint giop:tcp:0:5715 -ORBendPointPublish giop:tcp:${HOSTNAME}:5715
restart: unless-stopped
...@@ -11,7 +11,8 @@ tilebeam = DeviceProxy("STAT/TileBeam/1") ...@@ -11,7 +11,8 @@ tilebeam = DeviceProxy("STAT/TileBeam/1")
pdu = DeviceProxy("STAT/PDU/1") pdu = DeviceProxy("STAT/PDU/1")
beamlet = DeviceProxy("STAT/Beamlet/1") beamlet = DeviceProxy("STAT/Beamlet/1")
digitalbeam = DeviceProxy("STAT/DigitalBeam/1") digitalbeam = DeviceProxy("STAT/DigitalBeam/1")
antennafield = DeviceProxy("STAT/AntennaField/1")
docker = DeviceProxy("STAT/Docker/1") docker = DeviceProxy("STAT/Docker/1")
# Put them in a list in case one wants to iterate # Put them in a list in case one wants to iterate
devices = [apsct, apspu, recv, sdp, sst, xst, unb2, boot, tilebeam, beamlet, digitalbeam, docker] devices = [apsct, apspu, recv, sdp, sst, xst, unb2, boot, tilebeam, beamlet, digitalbeam, antennafield, docker]
...@@ -20,7 +20,7 @@ sleep 1 # dsconfig container must be up and running... ...@@ -20,7 +20,7 @@ sleep 1 # dsconfig container must be up and running...
# shellcheck disable=SC2016 # shellcheck disable=SC2016
echo '/usr/local/bin/wait-for-it.sh ${TANGO_HOST} --strict --timeout=300 -- true' | make run dsconfig bash - echo '/usr/local/bin/wait-for-it.sh ${TANGO_HOST} --strict --timeout=300 -- true' | make run dsconfig bash -
DEVICES="device-boot device-apsct device-apspu device-sdp device-pdu device-recv device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-pdu" DEVICES="device-boot device-apsct device-apspu device-sdp device-recv device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-pdu device-antennafield"
SIMULATORS="sdptr-sim recv-sim unb2-sim apsct-sim apspu-sim" SIMULATORS="sdptr-sim recv-sim unb2-sim apsct-sim apspu-sim"
# Build only the required images, please do not build everything that makes CI # Build only the required images, please do not build everything that makes CI
......
antennfield
====================
``antennafield == DeviceProxy("STAT/AntennaField/1")``
...@@ -22,6 +22,7 @@ Even without having access to any LOFAR2.0 hardware, you can install the full st ...@@ -22,6 +22,7 @@ Even without having access to any LOFAR2.0 hardware, you can install the full st
devices/tilebeam devices/tilebeam
devices/beamlet devices/beamlet
devices/digitalbeam devices/digitalbeam
devices/antennafield
devices/boot devices/boot
devices/docker devices/docker
devices/pdu devices/pdu
......
...@@ -39,6 +39,7 @@ console_scripts = ...@@ -39,6 +39,7 @@ console_scripts =
l2ss-tilebeam = tangostationcontrol.devices.tilebeam:main l2ss-tilebeam = tangostationcontrol.devices.tilebeam:main
l2ss-beamlet = tangostationcontrol.devices.sdp.beamlet:main l2ss-beamlet = tangostationcontrol.devices.sdp.beamlet:main
l2ss-digitalbeam = tangostationcontrol.devices.sdp.digitalbeam:main l2ss-digitalbeam = tangostationcontrol.devices.sdp.digitalbeam:main
l2ss-antennafield = tangostationcontrol.devices.antennafield:main
l2ss-boot = tangostationcontrol.devices.boot:main l2ss-boot = tangostationcontrol.devices.boot:main
l2ss-docker-device = tangostationcontrol.devices.docker_device:main l2ss-docker-device = tangostationcontrol.devices.docker_device:main
l2ss-observation = tangostationcontrol.devices.observation:main l2ss-observation = tangostationcontrol.devices.observation:main
......
...@@ -10,7 +10,7 @@ If a new device is added, it will (likely) need to be referenced in several plac ...@@ -10,7 +10,7 @@ If a new device is added, it will (likely) need to be referenced in several plac
- Adjust `docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py` to make an alias for it available in Jupyter, - Adjust `docker-compose/jupyter/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py` to make an alias for it available in Jupyter,
- Adjust `tangostationcontrol/tangostationcontrol/devices/boot.py` to add the device to the station initialisation sequence, - Adjust `tangostationcontrol/tangostationcontrol/devices/boot.py` to add the device to the station initialisation sequence,
- Add to `docker-compose/` to create a YaML file to start the device in a docker container. NOTE: it needs a unique 57xx port assigned, - Add to `docker-compose/` to create a YaML file to start the device in a docker container. NOTE: it needs a unique 57xx port assigned,
current _unused_ port value: 5715 current _unused_ port value: 5716
- Adjust `tangostationcontrol/setup.cfg` to add an entry point for the device in the package installation, - Adjust `tangostationcontrol/setup.cfg` to add an entry point for the device in the package installation,
- Add to `tangostationcontrol/tangostationcontrol/integration_test/default/devices/` to add an integration test, - Add to `tangostationcontrol/tangostationcontrol/integration_test/default/devices/` to add an integration test,
- Adjust `sbin/run_integration_test.sh` to have the device started when running the integration tests, - Adjust `sbin/run_integration_test.sh` to have the device started when running the integration tests,
......
# -*- coding: utf-8 -*-
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" AntennaField Device Server for LOFAR2.0
"""
from tango import DeviceProxy, DevSource
from tango.server import device_property, attribute, AttrWriteType
import numpy
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.devices.lofar_device import lofar_device
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
from tangostationcontrol.devices.device_decorators import fault_on_error
import logging
logger = logging.getLogger()
__all__ = ["AntennaField", "main"]
NUMBER_OF_HBAT = 48
NUMBER_OF_ELEMENTS_PER_TILE = 16
class mapped_attribute(attribute):
def __init__(self, mapping_attribute, dtype, max_dim_x, max_dim_y=0, access=AttrWriteType.READ, **kwargs):
if access == AttrWriteType.READ_WRITE:
@fault_on_error()
def write_func_wrapper(device, value):
write_func = device.set_mapped_attribute(mapping_attribute, value)
self.fset = write_func_wrapper
@fault_on_error()
def read_func_wrapper(device):
return device.get_mapped_attribute(mapping_attribute)
self.fget = read_func_wrapper
super().__init__(dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, fisallowed="is_attribute_wrapper_allowed", **kwargs)
@device_logging_to_python()
class AntennaField(lofar_device):
HBAT_Power_to_RECV_mapping = device_property(
dtype=(numpy.int32,),
doc='The mapping of HBAT power lines to RECV mapping. Each RECV can handle 96 inputs. The HBAT number is the index and the value shows to which receiver device it is connected and on which input. The first integer is the input. The second integer is the RECV id. Example: [0, 3] = first receiver of property RECV_devices with input 3. -1 means that the HBAT is not connected. The property is stored in a one dimensional structure. It needs to be reshaped to a list of lists of two items.',
mandatory=False,
default_value = [-1] * NUMBER_OF_HBAT * 2
)
HBAT_Control_to_RECV_mapping = device_property(
dtype=(numpy.int32,),
doc='The mapping of HBAT control lines to RECV mapping. Each RECV can handle 96 inputs. The HBAT number is the index and the value shows to which receiver device it is connected and on which input. The first integer is the input. The second interger is the RECV id. Example: [1, 3] = STAT/RECV/1 with input 3. -1 means that the HBAT is not connected. The property is stored in a one dimensional structure. It needs to be reshaped to a list of lists of two items.',
mandatory=False,
default_value = [-1] * NUMBER_OF_HBAT * 2
)
RECV_devices = device_property(
dtype=(str,),
doc='Which Recv devices are in use by the AntennaField. The order is important and it should match up with the order of the mapping.',
mandatory=False,
default_value = []
)
HBAT_ANT_mask_RW = mapped_attribute("ANT_mask_RW", dtype=(numpy.bool_,), max_dim_x=NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_BF_delay_steps_R = mapped_attribute("HBAT_BF_delay_steps_R", dtype=((numpy.int64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=NUMBER_OF_HBAT)
HBAT_BF_delay_steps_RW = mapped_attribute("HBAT_BF_delay_steps_RW", dtype=((numpy.int64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = mapped_attribute("HBAT_LED_on_R", dtype=((numpy.bool_,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=NUMBER_OF_HBAT)
HBAT_LED_on_RW = mapped_attribute("HBAT_LED_on_RW", dtype=((numpy.bool_,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = mapped_attribute("HBAT_PWR_LNA_on_R", dtype=((numpy.bool_,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=NUMBER_OF_HBAT)
HBAT_PWR_LNA_on_RW = mapped_attribute("HBAT_PWR_LNA_on_RW", dtype=((numpy.bool_,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = mapped_attribute("HBAT_PWR_on_R", dtype=((numpy.bool_,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=NUMBER_OF_HBAT)
HBAT_PWR_on_RW = mapped_attribute("HBAT_PWR_on_RW", dtype=((numpy.bool_,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
@log_exceptions()
def configure_for_initialise(self):
super().configure_for_initialise()
self.__setup_all_receiver_proxies()
self.__setup_mapper()
def __setup_all_receiver_proxies(self):
self.recv_proxies = []
for recv in self.RECV_devices:
recv_proxy = DeviceProxy(recv)
# Want to force non-cached results from the receiver proxies
recv_proxy.set_source(DevSource.DEV)
self.recv_proxies.append(recv_proxy)
def __setup_mapper(self):
number_of_receivers = len(self.RECV_devices)
# Reshape of mapping is needed because properties are stored in 1d arrays
control_mapping = numpy.reshape(self.HBAT_Control_to_RECV_mapping, (NUMBER_OF_HBAT, 2))
power_mapping = numpy.reshape(self.HBAT_Power_to_RECV_mapping, (NUMBER_OF_HBAT, 2))
self.__mapper = HBATToRecvMapper(control_mapping, power_mapping, number_of_receivers)
def get_mapped_attribute(self, mapped_point):
recv_results = []
for recv_proxy in self.recv_proxies:
result = recv_proxy.read_attribute(mapped_point)
recv_results.append(result)
mapped_values = self.__mapper.map_read(mapped_point, recv_results)
return mapped_values
def set_mapped_attribute(self, mapped_point, value):
mapped_value = self.__mapper.map_write(mapped_point, value)
for idx, recv_proxy in enumerate(self.recv_proxies):
recv_proxy.write_attribute(mapped_point, mapped_value[idx])
class HBATToRecvMapper(object):
def __init__(self, hbat_control_to_recv_mapping, hbat_power_to_recv_mapping, number_of_receivers):
self.__control_mapping = hbat_control_to_recv_mapping
self.__power_mapping = hbat_power_to_recv_mapping
self.__number_of_receivers = number_of_receivers
self.__default_value_mapping_read = {
"ANT_mask_RW": [False] * 48,
"HBAT_BF_delay_steps_R": [[0] * 32] * 48,
"HBAT_BF_delay_steps_RW": [[0] * 32] * 48,
"HBAT_LED_on_R": [[False] * 32] * 48,
"HBAT_LED_on_RW": [[False] * 32] * 48,
"HBAT_PWR_LNA_on_R": [[False] * 32] * 48,
"HBAT_PWR_LNA_on_RW": [[False] * 32] * 48,
"HBAT_PWR_on_R": [[False] * 32] * 48,
"HBAT_PWR_on_RW": [[False] * 32] * 48
}
self.__default_value_mapping_write = {
"ANT_mask_RW": [False] * 96,
"HBAT_BF_delay_steps_RW": [[0] * 32] * 96,
"HBAT_LED_on_RW": [[False] * 32] * 96,
"HBAT_PWR_LNA_on_RW": [[False] * 32] * 96,
"HBAT_PWR_on_RW": [[False] * 32] * 96
}
self.__reshape_attributes = {
"ANT_mask_RW": [32, 3]
}
def map_read(self, mapped_attribute, recv_results):
default_values = self.__default_value_mapping_read[mapped_attribute]
return self._mapped_r_values(recv_results, default_values)
def map_write(self, mapped_attribute, set_values):
default_values = self.__default_value_mapping_write[mapped_attribute]
mapped_values = self._mapped_rw_values(set_values, default_values)
if mapped_attribute in self.__reshape_attributes:
mapped_values = numpy.reshape(mapped_values,
(self.__number_of_receivers,
self.__reshape_attributes[mapped_attribute][0],
self.__reshape_attributes[mapped_attribute][1]))
return mapped_values
def _mapped_r_values(self, recv_results, default_values):
mapped_values = default_values
for idx, mapping in enumerate(self.__control_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
mapped_values[idx] = recv_results[recv - 1][rcu]
return mapped_values
def _mapped_rw_values(self, set_values, default_values):
mapped_values = []
for _ in range(self.__number_of_receivers):
mapped_values.append(default_values)
for idx, mapping in enumerate(self.__control_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
mapped_values[recv - 1][rcu] = set_values[idx]
return mapped_values
# ----------
# Run server
# ----------
def main(**kwargs):
"""Main function of the ObservationControl module."""
return entry(AntennaField, **kwargs)
...@@ -244,6 +244,7 @@ class Boot(lofar_device): ...@@ -244,6 +244,7 @@ class Boot(lofar_device):
"STAT/Beamlet/1", "STAT/Beamlet/1",
"STAT/TileBeam/1", # Accesses RECV and Beamlet "STAT/TileBeam/1", # Accesses RECV and Beamlet
"STAT/DigitalBeam/1", "STAT/DigitalBeam/1",
"STAT/AntennaField/1",
], ],
) )
......
...@@ -143,11 +143,12 @@ class lofar_device(Device, metaclass=DeviceMeta): ...@@ -143,11 +143,12 @@ class lofar_device(Device, metaclass=DeviceMeta):
@log_exceptions() @log_exceptions()
def Initialise(self): def Initialise(self):
""" """
Command to ask for initialisation of this device. Can only be called in FAULT or OFF state. Command to ask for initialisation of this device. Can only be called in OFF state.
:return:None :return:None
""" """
self.set_state(DevState.INIT) self.set_state(DevState.INIT)
self.set_status("Device is in the INIT state.") self.set_status("Device is in the INIT state.")
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from .base import AbstractTestBases
class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
def setUp(self):
super().setUp("STAT/AntennaField/1")
self.proxy.put_property({"RECV_devices": ["STAT/RECV/1"],
"HBAT_Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92})
self.recv_proxy = self.setup_recv_proxy()
def setup_recv_proxy(self):
# setup RECV
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy.off()
recv_proxy.warm_boot()
recv_proxy.set_defaults()
return recv_proxy
def test_property_recv_devices_has_one_receiver(self):
result = self.proxy.get_property("RECV_devices")
self.assertSequenceEqual(result["RECV_devices"], ["STAT/RECV/1"])
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import numpy
import unittest
from tangostationcontrol.devices.antennafield import HBATToRecvMapper
class TestHBATToRecvMapper(unittest.TestCase):
# A mapping where HBATs are all not mapped to power RCUs
power_not_connected = [[-1, -1] * 48]
# A mapping where HBATs are all not mapped to control RCUs
control_not_connected = [[-1, -1] * 48]
# A mapping where first two HBATs are mapped on the first Receiver.
# The first HBAT control line on RCU 1 and the second HBAT control line on RCU 0.
control_hba_0_and_1_on_rcu_1_and_0_of_recv_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46
def test_ant_read_mask_r_no_mapping(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[False] * 3] * 32, [[False] * 3] * 32, [[False] * 3] * 32]
expected = [False] * 48
actual = mapper.map_read("ANT_mask_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_ant_read_mask_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[False, True, False], [[False] * 3] * 31, [[False] * 3] * 32, [[False] * 3] * 32]
expected = [True, False] + [False] * 46
actual = mapper.map_read("ANT_mask_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_bf_read_delay_steps_r_no_mapping(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[0] * 32] * 96, [[0] * 32] * 96, [[0] * 32] * 96]
expected = [[0] * 32] * 48
actual = mapper.map_read("HBAT_BF_delay_steps_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_bf_read_delay_steps_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[[2] * 32, [1] * 32] + [[0] * 32] * 94, [[0] * 32] * 96, [[0] * 32] * 96]
expected = [[1] * 32, [2] * 32] + [[0] * 32] * 46
actual = mapper.map_read("HBAT_BF_delay_steps_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_bf_read_delay_steps_rw_no_mapping(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[0] * 32] * 96, [[0] * 32] * 96, [[0] * 32] * 96]
expected = [[0] * 32] * 48
actual = mapper.map_read("HBAT_BF_delay_steps_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_bf_read_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[[2] * 32, [1] * 32] + [[0] * 32] * 94, [[0] * 32] * 96, [[0] * 32] * 96]
expected = [[1] * 32, [2] * 32] + [[0] * 32] * 46
actual = mapper.map_read("HBAT_BF_delay_steps_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_led_on_r_unmapped(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[False] * 32] * 48
actual = mapper.map_read("HBAT_LED_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_led_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46
actual = mapper.map_read("HBAT_LED_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_led_on_rw_unmapped(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[False] * 32] * 48
actual = mapper.map_read("HBAT_LED_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46
actual = mapper.map_read("HBAT_LED_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_lna_on_r_unmapped(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[False] * 32] * 48
actual = mapper.map_read("HBAT_PWR_LNA_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_lna_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46
actual = mapper.map_read("HBAT_PWR_LNA_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_lna_on_rw_unmapped(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[False] * 32] * 48
actual = mapper.map_read("HBAT_PWR_LNA_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46
actual = mapper.map_read("HBAT_PWR_LNA_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_on_r_unmapped(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[False] * 32] * 48
actual = mapper.map_read("HBAT_PWR_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46
actual = mapper.map_read("HBAT_PWR_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_on_rw_unmapped(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3)
receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[False] * 32] * 48
actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 3)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46
actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
# Rename to write
def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1)
set_values = [False] * 48
expected = [[[False] * 3] * 32]
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2)
set_values = [False] * 48
expected = [[[False] * 3] * 32] * 2
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1)
set_values = [True, False] + [False] * 46
expected = [[[False, True, False]] + [[False] * 3] * 31]
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_no_mapping_and_one_receiver(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1)
set_values = [[1] * 32] * 48
expected = [[[0] * 32] * 96]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_no_mapping_and_two_receivers(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2)
set_values = [[1] * 32] * 48
expected = [[[0] * 32] * 96, [[0] * 32] * 96]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1)
set_values = [[1] * 32, [2] * 32] + [[0] * 32] * 46
expected = [[[2] * 32, [1] * 32] + [[0] * 32] * 94]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_one_receiver(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_two_receivers(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[0] * 32] * 94]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[0] * 32] * 94]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[0] * 32] * 94]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
...@@ -52,3 +52,4 @@ class TestLofarDevice(base.TestCase): ...@@ -52,3 +52,4 @@ class TestLofarDevice(base.TestCase):
proxy.initialise() proxy.initialise()
self.assertEqual(42.0, proxy.read_attribute_A) self.assertEqual(42.0, proxy.read_attribute_A)
self.assertListEqual([42.0, 43.0], proxy.read_attribute_B_array.tolist()) self.assertListEqual([42.0, 43.0], proxy.read_attribute_B_array.tolist())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment