Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
beam_device.py 17.46 KiB
# -*- coding: utf-8 -*-
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
"""Beam Abstract Device Server for LOFAR2.0
"""
import datetime
import numpy
import time
from json import loads
from threading import Thread, Lock, Condition
from statistics import median
# PyTango imports
from tango.server import attribute, command, device_property
from tango import AttrWriteType, DebugIt, DevVarStringArray, DevVarDoubleArray, DevString
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.measures import get_measures_directory, get_available_measures_directories, download_measures, use_measures_directory, restart_python
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.device_decorators import TimeIt, only_in_states, fault_on_error
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.devices.lofar_device import lofar_device
__all__ = ["beam_device", "main", "BeamTracker"]
import logging
logger = logging.getLogger()
class beam_device(lofar_device):
# -----------------
# Device Properties
# -----------------
Beam_tracking_interval = device_property(
dtype='DevFloat',
doc='Beam weights updating interval time [seconds]',
mandatory=False,
default_value = 10.0
)
Beam_tracking_application_offset = device_property(
dtype='DevFloat',
doc='Amount of time to send the weights earlier than the interval, to allow the hardware to get apply them in time [seconds]',
mandatory=False,
default_value = 0.05
)
Beam_tracking_preparation_period = device_property(
dtype='DevFloat',
doc='Preparation time [seconds] needed before starting update operation',
mandatory=False,
default_value = 0.4
)
Tracking_enabled_RW_default = device_property(
dtype='DevBoolean',
mandatory=False,
default_value=True
)
# ----------
# Attributes
# ----------
# Maximum array size to allocate for the pointings,
# which must be enough for all derived classes.
#
# The actual number of pointings for this device
# will be stored as self._num_pointings.
MAX_POINTINGS = 1024
Pointing_direction_R = attribute(access=AttrWriteType.READ,
dtype=((str,),), max_dim_x=3, max_dim_y=MAX_POINTINGS,
fget=lambda self: self._pointing_direction_r)
Pointing_direction_RW = attribute(access=AttrWriteType.READ_WRITE,
dtype=((str,),), max_dim_x=3, max_dim_y=MAX_POINTINGS,
fget=lambda self: self._pointing_direction_rw)
Pointing_direction_str_R = attribute(access=AttrWriteType.READ,
doc='Pointing direction as a formatted string',
dtype=(str,), max_dim_x=MAX_POINTINGS,
fget=lambda self: ["{0} ({1}, {2})".format(*x) for x in self._pointing_direction_r])
Pointing_timestamp_R = attribute(access=AttrWriteType.READ,
dtype=(numpy.double,), max_dim_x=MAX_POINTINGS,
fget=lambda self: self._pointing_timestamp_r)
Tracking_enabled_R = attribute(access=AttrWriteType.READ,
doc="Whether the tile beam is updated periodically",
dtype=bool,
fget=lambda self: bool(self.Beam_tracker and self.Beam_tracker.is_alive()))
Tracking_enabled_RW = attribute(access=AttrWriteType.READ_WRITE,
doc="Whether the tile beam should be updated periodically",
dtype=bool,
fget=lambda self: self._tracking_enabled_rw)
Duration_compute_weights_R = attribute(access=AttrWriteType.READ,
doc="Time it took to compute weights",
unit="s",
dtype=numpy.float64, fget=lambda self: self._compute_weights.statistics["last"] or 0)
Duration_preparation_period_slack_R = attribute(access=AttrWriteType.READ,
doc="Slack between computing and applying weights",
unit="s",
dtype=numpy.float64, fget=lambda self: self._wait_to_apply_weights.statistics["last"] or 0)
Duration_apply_weights_R = attribute(access=AttrWriteType.READ,
doc="Time it took to upload weights",
unit="s",
dtype=numpy.float64, fget=lambda self: self._apply_weights.statistics["last"] or 0)
def write_Pointing_direction_RW(self, value):
""" Setter method for attribute Pointing_direction_RW """
# verify whether values are valid
if len(value) != self._num_pointings:
raise ValueError(f"Expected {self._num_pointings} directions, got {len(value)}")
for pointing in value:
if not self.generic_delay_calculator.is_valid_direction(pointing):
raise ValueError(f"Invalid direction: {pointing}")
# store the new values
self._pointing_direction_rw = value
# force update across tiles if pointing changes
self.Beam_tracker.force_update()
logger.info("Pointing direction update requested")
def write_Tracking_enabled_RW(self, value):
self._tracking_enabled_rw = value
if value:
self.Beam_tracker.start()
else:
self.Beam_tracker.stop()
def update_pointing(self, timestamp: datetime.datetime):
""" Update the weights for the configured pointings, for the given timestamp, at the given timestamp. """
self._set_pointing(self._pointing_direction_rw, timestamp)
# --------
# abstract interface
# --------
def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array:
"""
Calculate the delay values based on the 2D pointing list (num_pointings x 3) and the timestamp
"""
raise NotImplementedError
@TimeIt()
def _compute_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array:
"""
Calculate and he hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
"""
raise NotImplementedError
@TimeIt()
def _apply_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime, weights: numpy.array) -> numpy.array:
"""
Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated.
"""
raise NotImplementedError
@TimeIt()
def _wait_to_apply_weights(self, timestamp: datetime.datetime):
# expected time required to upload weights to hardware (use last 10 measured durations)
expected_application_time = median(self._apply_weights.statistics["history"][-10:] or [0.1])
# wait until provided time occurs, but don't start sleeping long here
sleep_time = (timestamp - datetime.datetime.now()).total_seconds() - expected_application_time - self.Beam_tracking_application_offset
if sleep_time > 1:
raise ValueError(f"Provided timestamp is too far into the future to apply at real time: {sleep_time} seconds from now.")
time.sleep(max(0, sleep_time))
def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime):
"""
Calculate and Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated.
"""
# prepare weights
weights = self._compute_weights(pointing_direction, timestamp)
# wait until we can apply them
self._wait_to_apply_weights(timestamp)
# upload weights
self._apply_weights(pointing_direction, timestamp, weights)
# --------
# overloaded functions
# --------
def init_device(self):
super().init_device()
# Initialise pointing array data and attribute
self._tracking_enabled_rw = False
# thread to perform beam tracking
self.Beam_tracker = None
# generic delay calculator to ask about validity of settings
self.generic_delay_calculator = delay_calculator([0, 0, 0])
# Derived classes will override this with a non-parameterised
# version that lofar_device will call.
@log_exceptions()
def configure_for_initialise(self, num_pointings):
super().configure_for_initialise()
if not (0 < num_pointings <= self.MAX_POINTINGS):
raise ValueError(f"beam_device is configured to support 0 - {self.MAX_POINTINGS} pointings, but {num_pointings} were requested")
# Initialise tracking control
self._num_pointings = num_pointings
self._pointing_timestamp_r = numpy.zeros(num_pointings, dtype=numpy.double)
self._pointing_direction_r = numpy.zeros((num_pointings, 3), dtype="<U32")
self._pointing_direction_rw = numpy.array([["AZELGEO","0deg","90deg"]] * num_pointings, dtype="<U32")
self._tracking_enabled_rw = self.Tracking_enabled_RW_default
# Create a thread object to update beam weights
self.Beam_tracker = BeamTracker(self.Beam_tracking_interval, self.Beam_tracking_preparation_period, self.update_pointing, self.Fault)
@log_exceptions()
def configure_for_on(self):
super().configure_for_on()
# Start beam tracking thread
if self._tracking_enabled_rw:
self.Beam_tracker.start()
@log_exceptions()
def configure_for_off(self):
if self.Beam_tracker:
# Stop thread object
self.Beam_tracker.stop()
super().configure_for_off()
# --------
# Commands
# --------
@command(dtype_in=DevVarStringArray)
@DebugIt()
@log_exceptions()
@only_in_states(DEFAULT_COMMAND_STATES)
def set_pointing(self, pointing_direction: list):
"""
Compute and uploads the hardware-specific delays based on a given pointing direction
2D array (num_pointings x 3 parameters).
"""
# Reshape the flatten input array
pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3)
self._set_pointing(pointing_direction, datetime.datetime.now())
@command(dtype_in = DevString)
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
def set_pointing_for_specific_time(self, parameters: DevString = None):
"""
Compute and uploads the hardware-specific delays based on a given pointing direction
2D array (num_pointings x 3 parameters) for the given timestamp
Parameters are given in a json document.
pointing_direction: array of 96 casacore pointings (3 string parameters)
timestamp: POSIX timestamp
"""
pointing_parameters = loads(parameters)
pointing_direction = list(pointing_parameters["pointing_direction"])
timestamp_param = float(pointing_parameters["timestamp"])
timestamp = datetime.datetime.fromtimestamp(timestamp_param)
# Reshape the flatten pointing array
pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3)
self._set_pointing(pointing_direction, timestamp)
@command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray)
@DebugIt()
@log_exceptions()
@only_in_states(DEFAULT_COMMAND_STATES)
def delays(self, pointing_direction: numpy.array):
"""
Calculate the delays based on the pointing list and the timestamp
"""
pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3)
delays = self._delays(pointing_direction, datetime.datetime.now())
return delays.flatten()
# --------
# Measures
# --------
# Directory where the casacore measures that we use, reside. We configure ~/.casarc to
# use the symlink /opt/IERS/current, which we switch to the actual set of files to use.
measures_directory_R = attribute(dtype=str, access=AttrWriteType.READ, fget = lambda self: get_measures_directory())
# List of dowloaded measures (the latest 64, anyway)
measures_directories_available_R = attribute(dtype=(str,), max_dim_x=64, access=AttrWriteType.READ, fget = lambda self: sorted(get_available_measures_directories())[-64:])
@command(dtype_out=str, doc_out="Name of newly installed measures directory")
@DebugIt()
@log_exceptions()
def download_measures(self):
""" Download new measures tables into /opt/IERS, but do not activate them.
NOTE: This may take a while to complete. You are advised to increase
the timeout of the proxy using `my_device.set_timeout_millis(10000)`. """
return download_measures()
@command(dtype_in=str, doc_in="Measures directory to activate")
@DebugIt()
@log_exceptions()
def use_measures(self, newdir):
""" Activate a downloaded set of measures tables.
NOTE: This will turn off and restart this device!! """
# switch to requested measures
use_measures_directory(newdir)
logger.info(f"Switched measures table to {newdir}")
# turn off our device, to prepare for a python restart
self.Off()
# restart this program to force casacore to adopt
# the new tables
logger.warning("Restarting device to activate new measures tables")
restart_python()
# ----------
# Run server
# ----------
def main(**kwargs):
"""Main function of the Docker module."""
return entry(beam_device, **kwargs)
# ----------
# Beam Tracker
# ----------
class BeamTracker():
DISCONNECT_TIMEOUT = 3.0
""" Object that encapsulates a Thread, resposible for beam tracking operations """
def __init__(self, interval, preparation_period, update_pointing_callback, fault_callback):
self.thread = None
self.interval = interval
self.preparation_period = preparation_period
self.update_pointing_callback = update_pointing_callback
self.fault_callback = fault_callback
# Condition to trigger a forced update or early abort
self.update_lock = Lock()
self.update_condition = Condition(self.update_lock)
# Whether the pointing has to be forced updated
self.stale_pointing = True
def start(self):
""" Starts the Beam Tracking thread """
if self.thread:
# already started
return
self.done = False
self.thread = Thread(target=self._update_pointing_direction, name="BeamTracker")
self.thread.start()
logger.info("BeamTracking thread started")
def is_alive(self):
""" Returns True just before the Thread run() method starts until just after the Thread run() method terminates. """
return self.thread and self.thread.is_alive()
def force_update(self):
""" Force the pointing to be updated. """
self.stale_pointing = True
self.notify_thread()
def notify_thread(self):
# inform the thread to stop waiting
with self.update_lock:
self.update_condition.notify()
def stop(self):
""" Stops the Beam Tracking loop """
if not self.thread:
return
logger.info("BeamTracking thread stopping")
self.done = True
self.force_update()
# wait for thread to finish
self.thread.join(self.DISCONNECT_TIMEOUT)
if self.is_alive():
logger.error("BeamTracking Thread did not properly terminate")
self.thread = None
logger.info("BeamTracking thread stopped")
def _get_sleep_time(self):
""" Computes the sleep time (in seconds) that needs to be waited for the next beam tracking update """
now = datetime.datetime.now().timestamp()
# Computes the left seconds before the next update
next_update_in = self.interval - (now % self.interval)
# Computes the needed sleep time before the next update
sleep_time = next_update_in - self.preparation_period
# If sleep time is negative, add the tracking interval for the next update
if sleep_time < 0:
return sleep_time + self.interval
else:
return sleep_time
# @fault_on_error routes errors here. we forward them to our device
def Fault(self, msg):
self.fault_callback(msg)
@log_exceptions()
@fault_on_error()
def _update_pointing_direction(self):
""" Updates the beam weights using a fixed interval of time """
# Check if flag beamtracking is true
with self.update_lock:
while not self.done:
# apply next pointing at next interval,
# or immediately if it is stale (just changed).
now = datetime.datetime.now()
if self.stale_pointing:
next_update_in = now
else:
next_update_in = now + datetime.timedelta(seconds=now.timestamp() % self.interval)
# update pointing at requested time
self.stale_pointing = False
self.update_pointing_callback(next_update_in)
# sleep until the next update, or when interrupted (this releases the lock, allowing for notification)
# note that we need wait_for as conditions can be triggered multiple times in succession
self.update_condition.wait_for(lambda: self.done or self.stale_pointing, self._get_sleep_time())