Skip to content
Snippets Groups Projects
Commit 5be9be2c authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-1492: Support controlling multiple individual antenna fields in single observation

parent 18c0d691
Branches
Tags
1 merge request!757L2SS-1492: Support controlling multiple individual antenna fields in single observation
Showing
with 166 additions and 1568 deletions
......@@ -17,6 +17,8 @@
**/vscode-server.tar
.orig
bin/jumppad
tangostationcontrol/build
tangostationcontrol/cover
tangostationcontrol/dist
......
{
"servers": {
"ObservationControl": {
"STAT": {
"Observation": {
"STAT/Observation/1": {},
"STAT/Observation/2": {},
"STAT/Observation/3": {}
}
}
}
}
}
......@@ -1115,26 +1115,6 @@
}
}
}
},
"ObservationControl": {
"STAT": {
"Observation": {
"STAT/Observation/1": {
}
},
"ObservationControl": {
"STAT/ObservationControl/1": {
"properties": {
"Power_Children": [
"STAT/Observation/1"
],
"Control_Children": [
"STAT/Observation/1"
]
}
}
}
}
}
}
}
This diff is collapsed.
......@@ -48,6 +48,7 @@ You will also need:
* bash
* dig (dnsutils package on ubuntu/debian)
* wget
* hexdump (bsdextrautils package on ubuntu/debian)
## Start dev environment
......@@ -70,25 +71,6 @@ to start the dev environment including tango.
Nomad is now available at http://localhost:4646/
## Start dev environment
For local development a dev environment is needed. To setup this environment run
```
./sbin/prepare_dev_env.sh
```
This will install `jumppad`, if not present yet as well as creating the docker volume needed to simulate the station
nomad cluster.
Afterwards run
```
jumppad up infra/dev
```
to start the dev environment including tango.
## Bootstrap
The bootstrap procedure is needed only once. First we build all docker
......@@ -157,6 +139,8 @@ Next change the version in the following places:
# Release Notes
* 0.24.0 Allow multiple antenna fields to be used in single observation,
This renames the `Observation` device to `ObservationField`.
* 0.23.0 Migrate execution environment to nomad
* 0.22.0 Split `Antennafield` in `AFL` and `AFH` devices in order to separate Low-Band
and High-Band functionalities
......
......@@ -30,7 +30,7 @@
},
"stat/docker/1": {
},
"stat/observation/*":{
"stat/observationfield/*":{
},
"stat/observationcontrol/1":{
},
......
......@@ -67,7 +67,7 @@
},
"stat/docker/1": {
},
"stat/observation/*":{
"stat/observationfield/*":{
"exclude": [
"saps_pointing_R",
"saps_subbands_R"
......
......@@ -41,7 +41,7 @@
},
"stat/docker/1": {
},
"stat/observation/*":{
"stat/observationfield/*":{
"include": [
"saps_pointing_R",
"saps_subbands_R"
......
0.23.0
0.24.0
......@@ -24,9 +24,9 @@
}
}
},
"Observation": {
"ObservationField": {
"STAT": {
"Observation": {
"ObservationField": {
}
}
},
......@@ -37,10 +37,10 @@
}
}
},
"AntennaField": {
"AFH": {
"STAT": {
"AntennaField": {
"STAT/AntennaField/HBA": {
"AFH": {
"STAT/AFH/HBA": {
"properties": {
"Control_Children": ["STAT/RECVH/1"]
}
......
{
"servers": {
"AntennaField": {
"AFH": {
"STAT": {
"AntennaField": {
"STAT/AntennaField/HBA": {
"AFH": {
"STAT/AFH/HBA": {
"properties": {
"Control_to_RECV_mapping": [
"1", "0",
......
......@@ -10,13 +10,6 @@
}
}
}
},
"ObservationControl": {
"STAT": {
"Observation": {
"STAT/Observation/1": {}
}
}
}
}
}
......@@ -22,10 +22,10 @@ INITIAL_CONFIGURATION = None
class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase):
TEST_CONFIGURATION = """{
"servers": {
"AntennaField": {
"AFH": {
"STAT": {
"AntennaField": {
"STAT/AntennaField/HBA": {
"AFH": {
"STAT/AFH/HBA": {
"properties": {
"Control_Children": [ "STAT/MOCKRCU/1" ]
}
......@@ -35,8 +35,8 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase):
},
"ObservationControl": {
"STAT": {
"Observation": {
"STAT/Observation/11": {}
"ObservationControl": {
"STAT/ObservationControl/11": {}
}
}
}
......@@ -148,20 +148,19 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase):
msg=f"{dbdata}",
) # configuration device
self.assertFalse(
"stat/observation/11"
in dbdata["servers"]["observationcontrol"]["stat"]["observation"],
"stat/observationcontrol/11"
in dbdata["servers"]["observationcontrol"]["stat"][
"observationcontrol"
],
msg=f"{dbdata}",
) # observation device
self.assertTrue(
"stat/antennafield/hba"
in dbdata["servers"]["antennafield"]["stat"]["antennafield"],
"stat/afh/hba" in dbdata["servers"]["afh"]["stat"]["afh"],
msg=f"{dbdata}",
) # antennafield device
antennafield_properties = CaseInsensitiveDict(
dbdata["servers"]["antennafield"]["stat"]["antennafield"][
"stat/antennafield/hba"
]["properties"]
dbdata["servers"]["afh"]["stat"]["afh"]["stat/afh/hba"]["properties"]
)
self.assertIn(
......@@ -183,9 +182,9 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase):
)
# Test whether new device has been added
self.assertTrue(
"stat/observation/11"
"stat/observationcontrol/11"
in updated_dbdata["servers"]["observationcontrol"]["stat"][
"observation"
"observationcontrol"
],
msg=f"{updated_dbdata}",
) # observation device
......@@ -195,15 +194,14 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase):
) # recvh device
# Test whether old attribute has been updated
self.assertTrue(
"stat/antennafield/hba"
in updated_dbdata["servers"]["antennafield"]["stat"]["antennafield"],
"stat/afh/hba" in updated_dbdata["servers"]["afh"]["stat"]["afh"],
msg=f"{updated_dbdata}",
)
antennafield_properties = CaseInsensitiveDict(
updated_dbdata["servers"]["antennafield"]["stat"]["antennafield"][
"stat/antennafield/hba"
]["properties"]
updated_dbdata["servers"]["afh"]["stat"]["afh"]["stat/afh/hba"][
"properties"
]
)
self.assertIn(
......@@ -241,8 +239,10 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase):
)
# Test whether new device has been added
self.assertTrue(
"stat/observation/11"
in updated_dbdata["servers"]["observationcontrol"]["stat"]["observation"],
"stat/observationcontrol/11"
in updated_dbdata["servers"]["observationcontrol"]["stat"][
"observationcontrol"
],
msg=f"{updated_dbdata}",
) # observation device
# Test whether old device has NOT been deleted
......@@ -252,14 +252,13 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase):
) # recvh device
# Test whether old attribute has been updated
self.assertTrue(
"stat/antennafield/hba"
in updated_dbdata["servers"]["antennafield"]["stat"]["antennafield"],
"stat/afh/hba" in updated_dbdata["servers"]["afh"]["stat"]["afh"],
msg=f"{updated_dbdata}",
)
antennafield_properties = CaseInsensitiveDict(
updated_dbdata["servers"]["antennafield"]["stat"]["antennafield"][
"stat/antennafield/hba"
]["properties"]
updated_dbdata["servers"]["afh"]["stat"]["afh"]["stat/afh/hba"][
"properties"
]
)
self.assertIn(
......
......@@ -16,10 +16,10 @@ class TestStationConfiguration(BaseIntegrationTestCase):
TEST_CONFIGURATION = """{
"servers": {
"AntennaField": {
"AFH": {
"STAT": {
"AntennaField": {
"STAT/AntennaField/1": {
"AFH": {
"STAT/AFH/1": {
"properties": {
"Control_Children": [ "STAT/MOCKRCU/1" ]
}
......@@ -154,9 +154,9 @@ class TestStationConfiguration(BaseIntegrationTestCase):
self.assertTrue(self.check_configuration_validity(self.TEST_CONFIGURATION))
# Insert invalid field
json_test_configuration = json.loads(self.TEST_CONFIGURATION)
invalid_configuration = json_test_configuration["servers"]["AntennaField"][
"STAT"
]["AntennaField"]["STAT/AntennaField/1"]["new_field"] = "test"
invalid_configuration = json_test_configuration["servers"]["AFH"]["STAT"][
"AFH"
]["STAT/AFH/1"]["new_field"] = "test"
self.assertFalse(
self.check_configuration_validity(json.dumps(invalid_configuration))
)
......
......@@ -4,7 +4,6 @@
import time
import numpy
from integration_test.device_proxy import TestDeviceProxy
from integration_test.default.devices.base import AbstractTestBases
from tango import DevState
......@@ -28,7 +27,9 @@ from tangostationcontrol.devices.base_device_classes.antennafield_device import
class TestHBADevice(AbstractTestBases.TestDeviceBase):
def setUp(self):
self.stationmanager_proxy = self.setup_stationmanager_proxy()
self.stationmanager_proxy = self.setup_proxy("STAT/StationManager/1")
# Setup will dump current properties and restore them for us
super().setUp("STAT/AFH/HBA0")
# Typical tests emulate 'CS001_TILES' number of antennas in
......@@ -48,66 +49,15 @@ class TestHBADevice(AbstractTestBases.TestDeviceBase):
).flatten(),
}
)
self.recv_proxy = self.setup_recv_proxy()
self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
self.sdp_proxy = self.setup_sdp_proxy()
self.addCleanup(self.shutdown_recv)
self.addCleanup(self.shutdown_sdp)
self.recv_proxy = self.setup_proxy("STAT/RECVH/H0", defaults=True)
self.sdpfirmware_proxy = self.setup_proxy("STAT/SDPFirmware/HBA0")
self.sdp_proxy = self.setup_proxy("STAT/SDP/HBA0")
# configure the frequencies, which allows access
# to the calibration attributes and commands
self.sdpfirmware_proxy.clock_RW = CLK_160_MHZ
self.recv_proxy.RCU_band_select_RW = [[1] * N_rcu_inp] * N_rcu
def restore_antennafield(self):
self.proxy.put_property(
{
"Power_to_RECV_mapping": [-1, -1] * CS001_TILES,
"Control_to_RECV_mapping": [-1, -1] * CS001_TILES,
}
)
@staticmethod
def shutdown_recv():
recv_proxy = TestDeviceProxy("STAT/RECVH/H0")
recv_proxy.off()
@staticmethod
def shutdown_sdp():
sdp_proxy = TestDeviceProxy("STAT/SDP/HBA0")
sdp_proxy.off()
def setup_recv_proxy(self):
# setup RECV
recv_proxy = TestDeviceProxy("STAT/RECVH/H0")
recv_proxy.off()
recv_proxy.boot()
recv_proxy.set_defaults()
return recv_proxy
def setup_sdpfirmware_proxy(self):
# setup SDPFirmware
sdpfirmware_proxy = TestDeviceProxy("STAT/SDPFirmware/HBA0")
sdpfirmware_proxy.off()
sdpfirmware_proxy.boot()
return sdpfirmware_proxy
def setup_sdp_proxy(self):
# setup SDP
sdp_proxy = TestDeviceProxy("STAT/SDP/HBA0")
sdp_proxy.off()
sdp_proxy.boot()
return sdp_proxy
def setup_stationmanager_proxy(self):
"""Setup StationManager"""
stationmanager_proxy = TestDeviceProxy("STAT/StationManager/1")
stationmanager_proxy.off()
stationmanager_proxy.boot()
self.assertEqual(stationmanager_proxy.state(), DevState.ON)
return stationmanager_proxy
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self):
"""Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values"""
......
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from typing import Callable
from tango._tango import DevState, AttrWriteType
......@@ -29,19 +30,56 @@ class AbstractTestBases:
# make a backup of the properties, in case they're changed
# NB: "or {}" is needed to deal with devices that have no properties.
self.original_properties = (
self.proxy.get_property(self.proxy.get_property_list("*") or {}) or {}
)
self.original_properties = self.current_properties()
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.name)
self.addCleanup(self.restore_properties)
super().setUp()
def restore_properties(self):
def current_properties(self):
"""Return the current properties of self.proxy"""
return (
self.proxy.get_property(self.proxy.get_property_list("*") or {}) or {}
)
def setup_proxy(
self,
name: str,
defaults: bool = False,
restore_properties: bool = False,
cb: Callable[[TestDeviceProxy], None] = None,
):
"""Setup A TestDeviceProxy and handle cleanup"""
proxy = TestDeviceProxy(name)
if restore_properties:
self.addCleanup(proxy.test_device_turn_off, name)
self.addCleanup(self.restore_properties, self.current_properties())
if cb:
cb(proxy)
proxy.off()
proxy.boot()
if defaults:
proxy.set_defaults()
self.assertEqual(proxy.state(), DevState.ON)
if not restore_properties:
self.addCleanup(proxy.test_device_turn_off, name)
return proxy
def restore_properties(self, properties=None):
"""Restore the properties as they were before the test."""
self.proxy.put_property(self.original_properties)
if not properties:
properties = self.original_properties
self.proxy.put_property(properties)
def test_device_fetch_state(self):
"""Test if we can successfully fetch state"""
......
......@@ -7,7 +7,6 @@ from ctypes import c_short
import numpy
import numpy.testing
from integration_test.device_proxy import TestDeviceProxy
from integration_test.default.devices.base import AbstractTestBases
from tango import DevState
......@@ -26,42 +25,15 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase):
"""Intentionally recreate the device object in each test"""
super().setUp("STAT/Beamlet/HBA0")
def test_device_read_all_attributes(self):
# We need to connect to SDP first to read some of our attributes
self.sdp_proxy = self.setup_sdp()
self.sdpfirmware_proxy = self.setup_sdpfirmware()
self.sdp_proxy = self.setup_proxy("STAT/SDP/HBA0", defaults=True)
self.sdp_proxy.nyquist_zone_RW = [[2] * S_pn] * N_pn
super().test_device_read_all_attributes()
def setup_sdp(self, clock=CLK_200_MHZ):
# setup SDP, on which this device depends
sdp_proxy = TestDeviceProxy("STAT/SDP/HBA0")
sdp_proxy.off()
sdp_proxy.boot()
sdp_proxy.set_defaults()
# setup the frequencies as expected in the test
sdp_proxy.nyquist_zone_RW = [[2] * S_pn] * N_pn
return sdp_proxy
def setup_sdpfirmware(self, clock=CLK_200_MHZ):
# setup SDP, on which this device depends
sdpfirmware_proxy = TestDeviceProxy("STAT/SDPFirmware/HBA0")
sdpfirmware_proxy.off()
sdpfirmware_proxy.boot()
sdpfirmware_proxy.set_defaults()
# setup the frequencies as expected in the test
sdpfirmware_proxy.clock_RW = clock
return sdpfirmware_proxy
self.sdpfirmware_proxy = self.setup_proxy(
"STAT/SDPFirmware/HBA0", defaults=True
)
self.sdpfirmware_proxy.clock_RW = CLK_200_MHZ
def test_pointing_to_zenith(self):
# Setup configuration
sdp_proxy = self.setup_sdp()
sdpfirmware_proxy = self.setup_sdpfirmware()
self.proxy.initialise()
self.proxy.on()
......@@ -82,10 +54,6 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase):
numpy.testing.assert_almost_equal(expected_bf_weights, calculated_bf_weights)
def test_subband_select_change(self):
# Setup configuration
sdp_proxy = self.setup_sdp()
sdpfirmware_proxy = self.setup_sdpfirmware()
# Change subband
self.proxy.off()
self.proxy.initialise()
......@@ -110,10 +78,7 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase):
)
def test_sdp_clock_change(self):
# Setup configuration
sdp_proxy = self.setup_sdp()
sdpfirmware_proxy = self.setup_sdpfirmware()
sdpfirmware_proxy = self.sdpfirmware_proxy
self.proxy.initialise()
self.proxy.subband_select_RW = numpy.array(
list(range(317)) + [316] + list(range(318, N_beamlets_ctrl)),
......
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from integration_test.device_proxy import TestDeviceProxy
from integration_test.default.devices.base import AbstractTestBases
......@@ -12,14 +11,7 @@ class TestDeviceBST(AbstractTestBases.TestDeviceBase):
def test_device_read_all_attributes(self):
# We need to connect to SDP first to read some of our attributes
self.sdp_proxy = self.setup_sdp()
self.setup_proxy("STAT/sdpfirmware/HBA0", defaults=True)
self.setup_proxy("STAT/SDP/HBA0", defaults=True)
super().test_device_read_all_attributes()
def setup_sdp(self):
# setup SDP, on which this device depends
sdp_proxy = TestDeviceProxy("STAT/SDP/HBA0")
sdp_proxy.off()
sdp_proxy.boot()
sdp_proxy.set_defaults()
return sdp_proxy
......@@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0
import numpy
from tango import DevState
from tangostationcontrol.common.constants import (
N_rcu,
......@@ -11,7 +10,6 @@ from tangostationcontrol.common.constants import (
CLK_200_MHZ,
)
from integration_test.device_proxy import TestDeviceProxy
from integration_test.default.devices.base import AbstractTestBases
......@@ -68,22 +66,19 @@ class TestCalibrationDevice(AbstractTestBases.TestDeviceBase):
]
def setUp(self):
self.stationmanager_proxy = self.setup_stationmanager_proxy()
super().setUp("STAT/Calibration/1")
self.recv_proxy = self.setup_recv_proxy()
self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
self.sdp_proxy = self.setup_sdp_proxy()
self.stationmanager_proxy = self.setup_proxy("STAT/StationManager/1")
self.antennafield_proxy = self.setup_proxy("STAT/AFH/HBA0")
super().setUp("STAT/Calibration/1")
# make sure we restore any properties we modify
self.original_antennafield_properties = self.antennafield_proxy.get_property(
self.antennafield_proxy.get_property_list("*")
)
self.addCleanup(self.restore_antennafield)
self.recv_proxy = self.setup_proxy("STAT/RECVH/H0")
self.sdpfirmware_proxy = self.setup_proxy("STAT/SDPFirmware/HBA0")
self.sdp_proxy = self.setup_proxy("STAT/SDP/HBA0")
self.antennafield_proxy.put_property(
self.antennafield_proxy = self.setup_proxy(
"STAT/AFH/HBA0",
restore_properties=True,
cb=lambda x: {
x.put_property(
{
"Power_to_RECV_mapping": [1, 1, 1, 0]
+ [-1] * ((DEFAULT_N_HBA_TILES * 2) - 4),
......@@ -93,7 +88,8 @@ class TestCalibrationDevice(AbstractTestBases.TestDeviceBase):
* (DEFAULT_N_HBA_TILES * 2),
}
)
self.antennafield_proxy.boot()
},
)
# configure the frequencies, which allows access
# to the calibration attributes and commands
......@@ -103,45 +99,6 @@ class TestCalibrationDevice(AbstractTestBases.TestDeviceBase):
def restore_antennafield(self):
self.proxy.put_property(self.original_antennafield_properties)
@staticmethod
def shutdown(device: str):
def off():
proxy = TestDeviceProxy(device)
proxy.off()
return off
def setup_recv_proxy(self):
# setup RECV
recv_proxy = self.setup_proxy("STAT/RECVH/H0")
recv_proxy.boot()
return recv_proxy
def setup_sdpfirmware_proxy(self):
# setup SDP
sdpfirmware_proxy = self.setup_proxy("STAT/SDPFirmware/HBA0")
sdpfirmware_proxy.boot()
return sdpfirmware_proxy
def setup_sdp_proxy(self):
# setup SDP
sdp_proxy = self.setup_proxy("STAT/SDP/HBA0")
sdp_proxy.boot()
return sdp_proxy
def setup_proxy(self, dev: str):
proxy = TestDeviceProxy(dev)
proxy.off()
self.addCleanup(self.shutdown(dev))
return proxy
def setup_stationmanager_proxy(self):
"""Setup StationManager"""
stationmanager_proxy = self.setup_proxy("STAT/StationManager/1")
stationmanager_proxy.boot()
self.assertEqual(stationmanager_proxy.state(), DevState.ON)
return stationmanager_proxy
def test_calibrate_recv(self):
calibration_properties = {
"Antenna_Cables": ["50m", "80m"] * (DEFAULT_N_HBA_TILES // 2),
......@@ -155,9 +112,10 @@ class TestCalibrationDevice(AbstractTestBases.TestDeviceBase):
"Frequency_Band_RW_default": ["HBA_110_190"] * (DEFAULT_N_HBA_TILES * 2),
}
self.antennafield_proxy = self.setup_proxy("STAT/AFH/HBA0")
self.antennafield_proxy.put_property(calibration_properties)
self.antennafield_proxy.boot()
self.antennafield_proxy = self.setup_proxy(
"STAT/AFH/HBA0",
cb=lambda x: {x.put_property(calibration_properties)},
)
self.proxy.boot()
......@@ -215,9 +173,10 @@ class TestCalibrationDevice(AbstractTestBases.TestDeviceBase):
"Frequency_Band_RW_default": ["HBA_110_190"] * (DEFAULT_N_HBA_TILES * 2),
}
self.antennafield_proxy = self.setup_proxy("STAT/AFH/HBA0")
self.antennafield_proxy.put_property(calibration_properties)
self.antennafield_proxy.boot()
self.antennafield_proxy = self.setup_proxy(
"STAT/AFH/HBA0",
cb=lambda x: {x.put_property(calibration_properties)},
)
self.proxy.boot()
......
......@@ -47,84 +47,46 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
self.proxy.put_property({"Beamlet_Select": [True] * N_beamlets_ctrl})
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.recv_iden)
self.recv_proxy = self.setup_recv_proxy()
self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
self.sdp_proxy = self.setup_sdp_proxy()
self.recv_proxy = self.setup_proxy(self.recv_iden, defaults=True)
self.sdpfirmware_proxy = self.setup_proxy(self.sdpfirmware_iden, defaults=True)
self.sdp_proxy = self.setup_proxy(self.sdp_iden)
self.beamlet_proxy = self.initialise_beamlet_proxy()
def setup_recv_proxy(self):
recv_proxy = TestDeviceProxy(self.recv_iden)
recv_proxy.off()
recv_proxy.boot()
recv_proxy.set_defaults()
return recv_proxy
def initialise_beamlet_proxy(self):
beamlet_proxy = TestDeviceProxy(self.beamlet_iden)
beamlet_proxy.off()
beamlet_proxy.initialise()
return beamlet_proxy
def setup_beamlet_proxy(self):
beamlet_proxy = TestDeviceProxy(self.beamlet_iden)
beamlet_proxy.off()
beamlet_proxy.boot()
beamlet_proxy.set_defaults()
return beamlet_proxy
def setup_sdpfirmware_proxy(self):
# setup SDPFirmware
sdpfirmware_proxy = TestDeviceProxy(self.sdpfirmware_iden)
sdpfirmware_proxy.off()
sdpfirmware_proxy.boot()
sdpfirmware_proxy.set_defaults()
return sdpfirmware_proxy
def setup_sdp_proxy(self):
# setup SDP, on which this device depends
sdp_proxy = TestDeviceProxy(self.sdp_iden)
sdp_proxy.off()
sdp_proxy.boot()
sdp_proxy.set_defaults()
return sdp_proxy
def setup_antennafield_proxy(self, antenna_qualities, antenna_use):
# setup AntennaField
NR_TILES = CS001_TILES
antennafield_proxy = TestDeviceProxy(self.antennafield_iden)
control_mapping = [[1, i] for i in range(NR_TILES)]
sdp_mapping = [[i // 6, i % 6] for i in range(NR_TILES)]
antennafield_proxy.put_property(
self.antennafield_proxy = self.setup_proxy(
self.antennafield_iden,
cb=lambda x: {
x.put_property(
{
"Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
"Control_to_RECV_mapping": numpy.array(
control_mapping
).flatten(),
"Antenna_to_SDP_Mapping": numpy.array(sdp_mapping).flatten(),
"Antenna_Quality": antenna_qualities,
"Antenna_Use": antenna_use,
"Antenna_Quality": self.antenna_qualities_ok,
"Antenna_Use": self.antenna_use_ok,
"Antenna_Cables": ["50m", "80m"] * (CS001_TILES // 2),
"Antenna_Sets": ["FIRST", "ALL"],
"Antenna_Set_Masks": [
"1" + ("0" * (NR_TILES - 1)),
"1" * NR_TILES,
],
"Antenna_Type": "HBA",
}
)
antennafield_proxy.off()
antennafield_proxy.boot()
return antennafield_proxy
},
)
def test_pointing_to_zenith_clock_change(self):
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden)
self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
self.sdp_proxy = self.setup_sdp_proxy()
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
def initialise_beamlet_proxy(self):
beamlet_proxy = TestDeviceProxy(self.beamlet_iden)
beamlet_proxy.off()
beamlet_proxy.initialise()
return beamlet_proxy
self.beamlet_proxy = self.initialise_beamlet_proxy()
def test_pointing_to_zenith_clock_change(self):
self.beamlet_proxy.on()
# Set first (default) clock configuration
......@@ -160,15 +122,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
)
def test_pointing_to_zenith_subband_change(self):
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden)
self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
self.sdp_proxy = self.setup_sdp_proxy()
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.beamlet_proxy = self.initialise_beamlet_proxy()
self.beamlet_proxy.subband_select_RW = numpy.array(
list(range(317)) + [316] + list(range(318, N_beamlets_ctrl)),
dtype=numpy.uint32,
......@@ -207,16 +160,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def test_set_pointing_masked_enable(self):
"""Verify that only selected inputs are written"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden)
self.setup_sdpfirmware_proxy()
self.setup_sdp_proxy()
self.antennafield_proxy = self.setup_antennafield_proxy(
self.antenna_qualities_ok, self.antenna_use_ok
)
self.proxy.initialise()
self.proxy.Tracking_enabled_RW = False
self.proxy.on()
......@@ -261,14 +204,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def test_beam_tracking_90_percent_interval(self):
"""Verify that the beam tracking operates within 95% of interval"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden)
self.setup_sdpfirmware_proxy()
self.setup_sdp_proxy()
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.proxy.initialise()
self.proxy.Tracking_enabled_RW = True
self.proxy.on()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment