Skip to content
Snippets Groups Projects
Commit d1002a43 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'master' into L2SS-827-implement-antenna-state

parents 222d8451 e16b2d1d
No related branches found
No related tags found
1 merge request!389Resolve L2SS-827 "Implement antenna state"
......@@ -145,7 +145,7 @@
"AntennaField": {
"STAT": {
"AntennaField": {
"STAT/AntennaField/1": {
"STAT/AntennaField/2": {
"properties": {
"RECV_devices": [
"STAT/RECV/1"
......@@ -188,7 +188,7 @@
]
}
},
"STAT/AntennaField/2": {
"STAT/AntennaField/1": {
"properties": {
"RECV_devices": [
"STAT/RECV/1"
......@@ -237,8 +237,11 @@
"DigitalBeam": {
"STAT": {
"DigitalBeam": {
"STAT/DigitalBeam/1": {
"STAT/DigitalBeam/2": {
"properties": {
"AntennaField_Device": [
"STAT/AntennaField/2"
],
"Input_to_Antenna_Mapping": [
"-1", "-1", "-1", "-1", "-1", "-1",
"-1", "-1", "-1", "-1", "-1", "-1",
......@@ -259,10 +262,10 @@
]
}
},
"STAT/DigitalBeam/2": {
"STAT/DigitalBeam/1": {
"properties": {
"AntennaField_Device": [
"STAT/AntennaField/2"
"STAT/AntennaField/1"
],
"Input_to_Antenna_Mapping": [
"0", "1", "2", "3", "4", "5",
......
......@@ -37,8 +37,33 @@ class ArchiverPolicy(object):
def __init__(self, config: dict = None):
self.config = config or self.EMPTY_POLICY
def device_list(self) -> list:
""" Retrieve the device list from TangoDB """
device_list = []
db = Database()
server_list = db.get_server_list() # e.g. SDP/STAT, RECV/STAT
for server in server_list:
# https://pytango.readthedocs.io/en/stable/database.html#tango.Database.get_device_class_list
class_list = db.get_device_class_list(server)
for cls in class_list[::2]:
if "dserver" in cls:
continue
device_list.append(cls.lower())
return device_list
def devices(self) -> list:
return list(self.config["devices"].keys())
""" Filter the device list from TangoDB following the lofar2-policy file """
# Devices list from TangoDB
db_devices = self.device_list()
# Devices listed in policy file
config_devices = list(k.lower() for k in self.config["devices"].keys())
# Match device names fetched from DB against device names in policy file
devices = []
for config_dev in config_devices:
for db_dev in db_devices:
if fnmatch.fnmatch(db_dev, config_dev):
devices.append(db_dev)
return devices
def attribute_list(self, device_name: str, attribute_list: list) -> dict:
""" Return the full set of archiving policy for the given device. """
......@@ -184,7 +209,7 @@ class CustomCollector(object):
attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['station', 'device', 'name', 'str_value', 'type', 'x', 'y', 'idx'])
scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['station', 'device'])
for device_name in self.policy.devices():
logger.debug(f"Processing device {device_name}")
dev_scrape_begin = time.time()
......
......@@ -29,6 +29,10 @@
},
"STAT/Docker/1": {
},
"STAT/Observation/*":{
},
"STAT/ObservationControl/1":{
},
"STAT/PSOC/1": {
},
"STAT/PCON/1": {
......
......@@ -105,6 +105,39 @@ We use `python-casacore <https://casacore.github.io/python-casacore/index.html>`
:returns: ``(does not return)``
Timing
"""""""""""""""""""""""""""""""""
The beam tracking applies an update each *interval*, and aims to apply it at timestamps ``(now % Beam_tracking_interval) - Beam_tracking_application_offset``. To do so, it starts its computations every interval ``Beam_tracking_preparation_period`` seconds before. It then starts to compute the weights, waits to apply them, and applies them by uploading the weights to the underlying hardware.
The following properties are used:
:Beam_tracking_interval: Update the beam tracking at this interval (seconds).
:type: ``float``
:Beam_tracking_application_offset: Update the beam tracking this amount of time before the next interval (seconds).
:type: ``float``
:Beam_tracking_preparation_period: Prepare time for each period to compute and upload the weights (seconds).
:type: ``float``
The following timers allow you to track the durations of each stage:
:Duration_compute_weights_R: Amount of time it took to compute the last weights (seconds).
:type: ``float``
:Duration_preparation_period_slack_R: Amount of time left in the prepration period between computing and uploading the weights (seconds).
:type: ``float``
:Duration_apply_weights_R: Amount of time it took to apply (upload) the weights (seconds).
:type: ``float``
DigitalBeam
`````````````````````
......
......@@ -9,8 +9,10 @@
import datetime
import numpy
import time
from json import loads
from threading import Thread, Lock, Condition
from statistics import median
# PyTango imports
from tango.server import attribute, command, device_property
......@@ -43,11 +45,18 @@ class beam_device(lofar_device):
default_value = 10.0
)
Beam_tracking_application_offset = device_property(
dtype='DevFloat',
doc='Amount of time to send the weights earlier than the interval, to allow the hardware to get apply them in time [seconds]',
mandatory=False,
default_value = 0.05
)
Beam_tracking_preparation_period = device_property(
dtype='DevFloat',
doc='Preparation time [seconds] needed before starting update operation',
mandatory=False,
default_value = 0.25
default_value = 0.4
)
Tracking_enabled_RW_default = device_property(
......@@ -94,8 +103,20 @@ class beam_device(lofar_device):
dtype=bool,
fget=lambda self: self._tracking_enabled_rw)
Duration_update_pointing_R = attribute(access=AttrWriteType.READ,
dtype=numpy.float64, fget=lambda self: self.update_pointing.statistics["last"] or 0)
Duration_compute_weights_R = attribute(access=AttrWriteType.READ,
doc="Time it took to compute weights",
unit="s",
dtype=numpy.float64, fget=lambda self: self._compute_weights.statistics["last"] or 0)
Duration_preparation_period_slack_R = attribute(access=AttrWriteType.READ,
doc="Slack between computing and applying weights",
unit="s",
dtype=numpy.float64, fget=lambda self: self._wait_to_apply_weights.statistics["last"] or 0)
Duration_apply_weights_R = attribute(access=AttrWriteType.READ,
doc="Time it took to upload weights",
unit="s",
dtype=numpy.float64, fget=lambda self: self._apply_weights.statistics["last"] or 0)
def write_Pointing_direction_RW(self, value):
""" Setter method for attribute Pointing_direction_RW """
......@@ -123,9 +144,8 @@ class beam_device(lofar_device):
self.Beam_tracker.stop()
@TimeIt()
def update_pointing(self, timestamp: datetime.datetime):
""" Update the weights for the configured pointings, for the given timestamp. """
""" Update the weights for the configured pointings, for the given timestamp, at the given timestamp. """
self._set_pointing(self._pointing_direction_rw, timestamp)
# --------
......@@ -139,6 +159,36 @@ class beam_device(lofar_device):
raise NotImplementedError
@TimeIt()
def _compute_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array:
"""
Calculate and he hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
"""
raise NotImplementedError
@TimeIt()
def _apply_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime, weights: numpy.array) -> numpy.array:
"""
Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated.
"""
raise NotImplementedError
@TimeIt()
def _wait_to_apply_weights(self, timestamp: datetime.datetime):
# expected time required to upload weights to hardware (use last 10 measured durations)
expected_application_time = median(self._apply_weights.statistics["history"][-10:] or [0.1])
# wait until provided time occurs, but don't start sleeping long here
sleep_time = (timestamp - datetime.datetime.now()).total_seconds() - expected_application_time - self.Beam_tracking_application_offset
if sleep_time > 1:
raise ValueError(f"Provided timestamp is too far into the future to apply at real time: {sleep_time} seconds from now.")
time.sleep(max(0, sleep_time))
def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime):
"""
Calculate and Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp
......@@ -146,7 +196,14 @@ class beam_device(lofar_device):
Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated.
"""
raise NotImplementedError
# prepare weights
weights = self._compute_weights(pointing_direction, timestamp)
# wait until we can apply them
self._wait_to_apply_weights(timestamp)
# upload weights
self._apply_weights(pointing_direction, timestamp, weights)
# --------
# overloaded functions
......@@ -405,8 +462,18 @@ class BeamTracker():
# Check if flag beamtracking is true
with self.update_lock:
while not self.done:
# apply next pointing at next interval,
# or immediately if it is stale (just changed).
now = datetime.datetime.now()
if self.stale_pointing:
next_update_in = now
else:
next_update_in = now + datetime.timedelta(seconds=now.timestamp() % self.interval)
# update pointing at requested time
self.stale_pointing = False
self.update_pointing_callback(datetime.datetime.now())
self.update_pointing_callback(next_update_in)
# sleep until the next update, or when interrupted (this releases the lock, allowing for notification)
# note that we need wait_for as conditions can be triggered multiple times in succession
......
......@@ -191,7 +191,8 @@ class DigitalBeam(beam_device):
return result
def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime):
@TimeIt()
def _compute_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array:
"""
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
......@@ -208,6 +209,13 @@ class DigitalBeam(beam_device):
# Filter out unwanted antennas
beam_weights *= self._map_inputs_on_polarised_inputs(self._input_select)
return beam_weights
@TimeIt()
def _apply_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime, beam_weights: numpy.array):
"""
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
# Write weights to SDP
self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = beam_weights
......
......@@ -19,6 +19,7 @@ from tangostationcontrol.common.lofar_logging import device_logging_to_python, l
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.beam.hba_tile import NUMBER_OF_ELEMENTS_PER_TILE
from tangostationcontrol.devices.beam_device import beam_device
from tangostationcontrol.devices.device_decorators import TimeIt
import logging
logger = logging.getLogger()
......@@ -88,7 +89,8 @@ class TileBeam(beam_device):
return delays
def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime):
@TimeIt()
def _compute_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array:
"""
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
......@@ -98,10 +100,14 @@ class TileBeam(beam_device):
# Convert delays into beam weights
delays = delays.flatten()
HBAT_bf_delay_steps = self.antennafield_proxy.calculate_HBAT_bf_delay_steps(delays)
bf_delay_steps = self.antennafield_proxy.calculate_HBAT_bf_delay_steps(delays)
return bf_delay_steps
@TimeIt()
def _apply_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime, bf_delay_steps: numpy.array):
# Write weights to RECV through the HBATToRecvMapper
self.antennafield_proxy.HBAT_bf_delay_steps_RW = HBAT_bf_delay_steps.reshape(self._nr_tiles, NUMBER_OF_ELEMENTS_PER_TILE * 2)
self.antennafield_proxy.HBAT_bf_delay_steps_RW = bf_delay_steps.reshape(self._nr_tiles, NUMBER_OF_ELEMENTS_PER_TILE * 2)
# Record where we now point to, now that we've updated the weights.
# Only the entries within the mask have been updated
......
......@@ -46,9 +46,9 @@ commands =
{envpython} -m coverage erase
{envpython} -m stestr run {posargs}
{envpython} -m coverage combine
{envpython} -m coverage html -d cover
{envpython} -m coverage html --omit='*test*' -d cover
{envpython} -m coverage xml -o coverage.xml
{envpython} -m coverage report
{envpython} -m coverage report --omit='*test*'
; TODO(Corne): Integrate Hacking to customize pep8 rules
[testenv:pep8]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment