diff --git a/tangostationcontrol/tangostationcontrol/common/beamforming.py b/tangostationcontrol/tangostationcontrol/common/beamforming.py new file mode 100644 index 0000000000000000000000000000000000000000..768bc3bbd9bfc1f0c1834bda117983448fc78c71 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/common/beamforming.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" Helper functions for managing beamforming calculation. + +""" + +import numpy + +def calculate_HBAT_bf_delays(delays: numpy.ndarray, HBAT_signal_input_delays: numpy.ndarray): + """ + Helper function that converts a signal path delay (in seconds) to an analog beam weight, + which is a value per tile per dipole per polarisation. + """ + # Duplicate delay values per polarisation + polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32 + + # Add signal input delay + summed_delays = numpy.add(polarised_delays,HBAT_signal_input_delays) + + # Divide by 0.5ns and round + HBAT_bf_delays = numpy.array(summed_delays / 0.5e-09, dtype=numpy.int64) + + return HBAT_bf_delays diff --git a/tangostationcontrol/tangostationcontrol/devices/beam.py b/tangostationcontrol/tangostationcontrol/devices/beam.py index 05373c2537d6ea07f8ebace51f95af26bfb8a978..a64471933bfa520a3598a9d42e1b15b9b4a67781 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam.py @@ -19,6 +19,7 @@ from tangostationcontrol.common.lofar_logging import device_logging_to_python, l from tangostationcontrol.common.measures import get_measures_directory, use_measures_directory, download_measures, restart_python, get_available_measures_directories from tangostationcontrol.beam.delays import delay_calculator from tangostationcontrol.devices.device_decorators import * +from tangostationcontrol.common.beamforming import calculate_HBAT_bf_delays import logging logger = logging.getLogger() @@ -104,22 +105,6 @@ class Beam(lofar_device): return delays - def _calculate_HBAT_bf_delays(self,delays: numpy.ndarray): - """ - Helper function that converts a signal path delay (in seconds) to an analog beam weight, - which is a value per tile per dipole per polarisation. - """ - # Duplicate delay values per polarisation - polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32 - - # Add signal input delay - summed_delays = numpy.add(polarised_delays,self.HBAT_signal_input_delays) - - # Divide by 0.5ns and round - HBAT_bf_delays = numpy.array(summed_delays / 0.5e-09, dtype=numpy.int64) - - return HBAT_bf_delays - def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()): """ Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) @@ -127,7 +112,7 @@ class Beam(lofar_device): # Retrieve delays from casacore delays = self._HBAT_delays(pointing_direction, timestamp) # Convert delays into beam weights - HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays) + HBAT_bf_delays = calculate_HBAT_bf_delays(delays, self.HBAT_signal_input_delays) # Write weights to RECV self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py index 06a6d858114b30a1dc1f1f6ba7314aadf1a2c5c3..4a633876d77ad708a234f4cc9bafd967c07a4f57 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py @@ -11,6 +11,7 @@ from tango import DevState from tango.test_context import DeviceTestContext from tangostationcontrol.devices import beam, lofar_device +from tangostationcontrol.common.beamforming import calculate_HBAT_bf_delays import numpy import mock @@ -70,5 +71,6 @@ class TestBeamDevice(base.TestCase): # verify if values are actually transformed HBAT_delays = proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten()) - HBAT_bf_delays = beam.Beam._calculate_HBAT_bf_delays(HBAT_delays) + HBAT_signal_input_delays = numpy.random.rand(96,32) # tango properties cannot be retrieved for mock devices + HBAT_bf_delays = calculate_HBAT_bf_delays(numpy.array(HBAT_delays).reshape(96,16), HBAT_signal_input_delays) self.assertNotEqual(HBAT_delays, HBAT_bf_delays)