diff --git a/CDB/stations/DTS_Outside_ConfigDb.json b/CDB/stations/DTS_Outside_ConfigDb.json index dbff3fda62b2b76e5714a81c12f94d4b75502eea..ed3315567700d954413967c4b9338f14fe676978 100644 --- a/CDB/stations/DTS_Outside_ConfigDb.json +++ b/CDB/stations/DTS_Outside_ConfigDb.json @@ -145,7 +145,7 @@ "AntennaField": { "STAT": { "AntennaField": { - "STAT/AntennaField/1": { + "STAT/AntennaField/2": { "properties": { "RECV_devices": [ "STAT/RECV/1" @@ -188,7 +188,7 @@ ] } }, - "STAT/AntennaField/2": { + "STAT/AntennaField/1": { "properties": { "RECV_devices": [ "STAT/RECV/1" @@ -237,8 +237,11 @@ "DigitalBeam": { "STAT": { "DigitalBeam": { - "STAT/DigitalBeam/1": { + "STAT/DigitalBeam/2": { "properties": { + "AntennaField_Device": [ + "STAT/AntennaField/2" + ], "Input_to_Antenna_Mapping": [ "-1", "-1", "-1", "-1", "-1", "-1", "-1", "-1", "-1", "-1", "-1", "-1", @@ -259,10 +262,10 @@ ] } }, - "STAT/DigitalBeam/2": { + "STAT/DigitalBeam/1": { "properties": { "AntennaField_Device": [ - "STAT/AntennaField/2" + "STAT/AntennaField/1" ], "Input_to_Antenna_Mapping": [ "0", "1", "2", "3", "4", "5", diff --git a/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py b/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py index 82b8759b076761aa15e0b553d24356c929742fd9..256f78bf155b7a3a17bd70e426478dd56bfc8182 100644 --- a/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py +++ b/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py @@ -37,8 +37,33 @@ class ArchiverPolicy(object): def __init__(self, config: dict = None): self.config = config or self.EMPTY_POLICY + def device_list(self) -> list: + """ Retrieve the device list from TangoDB """ + device_list = [] + db = Database() + server_list = db.get_server_list() # e.g. SDP/STAT, RECV/STAT + for server in server_list: + # https://pytango.readthedocs.io/en/stable/database.html#tango.Database.get_device_class_list + class_list = db.get_device_class_list(server) + for cls in class_list[::2]: + if "dserver" in cls: + continue + device_list.append(cls.lower()) + return device_list + def devices(self) -> list: - return list(self.config["devices"].keys()) + """ Filter the device list from TangoDB following the lofar2-policy file """ + # Devices list from TangoDB + db_devices = self.device_list() + # Devices listed in policy file + config_devices = list(k.lower() for k in self.config["devices"].keys()) + # Match device names fetched from DB against device names in policy file + devices = [] + for config_dev in config_devices: + for db_dev in db_devices: + if fnmatch.fnmatch(db_dev, config_dev): + devices.append(db_dev) + return devices def attribute_list(self, device_name: str, attribute_list: list) -> dict: """ Return the full set of archiving policy for the given device. """ @@ -184,7 +209,7 @@ class CustomCollector(object): attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['station', 'device', 'name', 'str_value', 'type', 'x', 'y', 'idx']) scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['station', 'device']) - + for device_name in self.policy.devices(): logger.debug(f"Processing device {device_name}") dev_scrape_begin = time.time() diff --git a/docker-compose/tango-prometheus-exporter/lofar2-policy.json b/docker-compose/tango-prometheus-exporter/lofar2-policy.json index 5ce4117166e120d82b85b85025371a6d4af568c8..994d9dd1877b87ab7ccecbcfe325c97333dd7f92 100644 --- a/docker-compose/tango-prometheus-exporter/lofar2-policy.json +++ b/docker-compose/tango-prometheus-exporter/lofar2-policy.json @@ -29,6 +29,10 @@ }, "STAT/Docker/1": { }, + "STAT/Observation/*":{ + }, + "STAT/ObservationControl/1":{ + }, "STAT/PSOC/1": { }, "STAT/PCON/1": { diff --git a/tangostationcontrol/docs/source/devices/tilebeam-digitalbeam.rst b/tangostationcontrol/docs/source/devices/tilebeam-digitalbeam.rst index 4089d082f4b5a62fd663673983ad03c8fa0f4710..eeccd01ea74f10ab6f1d34fbf2b64cf9e79f4059 100644 --- a/tangostationcontrol/docs/source/devices/tilebeam-digitalbeam.rst +++ b/tangostationcontrol/docs/source/devices/tilebeam-digitalbeam.rst @@ -105,6 +105,39 @@ We use `python-casacore <https://casacore.github.io/python-casacore/index.html>` :returns: ``(does not return)`` +Timing +""""""""""""""""""""""""""""""""" + +The beam tracking applies an update each *interval*, and aims to apply it at timestamps ``(now % Beam_tracking_interval) - Beam_tracking_application_offset``. To do so, it starts its computations every interval ``Beam_tracking_preparation_period`` seconds before. It then starts to compute the weights, waits to apply them, and applies them by uploading the weights to the underlying hardware. + +The following properties are used: + +:Beam_tracking_interval: Update the beam tracking at this interval (seconds). + + :type: ``float`` + +:Beam_tracking_application_offset: Update the beam tracking this amount of time before the next interval (seconds). + + :type: ``float`` + +:Beam_tracking_preparation_period: Prepare time for each period to compute and upload the weights (seconds). + + :type: ``float`` + +The following timers allow you to track the durations of each stage: + +:Duration_compute_weights_R: Amount of time it took to compute the last weights (seconds). + + :type: ``float`` + +:Duration_preparation_period_slack_R: Amount of time left in the prepration period between computing and uploading the weights (seconds). + + :type: ``float`` + +:Duration_apply_weights_R: Amount of time it took to apply (upload) the weights (seconds). + + :type: ``float`` + DigitalBeam ````````````````````` diff --git a/tangostationcontrol/tangostationcontrol/devices/beam_device.py b/tangostationcontrol/tangostationcontrol/devices/beam_device.py index 1d5e7143fbc7afd2385c063b4aab04b30347c00f..c8ccf78e649b194b9efd0a2e97231310cf091581 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam_device.py @@ -9,8 +9,10 @@ import datetime import numpy +import time from json import loads from threading import Thread, Lock, Condition +from statistics import median # PyTango imports from tango.server import attribute, command, device_property @@ -43,11 +45,18 @@ class beam_device(lofar_device): default_value = 10.0 ) + Beam_tracking_application_offset = device_property( + dtype='DevFloat', + doc='Amount of time to send the weights earlier than the interval, to allow the hardware to get apply them in time [seconds]', + mandatory=False, + default_value = 0.05 + ) + Beam_tracking_preparation_period = device_property( dtype='DevFloat', doc='Preparation time [seconds] needed before starting update operation', mandatory=False, - default_value = 0.25 + default_value = 0.4 ) Tracking_enabled_RW_default = device_property( @@ -94,8 +103,20 @@ class beam_device(lofar_device): dtype=bool, fget=lambda self: self._tracking_enabled_rw) - Duration_update_pointing_R = attribute(access=AttrWriteType.READ, - dtype=numpy.float64, fget=lambda self: self.update_pointing.statistics["last"] or 0) + Duration_compute_weights_R = attribute(access=AttrWriteType.READ, + doc="Time it took to compute weights", + unit="s", + dtype=numpy.float64, fget=lambda self: self._compute_weights.statistics["last"] or 0) + + Duration_preparation_period_slack_R = attribute(access=AttrWriteType.READ, + doc="Slack between computing and applying weights", + unit="s", + dtype=numpy.float64, fget=lambda self: self._wait_to_apply_weights.statistics["last"] or 0) + + Duration_apply_weights_R = attribute(access=AttrWriteType.READ, + doc="Time it took to upload weights", + unit="s", + dtype=numpy.float64, fget=lambda self: self._apply_weights.statistics["last"] or 0) def write_Pointing_direction_RW(self, value): """ Setter method for attribute Pointing_direction_RW """ @@ -123,9 +144,8 @@ class beam_device(lofar_device): self.Beam_tracker.stop() - @TimeIt() def update_pointing(self, timestamp: datetime.datetime): - """ Update the weights for the configured pointings, for the given timestamp. """ + """ Update the weights for the configured pointings, for the given timestamp, at the given timestamp. """ self._set_pointing(self._pointing_direction_rw, timestamp) # -------- @@ -139,6 +159,36 @@ class beam_device(lofar_device): raise NotImplementedError + @TimeIt() + def _compute_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array: + """ + Calculate and he hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp + """ + + raise NotImplementedError + + @TimeIt() + def _apply_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime, weights: numpy.array) -> numpy.array: + """ + Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp + + Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated. + """ + + raise NotImplementedError + + @TimeIt() + def _wait_to_apply_weights(self, timestamp: datetime.datetime): + # expected time required to upload weights to hardware (use last 10 measured durations) + expected_application_time = median(self._apply_weights.statistics["history"][-10:] or [0.1]) + + # wait until provided time occurs, but don't start sleeping long here + sleep_time = (timestamp - datetime.datetime.now()).total_seconds() - expected_application_time - self.Beam_tracking_application_offset + if sleep_time > 1: + raise ValueError(f"Provided timestamp is too far into the future to apply at real time: {sleep_time} seconds from now.") + + time.sleep(max(0, sleep_time)) + def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): """ Calculate and Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp @@ -146,7 +196,14 @@ class beam_device(lofar_device): Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated. """ - raise NotImplementedError + # prepare weights + weights = self._compute_weights(pointing_direction, timestamp) + + # wait until we can apply them + self._wait_to_apply_weights(timestamp) + + # upload weights + self._apply_weights(pointing_direction, timestamp, weights) # -------- # overloaded functions @@ -405,8 +462,18 @@ class BeamTracker(): # Check if flag beamtracking is true with self.update_lock: while not self.done: + + # apply next pointing at next interval, + # or immediately if it is stale (just changed). + now = datetime.datetime.now() + if self.stale_pointing: + next_update_in = now + else: + next_update_in = now + datetime.timedelta(seconds=now.timestamp() % self.interval) + + # update pointing at requested time self.stale_pointing = False - self.update_pointing_callback(datetime.datetime.now()) + self.update_pointing_callback(next_update_in) # sleep until the next update, or when interrupted (this releases the lock, allowing for notification) # note that we need wait_for as conditions can be triggered multiple times in succession diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 7bb139292584d117442cfff418023f100d0dcf5f..0b44a14292c9e2f0e053001a6e071adcd977f978 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -189,7 +189,8 @@ class DigitalBeam(beam_device): return result - def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): + @TimeIt() + def _compute_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array: """ Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) """ @@ -206,6 +207,13 @@ class DigitalBeam(beam_device): # Filter out unwanted antennas beam_weights *= self._map_inputs_on_polarised_inputs(self._input_select) + return beam_weights + + @TimeIt() + def _apply_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime, beam_weights: numpy.array): + """ + Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) + """ # Write weights to SDP self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = beam_weights diff --git a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py index 34a5a6924f99ef7bbc43c37bb1abbfabd3c3276b..d8e98ad75c550ed85b5d6a23cc0c8c7543637011 100644 --- a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py @@ -19,6 +19,7 @@ from tangostationcontrol.common.lofar_logging import device_logging_to_python, l from tangostationcontrol.beam.delays import delay_calculator from tangostationcontrol.beam.hba_tile import NUMBER_OF_ELEMENTS_PER_TILE from tangostationcontrol.devices.beam_device import beam_device +from tangostationcontrol.devices.device_decorators import TimeIt import logging logger = logging.getLogger() @@ -88,7 +89,8 @@ class TileBeam(beam_device): return delays - def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): + @TimeIt() + def _compute_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array: """ Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) """ @@ -98,10 +100,14 @@ class TileBeam(beam_device): # Convert delays into beam weights delays = delays.flatten() - HBAT_bf_delay_steps = self.antennafield_proxy.calculate_HBAT_bf_delay_steps(delays) + bf_delay_steps = self.antennafield_proxy.calculate_HBAT_bf_delay_steps(delays) + + return bf_delay_steps + @TimeIt() + def _apply_weights(self, pointing_direction: numpy.array, timestamp: datetime.datetime, bf_delay_steps: numpy.array): # Write weights to RECV through the HBATToRecvMapper - self.antennafield_proxy.HBAT_bf_delay_steps_RW = HBAT_bf_delay_steps.reshape(self._nr_tiles, NUMBER_OF_ELEMENTS_PER_TILE * 2) + self.antennafield_proxy.HBAT_bf_delay_steps_RW = bf_delay_steps.reshape(self._nr_tiles, NUMBER_OF_ELEMENTS_PER_TILE * 2) # Record where we now point to, now that we've updated the weights. # Only the entries within the mask have been updated diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index d13b16a1ef4cdfbc8e2e02d22805fa92314fb3bf..c62076477d0596d2fbf9993b9e81db69a15c12d7 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -36,8 +36,18 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): beamlet_proxy.warm_boot() beamlet_proxy.set_defaults() return beamlet_proxy + + def setup_sdp_proxy(self): + # setup SDP, on which this device depends + sdp_proxy = TestDeviceProxy("STAT/SDP/1") + sdp_proxy.off() + sdp_proxy.warm_boot() + sdp_proxy.set_defaults() + return sdp_proxy def test_pointing_to_zenith(self): + self.setup_sdp_proxy() + self.setup_recv_proxy() # Setup beamlet configuration self.beamlet_proxy.clock_RW = 200 * 1000000 self.beamlet_proxy.subband_select = list(range(488)) diff --git a/tangostationcontrol/tox.ini b/tangostationcontrol/tox.ini index 62b18988c5b4289ae1c12b3283d7e1b4b52fd8f7..74e93462b8e97325c12e81b53f372b34961a05e1 100644 --- a/tangostationcontrol/tox.ini +++ b/tangostationcontrol/tox.ini @@ -46,9 +46,9 @@ commands = {envpython} -m coverage erase {envpython} -m stestr run {posargs} {envpython} -m coverage combine - {envpython} -m coverage html -d cover + {envpython} -m coverage html --omit='*test*' -d cover {envpython} -m coverage xml -o coverage.xml - {envpython} -m coverage report + {envpython} -m coverage report --omit='*test*' ; TODO(Corne): Integrate Hacking to customize pep8 rules [testenv:pep8]