diff --git a/sbin/run_integration_test.sh b/sbin/run_integration_test.sh index 70e8aaff339909b96f07db7e6125b748d49e251e..691ba0b54209bedd8c536bd406efc00b16911b59 100755 --- a/sbin/run_integration_test.sh +++ b/sbin/run_integration_test.sh @@ -69,7 +69,7 @@ sleep 1 # dsconfig container must be up and running... # shellcheck disable=SC2016 echo '/usr/local/bin/wait-for-it.sh ${TANGO_HOST} --strict --timeout=300 -- true' | make run dsconfig bash - -DEVICES="device-boot device-apsct device-apspu device-sdp device-recv device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-pdu device-antennafield device-temperature-manager" +DEVICES="device-boot device-apsct device-apspu device-sdp device-recv device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-pdu device-antennafield device-temperature-manager device-observation" SIMULATORS="sdptr-sim recv-sim unb2-sim apsct-sim apspu-sim" # Build only the required images, please do not build everything that makes CI diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index 86258de6f32b3bb5c0acc4123a2451467c2f5834..38701a47ae231c938012099596802aa74fc54c1c 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -173,6 +173,7 @@ class AntennaField(lofar_device): HBAT_PWR_LNA_on_RW = mapped_attribute("HBAT_PWR_LNA_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) HBAT_PWR_on_R = mapped_attribute("HBAT_PWR_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT) HBAT_PWR_on_RW = mapped_attribute("HBAT_PWR_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) + RCU_band_select_RW = mapped_attribute("RCU_band_select_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) # ----- Position information @@ -362,22 +363,26 @@ class HBATToRecvMapper(object): "HBAT_PWR_LNA_on_R": numpy.full((number_of_antennas,32), False), "HBAT_PWR_LNA_on_RW": numpy.full((number_of_antennas,32), False), "HBAT_PWR_on_R": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_on_RW": numpy.full((number_of_antennas,32), False) + "HBAT_PWR_on_RW": numpy.full((number_of_antennas,32), False), + "RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64) } self.__default_value_mapping_write = { "ANT_mask_RW": numpy.full(96, False), "HBAT_BF_delay_steps_RW": numpy.zeros([96,32], dtype=numpy.int64), "HBAT_LED_on_RW": numpy.full((96,32), False), "HBAT_PWR_LNA_on_RW": numpy.full((96,32), False), - "HBAT_PWR_on_RW": numpy.full((96,32), False) + "HBAT_PWR_on_RW": numpy.full((96,32), False), + "RCU_band_select_RW": numpy.zeros(96, dtype=numpy.int64) } self.__reshape_attributes_in = { "ANT_mask_RW": (96,), - "HBAT_BF_delay_steps_RW": (96, 32) + "HBAT_BF_delay_steps_RW": (96, 32), + "RCU_band_select_RW": (96,), } self.__reshape_attributes_out = { "ANT_mask_RW": (32, 3), - "HBAT_BF_delay_steps_RW": (96, 32) + "HBAT_BF_delay_steps_RW": (96, 32), + "RCU_band_select_RW": (32, 3) } def map_read(self, mapped_attribute, recv_results): diff --git a/tangostationcontrol/tangostationcontrol/devices/observation.py b/tangostationcontrol/tangostationcontrol/devices/observation.py index f57e3ed992e22b77ed12cf50d495c30b405d90b5..bc30872734ba2006c1d7245281cfb04dfd6fb0ba 100644 --- a/tangostationcontrol/tangostationcontrol/devices/observation.py +++ b/tangostationcontrol/tangostationcontrol/devices/observation.py @@ -6,9 +6,8 @@ # See LICENSE.txt for more info. # PyTango imports -from tango import AttrWriteType +from tango import AttrWriteType, DeviceProxy, DevState, DevSource, Util from tango.server import attribute -from tango import DevState import numpy @@ -19,6 +18,8 @@ from tangostationcontrol.devices.device_decorators import fault_on_error from tangostationcontrol.devices.device_decorators import only_when_on from tangostationcontrol.devices.device_decorators import only_in_states from tangostationcontrol.devices.lofar_device import lofar_device +from tangostationcontrol.devices.antennafield import MAX_NUMBER_OF_HBAT +from tangostationcontrol.devices.sdp.digitalbeam import DigitalBeam from datetime import datetime from json import loads @@ -40,14 +41,24 @@ class Observation(lofar_device): The lifecycle of instances of this device is controlled by ObservationControl """ + NUM_MAX_HBAT = MAX_NUMBER_OF_HBAT + NUM_INPUTS = DigitalBeam.NUM_INPUTS + NUM_BEAMLETS = DigitalBeam.NUM_BEAMLETS + # Attributes observation_running_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ, polling_period=1000, period=1000, rel_change="1.0") observation_id_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ) stop_time_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ) + antenna_mask_R = attribute(dtype=(numpy.int64,), max_dim_x=NUM_MAX_HBAT, access=AttrWriteType.READ) + filter_R = attribute(dtype=numpy.str, access=AttrWriteType.READ) + saps_subband_R = attribute(dtype=((numpy.uint32,),), max_dim_x=NUM_INPUTS, max_dim_y=NUM_BEAMLETS, access=AttrWriteType.READ) + saps_pointing_R = attribute(dtype=((numpy.str,),), max_dim_x=3, max_dim_y=NUM_BEAMLETS, access=AttrWriteType.READ) + tile_beam_R = attribute(dtype=(numpy.str,), max_dim_x=3, access=AttrWriteType.READ) + first_beamlet_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ) observation_settings_RW = attribute(dtype=str, access=AttrWriteType.READ_WRITE) - + def init_device(self): """Setup some class member variables for observation state""" @@ -70,6 +81,36 @@ class Observation(lofar_device): self._observation_id = parameters["observation_id"] self._stop_time = datetime.fromisoformat(parameters["stop_time"]) + self._antenna_mask = parameters["antenna_mask"] + self._filter = parameters["filter"] + self._num_saps = len(parameters["SAPs"]) + self._saps_subband = [ parameters["SAPs"][i]['subbands'] for i in range(0, self._num_saps)] + self._saps_pointing = self._build_saps_pointing(parameters) + self._tile_beam = self._build_tilebeam_pointing(parameters) + self._first_beamlet = parameters["first_beamlet"] + + # Set a reference of AntennaField device that is correlated to this device + util = Util.instance() + instance_number = self.get_name().split('/')[2] + self.antennafield_proxy = DeviceProxy( + f"{util.get_ds_inst_name()}/AntennaField/{instance_number}") + self.antennafield_proxy.set_source(DevSource.DEV) + + # Set a reference of RECV device that is correlated to this device + self.recv_proxy = DeviceProxy(f"{util.get_ds_inst_name()}/RECV/{instance_number}") + self.recv_proxy.set_source(DevSource.DEV) + + # Set a reference of Beamlet device that is correlated to this device + self.beamlet_proxy = DeviceProxy(f"{util.get_ds_inst_name()}/Beamlet/{instance_number}") + self.beamlet_proxy.set_source(DevSource.DEV) + + # Set a reference of DigitalBeam device that is correlated to this device + self.digitalbeam_proxy = DeviceProxy(f"{util.get_ds_inst_name()}/DigitalBeam/{instance_number}") + self.digitalbeam_proxy.set_source(DevSource.DEV) + + # Set a reference of TileBeam device that is correlated to this device + self.tilebeam_proxy = DeviceProxy(f"{util.get_ds_inst_name()}/Tilebeam/{instance_number}") + self.tilebeam_proxy.set_source(DevSource.DEV) logger.info( f"The observation with ID={self._observation_id} is " @@ -88,6 +129,19 @@ class Observation(lofar_device): super().configure_for_on() + # Apply Antenna Mask and Filter + HBAT_ANT_mask, RCU_band_select = self._apply_antennafield_settings(self.read_antenna_mask_R(), self.read_filter_R()) + self.antennafield_proxy.HBAT_ANT_mask_RW = HBAT_ANT_mask + self.antennafield_proxy.RCU_band_select_RW = RCU_band_select + + # Apply Beamlet configuration + self.beamlet_proxy.subband_select_RW = self._apply_saps_subbands(self.read_saps_subband_R()) + self.digitalbeam_proxy.Pointing_direction_RW = self._apply_saps_pointing(self.read_saps_pointing_R()) + self.digitalbeam_proxy.antenna_select_RW = self._apply_saps_antenna_select(self.read_antenna_mask_R()) + + # Apply Tile Beam pointing direction + self.tilebeam_proxy.Pointing_direction_RW = [tuple(self.read_tile_beam_R())] * self.antennafield_proxy.nr_tiles_R + logger.info(f"Started the observation with ID={self._observation_id}.") @only_when_on() @@ -103,6 +157,48 @@ class Observation(lofar_device): def read_stop_time_R(self): """Return the stop_time_R attribute.""" return self._stop_time.timestamp() + + @only_in_states([DevState.STANDBY, DevState.ON]) + @fault_on_error() + @log_exceptions() + def read_antenna_mask_R(self): + """Return the antenna_mask_R attribute.""" + return self._antenna_mask + + @only_in_states([DevState.STANDBY, DevState.ON]) + @fault_on_error() + @log_exceptions() + def read_filter_R(self): + """Return the filter_R attribute.""" + return self._filter + + @only_in_states([DevState.STANDBY, DevState.ON]) + @fault_on_error() + @log_exceptions() + def read_saps_subband_R(self): + """Return the saps_subband_R attribute.""" + return self._saps_subband + + @only_in_states([DevState.STANDBY, DevState.ON]) + @fault_on_error() + @log_exceptions() + def read_saps_pointing_R(self): + """Return the saps_pointing_R attribute.""" + return self._saps_pointing + + @only_in_states([DevState.STANDBY, DevState.ON]) + @fault_on_error() + @log_exceptions() + def read_tile_beam_R(self): + """Return the tile_beam_R attribute.""" + return self._tile_beam + + @only_in_states([DevState.STANDBY, DevState.ON]) + @fault_on_error() + @log_exceptions() + def read_first_beamlet_R(self): + """Return the first_beamlet_R attribute.""" + return self._first_beamlet @fault_on_error() @log_exceptions() @@ -126,6 +222,57 @@ class Observation(lofar_device): # value return time() + def _build_saps_pointing(self, parameters:dict): + """ Build the sap pointing array preserving the correct order from JSON """ + saps_pointing = [] + for i in range(0, self._num_saps): + pointing_direction = parameters["SAPs"][i]['pointing'] + saps_pointing.insert(i,(pointing_direction['direction_type'], f"{pointing_direction['angle1']}deg", f"{pointing_direction['angle2']}deg")) + return saps_pointing + + def _build_tilebeam_pointing(self, parameters:dict): + """ Build the sap pointing array preserving the correct order from JSON """ + pointing_direction = parameters["tile_beam"] + return [str(pointing_direction['direction_type']), f"{pointing_direction['angle1']}deg", f"{pointing_direction['angle2']}deg"] + + def _apply_antennafield_settings(self, antenna_mask:list, filter_name:str): + """ Convert an array of antenna indexes into a boolean mask array and + retrieve the RCU band from filter name, returning the correct format for + AntennaField device + """ + HBAT_ANT_mask_RW = [False] * self.NUM_MAX_HBAT + RCU_band_select_RW = [0] * self.NUM_MAX_HBAT + rcu_band = self.recv_proxy.get_rcu_band_from_filter(filter_name) + for a in antenna_mask: + HBAT_ANT_mask_RW[a] = True + RCU_band_select_RW[a] = rcu_band + return numpy.array(HBAT_ANT_mask_RW), numpy.array(RCU_band_select_RW) + + def _apply_saps_subbands(self, sap_subbands:list): + """ Convert an array of subbands into the correct format for Beamlet device""" + subband_select = self.beamlet_proxy.subband_select_RW + first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64) + # Insert subband values starting from the first beamlet + sap_subbands = numpy.array(sap_subbands).flatten() + subband_select[first_beamlet:len(sap_subbands)] = sap_subbands + return subband_select + + def _apply_saps_pointing(self, sap_pointing:list): + """ Convert an array of string directions into the correct format for DigitalBeam device""" + pointing_direction = list(self.digitalbeam_proxy.Pointing_direction_RW) # convert to list to allows item assignment + first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64) + # Insert pointing values starting from the first beamlet + pointing_direction[first_beamlet:len(sap_pointing)] = sap_pointing + return tuple(pointing_direction) + + def _apply_saps_antenna_select(self, antenna_mask:list): + """ Convert an array of antenna indexes into a boolean select array""" + antenna_select = numpy.array([[False] * self.NUM_BEAMLETS] * self.NUM_INPUTS) + first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64) + for a in antenna_mask: + for i in range(first_beamlet, self.NUM_BEAMLETS): + antenna_select[a,i] = True + return antenna_select # ---------- # Run server diff --git a/tangostationcontrol/tangostationcontrol/devices/recv.py b/tangostationcontrol/tangostationcontrol/devices/recv.py index 916ec2b1932ab605a08f4161a73fd550a4574616..4914f73134a60e746c1e38047fcae7964f423987 100644 --- a/tangostationcontrol/tangostationcontrol/devices/recv.py +++ b/tangostationcontrol/tangostationcontrol/devices/recv.py @@ -15,7 +15,7 @@ from tango import DebugIt from tango.server import command from tango.server import device_property, attribute -from tango import AttrWriteType, DevVarFloatArray +from tango import AttrWriteType, DevVarFloatArray, DevString, DevLong import numpy @@ -34,6 +34,16 @@ __all__ = ["RECV", "main"] @device_logging_to_python() class RECV(opcua_device): + + FILTER_RCU_DICT = { + "LBA_10_90": 1, + "LBA_10_70": 1, + "LBA_30_90": 2, + "LBA_30_70": 2, + "HBA_170_230": 1, + "HBA_110_190": 2, + "HBA_210_250": 4 + } # ----------------- # Device Properties @@ -263,6 +273,11 @@ class RECV(opcua_device): HBAT_bf_delay_steps = self._calculate_HBAT_bf_delay_steps(delays) return HBAT_bf_delay_steps.flatten() + + @command(dtype_in=DevString, dtype_out=DevLong) + def get_rcu_band_from_filter(self, filter_name: str): + """ return the rcu band given the filter name""" + return self.FILTER_RCU_DICT.get(filter_name, -1) @command() @DebugIt() diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py new file mode 100644 index 0000000000000000000000000000000000000000..ce09175933edc18baf555ba091bb8b72ebdb1781 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py @@ -0,0 +1,228 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import numpy +from json import loads +from datetime import datetime +from tango import DevState, DevFailed + +from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +from tangostationcontrol.test.devices.test_observation_base import TestObservationBase +from .base import AbstractTestBases + +class TestDeviceObservation(AbstractTestBases.TestDeviceBase): + + NUM_TILES = 48 + NUM_BEAMLETS_CTRL = 488 + NUM_INPUTS = 96 + INPUT_TO_ANTENNA_MAPPING = [ + "0", "1", "2", "3", "4", "5", + "6", "7", "8", "9", "10", "11", + "12", "13", "14", "15", "16", "17", + "18", "19", "20", "21", "22", "23", + "24", "25", "26", "27", "28", "29", + "30", "31", "32", "33", "34", "35", + "36", "37", "38", "39", "40", "41", + "42", "43", "44", "45", "46", "47", + "-1", "-1", "-1", "-1", "-1", "-1", + "-1", "-1", "-1", "-1", "-1", "-1", + "-1", "-1", "-1", "-1", "-1", "-1", + "-1", "-1", "-1", "-1", "-1", "-1", + "-1", "-1", "-1", "-1", "-1", "-1", + "-1", "-1", "-1", "-1", "-1", "-1", + "-1", "-1", "-1", "-1", "-1", "-1", + "-1", "-1", "-1", "-1", "-1", "-1" + ] + + def setUp(self): + super().setUp("STAT/Observation/1") + self.VALID_JSON = TestObservationBase.VALID_JSON + self.recv_proxy = self.setup_recv_proxy() + self.antennafield_proxy = self.setup_antennafield_proxy() + self.beamlet_proxy = self.setup_beamlet_proxy() + self.sdp_proxy = self.setup_sdp_proxy() + self.digitalbeam_proxy = self.setup_digitalbeam_proxy() + self.tilebeam_proxy = self.setup_tilebeam_proxy() + + def setup_recv_proxy(self): + # setup RECV + recv_proxy = TestDeviceProxy("STAT/RECV/1") + recv_proxy.off() + recv_proxy.warm_boot() + recv_proxy.set_defaults() + return recv_proxy + + def setup_sdp_proxy(self): + # setup SDP + sdp_proxy = TestDeviceProxy("STAT/SDP/1") + sdp_proxy.off() + sdp_proxy.warm_boot() + return sdp_proxy + + def setup_antennafield_proxy(self): + # setup AntennaField + antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") + control_mapping = [[1,i] for i in range(self.NUM_TILES)] + antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], + "HBAT_Power_to_RECV_mapping": numpy.array(control_mapping).flatten()}) + antennafield_proxy.off() + antennafield_proxy.warm_boot() + antennafield_proxy.set_defaults() + return antennafield_proxy + + def setup_beamlet_proxy(self): + # setup Digitalbeam + beamlet_proxy = TestDeviceProxy("STAT/Beamlet/1") + beamlet_proxy.off() + beamlet_proxy.warm_boot() + beamlet_proxy.set_defaults() + return beamlet_proxy + + def setup_digitalbeam_proxy(self): + # setup Digitalbeam + digitalbeam_proxy = TestDeviceProxy("STAT/DigitalBeam/1") + digitalbeam_proxy.put_property({"Input_to_Antenna_Mapping": numpy.array(self.INPUT_TO_ANTENNA_MAPPING).flatten()}) + digitalbeam_proxy.off() + digitalbeam_proxy.warm_boot() + digitalbeam_proxy.set_defaults() + return digitalbeam_proxy + + def setup_tilebeam_proxy(self): + # Setup Tilebeam + tilebeam_proxy = TestDeviceProxy("STAT/TileBeam/1") + tilebeam_proxy.off() + tilebeam_proxy.warm_boot() + tilebeam_proxy.set_defaults() + return tilebeam_proxy + + def test_init_valid(self): + """Initialize an observation with valid JSON""" + + self.proxy.off() + self.proxy.observation_settings_RW = self.VALID_JSON + self.proxy.Initialise() + self.assertEqual(DevState.STANDBY, self.proxy.state()) + + def test_init_invalid(self): + """Initialize an observation with _invalid_ JSON""" + + self.proxy.off() + self.proxy.observation_settings_RW = "{}" + with self.assertRaises(DevFailed): + self.proxy.Initialise() + self.assertEqual(DevState.FAULT, self.proxy.state()) + + def test_prohibit_rewriting_settings(self): + """Test that changing observation settings is disallowed once init""" + + self.proxy.off() + self.proxy.observation_settings_RW = self.VALID_JSON + self.proxy.Initialise() + + with self.assertRaises(DevFailed): + self.proxy.write_attribute( + "observation_settings_RW", self.VALID_JSON) + + def test_attribute_match(self): + """Test that JSON data is exposed to attributes""" + + data = loads(self.VALID_JSON) + stop_timestamp = datetime.fromisoformat(data["stop_time"]).timestamp() + observation_id = data["observation_id"] + antenna_mask = data["antenna_mask"] + filter = data["filter"] + num_saps = len(data["SAPs"]) + saps_subband = [ data["SAPs"][i]['subbands'] for i in range(0, num_saps)] + pointing_direction = data["SAPs"][0]['pointing'] + saps_pointing = [(pointing_direction['direction_type'], f"{pointing_direction['angle1']}deg", f"{pointing_direction['angle2']}deg")] + tile_beam = [str(data['tile_beam']['direction_type']), f"{data['tile_beam']['angle1']}deg", f"{data['tile_beam']['angle2']}deg"] + first_beamlet = data["first_beamlet"] + + self.proxy.off() + self.proxy.observation_settings_RW = self.VALID_JSON + self.proxy.Initialise() + self.proxy.On() + + self.assertEqual(DevState.ON, self.proxy.state()) + self.assertEqual(stop_timestamp, self.proxy.stop_time_R) + self.assertEqual(observation_id, self.proxy.observation_id_R) + self.assertListEqual(antenna_mask, self.proxy.antenna_mask_R.tolist()) + self.assertEqual(filter, self.proxy.filter_R) + self.assertListEqual(saps_subband, self.proxy.saps_subband_R.tolist()) + self.assertListEqual(saps_pointing, list(self.proxy.saps_pointing_R)) + self.assertListEqual(tile_beam, list(self.proxy.tile_beam_R)) + self.assertEqual(first_beamlet, self.proxy.first_beamlet_R) + + def test_apply_antennafield_settings(self): + """Test that attributes antenna_mask and filter are correctly applied""" + self.setup_recv_proxy() + antennafield_proxy = self.setup_antennafield_proxy() + antennafield_proxy.HBAT_ANT_mask_RW = [True] * 48 # set all masks to True + self.assertListEqual(antennafield_proxy.HBAT_ANT_mask_RW.tolist(), [True] * 48) + antennafield_proxy.RCU_band_select_RW = [0] * 48 + self.assertListEqual(antennafield_proxy.RCU_band_select_RW.tolist(), [0] * 48) + self.proxy.off() + self.proxy.observation_settings_RW = self.VALID_JSON + self.proxy.Initialise() + self.proxy.On() + expected_masks = [True, True, True] + [False] * 6 + [True] + [False] * 38 + self.assertListEqual(antennafield_proxy.HBAT_ANT_mask_RW.tolist(), expected_masks) + expected_bands = [2, 2, 2] + [0] * 6 + [2] + [0] * 38 + self.assertListEqual(antennafield_proxy.RCU_band_select_RW.tolist(), expected_bands) + + def test_apply_subbands(self): + """Test that attribute sap subbands is correctly applied""" + beamlet_proxy = self.setup_beamlet_proxy() + subband_select = [0] * self.NUM_BEAMLETS_CTRL + beamlet_proxy.subband_select_RW = subband_select + self.assertListEqual(beamlet_proxy.subband_select_RW.tolist(), subband_select) + self.proxy.off() + self.proxy.observation_settings_RW = self.VALID_JSON + self.proxy.Initialise() + self.proxy.On() + expected_subbands = [10,20,30] + [0] * (self.NUM_BEAMLETS_CTRL-3) + self.assertListEqual(beamlet_proxy.subband_select_RW.tolist(), expected_subbands) + + def test_apply_pointing(self): + """Test that attribute sap pointing is correctly applied""" + digitalbeam_proxy = self.setup_digitalbeam_proxy() + default_pointing = [("AZELGEO","0deg","90deg")]*488 + digitalbeam_proxy.Pointing_direction_RW = default_pointing + self.assertListEqual(list(digitalbeam_proxy.Pointing_direction_RW), default_pointing) + self.proxy.off() + self.proxy.observation_settings_RW = self.VALID_JSON + self.proxy.Initialise() + self.proxy.On() + expected_pointing = [("J2000","1.5deg","0deg")] + [("AZELGEO","0deg","90deg")] * 487 + self.assertListEqual(list(digitalbeam_proxy.Pointing_direction_RW), expected_pointing) + + def test_apply_antenna_select(self): + """Test that antenna selection is correctly applied""" + digitalbeam_proxy = self.setup_digitalbeam_proxy() + default_selection = [[False] * self.NUM_BEAMLETS_CTRL] * self.NUM_INPUTS + digitalbeam_proxy.antenna_select_RW = default_selection + self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[9], default_selection[9]) + self.proxy.off() + self.proxy.observation_settings_RW = self.VALID_JSON + self.proxy.Initialise() + self.proxy.On() + self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[9], [True] * self.NUM_BEAMLETS_CTRL) + self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[10], [False] * self.NUM_BEAMLETS_CTRL) + + def test_apply_tilebeam(self): + """Test that attribute tilebeam is correctly applied""" + tilebeam_proxy = self.setup_tilebeam_proxy() + pointing_direction = [("J2000","0deg","0deg")] * self.NUM_TILES + tilebeam_proxy.Pointing_direction_RW = pointing_direction + self.assertListEqual(list(tilebeam_proxy.Pointing_direction_RW[0]), ["J2000","0deg","0deg"]) + self.proxy.off() + self.proxy.observation_settings_RW = self.VALID_JSON + self.proxy.Initialise() + self.proxy.On() + self.assertListEqual(list(tilebeam_proxy.Pointing_direction_RW[0]), ["J2000","1.5deg","0deg"]) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py index b27c59e0f0deedd607398ad796b60550eea06e81..48692d08b255bea2a232776737a7d17f18633606 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py @@ -43,6 +43,13 @@ class TestHBATToRecvMapper(base.TestCase): actual = mapper.map_read("ANT_mask_RW", receiver_values) numpy.testing.assert_equal(expected, actual) + + def test_rcu_band_select_no_mapping(self): + mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) + receiver_values = [[[0] * 3] * 32, [[0] * 3] * 32, [[0] * 3] * 32] + expected = [0] * 48 + actual = mapper.map_read("RCU_band_select_RW", receiver_values) + numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_r_no_mapping(self): mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 3) @@ -205,6 +212,30 @@ class TestHBATToRecvMapper(base.TestCase): expected = [[[False, True, False]] + [[False] * 3] * 31] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) + + def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self): + mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1) + + set_values = [0] * 48 + expected = [[[0] * 3] * 32] + actual = mapper.map_write("RCU_band_select_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self): + mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 2) + + set_values = [0] * 48 + expected = [[[0] * 3] * 32] * 2 + actual = mapper.map_write("RCU_band_select_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): + mapper = HBATToRecvMapper(self.control_hba_0_and_1_on_rcu_1_and_0_of_recv_1, self.power_not_connected, 1) + + set_values = [1, 0] + [0] * 46 + expected = [[[0, 1, 0]] + [[0] * 3] * 31] + actual = mapper.map_write("RCU_band_select_RW", set_values) + numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_no_mapping_and_one_receiver(self): mapper = HBATToRecvMapper(self.control_not_connected, self.power_not_connected, 1) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_observation_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_observation_device.py index dc04f5d07a1335190ba1c1ce1783144f36b3b734..70ecc0cfbaab093aef6a4685e111ae35f3fbd327 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_observation_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_observation_device.py @@ -7,14 +7,6 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. -from json import loads -from datetime import datetime - -from tango.test_context import DeviceTestContext -from tango import DevFailed -from tango import DevState - -from tangostationcontrol.devices import observation from tangostationcontrol.test.devices import device_base from tangostationcontrol.test.devices import test_observation_base @@ -25,49 +17,3 @@ class TestObservationDevice(device_base.DeviceTestCase, test_observation_base.Te def setUp(self): # DeviceTestCase setUp patches lofar_device DeviceProxy super(TestObservationDevice, self).setUp() - - def test_init_valid(self): - """Initialize an observation with valid JSON""" - - with DeviceTestContext(observation.Observation, process=True) as proxy: - proxy.off() - proxy.observation_settings_RW = self.VALID_JSON - proxy.Initialise() - self.assertEqual(DevState.STANDBY, proxy.state()) - - def test_init_invalid(self): - """Initialize an observation with _invalid_ JSON""" - - with DeviceTestContext(observation.Observation, process=True) as proxy: - proxy.off() - proxy.observation_settings_RW = "{}" - with self.assertRaises(DevFailed): - proxy.Initialise() - - def test_prohibit_rewriting_settings(self): - """Test that changing observation settings is disallowed once init""" - - with DeviceTestContext(observation.Observation, process=True) as proxy: - proxy.off() - proxy.observation_settings_RW = self.VALID_JSON - proxy.Initialise() - - with self.assertRaises(DevFailed): - proxy.write_attribute( - "observation_settings_RW", self.VALID_JSON) - - def test_attribute_match(self): - """Test that JSON data is exposed to attributes""" - - data = loads(self.VALID_JSON) - stop_timestamp = datetime.fromisoformat(data["stop_time"]).timestamp() - - with DeviceTestContext(observation.Observation, process=True) as proxy: - proxy.off() - proxy.observation_settings_RW = self.VALID_JSON - proxy.Initialise() - proxy.On() - - self.assertEqual(DevState.ON, proxy.state()) - self.assertEqual(stop_timestamp, proxy.stop_time_R) - self.assertEqual(data["observation_id"], proxy.observation_id_R) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py index cece1ea75344d4f74543c332ca609d93000411ab..1e1c08570f20c217aa5be54fdee43571aa886707 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py @@ -31,3 +31,10 @@ class TestRecvDevice(device_base.DeviceTestCase): delays = numpy.random.rand(96,16).flatten() HBAT_bf_delay_steps = proxy.calculate_HBAT_bf_delay_steps(delays) self.assertEqual(3072, len(HBAT_bf_delay_steps)) # 96x32=3072 + + def test_get_rcu_band_from_filter(self): + """Verify filter lookup table values are correctly retrieved""" + with DeviceTestContext(recv.RECV, properties=self.recv_properties, process=True) as proxy: + filter_name = "HBA_170_230" + self.assertEqual(1, proxy.get_rcu_band_from_filter(filter_name)) + self.assertEqual(-1, proxy.get_rcu_band_from_filter('MOCK_FILTER'))