Skip to content
Snippets Groups Projects
Select Git revision
  • ee65fa64a327255530517b27aed9152c4bfc4db3
  • main default protected
  • master
3 results

test

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    beam_device.py 17.46 KiB
    # -*- coding: utf-8 -*-
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    """Beam Abstract Device Server for LOFAR2.0
    
    """
    
    import datetime
    import numpy
    import time
    from json import loads
    from threading import Thread, Lock, Condition
    from statistics import median
    
    # PyTango imports
    from tango.server import attribute, command, device_property
    from tango import AttrWriteType, DebugIt, DevVarStringArray, DevVarDoubleArray, DevString
    
    # Additional import
    from tangostationcontrol.common.entrypoint import entry
    from tangostationcontrol.common.measures import get_measures_directory, get_available_measures_directories, download_measures, use_measures_directory, restart_python
    from tangostationcontrol.common.lofar_logging import log_exceptions
    from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
    from tangostationcontrol.devices.device_decorators import TimeIt, only_in_states, fault_on_error
    from tangostationcontrol.beam.delays import delay_calculator
    from tangostationcontrol.devices.lofar_device import lofar_device
    
    __all__ = ["beam_device", "main", "BeamTracker"]
    
    import logging
    logger = logging.getLogger()
    
    
    class beam_device(lofar_device):
        # -----------------
        # Device Properties
        # -----------------
    
        Beam_tracking_interval = device_property(
            dtype='DevFloat',
            doc='Beam weights updating interval time [seconds]',
            mandatory=False,
            default_value = 10.0
        )
    
        Beam_tracking_application_offset = device_property(
            dtype='DevFloat',
            doc='Amount of time to send the weights earlier than the interval, to allow the hardware to get apply them in time [seconds]',
            mandatory=False,
            default_value = 0.05
        )
    
        Beam_tracking_preparation_period = device_property(
            dtype='DevFloat',
            doc='Preparation time [seconds] needed before starting update operation',
            mandatory=False,
            default_value = 0.4
        )
    
        Tracking_enabled_RW_default = device_property(
            dtype='DevBoolean',
            mandatory=False,
            default_value=True
        )
        
        # ----------
        # Attributes
        # ----------
    
        # Maximum array size to allocate for the pointings,
        # which must be enough for all derived classes.
        #
        # The actual number of pointings for this device
        # will be stored as self._num_pointings.
        MAX_POINTINGS = 1024
    
        Pointing_direction_R = attribute(access=AttrWriteType.READ,
            dtype=((str,),), max_dim_x=3, max_dim_y=MAX_POINTINGS,
            fget=lambda self: self._pointing_direction_r)
    
        Pointing_direction_RW = attribute(access=AttrWriteType.READ_WRITE,
            dtype=((str,),), max_dim_x=3, max_dim_y=MAX_POINTINGS,
            fget=lambda self: self._pointing_direction_rw)
    
        Pointing_direction_str_R = attribute(access=AttrWriteType.READ,
            doc='Pointing direction as a formatted string',
            dtype=(str,), max_dim_x=MAX_POINTINGS,
            fget=lambda self:  ["{0} ({1}, {2})".format(*x) for x in self._pointing_direction_r])
    
        Pointing_timestamp_R = attribute(access=AttrWriteType.READ,
            dtype=(numpy.double,), max_dim_x=MAX_POINTINGS,
            fget=lambda self: self._pointing_timestamp_r)
    
        Tracking_enabled_R = attribute(access=AttrWriteType.READ,
            doc="Whether the tile beam is updated periodically",
            dtype=bool,
            fget=lambda self: bool(self.Beam_tracker and self.Beam_tracker.is_alive()))
    
        Tracking_enabled_RW = attribute(access=AttrWriteType.READ_WRITE,
            doc="Whether the tile beam should be updated periodically",
            dtype=bool,
            fget=lambda self: self._tracking_enabled_rw)
    
        Duration_compute_weights_R = attribute(access=AttrWriteType.READ,
            doc="Time it took to compute weights",
            unit="s",
            dtype=numpy.float64, fget=lambda self: self._compute_weights.statistics["last"] or 0)
    
        Duration_preparation_period_slack_R = attribute(access=AttrWriteType.READ,
            doc="Slack between computing and applying weights",
            unit="s",
            dtype=numpy.float64, fget=lambda self: self._wait_to_apply_weights.statistics["last"] or 0)
    
        Duration_apply_weights_R = attribute(access=AttrWriteType.READ,
            doc="Time it took to upload weights",
            unit="s",
            dtype=numpy.float64, fget=lambda self: self._apply_weights.statistics["last"] or 0)
    
        def write_Pointing_direction_RW(self, value):
            """ Setter method for attribute Pointing_direction_RW """
            # verify whether values are valid
            if len(value) != self._num_pointings:
                raise ValueError(f"Expected {self._num_pointings} directions, got {len(value)}")
    
            for pointing in value:
                if not self.generic_delay_calculator.is_valid_direction(pointing):
                    raise ValueError(f"Invalid direction: {pointing}")
    
            # store the new values
            self._pointing_direction_rw = value
    
            # force update across tiles if pointing changes
            self.Beam_tracker.force_update()
            logger.info("Pointing direction update requested")
    
        def write_Tracking_enabled_RW(self, value):
            self._tracking_enabled_rw = value
    
            if value:
                self.Beam_tracker.start()
            else:
                self.Beam_tracker.stop()
    
    
        def update_pointing(self, timestamp: datetime.datetime):
            """ Update the weights for the configured pointings, for the given timestamp, at the given timestamp. """
            self._set_pointing(self._pointing_direction_rw, timestamp)
    
        # --------
        # abstract interface
        # --------
    
        def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array:
            """
            Calculate the delay values based on the 2D pointing list (num_pointings x 3) and the timestamp
            """
    
            raise NotImplementedError
    
        @TimeIt()
        def _compute_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array:
            """
            Calculate and he hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
            """
    
            raise NotImplementedError
    
        @TimeIt()
        def _apply_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime, weights: numpy.array) -> numpy.array:
            """
            Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
    
            Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated.
            """
    
            raise NotImplementedError
    
        @TimeIt()
        def _wait_to_apply_weights(self, timestamp: datetime.datetime):
            # expected time required to upload weights to hardware (use last 10 measured durations)
            expected_application_time = median(self._apply_weights.statistics["history"][-10:] or [0.1])
    
            # wait until provided time occurs, but don't start sleeping long here
            sleep_time = (timestamp - datetime.datetime.now()).total_seconds() - expected_application_time - self.Beam_tracking_application_offset
            if sleep_time > 1:
                raise ValueError(f"Provided timestamp is too far into the future to apply at real time: {sleep_time} seconds from now.")
    
            time.sleep(max(0, sleep_time))
    
        def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime):
            """
            Calculate and Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
    
            Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated.
            """
    
            # prepare weights
            weights = self._compute_weights(pointing_direction, timestamp)
    
            # wait until we can apply them
            self._wait_to_apply_weights(timestamp)
    
            # upload weights
            self._apply_weights(pointing_direction, timestamp, weights)
    
        # --------
        # overloaded functions
        # --------
    
        def init_device(self):
            super().init_device()
    
            # Initialise pointing array data and attribute
            self._tracking_enabled_rw      = False
    
            # thread to perform beam tracking
            self.Beam_tracker = None
    
            # generic delay calculator to ask about validity of settings
            self.generic_delay_calculator = delay_calculator([0, 0, 0])
    
        # Derived  classes will override this with a non-parameterised
        # version that lofar_device will call.
        @log_exceptions()
        def configure_for_initialise(self, num_pointings):
            super().configure_for_initialise()
    
            if not (0 < num_pointings <= self.MAX_POINTINGS):
                raise ValueError(f"beam_device is configured to support 0 - {self.MAX_POINTINGS} pointings, but {num_pointings} were requested")
    
            # Initialise tracking control
            self._num_pointings            = num_pointings
            self._pointing_timestamp_r     = numpy.zeros(num_pointings, dtype=numpy.double)
            self._pointing_direction_r     = numpy.zeros((num_pointings, 3), dtype="<U32")
            self._pointing_direction_rw    = numpy.array([["AZELGEO","0deg","90deg"]] * num_pointings, dtype="<U32")
            self._tracking_enabled_rw      = self.Tracking_enabled_RW_default
    
            # Create a thread object to update beam weights
            self.Beam_tracker = BeamTracker(self.Beam_tracking_interval, self.Beam_tracking_preparation_period, self.update_pointing, self.Fault)
    
        @log_exceptions()
        def configure_for_on(self):
            super().configure_for_on()
    
            # Start beam tracking thread
            if self._tracking_enabled_rw:
                self.Beam_tracker.start()
    
        @log_exceptions()
        def configure_for_off(self):
            if self.Beam_tracker:
                # Stop thread object
                self.Beam_tracker.stop()
    
            super().configure_for_off()
    
    
        # --------
        # Commands
        # --------
    
        @command(dtype_in=DevVarStringArray)
        @DebugIt()
        @log_exceptions()
        @only_in_states(DEFAULT_COMMAND_STATES)
        def set_pointing(self, pointing_direction: list):
            """
            Compute and uploads the hardware-specific delays based on a given pointing direction
            2D array (num_pointings x 3 parameters).
            """
    
            # Reshape the flatten input array
            pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3)
    
            self._set_pointing(pointing_direction, datetime.datetime.now())
    
        @command(dtype_in = DevString)
        @DebugIt()
        @only_in_states(DEFAULT_COMMAND_STATES)
        def set_pointing_for_specific_time(self, parameters: DevString = None):
            """
            Compute and uploads the hardware-specific delays based on a given pointing direction
            2D array (num_pointings x 3 parameters) for the given timestamp
    
            Parameters are given in a json document.
            pointing_direction: array of 96 casacore pointings (3 string parameters)
            timestamp: POSIX timestamp
            """
            pointing_parameters = loads(parameters)
    
            pointing_direction = list(pointing_parameters["pointing_direction"])
            timestamp_param = float(pointing_parameters["timestamp"])
            timestamp = datetime.datetime.fromtimestamp(timestamp_param)
    
            # Reshape the flatten pointing array
            pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3)
    
            self._set_pointing(pointing_direction, timestamp)
    
        
        @command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray)
        @DebugIt()
        @log_exceptions()
        @only_in_states(DEFAULT_COMMAND_STATES)
        def delays(self, pointing_direction: numpy.array):
            """
            Calculate the delays based on the pointing list and the timestamp
            """
    
            pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3)
    
            delays = self._delays(pointing_direction, datetime.datetime.now())
    
            return delays.flatten()
    
        # --------
        # Measures
        # --------
    
        # Directory where the casacore measures that we use, reside. We configure ~/.casarc to
        # use the symlink /opt/IERS/current, which we switch to the actual set of files to use.
        measures_directory_R = attribute(dtype=str, access=AttrWriteType.READ, fget = lambda self: get_measures_directory())
    
        # List of dowloaded measures (the latest 64, anyway)
        measures_directories_available_R = attribute(dtype=(str,), max_dim_x=64, access=AttrWriteType.READ, fget = lambda self: sorted(get_available_measures_directories())[-64:])
    
        @command(dtype_out=str, doc_out="Name of newly installed measures directory")
        @DebugIt()
        @log_exceptions()
        def download_measures(self):
            """ Download new measures tables into /opt/IERS, but do not activate them.
            
                NOTE: This may take a while to complete. You are advised to increase
                      the timeout of the proxy using `my_device.set_timeout_millis(10000)`. """
    
            return download_measures()
        
        @command(dtype_in=str, doc_in="Measures directory to activate")
        @DebugIt()
        @log_exceptions()
        def use_measures(self, newdir):
            """ Activate a downloaded set of measures tables.
            
                NOTE: This will turn off and restart this device!! """
    
            # switch to requested measures
            use_measures_directory(newdir)
            logger.info(f"Switched measures table to {newdir}")
    
            # turn off our device, to prepare for a python restart
            self.Off()
    
            # restart this program to force casacore to adopt
            # the new tables
            logger.warning("Restarting device to activate new measures tables")
            restart_python()
    
    # ----------
    # Run server
    # ----------
    def main(**kwargs):
        """Main function of the Docker module."""
        return entry(beam_device, **kwargs)
    
    # ----------
    # Beam Tracker
    # ---------- 
    
    class BeamTracker():
        
        DISCONNECT_TIMEOUT = 3.0
    
        """ Object that encapsulates a Thread, resposible for beam tracking operations """
        def __init__(self, interval, preparation_period, update_pointing_callback, fault_callback):
            self.thread = None
            self.interval = interval
            self.preparation_period = preparation_period
            self.update_pointing_callback = update_pointing_callback
            self.fault_callback = fault_callback
    
            # Condition to trigger a forced update or early abort
            self.update_lock = Lock()
            self.update_condition = Condition(self.update_lock)
    
            # Whether the pointing has to be forced updated
            self.stale_pointing = True
        
        def start(self):
            """ Starts the Beam Tracking thread """
            if self.thread:
                # already started
                return
    
            self.done = False
            self.thread = Thread(target=self._update_pointing_direction, name="BeamTracker")
            self.thread.start()
    
            logger.info("BeamTracking thread started")
        
        def is_alive(self):
            """ Returns True just before the Thread run() method starts until just after the Thread run() method terminates. """
            return self.thread and self.thread.is_alive()
    
        def force_update(self):
            """ Force the pointing to be updated. """
    
            self.stale_pointing = True
            self.notify_thread()
    
        def notify_thread(self):
            # inform the thread to stop waiting
            with self.update_lock:
                self.update_condition.notify()
        
        def stop(self):
            """ Stops the Beam Tracking loop """
    
            if not self.thread:
                return
            
            logger.info("BeamTracking thread stopping")
    
            self.done = True
            self.force_update()
    
            # wait for thread to finish
            self.thread.join(self.DISCONNECT_TIMEOUT)
    
            if self.is_alive():
                logger.error("BeamTracking Thread did not properly terminate")
    
            self.thread = None
            
            logger.info("BeamTracking thread stopped")
        
        def _get_sleep_time(self):
            """ Computes the sleep time (in seconds) that needs to be waited for the next beam tracking update """  
            now = datetime.datetime.now().timestamp()
    
            # Computes the left seconds before the next update 
            next_update_in = self.interval - (now % self.interval)
    
            # Computes the needed sleep time before the next update
            sleep_time = next_update_in - self.preparation_period
            # If sleep time is negative, add the tracking interval for the next update
            if sleep_time < 0:
                return sleep_time + self.interval
            else:
                return sleep_time
    
        # @fault_on_error routes errors here. we forward them to our device
        def Fault(self, msg):
            self.fault_callback(msg)
       
        @log_exceptions()
        @fault_on_error()
        def _update_pointing_direction(self):
            """ Updates the beam weights using a fixed interval of time """
    
            # Check if flag beamtracking is true
            with self.update_lock:
                while not self.done:
    
                    # apply next pointing at next interval,
                    # or immediately if it is stale (just changed).
                    now = datetime.datetime.now()
                    if self.stale_pointing:
                        next_update_in = now
                    else:
                        next_update_in = now + datetime.timedelta(seconds=now.timestamp() % self.interval)
    
                    # update pointing at requested time
                    self.stale_pointing = False
                    self.update_pointing_callback(next_update_in)
    
                    # sleep until the next update, or when interrupted (this releases the lock, allowing for notification)
                    # note that we need wait_for as conditions can be triggered multiple times in succession
                    self.update_condition.wait_for(lambda: self.done or self.stale_pointing, self._get_sleep_time())