diff --git a/tangostationcontrol/tangostationcontrol/devices/beam.py b/tangostationcontrol/tangostationcontrol/devices/beam.py index a87808b3946db0525d35a8999d064501cd3a3425..a44a557a3fbb875d7f623ce05ed039576cdc69f7 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam.py @@ -7,12 +7,16 @@ """ -# PyTango imports -from tango.server import attribute, command -from tango import AttrWriteType, DebugIt -# Additional import +import numpy + +from tango.server import attribute +from tango.server import command +from tango import AttrWriteType +from tango import DevState +from tango import DebugIt from tangostationcontrol.common.entrypoint import entry +from tangostationcontrol.devices.device_decorators import only_in_states from tangostationcontrol.devices.lofar_device import lofar_device from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions from tangostationcontrol.common.measures import get_measures_directory, use_measures_directory, download_measures, restart_python, get_available_measures_directories @@ -25,6 +29,18 @@ __all__ = ["Beam", "main"] @device_logging_to_python() class Beam(lofar_device): + + _hbat_pointing_direction = numpy.zeros(96) + _hbat_pointing_epoch = numpy.zeros(96) + + @property + def hbat_pointing_direction(self): + return tuple(self._hbat_pointing_direction) + + @property + def hbat_pointing_epoch(self): + return tuple(self._hbat_pointing_epoch) + # ----------------- # Device Properties # ----------------- @@ -33,6 +49,14 @@ class Beam(lofar_device): # Attributes # ---------- + HBAT_pointing_direction_R = attribute(access=AttrWriteType.READ, + dtype=(numpy.double,), max_dim_x=96, + fget=lambda self: self.hbat_pointing_direction) + + HBAT_pointing_epoch_R = attribute(access=AttrWriteType.READ, + dtype=(numpy.double,), max_dim_x=96, + fget=lambda self: self.hbat_pointing_epoch) + # Directory where the casacore measures that we use, reside. We configure ~/.casarc to # use the symlink /opt/IERS/current, which we switch to the actual set of files to use. measures_directory_R = attribute(dtype=str, access=AttrWriteType.READ, fget = lambda self: get_measures_directory()) @@ -44,11 +68,32 @@ class Beam(lofar_device): # overloaded functions # -------- - # -------- # Commands # -------- + @DebugIt() + @command(dtype_in=(numpy.double,), dtype_out=int) + @only_in_states([DevState.STANDBY, DevState.ON]) + def set_direction_pointings(self, new_points: numpy.array): + if new_points.size != 96: + return -1 + + self._hbat_pointing_direction = new_points + + return 0 + + @DebugIt() + @command(dtype_in=(numpy.double,), dtype_out=int) + @only_in_states([DevState.STANDBY, DevState.ON]) + def set_direction_epochs(self, new_points: numpy.array): + if new_points.size != 96: + return -1 + + self._hbat_pointing_epoch = new_points + + return 0 + @command(dtype_out=str, doc_out="Name of newly installed measures directory") @DebugIt() @log_exceptions() diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py new file mode 100644 index 0000000000000000000000000000000000000000..107f95293f587e632f018be68a5cf07764bc49b7 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import numpy + +from tango import DevState +from tango.test_context import DeviceTestContext + +from tangostationcontrol.devices import beam + +import mock + +from tangostationcontrol.test import base + + +class TestBeamDevice(base.TestCase): + + def setUp(self): + super(TestBeamDevice, self).setUp() + + # lofar_device init_device will launch a DeviceProxy not captured by + # the TestDeviceContext making it fail. + + # Patch init_device and force match spec + init_patcher = mock.patch.object( + beam.Beam, 'init_device', spec=beam.Beam.init_device) + self.m_init = init_patcher.start() + self.addCleanup(init_patcher.stop) + + def test_get_pointing_directions(self): + """Verify can read pointings attribute and length matches without err""" + with DeviceTestContext(beam.Beam, process=True) as proxy: + self.assertEqual(96, len(proxy.read_attribute( + "HBAT_pointing_direction_R").value)) + + def test_get_pointing_epochs(self): + """Verify can read epochs attribute and length matches without err""" + + with DeviceTestContext(beam.Beam, process=True) as proxy: + self.assertEqual(96, len(proxy.read_attribute( + "HBAT_pointing_epoch_R").value)) + + def test_set_pointing_direction(self): + """Verify can set pointings attribute without error""" + + with DeviceTestContext(beam.Beam, process=True) as proxy: + proxy.init() + proxy.Initialise() + self.assertEqual(DevState.STANDBY, proxy.state()) + self.assertEqual(0, proxy.set_direction_pointings(numpy.zeros(96))) + + def test_set_pointing_epochs(self): + """Verify can set epochs attribute without error""" + + with DeviceTestContext(beam.Beam, process=True) as proxy: + proxy.init() + proxy.Initialise() + self.assertEqual(DevState.STANDBY, proxy.state()) + self.assertEqual(0, proxy.set_direction_epochs(numpy.zeros(96))) + + def pointing(self, attribute: str, lambd): + data = numpy.arange(0, 96) + + with DeviceTestContext(beam.Beam, process=True) as proxy: + proxy.init() + proxy.Initialise() + self.assertEqual(DevState.STANDBY, proxy.state()) + + # Evaluate default all zeros are present using numpy array compare + compare_obj = numpy.zeros(96) == proxy.read_attribute( + attribute).value + self.assertTrue(compare_obj.all()) + + # Set direction pointings to range of incrementing values + self.assertEqual(0, lambd(proxy, data)) + + # Verify attribute has been updated with correct data + compare_obj = data == proxy.read_attribute(attribute).value + self.assertTrue(compare_obj.all()) + + def test_direction_pointing(self): + """Set and Get test with actual values for pointing attribute""" + + self.pointing("HBAT_pointing_direction_R", lambda x, y: + x.set_direction_pointings(y)) + + def test_direction_epochs(self): + """Set and Get test with actual values for pointing attribute""" + + self.pointing("HBAT_pointing_epoch_R", lambda x, y: + x.set_direction_epochs(y)) + + def test_pointing_invalid(self): + """Test that set pointings command refuses invalid lengths""" + + with DeviceTestContext(beam.Beam, process=True) as proxy: + proxy.init() + proxy.Initialise() + self.assertEqual(DevState.STANDBY, proxy.state()) + + # should return error due to invalid length + self.assertEqual(-1, proxy.set_direction_pointings(numpy.zeros(55))) + + def test_epoch_invalid(self): + """Test that set epochs command refuses invalid lengths""" + + with DeviceTestContext(beam.Beam, process=True) as proxy: + proxy.init() + proxy.Initialise() + self.assertEqual(DevState.STANDBY, proxy.state()) + + # should return error due to invalid length + self.assertEqual(-1, proxy.set_direction_epochs(numpy.zeros(55)))