diff --git a/tangostationcontrol/tangostationcontrol/devices/beam.py b/tangostationcontrol/tangostationcontrol/devices/beam.py index e00f9581d664fe8111fcf5620f23e1c7eeb62dca..aff312deeeffe8f83f095012609d7550601b8382 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam.py @@ -95,6 +95,78 @@ class Beam(lofar_device): logger.warning("RECV device offline") # avoid error raising in unittest super().configure_for_initialise() + # -------- + # internal functions + # -------- + + def _set_pointing_directions(self, new_points: numpy.array): + if new_points.shape != (96,3): + raise ValueError(f"New pointing directions given as shape {new_points.shape}, must be (96,3)") + + self._hbat_pointing_direction = new_points + + def _set_pointing_epochs(self, new_points: numpy.array): + if new_points.shape != (96,): + raise ValueError(f"New pointing epochs given as shape {new_points.shape}, must be (96,)") + + self._hbat_pointing_epoch = new_points + + def _HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()): + """ + Calculate the delays (in seconds) based on the pointing list and the timestamp + TBD: antenna and reference positions will be retrieved from RECV and not stored as BEAM device properties + """ + + delays = numpy.zeros((96,16), dtype=numpy.float64) + + for tile in range(96): + # initialise delay calculator + d = delay_calculator(self.reference_itrf[tile]) + d.set_measure_time(timestamp) + + # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions + delays[tile] = d.convert(pointing_direction[tile], self.antenna_itrf[tile]) + + return delays + + def _calculate_HBAT_bf_delays(self, delays: numpy.ndarray): + """ + Helper function that converts a signal path delay (in seconds) to an analog beam weight, + which is a value per tile per dipole per polarisation. + """ + # Duplicate delay values per polarisation + polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32 + + # Divide by 0.5ns and round + HBAT_bf_delays = numpy.array(polarised_delays / 0.5e-09, dtype=numpy.int64) + + return HBAT_bf_delays + + def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()): + """ + Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) + """ + # Retrieve delays from casacore + delays = self._HBAT_delays(pointing_direction, timestamp) + # Convert delays into beam weights + HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays) + + # Write weights to RECV + self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays + + # Record where we now point to, now that we've updated the weights. + # Only the entries within the mask have been updated: + # mask = False -> retain value, mask = True -> use new value + mask = self.recv_proxy.RCU_mask_RW + pointing_epochs = numpy.tile([timestamp], 96) + for tile in range(96): + if not mask[tile]: + pointing_direction[tile] = self.hbat_pointing_direction[tile] + pointing_epochs[tile] = self.hbat_pointing_epoch[tile] + + self._set_pointing_directions(pointing_direction) + self._set_pointing_epochs(pointing_epochs) + # -------- # Commands # -------- @@ -103,16 +175,15 @@ class Beam(lofar_device): @command(dtype_in=(numpy.str,)) @only_in_states([DevState.STANDBY, DevState.ON]) def set_pointing_directions(self, new_points: numpy.array): - self._hbat_pointing_direction = numpy.array(new_points).reshape(96,3) + new_points = numpy.array(new_points).reshape(96,3) + + self._set_pointing_directions(new_points) @DebugIt() @command(dtype_in=(numpy.double,)) @only_in_states([DevState.STANDBY, DevState.ON]) def set_pointing_epochs(self, new_points: numpy.array): - if new_points.shape != (96,): - raise ValueError(f"New direction epochs given as shape {new_points.shape}, must be (96,)") - - self._hbat_pointing_epoch = new_points + self._set_pointing_epochs(new_points) @command(dtype_out=str, doc_out="Name of newly installed measures directory") @DebugIt() @@ -156,13 +227,7 @@ class Beam(lofar_device): """ pointing_direction = numpy.array(pointing_direction).reshape(96,3) - delays = numpy.zeros((96,16),dtype=numpy.float64) - for tile in range(0,96): - # initialise delay calculator - d = delay_calculator(self.reference_itrf[tile]) - d.set_measure_time(timestamp) - # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions - delays[tile] = d.convert(pointing_direction[tile], self.antenna_itrf[tile]) + delays = self._HBAT_delays(pointing_direction, timestamp) return delays.flatten() @@ -175,11 +240,12 @@ class Beam(lofar_device): Helper function that converts a signal path delay (in seconds) to an analog beam weight, which is a value per tile per dipole per polarisation. """ - # Duplicate delay values per polarisation - polarised_delays = numpy.tile(delays,2) # output dims -> 96x32 - # Divide by 0.5ns and round - HBAT_bf_delays = numpy.array(polarised_delays / 0.5e-09, dtype=numpy.int64) - return HBAT_bf_delays + + delays = delays.reshape(96,16) + + result = self._calculate_HBAT_bf_delays(delays) + + return result.flatten() @command(dtype_in=DevVarStringArray) @DebugIt() @@ -188,19 +254,9 @@ class Beam(lofar_device): """ Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) """ - # Retrieve delays from casacore - delays = self.HBAT_delays(pointing_direction,timestamp) - # Convert delays into beam weights - HBAT_bf_delays = self.calculate_HBAT_bf_delays(delays) - # Write weights to RECV - self.recv_proxy.write_attribute('HBAT_BF_delays_RW',HBAT_bf_delays) + pointing_direction = numpy.array(pointing_direction).reshape(96,3) - # Record where we now point to, now that we've updated the weights. - # Only the entries within the mask have been updated: - # mask = False -> retain value, mask = True -> use new value - mask = self.recv_proxy.RCU_mask_RW - self.set_pointing_directions((self.hbat_pointing_direction * ~mask) + (pointing_direction * mask)) - self.set_pointing_epochs((self.hbat_pointing_epoch * ~mask) + (numpy.tile([timestamp],96) * mask)) + self._HBAT_set_pointing(pointing_direction, timestamp) # ---------- # Run server