diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index de8a85e3ba87d0e659d0ecb96e6457e6c48fab46..affcc9d323ab51b3ac5eb3c2c79db344d089cc78 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -105,7 +105,6 @@ docker_build_image_all: - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh archiver-timescale latest - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh hdbppts-cm latest - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh hdbppts-es latest - docker_build_image_elk: extends: .base_docker_images rules: @@ -391,7 +390,7 @@ docker_build_image_device_docker: script: # Do not remove 'bash' or statement will be ignored by primitive docker shell - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh device-docker $tag -docker_build_image_device_ovservation_control: +docker_build_image_device_observation_control: extends: .base_docker_images rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event"' @@ -600,7 +599,7 @@ sphinx-documentation: stage: documentation before_script: - sudo apt-get update - - sudo apt-get install -y git + - sudo apt-get install -y git graphviz script: - cd tangostationcontrol - tox -e docs diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json index 9c243aab268d9ca13f37bfe3d7af9cfbba8222de..de32af15daedc471ada0abd83ae9c1ec4e993346 100644 --- a/CDB/LOFAR_ConfigDb.json +++ b/CDB/LOFAR_ConfigDb.json @@ -90,6 +90,12 @@ "DigitalBeam": { "STAT/DigitalBeam/1": { "properties": { + "Beam_tracking_interval": [ + "1.0" + ], + "Beam_tracking_preparation_period": [ + "0.5" + ] } } } diff --git a/docker-compose/device-sdp.yml b/docker-compose/device-sdp.yml index 0e54304ed6ac5313390020ef985e051a2effc42d..653efb2ad352e63936f75dbfda758b3406dadb06 100644 --- a/docker-compose/device-sdp.yml +++ b/docker-compose/device-sdp.yml @@ -43,5 +43,5 @@ services: - bin/start-ds.sh # configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA # can't know about our Docker port forwarding - - l2ss-sdp SDP STAT -v -ORBendPoint giop:tcp:0:5701 -ORBendPointPublish giop:tcp:${HOSTNAME}:5701 + - l2ss-sdp SDP STAT -v -ORBendPoint giop:tcp:device-sdp:5701 -ORBendPointPublish giop:tcp:${HOSTNAME}:5701 restart: unless-stopped diff --git a/docker-compose/grafana/Dockerfile b/docker-compose/grafana/Dockerfile index 9f7ab94b12b425be552e96d737d3fb91597e6e1c..b5a8f34b370c83e05ed0f02a032da247c0fe4ccb 100644 --- a/docker-compose/grafana/Dockerfile +++ b/docker-compose/grafana/Dockerfile @@ -5,8 +5,8 @@ RUN grafana-cli plugins install briangann-datatable-panel RUN grafana-cli plugins install ae3e-plotly-panel RUN grafana-cli plugins install yesoreyeram-infinity-datasource RUN grafana-cli plugins install aceiot-svg-panel -RUN grafana-cli plugins install alexanderzobnin-zabbix-app RUN grafana-cli plugins install yesoreyeram-boomtable-panel +RUN grafana-cli plugins install orchestracities-map-panel COPY grafana.ini /etc/grafana/ diff --git a/docker-compose/integration-test.yml b/docker-compose/integration-test.yml index ad4740833efa36708a54b21a9787cb91b6a7e46d..e59ca9c92b8b3d1e20bef63b19f0ee2a0a969025 100644 --- a/docker-compose/integration-test.yml +++ b/docker-compose/integration-test.yml @@ -9,7 +9,8 @@ version: '2' services: integration-test: build: - context: itango + context: .. + dockerfile: docker-compose/lofar-device-base/Dockerfile args: SOURCE_IMAGE: ${LOCAL_DOCKER_REGISTRY_HOST}/${LOCAL_DOCKER_REGISTRY_USER}/tango-itango:${TANGO_ITANGO_VERSION} container_name: ${CONTAINER_NAME_PREFIX}integration-test diff --git a/docker-compose/lofar-device-base/Dockerfile b/docker-compose/lofar-device-base/Dockerfile index 9897528c555aa92edcdf409c1518b53132818cb4..f81503c89d51b5198a2f4da368d9dbde1b5daaad 100644 --- a/docker-compose/lofar-device-base/Dockerfile +++ b/docker-compose/lofar-device-base/Dockerfile @@ -1,7 +1,8 @@ ARG SOURCE_IMAGE FROM ${SOURCE_IMAGE} -RUN sudo apt-get update && sudo apt-get install -y git && sudo apt-get clean +RUN sudo apt-get update && sudo apt-get install -y git g++ gcc python3-dev && sudo apt-get clean + COPY docker-compose/lofar-device-base/lofar-requirements.txt /lofar-requirements.txt RUN sudo pip3 install -r /lofar-requirements.txt diff --git a/docker-compose/prometheus/prometheus.yml b/docker-compose/prometheus/prometheus.yml index 63cfabb9a0561cb66bff714f4f4e176fd43e8ff7..3a664a64b1934c93c59ae805b8a10453d9ba4bf1 100644 --- a/docker-compose/prometheus/prometheus.yml +++ b/docker-compose/prometheus/prometheus.yml @@ -26,4 +26,3 @@ scrape_configs: - targets: ["grafana:3000"] labels: "host": "localhost" -~ diff --git a/docker-compose/tango-prometheus-exporter/lofar2-policy.json b/docker-compose/tango-prometheus-exporter/lofar2-policy.json index c0922be468f277195cb469d7b6637c3761cde90a..dfa82dbdc1837afcaeb696ae0021e8f015ad6658 100644 --- a/docker-compose/tango-prometheus-exporter/lofar2-policy.json +++ b/docker-compose/tango-prometheus-exporter/lofar2-policy.json @@ -5,6 +5,14 @@ }, "devices": { + "STAT/AntennaField/1": { + "include": [ + "HBAT_ANT_mask_RW" + ], + "exclude": [ + "HBAT_BF_delay_steps_*" + ] + }, "STAT/APSCT/1": { }, "STAT/APSPU/1": { @@ -29,6 +37,9 @@ "RCU_mask_RW" ], "exclude": [ + "HBAT_LED_on_R", + "HBAT_PWR_LNA_on_R", + "HBAT_PWR_on_R", "HBAT_BF_delay_steps_*", "*_ITRF_R", "*_ITRF_offsets_R" @@ -66,6 +77,9 @@ }, "STAT/XST/1": { "exclude": [ + "FPGA_xst_ring_*", + "FPGA_xst_rx_align_*", + "FPGA_xst_aligned_*", "last_packet_R", "xst_*_R" ] diff --git a/sbin/tag_and_push_docker_image.sh b/sbin/tag_and_push_docker_image.sh index 631413235d4ba9b2f39b9b0e216d31cd21913bdf..3689a26b4f988a463e6022472abc7db73d491f51 100755 --- a/sbin/tag_and_push_docker_image.sh +++ b/sbin/tag_and_push_docker_image.sh @@ -86,12 +86,14 @@ LOCAL_IMAGES=( "device-apsct device-apsct y" "device-apspu device-apspu y" "device-boot device-boot y" "device-docker device-docker y" "device-observation_control device-observation_control y" - "device-recv device-recv y" "device-sdp device-sdp y" - "device-sst device-sst y" "device-unb2 device-unb2 y" - "device-xst device-xst y" + "device-recv device-recv y" "device-temperature-manager y" + "device-sdp device-sdp y" "device-sst device-sst y" + "device-unb2 device-unb2 y" "device-xst device-xst y" "itango docker-compose_itango y" + "archiver-timescale n" "hdbppts-cm n" "hdbppts-es n" + "grafana grafana n" "prometheus prometheus n" "jupyter docker-compose_jupyter n" "integration-test docker-compose_integration-test n" diff --git a/tangostationcontrol/docs/source/conf.py b/tangostationcontrol/docs/source/conf.py index 9ab504856ca00dbf2c934478ec60bdece9853ad0..90b156ff1ac73a80f5a2dbc2c12d0931113eb744 100644 --- a/tangostationcontrol/docs/source/conf.py +++ b/tangostationcontrol/docs/source/conf.py @@ -28,6 +28,7 @@ author = 'Stichting ASTRON' # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ + "sphinx.ext.graphviz" ] # Add any paths that contain templates here, relative to this directory. diff --git a/tangostationcontrol/docs/source/devices/using.rst b/tangostationcontrol/docs/source/devices/using.rst index 825ee74d83b5a7ad8a8e18c8813ee1ace81f5459..6eff9278eb74fc06a66b0d5b90f09d4f58eb0008 100644 --- a/tangostationcontrol/docs/source/devices/using.rst +++ b/tangostationcontrol/docs/source/devices/using.rst @@ -25,6 +25,33 @@ The state of a device is then queried with ``device.state()``. Each device can b - ``DevState.FAULT``: The device is malfunctioning. Functionality cannot be counted on. - The ``device.state()`` function can throw an error, if the device cannot be reached at all. For example, because it's docker container is not running. See the :ref:`docker` device on how to start it. + +.. graphviz:: + + digraph finite_state_machine { + fontname="Helvetica,Arial,sans-serif" + node [fontname="Helvetica,Arial,sans-serif"] + edge [fontname="Helvetica,Arial,sans-serif"] + rankdir=LR; + node [shape = doublecircle fixedsize=true width=0.7]; off; + node [shape = circle fixedsize=true width=0.9]; + init -> off [label = "user", color="red"]; + standby -> off [label = "user", color="red"]; + on -> off [label = "user", color="red"]; + alarm -> off [label = "user", color="red"]; + off -> init [label = "device", color="green"]; + init -> standby [label = "device", color="green"]; + standby -> on [label = "device", color="green"]; + on -> alarm [label = "device", color="green"]; + init -> fault [label = "device", color="green"]; + standby -> fault [label = "device", color="green"]; + on -> fault [label = "device", color="green"]; + alarm -> fault [label = "device", color="green"]; + fault -> init [label = "user", color="red"]; + fault -> off [label = "user", color="red"]; + } + + Each device provides the following commands to change the state: :boot(): Turn on the device, and initialise the hardware. Moves from ``OFF`` to ``ON``. diff --git a/tangostationcontrol/requirements.txt b/tangostationcontrol/requirements.txt index b65be92168032276a4120804d581d4bc95e6c028..d1c66a8a28d821d59aac859eb3725d6fbce12243 100644 --- a/tangostationcontrol/requirements.txt +++ b/tangostationcontrol/requirements.txt @@ -13,4 +13,7 @@ docker >= 5.0.3 # Apache 2 python-logstash-async >= 2.3.0 # MIT python-casacore >= 3.3.1 # LGPLv3 etrs-itrs@git+https://github.com/brentjens/etrs-itrs # license pending +# numpy must be manually added even though etrs-itrs requires it +numpy >= 1.21.0 # BSD lofarantpos >= 0.5.0 # Apache 2 +python-geohash >= 0.8.5 # Apache 2MIT diff --git a/tangostationcontrol/setup.cfg b/tangostationcontrol/setup.cfg index 1d52727f782ff41dd2e7e75b2d3137314bfdb82f..6af6b55d604b22877688ac119720e6709b671611 100644 --- a/tangostationcontrol/setup.cfg +++ b/tangostationcontrol/setup.cfg @@ -24,12 +24,14 @@ classifier = [options] package_dir= - =./ + =. packages=find: -python_requires = >=3.7 +python_requires => 3.7 +install_requires = + GitPython>=3.1.20 [options.packages.find] -where=./ +where=. [options.entry_points] console_scripts = diff --git a/tangostationcontrol/tangostationcontrol/beam/delays.py b/tangostationcontrol/tangostationcontrol/beam/delays.py index 850213c0cc64cdb72cb567a9b8c5334a464f4965..b1d7f9976ec5de3e75784dac484c9c33584f2806 100644 --- a/tangostationcontrol/tangostationcontrol/beam/delays.py +++ b/tangostationcontrol/tangostationcontrol/beam/delays.py @@ -1,5 +1,4 @@ import casacore.measures -from math import sin, cos import numpy import datetime @@ -29,26 +28,30 @@ class delay_calculator: result = self.measure.do_frame(frame_time) assert result == True - def get_direction_vector(self, pointing: dict): - """ Compute a direction vector for the given pointing, relative to the measure. """ - angles = self.measure.measure(pointing, "ITRF") - angle0, angle1 = angles["m0"]["value"], angles["m1"]["value"] + def get_direction_vector(self, pointing: numpy.ndarray) -> numpy.ndarray: + """ Compute direction vector for a given pointing, relative to the measure. """ - # Convert polar to carthesian coordinates - # see also https://github.com/casacore/casacore/blob/e793b3d5339d828a60339d16476bf688a19df3ec/casa/Quanta/MVDirection.cc#L67 - direction_vector = numpy.array( - (cos(angle0) * cos(angle1), - sin(angle0) * cos(angle1), - sin(angle1))) + return self.get_direction_vector_bulk([pointing]).flatten() + + def get_direction_vector_bulk(self, pointings: numpy.ndarray) -> numpy.ndarray: + """ Compute direction vectors for the given pointings, relative to the measure. """ - return direction_vector + angles0 = numpy.empty(len(pointings)) + angles1 = numpy.empty(len(pointings)) + for idx, pointing in enumerate(pointings): + angles = self.measure.measure(pointing, "ITRF") + angles0[idx] = angles["m0"]["value"] + angles1[idx] = angles["m1"]["value"] - def _get_delay(self, reference_direction_vector: numpy.array, relative_itrf: numpy.array) -> float: - """ Compute the delay to apply, in seconds, to align signals for a distance of `relative_itrf` in the - direction of `reference_direction_vector`. """ - speed_of_light = 299792458.0 + # Convert polar to carthesian coordinates + # see also https://github.com/casacore/casacore/blob/e793b3d5339d828a60339d16476bf688a19df3ec/casa/Quanta/MVDirection.cc#L67 + direction_vectors = numpy.array( + [numpy.cos(angles0) * numpy.cos(angles1), + numpy.sin(angles0) * numpy.cos(angles1), + numpy.sin(angles1)]) - return numpy.dot(reference_direction_vector, relative_itrf) / speed_of_light + # Return array [directions][angles] + return direction_vectors.T def is_valid_direction(self, direction): try: @@ -58,18 +61,34 @@ class delay_calculator: return True - def convert(self, direction, antenna_itrf: list([float])): - # explicitly check validity to get a proper error message - if not self.is_valid_direction(direction): - raise ValueError(f"Invalid direction: {direction}") + def convert(self, direction, antenna_absolute_itrf: list([float])): + """ Get the delays for a direction and *absolute* antenna positions. + + Returns delays[antenna]. """ - # obtain the direction vector for a specific pointing - pointing = self.measure.direction(*direction) + return self.convert_bulk([direction], numpy.array(antenna_absolute_itrf) - self.reference_itrf).flatten() - reference_dir_vector = self.get_direction_vector(pointing) + def convert_bulk(self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray) -> numpy.ndarray: + """ Get the delays for each direction and each *relative* antenna position. + + Returns delays[antenna][direction]. """ - # # compute the delays for an antennas w.r.t. the reference position - antenna_relative_itrf = [subtract(pos, self.reference_itrf) for pos in antenna_itrf] - delays = [self._get_delay(reference_dir_vector, relative_itrf) for relative_itrf in antenna_relative_itrf] + # obtain the direction vector for each pointing + try: + pointings = [self.measure.direction(*direction) for direction in directions] + except (RuntimeError, TypeError) as e: + raise ValueError("Invalid direction") from e + + direction_vectors = self.get_direction_vector_bulk(pointings) + + # compute the corresponding delays for all directions + def get_delay_all_directions(relative_itrf): + # Dot product between relative position and angle vector determines + # distance. Divide by speed of light to obtain delay. + return numpy.inner(relative_itrf, direction_vectors) / 299792458.0 + + # apply for each position + delays = numpy.apply_along_axis(get_delay_all_directions, 1, antenna_relative_itrfs) return delays + diff --git a/tangostationcontrol/tangostationcontrol/beam/geo.py b/tangostationcontrol/tangostationcontrol/beam/geo.py index 2c8195796811237750becb29f28fcf904f34f03d..a861ef66110bc92ea861b93ccb50b80eda41e518 100644 --- a/tangostationcontrol/tangostationcontrol/beam/geo.py +++ b/tangostationcontrol/tangostationcontrol/beam/geo.py @@ -2,6 +2,7 @@ import etrsitrs import lofarantpos.geo import numpy import math +import geohash """ LOFAR station positions are measured in ETRS89, which are the coordinates of the position as it would be in 1989. @@ -49,3 +50,11 @@ def ETRS_to_GEO(ETRS_coordinates: numpy.array) -> numpy.array: # Geo coordinates are only used for rough positioning. The difference between ITRF and ETRS matters little here ITRF_to_GEO = ETRS_to_GEO + +def GEO_to_GEOHASH(GEO_coordinates: numpy.array) -> numpy.array: + """ Convert an array of latitude/longitude (degrees) tuples to geohash strings. """ + + def GEO_to_GEOHASH_fn(geo_coords): + return geohash.encode(geo_coords[0], geo_coords[1], precision=16) + + return _apply_fn_on_one_element_or_array(GEO_to_GEOHASH_fn, GEO_coordinates) diff --git a/tangostationcontrol/tangostationcontrol/clients/attribute_wrapper.py b/tangostationcontrol/tangostationcontrol/clients/attribute_wrapper.py index 7f9c8d07fb3e5ceca540c2a03294dc9a92450d66..d3e6ee91b1177cb6e7306e07c448528cc476c8df 100644 --- a/tangostationcontrol/tangostationcontrol/clients/attribute_wrapper.py +++ b/tangostationcontrol/tangostationcontrol/clients/attribute_wrapper.py @@ -90,8 +90,11 @@ class attribute_wrapper(attribute): """ read_func_wrapper reads the attribute value, stores it and returns it" """ - device.value_dict[self] = self.read_function() - return device.value_dict[self] + try: + device.value_dict[self] = self.read_function() + return device.value_dict[self] + except Exception as e: + raise e.__class__(f"Could not read attribute {comms_annotation}") from e self.fget = read_func_wrapper diff --git a/tangostationcontrol/tangostationcontrol/devices/README.md b/tangostationcontrol/tangostationcontrol/devices/README.md index 378cece58eaa3ebe181e129e6f2f6cb6c2b8b1c3..566fb78e57ce093aefade15fc21e8d64c94d47ed 100644 --- a/tangostationcontrol/tangostationcontrol/devices/README.md +++ b/tangostationcontrol/tangostationcontrol/devices/README.md @@ -14,7 +14,8 @@ If a new device is added, it will (likely) need to be referenced in several plac - Adjust `tangostationcontrol/setup.cfg` to add an entry point for the device in the package installation, - Add to `tangostationcontrol/tangostationcontrol/integration_test/default/devices/` to add an integration test, - Adjust `sbin/run_integration_test.sh` to have the device started when running the integration tests, -- Adjust `.gitlab-ci.yml` to add the device to the `docker_build_image_all` step and to create a `docker_build_image_device_XXX` step, +- Adjust `.gitlab-ci.yml` to add the device to the `docker_build_image_all` step and to create a `docker_build_image_device_XXX` step, +- Add to `sbin/tag_and_push_docker_image.sh` the LOCAL_IMAGES devicename and build for integration boolean pair, - Add to `tangostationcontrol/docs/source/devices/` to mention the device in the end-user documentation. - Adjust `tangostationcontrol/docs/source/index.rst` to include the newly created file in `docs/source/devices/`. diff --git a/tangostationcontrol/tangostationcontrol/devices/beam_device.py b/tangostationcontrol/tangostationcontrol/devices/beam_device.py index 0b436d89b2191b33f059094a89740d66382d927b..e69a10c3b584a0275f364aa1c92eccb2aee651e3 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam_device.py @@ -6,28 +6,255 @@ """Beam Abstract Device Server for LOFAR2.0 """ + +import datetime +import numpy +from json import loads +from threading import Thread, Lock, Condition + # PyTango imports -from tango.server import attribute, command -from tango import AttrWriteType, DebugIt +from tango.server import attribute, command, device_property +from tango import AttrWriteType, DebugIt, DevVarStringArray, DevVarDoubleArray, DevString # Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.measures import get_measures_directory, get_available_measures_directories, download_measures, use_measures_directory, restart_python from tangostationcontrol.common.lofar_logging import log_exceptions +from tangostationcontrol.devices.device_decorators import TimeIt, only_in_states, fault_on_error +from tangostationcontrol.beam.delays import delay_calculator from tangostationcontrol.devices.lofar_device import lofar_device -__all__ = ["beam_device", "main"] +__all__ = ["beam_device", "main", "BeamTracker"] import logging logger = logging.getLogger() class beam_device(lofar_device): + # ----------------- + # Device Properties + # ----------------- + + Beam_tracking_interval = device_property( + dtype='DevFloat', + doc='Beam weights updating interval time [seconds]', + mandatory=False, + default_value = 10.0 + ) + + Beam_tracking_preparation_period = device_property( + dtype='DevFloat', + doc='Preparation time [seconds] needed before starting update operation', + mandatory=False, + default_value = 0.25 + ) # ---------- # Attributes # ---------- + # Maximum array size to allocate for the pointings, + # which must be enough for all derived classes. + # + # The actual number of pointings for this device + # will be stored as self._num_pointings. + MAX_POINTINGS = 1024 + + Pointing_direction_R = attribute(access=AttrWriteType.READ, + dtype=((numpy.str,),), max_dim_x=3, max_dim_y=MAX_POINTINGS, + fget=lambda self: self._pointing_direction_r) + + Pointing_direction_RW = attribute(access=AttrWriteType.READ_WRITE, + dtype=((numpy.str,),), max_dim_x=3, max_dim_y=MAX_POINTINGS, + fget=lambda self: self._pointing_direction_rw) + + Pointing_direction_str_R = attribute(access=AttrWriteType.READ, + doc='Pointing direction as a formatted string', + dtype=(numpy.str,), max_dim_x=MAX_POINTINGS, + fget=lambda self: ["{0} ({1}, {2})".format(*x) for x in self._pointing_direction_r]) + + Pointing_timestamp_R = attribute(access=AttrWriteType.READ, + dtype=(numpy.double,), max_dim_x=MAX_POINTINGS, + fget=lambda self: self._pointing_timestamp_r) + + Tracking_enabled_R = attribute(access=AttrWriteType.READ, + doc="Whether the tile beam is updated periodically", + dtype=numpy.bool, + fget=lambda self: self.Beam_tracker and self.Beam_tracker.is_alive()) + + Tracking_enabled_RW = attribute(access=AttrWriteType.READ_WRITE, + doc="Whether the tile beam should be updated periodically", + dtype=numpy.bool, + fget=lambda self: self._tracking_enabled_rw) + + Duration_update_pointing_R = attribute(access=AttrWriteType.READ, + dtype=numpy.float, fget=lambda self: self.update_pointing.statistics["last"] or 0) + + def write_Pointing_direction_RW(self, value): + """ Setter method for attribute Pointing_direction_RW """ + # verify whether values are valid + if len(value) != self._num_pointings: + raise ValueError(f"Expected {self._num_pointings} directions, got {len(value)}") + + for pointing in value: + if not self.generic_delay_calculator.is_valid_direction(pointing): + raise ValueError(f"Invalid direction: {pointing}") + + # store the new values + self._pointing_direction_rw = value + + # force update across tiles if pointing changes + self.Beam_tracker.force_update() + logger.info("Pointing direction update requested") + + def write_Tracking_enabled_RW(self, value): + self._tracking_enabled_rw = value + + if value: + self.Beam_tracker.start() + else: + self.Beam_tracker.stop() + + + @TimeIt() + def update_pointing(self, timestamp: datetime.datetime): + """ Update the weights for the configured pointings, for the given timestamp. """ + self._set_pointing(self._pointing_direction_rw, timestamp) + + # -------- + # abstract interface + # -------- + + def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array: + """ + Calculate the delay values based on the 2D pointing list (num_pointings x 3) and the timestamp + """ + + raise NotImplementedError + + def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): + """ + Calculate and Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp + + Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated. + """ + + raise NotImplementedError + + # -------- + # overloaded functions + # -------- + + def init_device(self): + super().init_device() + + # Initialise pointing array data and attribute + self._tracking_enabled_rw = False + + # thread to perform beam tracking + self.Beam_tracker = None + + # generic delay calculator to ask about validity of settings + self.generic_delay_calculator = delay_calculator([0, 0, 0]) + + # Derived classes will override this with a non-parameterised + # version that lofar_device will call. + @log_exceptions() + def configure_for_initialise(self, num_pointings): + super().configure_for_initialise() + + if not (0 < num_pointings <= self.MAX_POINTINGS): + raise ValueError(f"beam_device is configured to support 0 - {self.MAX_POINTINGS} pointings, but {num_pointings} were requested") + + # Initialise tracking control + self._num_pointings = num_pointings + self._pointing_timestamp_r = numpy.zeros(num_pointings, dtype=numpy.double) + self._pointing_direction_r = numpy.zeros((num_pointings, 3), dtype="<U32") + self._pointing_direction_rw = numpy.array([["AZELGEO","0deg","90deg"]] * num_pointings, dtype="<U32") + self._tracking_enabled_rw = True + + # Create a thread object to update beam weights + self.Beam_tracker = BeamTracker(self.Beam_tracking_interval, self.Beam_tracking_preparation_period, self.update_pointing, self.Fault) + + @log_exceptions() + def configure_for_on(self): + super().configure_for_on() + + # Start beam tracking thread + if self._tracking_enabled_rw: + self.Beam_tracker.start() + + @log_exceptions() + def configure_for_off(self): + if self.Beam_tracker: + # Stop thread object + self.Beam_tracker.stop() + + super().configure_for_off() + + + # -------- + # Commands + # -------- + + @command(dtype_in=DevVarStringArray) + @DebugIt() + @log_exceptions() + @only_in_states(lofar_device.DEFAULT_COMMAND_STATES) + def set_pointing(self, pointing_direction: list): + """ + Compute and uploads the hardware-specific delays based on a given pointing direction + 2D array (num_pointings x 3 parameters). + """ + + # Reshape the flatten input array + pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3) + + self._set_pointing(pointing_direction, datetime.datetime.now()) + + @command(dtype_in = DevString) + @DebugIt() + @only_in_states(lofar_device.DEFAULT_COMMAND_STATES) + def set_pointing_for_specific_time(self, parameters: DevString = None): + """ + Compute and uploads the hardware-specific delays based on a given pointing direction + 2D array (num_pointings x 3 parameters) for the given timestamp + + Parameters are given in a json document. + pointing_direction: array of 96 casacore pointings (3 string parameters) + timestamp: POSIX timestamp + """ + pointing_parameters = loads(parameters) + + pointing_direction = list(pointing_parameters["pointing_direction"]) + timestamp_param = float(pointing_parameters["timestamp"]) + timestamp = datetime.datetime.fromtimestamp(timestamp_param) + + # Reshape the flatten pointing array + pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3) + + self._set_pointing(pointing_direction, timestamp) + + + @command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray) + @DebugIt() + @log_exceptions() + @only_in_states(lofar_device.DEFAULT_COMMAND_STATES) + def delays(self, pointing_direction: numpy.array): + """ + Calculate the delays based on the pointing list and the timestamp + """ + + pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3) + + delays = self._delays(pointing_direction, datetime.datetime.now()) + + return delays.flatten() + + # -------- + # Measures + # -------- + # Directory where the casacore measures that we use, reside. We configure ~/.casarc to # use the symlink /opt/IERS/current, which we switch to the actual set of files to use. measures_directory_R = attribute(dtype=str, access=AttrWriteType.READ, fget = lambda self: get_measures_directory()) @@ -35,10 +262,6 @@ class beam_device(lofar_device): # List of dowloaded measures (the latest 64, anyway) measures_directories_available_R = attribute(dtype=(str,), max_dim_x=64, access=AttrWriteType.READ, fget = lambda self: sorted(get_available_measures_directories())[-64:]) - # -------- - # Commands - # -------- - @command(dtype_out=str, doc_out="Name of newly installed measures directory") @DebugIt() @log_exceptions() @@ -76,3 +299,108 @@ class beam_device(lofar_device): def main(**kwargs): """Main function of the Docker module.""" return entry(beam_device, **kwargs) + +# ---------- +# Beam Tracker +# ---------- + +class BeamTracker(): + + DISCONNECT_TIMEOUT = 3.0 + + """ Object that encapsulates a Thread, resposible for beam tracking operations """ + def __init__(self, interval, preparation_period, update_pointing_callback, fault_callback): + self.thread = None + self.interval = interval + self.preparation_period = preparation_period + self.update_pointing_callback = update_pointing_callback + self.fault_callback = fault_callback + + # Condition to trigger a forced update or early abort + self.update_lock = Lock() + self.update_condition = Condition(self.update_lock) + + # Whether the pointing has to be forced updated + self.stale_pointing = True + + def start(self): + """ Starts the Beam Tracking thread """ + if self.thread: + # already started + return + + self.done = False + self.thread = Thread(target=self._update_pointing_direction, name="BeamTracker") + self.thread.start() + + logger.info("BeamTracking thread started") + + def is_alive(self): + """ Returns True just before the Thread run() method starts until just after the Thread run() method terminates. """ + return self.thread and self.thread.is_alive() + + def force_update(self): + """ Force the pointing to be updated. """ + + self.stale_pointing = True + self.notify_thread() + + def notify_thread(self): + # inform the thread to stop waiting + with self.update_lock: + self.update_condition.notify() + + def stop(self): + """ Stops the Beam Tracking loop """ + + if not self.thread: + return + + logger.info("BeamTracking thread stopping") + + self.done = True + self.force_update() + + # wait for thread to finish + self.thread.join(self.DISCONNECT_TIMEOUT) + + if self.is_alive(): + logger.error("BeamTracking Thread did not properly terminate") + + self.thread = None + + logger.info("BeamTracking thread stopped") + + def _get_sleep_time(self): + """ Computes the sleep time (in seconds) that needs to be waited for the next beam tracking update """ + now = datetime.datetime.now().timestamp() + + # Computes the left seconds before the next update + next_update_in = self.interval - (now % self.interval) + + # Computes the needed sleep time before the next update + sleep_time = next_update_in - self.preparation_period + # If sleep time is negative, add the tracking interval for the next update + if sleep_time < 0: + return sleep_time + self.interval + else: + return sleep_time + + # @fault_on_error routes errors here. we forward them to our device + def Fault(self, msg): + self.fault_callback(msg) + + @log_exceptions() + @fault_on_error() + def _update_pointing_direction(self): + """ Updates the beam weights using a fixed interval of time """ + + # Check if flag beamtracking is true + with self.update_lock: + while not self.done: + self.stale_pointing = False + self.update_pointing_callback(datetime.datetime.now()) + + # sleep until the next update, or when interrupted (this releases the lock, allowing for notification) + # note that we need wait_for as conditions can be triggered multiple times in succession + self.update_condition.wait_for(lambda: self.done or self.stale_pointing, self._get_sleep_time()) diff --git a/tangostationcontrol/tangostationcontrol/devices/device_decorators.py b/tangostationcontrol/tangostationcontrol/devices/device_decorators.py index 26a5c488ba0a4ae219e3bd783691491e9ff2f45c..b1dbb4b7a5b61bf80169e8945b7347f9d2e394fb 100644 --- a/tangostationcontrol/tangostationcontrol/devices/device_decorators.py +++ b/tangostationcontrol/tangostationcontrol/devices/device_decorators.py @@ -2,9 +2,10 @@ from tango import DevState from functools import wraps import logging +import time logger = logging.getLogger() -__all__ = ["only_in_states", "only_when_on", "fault_on_error"] +__all__ = ["only_in_states", "only_when_on", "fault_on_error", "TimeIt"] def only_in_states(allowed_states, log=True): """ @@ -63,3 +64,51 @@ def fault_on_error(): return error_wrapper return inner + + +def TimeIt(log_function=None): + """ + Wrapper to time calls. Stores the timing log in a + <function>.statistics property as a dict with the following + information: + + "count": number of times the function was called + "last": duration of the last invocation + "history": last 10 durations + + NOTE: If the function called throws an exception, timing information + is not logged or recorded. Those calls are expected to be + uncharacteristically cheap, after all. + """ + + def inner(func): + statistics = { + "count": 0, + "last": None, + "history": [], + } + + @wraps(func) + def timer_wrapper(self, *args, **kwargs): + # time function call + before = time.monotonic_ns() + result = func(self, *args, **kwargs) + after = time.monotonic_ns() + + # store measurement + statistics["count"] += 1 + statistics["last"] = (after - before) / 1e9 + statistics["history"] = statistics["history"][-9:] + [statistics["last"]] + + if log_function: + log_function(f"Call to {func.__name__} took {(after - before)/1e6} ms") + + # return function result (if any) + return result + + timer_wrapper.statistics = statistics + + return timer_wrapper + + return inner + diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index 6f0f1de56b5bd0ade282aeede42fd719ce375b13..3fa662c0198b6f93b478a9fc09dbee6fd207e064 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -17,6 +17,7 @@ from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy, Attr import time import math import numpy +import textwrap # Additional import from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper @@ -278,7 +279,7 @@ class lofar_device(Device, metaclass=DeviceMeta): default_value = getattr(self, f"{name}_default") # set the attribute to the configured default - logger.debug(f"Setting attribute {name} to {default_value}") + logger.debug(textwrap.shorten(f"Setting attribute {name} to {default_value}", 150)) self.proxy.write_attribute(name, default_value) except Exception as e: # log which attribute we're addressing diff --git a/tangostationcontrol/tangostationcontrol/devices/recv.py b/tangostationcontrol/tangostationcontrol/devices/recv.py index 090d6349c929266bf5a49d65772c36cdc3b820cf..eecbf2b1ef3dea45e6571ab7b1d499269d33c783 100644 --- a/tangostationcontrol/tangostationcontrol/devices/recv.py +++ b/tangostationcontrol/tangostationcontrol/devices/recv.py @@ -22,7 +22,7 @@ from math import pi # Additional import from tangostationcontrol.beam.hba_tile import HBATAntennaOffsets -from tangostationcontrol.beam.geo import ETRS_to_ITRF, ITRF_to_GEO +from tangostationcontrol.beam.geo import ETRS_to_ITRF, ITRF_to_GEO, GEO_to_GEOHASH from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.lofar_logging import device_logging_to_python from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper @@ -238,6 +238,10 @@ class RECV(opcua_device): doc='Absolute reference position of antenna field, in latitude/longitude (degrees)', dtype=(numpy.float,), max_dim_x=2) + Antenna_Field_Reference_GEOHASH_R = attribute(access=AttrWriteType.READ, + doc='Absolute reference position of antenna field, as geohash string', + dtype=numpy.str) + HBAT_antenna_ITRF_offsets_R = attribute(access=AttrWriteType.READ, doc='Offsets of the antennas within a tile, in ITRF ("iHBADeltas"). True shape: 96x16x3.', dtype=((numpy.float,),), max_dim_x=48, max_dim_y=96) @@ -250,6 +254,11 @@ class RECV(opcua_device): doc='Absolute reference position of each tile, in latitude/longitude (degrees)', dtype=((numpy.float,),), max_dim_x=2, max_dim_y=96) + HBAT_reference_GEOHASH_R = attribute(access=AttrWriteType.READ, + doc='Absolute reference position of each tile, as geohash strings', + dtype=(numpy.str,), max_dim_x=96,) + + def read_Antenna_Field_Reference_ITRF_R(self): # provide ITRF field coordinates if they were configured if self.Antenna_Field_Reference_ITRF: @@ -261,6 +270,9 @@ class RECV(opcua_device): def read_Antenna_Field_Reference_GEO_R(self): return ITRF_to_GEO(self.read_Antenna_Field_Reference_ITRF_R()) + + def read_Antenna_Field_Reference_GEOHASH_R(self): + return GEO_to_GEOHASH(self.read_Antenna_Field_Reference_GEO_R()) def read_HBAT_antenna_ITRF_offsets_R(self): base_antenna_offsets = numpy.array(self.HBAT_base_antenna_offsets).reshape(16,3) @@ -288,6 +300,9 @@ class RECV(opcua_device): def read_HBAT_reference_GEO_R(self): return ITRF_to_GEO(self.read_HBAT_reference_ITRF_R()) + def read_HBAT_reference_GEOHASH_R(self): + return GEO_to_GEOHASH(self.read_HBAT_reference_GEO_R()) + # ---------- # Summarising Attributes # ---------- diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py index ddd3e7899e354c3442b38c0df27737e267db0952..585f514bc0cbc1df43321b10474575ab78b64edd 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py @@ -8,19 +8,24 @@ """ # PyTango imports -from tango.server import device_property, command -from tango import AttrWriteType, DevVarFloatArray, DevVarULongArray +from tango.server import device_property, command, attribute +from tango import AttrWriteType, DevVarFloatArray, DevVarULongArray, DeviceProxy, DevSource, EventType # Additional import from tangostationcontrol.common.entrypoint import entry +from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.devices.opcua_device import opcua_device from tangostationcontrol.devices.sdp.sdp import SDP import numpy +from functools import lru_cache __all__ = ["Beamlet", "main"] +import logging +logger = logging.getLogger() + class Beamlet(opcua_device): @@ -70,6 +75,12 @@ class Beamlet(opcua_device): default_value = [[0] * A_pn * N_pol * N_beamlets_ctrl] * N_pn ) + subband_select_RW_default = device_property( + dtype='DevVarULongArray', + mandatory=False, + default_value = [102] * N_beamlets_ctrl + ) + first_default_settings = [ 'FPGA_beamlet_output_hdr_eth_destination_mac_RW', 'FPGA_beamlet_output_hdr_ip_destination_address_RW', @@ -101,8 +112,8 @@ class Beamlet(opcua_device): # Select subband per dual-polarisation beamlet. # 0 for antenna polarization X in beamlet polarization X, # 1 for antenna polarization Y in beamlet polarization Y. - FPGA_beamlet_subband_select_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_R"], datatype=numpy.uint16, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn)) - FPGA_beamlet_subband_select_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_RW"], datatype=numpy.uint16, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + FPGA_beamlet_subband_select_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_R"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn)) + FPGA_beamlet_subband_select_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_RW"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) # cint16[N_pn][A_pn][N_pol][N_beamlets_ctrl] # Co-polarization BF weights. The N_pol = 2 parameter index is: @@ -134,6 +145,20 @@ class Beamlet(opcua_device): FPGA_bf_weights_yy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yy_R"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn)) FPGA_bf_weights_yy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yy_RW"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + subband_select_RW = attribute(dtype=(numpy.uint32,), max_dim_x=N_beamlets_ctrl, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_Wrapper_allowed") + + def read_subband_select_RW(self): + return self._subband_select + + def write_subband_select_RW(self, subbands): + # Use the same subband for all inputs and polarisations of a beamlet + self.proxy.FPGA_beamlet_subband_select_RW = numpy.tile(subbands, (self.N_pn, self.A_pn * self.N_pol)) + + self.cache_clear() + + # Store new value + self._subband_select = subbands + # ---------- # Summarising Attributes # ---------- @@ -142,33 +167,149 @@ class Beamlet(opcua_device): # Overloaded functions # -------- + def configure_for_initialise(self): + super().configure_for_initialise() + + self._subband_select = numpy.zeros(self.N_beamlets_ctrl, dtype=numpy.uint32) + + self.sdp_proxy = DeviceProxy("STAT/SDP/1") + self.sdp_proxy.set_source(DevSource.DEV) + + # subscribe to events to notice setting changes in SDP that determine the input frequency + self.event_subscriptions = {} + self.event_subscriptions["clock_rw"] = self.sdp_proxy.subscribe_event("clock_RW", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True) + self.event_subscriptions["nyquist_zone_r"] = self.sdp_proxy.subscribe_event("nyquist_zone_R", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True) + + def configure_for_off(self): + super().configure_for_off() + + # unsubscribe from all events + for k,v in list(self.event_subscriptions.items()): + self.sdp_proxy.unsubscribe_event(v) + del self.event_subscriptions[k] + # -------- # internal functions # -------- - def _calculate_bf_weights(self, phases: numpy.ndarray): - """ Helper function that converts a difference in phase (in radiants) - to a FPGA weight (in complex number) """ - + + @log_exceptions() + def _frequency_change_event(self, event): + """ Trigger on external changes in frequency settings. """ + + if event.err: + # little we can do here. note that errors are also thrown if the device we subscribed to is offline + return + + logger.info(f"Received attribute change event from {event.device}: {event.attr_value.name} := {event.attr_value.value}") + + # invalidate caches for frequency-dependent values + self.cache_clear() + + @command() + def cache_clear(self): + """ Explicitly clear any caches. """ + + self._beamlet_frequencies.cache_clear() + + """ + The SDP FPGAs correct for signal-delay differences by rotating the phases of the antenna signals. A delay + is converted to a phase difference as follows: + + phase = 2 * pi * frequency * delay + + where 'frequency' is the subband frequency: + + LBA: frequency = (subband_nr + 0) * clock / 1024 + HBA: frequency = (subband_nr + 512) * clock / 1024 + + The beamformer combines a set of antennas for each beamlet, and each beamlet can have a different pointing + and subband selected. + + This results in an array of phase[antenna][beamlet] adjustments to be sent to the FPGA.The FPGA accepts + weights as a 16-bit (imag,real) complex pair packed into an uint32. + + The phases, delays, and final beam weights, all have shape (fpga_nr, [input_nr][pol][beamlet_nr]). + """ + + BF_UNIT_WEIGHT = 2**14 + + @staticmethod + def _phases_to_bf_weights(phases: numpy.ndarray): + """ Convert differences in phase (in radians) into FPGA weights (complex numbers packed into uint32). """ + + # flatten array and restore its shape later, which makes running over all elements a lot easier + orig_shape = phases.shape + phases = phases.flatten() + # Convert array values in complex numbers - unit = numpy.power(2,14) - real = numpy.array(unit * numpy.cos(phases), dtype=numpy.short) - imag = numpy.array(unit * numpy.sin(phases), dtype=numpy.short) - # join 16 bits of imaginary part (MSB) with 16 bits of real part (LSB) - bf_weights = numpy.array( numpy.frombuffer( b''.join([imag,real]), dtype=numpy.uint32 ) ) + real = Beamlet.BF_UNIT_WEIGHT * numpy.cos(phases) + imag = Beamlet.BF_UNIT_WEIGHT * numpy.sin(phases) + + # Interleave into (real, imag) pairs, and store as int16 + # see also https://stackoverflow.com/questions/5347065/interweaving-two-numpy-arrays/5347492 + # Round to nearest integer instead of rounding down + real_imag = numpy.empty(phases.size * 2, dtype=numpy.int16) + real_imag[0::2] = numpy.round(real) + real_imag[1::2] = numpy.round(imag) + + # Cast each (real, imag) pair into an uint32, which brings the array size + # back to the original. + bf_weights = real_imag.view(numpy.uint32) + + return bf_weights.reshape(orig_shape) + + @staticmethod + def _subband_frequencies(subbands: numpy.ndarray, clock: int, nyquist_zone: int) -> numpy.ndarray: + """ Obtain the frequencies of each subband, given a clock and an antenna type. """ + + subband_width = clock / 1024 + base_subband = nyquist_zone * 512 + + # broadcast clock across frequencies + frequencies = (subbands + base_subband) * subband_width + + return frequencies + + @lru_cache() # this function requires large hardware reads for values that don't change often + def _beamlet_frequencies(self): + """ Obtain the frequencies (in Hz) of each subband that is selected for each input and beamlet. + + Returns shape (fpga_nr, [input_nr][pol][beamlet_nr]). + """ + + # obtain which subband is selected for each input and beamlet + beamlet_subbands = self.read_attribute("FPGA_beamlet_subband_select_RW") # (fpga_nr, [input_nr][pol][beamlet_nr]) + return self._subband_frequencies(beamlet_subbands, self.sdp_proxy.clock_RW, self.sdp_proxy.nyquist_zone_R) + + @staticmethod + def _calculate_bf_weights(delays: numpy.ndarray, beamlet_frequencies: numpy.ndarray): + """ Helper function that converts a series of delays into FPGA_bf_weights_xx_yy. + All input and output arrays have the same dimensionality. + """ + + # compute the phases + beamlet_phases = (2.0 * numpy.pi) * beamlet_frequencies * delays + + # convert to weights + bf_weights = Beamlet._phases_to_bf_weights(beamlet_phases) + return bf_weights - + # -------- # Commands # -------- + @command(dtype_in=DevVarFloatArray, dtype_out=DevVarULongArray) - def calculate_bf_weights(self, phases: numpy.ndarray): - """ converts a difference in phase (in radiants) to a FPGA weight (in complex number) """ - + def calculate_bf_weights(self, delays: numpy.ndarray): + """ converts a difference in delays (in seconds) to a FPGA weight (in complex number) """ + # Calculate the FPGA weight array - bf_weights = self._calculate_bf_weights(phases) - - return bf_weights + delays = delays.reshape(self.N_pn, self.A_pn * self.N_pol * self.N_beamlets_ctrl) + beamlet_frequencies = self._beamlet_frequencies() + bf_weights = self._calculate_bf_weights(delays, beamlet_frequencies) + + return bf_weights.flatten() # ---------- # Run server diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 256bd5b2cccd8531b2e9ff9aeedcc1acf25cd425..2e985b8f10d66341a76bbe6b192dc3bf69677c8d 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -8,29 +8,56 @@ """ # PyTango imports -#from tango.server import device_property -#from tango import AttrWriteType -# Additional import +from tango import Util, DeviceProxy, DevSource +from tango.server import attribute, AttrWriteType, device_property +# Additional import from tangostationcontrol.common.entrypoint import entry -#from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.devices.beam_device import beam_device +from tangostationcontrol.devices.sdp.beamlet import Beamlet +from tangostationcontrol.devices.device_decorators import TimeIt +from tangostationcontrol.common.lofar_logging import log_exceptions +from tangostationcontrol.beam.delays import delay_calculator + +import numpy +import datetime -#import numpy +import logging +logger = logging.getLogger() __all__ = ["DigitalBeam", "main"] class DigitalBeam(beam_device): - pass + NUM_ANTENNAS = 96 + NUM_BEAMLETS = 488 + # ----------------- # Device Properties # ----------------- + antenna_select_RW_default = device_property( + dtype='DevVarBooleanArray', + mandatory=False, + default_value = [[True] * NUM_BEAMLETS] * NUM_ANTENNAS + ) + # ---------- # Attributes # ---------- + Duration_delays_R = attribute(access=AttrWriteType.READ, + dtype=numpy.float, fget=lambda self: self._delays.statistics["last"] or 0) + + antenna_select_RW = attribute(doc='Selection of antennas to use for forming each beamlet', + dtype=((numpy.bool_,),), max_dim_x=NUM_BEAMLETS, max_dim_y=NUM_ANTENNAS, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_Wrapper_allowed") + + def read_antenna_select_RW(self): + return self._antenna_select + + def write_antenna_select_RW(self, antennas): + self._antenna_select = antennas + # ---------- # Summarising Attributes # ---------- @@ -39,6 +66,106 @@ class DigitalBeam(beam_device): # Overloaded functions # -------- + @log_exceptions() + def configure_for_initialise(self): + super().configure_for_initialise(self.NUM_BEAMLETS) + + # Set a reference of RECV device that is correlated to this BEAM device + util = Util.instance() + instance_number = self.get_name().split('/')[2] + + self.recv_proxy = DeviceProxy( + f"{util.get_ds_inst_name()}/RECV/{instance_number}") + self.recv_proxy.set_source(DevSource.DEV) + + self.beamlet_proxy = DeviceProxy( + f"{util.get_ds_inst_name()}/Beamlet/{instance_number}") + self.beamlet_proxy.set_source(DevSource.DEV) + + # Retrieve positions from RECV device + reference_itrf = self.recv_proxy.Antenna_Field_Reference_ITRF_R + antenna_itrf = self.recv_proxy.HBAT_reference_ITRF_R.reshape(self.NUM_ANTENNAS, 3) + + # a delay calculator + self.delay_calculator = delay_calculator(reference_itrf) + + # relatvie positions of each antenna + self.relative_antenna_positions = antenna_itrf - reference_itrf + + # use all antennas unless specified otherwise + self._antenna_select = numpy.ones((self.NUM_ANTENNAS, self.NUM_BEAMLETS), dtype=numpy.bool_) + + # -------- + # internal functions + # -------- + + @TimeIt() + def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime): + """ + Calculate the delays (in seconds) based on the pointing list and the timestamp, + + Returns delays[antenna][beamlet] + """ + + delays = numpy.zeros((self.NUM_ANTENNAS, self.NUM_BEAMLETS), dtype=numpy.float64) + + d = self.delay_calculator + d.set_measure_time(timestamp) + + # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions + delays = d.convert_bulk(pointing_direction, self.relative_antenna_positions) + + return delays + + def _map_antennas_on_fpga_inputs(self, arr): + """ Converts an array with dimensions [antenna][beamlet] -> [fpga_nr][input_nr][pol_nr][beamlet] + by repeating the values for both polarisations. """ + + assert arr.shape == (self.NUM_ANTENNAS, self.NUM_BEAMLETS) + + # Each antenna maps on [fpga_nr][input_nr][0] and [fpga_nr][input_nr][1], and we work + # with [antenna][beamlet], so we have to interleave copies of the input array per beamlet + + # double the delays to cover both polarisations + # [[1,2,3], [4,5,6]] -> [[1,2,3,1,2,3], [4,5,6,4,5,6]] + result = numpy.hstack((arr,arr)) + + # move doubling of last dimension into first + # [[1,2,3,1,2,3], [4,5,6,4,5,6]] -> [[1,2,3], [1,2,3], [4,5,6], [4,5,6]] + result = result.reshape(result.shape[0] * 2, result.shape[1] // 2) + + # cast into (fpga_nr, [input_nr][pol_nr][beamlet]) + result = result.reshape((Beamlet.N_pn, Beamlet.A_pn * Beamlet.N_pol * Beamlet.N_beamlets_ctrl)) + + return result + + def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): + """ + Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) + """ + + # Retrieve delays from casacore + antenna_delays = self._delays(pointing_direction, timestamp) + + # Map them onto the FPGA inputs + fpga_delays = self._map_antennas_on_fpga_inputs(antenna_delays) + + beam_weights = self.beamlet_proxy.calculate_bf_weights(fpga_delays.flatten()) + beam_weights = beam_weights.reshape((Beamlet.N_pn, Beamlet.A_pn * Beamlet.N_pol * Beamlet.N_beamlets_ctrl)) + + # Filter out unwanted antennas + beam_weights *= self._map_antennas_on_fpga_inputs(self._antenna_select) + + # Write weights to SDP + self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = beam_weights + + # Record where we now point to, now that we've updated the weights. + # Only record pointings per beamlet, not which antennas took part + self._pointing_direction_r = pointing_direction + self._pointing_timestamp_r[:] = timestamp.timestamp() + + logger.info("Pointing direction updated") + # -------- # Commands # -------- diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py index 914c84441b2170a68cba9b0372639ea25bc76e4d..c5b2968d9715779907834de2d561fda82c842b90 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py @@ -32,6 +32,13 @@ class SDP(opcua_device): # Device Properties # ----------------- + AntennaType = device_property( + doc='Antenna type (LBA or HBA) we control', + dtype='DevString', + mandatory=False, + default_value = "HBA" + ) + TR_fpga_mask_RW_default = device_property( dtype='DevVarBooleanArray', mandatory=False, @@ -89,6 +96,12 @@ class SDP(opcua_device): default_value=[[8192] * 12 * 512] * 16 ) + clock_RW_default = device_property( + dtype='DevULong', + mandatory=False, + default_value = 200 * 1000000 + ) + translator_default_settings = [ 'TR_fpga_mask_RW' ] @@ -173,6 +186,63 @@ class SDP(opcua_device): FPGA_bst_offload_bsn_R = attribute_wrapper(comms_annotation=["FPGA_bst_offload_bsn_R"], datatype=numpy.int64, dims=(N_pn, N_beamsets_ctrl)) + antenna_type_R = attribute(doc='Type of antenna (LBA or HBA) attached to the FPGAs', + dtype=numpy.str, fget=lambda self: self.AntennaType) + nyquist_zone_R = attribute(doc='Nyquist zone of the input frequencies', + dtype=numpy.uint32, fisallowed="is_attribute_wrapper_allowed", + polling_period=1000, abs_change=1) + clock_RW = attribute(doc='Configured sampling clock (Hz)', + dtype=numpy.uint32, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_wrapper_allowed", + polling_period=1000, abs_change=1) + + def _nyquist_zone(self, clock): + """ Return the Nyquist zone for the given clock (in Hz). + + The Nyquist zone determines the frequency offset of + the antennas. + + NOTE: Only 160 and 200 MHz clocks are supported. """ + + # (AntennaType, clockMHz) -> Nyquist zone + nyquist_zones = { + ("LBA", 160): 0, + ("LBA", 200): 0, + ("HBA", 160): 1, + ("HBA", 200): 2, + } + + try: + return nyquist_zones[(self.AntennaType), clock // 1000000] + except KeyError: + raise ValueError(f"Could not determine Nyquist zone for antenna type {self.AntennaType} with clock {clock} Hz") + + def read_nyquist_zone_R(self): + try: + return self._nyquist_zone(self.read_attribute("clock_RW")) + except ValueError: + # Supply a sane default for computations in tests until L2SDP-725 allows us to read back the set clock + return 0 + + def read_clock_RW(self): + # We can only return a single value, so we assume the FPGA is configured coherently. Which is something + # that is to be checked by an independent monitoring system anyway. + mask = self.read_attribute("TR_fpga_mask_RW") + clocks = self.read_attribute("FPGA_pps_expected_cnt_RW") + clocks_in_mask = [clock for idx,clock in enumerate(clocks) if mask[idx]] + + # We return first setting within the mask. If there are no FPGAs selected at all, just return a sane default. + return numpy.uint32(clocks_in_mask[0]) if clocks_in_mask else self.clock_RW_default + + def write_clock_RW(self, clock): + if clock not in (160*1000000, 200*1000000): + raise ValueError(f"Unsupported clock frequency: {clock}") + + # Tell all FPGAs to use this clock + self.proxy.FPGA_pps_expected_cnt_RW = [clock] * self.N_pn + + # Also update the packet headers + self.proxy.FPGA_sdp_info_nyquist_sampling_zone_index_RW = [self._nyquist_zone(clock)] * self.N_pn + # ---------- # Summarising Attributes # ---------- diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py index 784750cd0b01a2d7e144858af9f4eca39c6aee14..3a067aa9f6d090c0bdcb0d84f5f9766b6415791c 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py @@ -235,9 +235,8 @@ class XSTCollector(StatisticsCollector): # log if we're replacing a subband we were once recording previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot] if previous_subband_in_slot != fields.subband_index: - previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot]) - - if previous_subband_timestamp.timestamp() > 0: + if self.parameters["xst_timestamps"][subband_slot] > 0: + previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot]) logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.") def xst_values(self, subband_indices = None): diff --git a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py index 0c8eb401f9fd6be41677d0295e9463c6d1f8ebe9..ae86554f71871a5c7dd3774393677f27cf856eff 100644 --- a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py @@ -9,100 +9,41 @@ import numpy import datetime -from json import loads -from threading import Thread, Lock, Condition -from tango import AttrWriteType, DebugIt, DeviceProxy, DevVarStringArray, DevVarDoubleArray, DevString, DevSource -from tango.server import attribute, command, device_property +from tango import DeviceProxy, DevSource from tango import Util # Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions from tangostationcontrol.beam.delays import delay_calculator -from tangostationcontrol.devices.device_decorators import only_in_states, fault_on_error from tangostationcontrol.devices.beam_device import beam_device import logging logger = logging.getLogger() - -__all__ = ["TileBeam", "main", "BeamTracker"] +__all__ = ["TileBeam", "main"] @device_logging_to_python() class TileBeam(beam_device): + NUM_TILES = 96 # ----------------- # Device Properties # ----------------- - Beam_tracking_interval = device_property( - dtype='DevFloat', - doc='Beam weights updating interval time [seconds]', - mandatory=False, - default_value = 10.0 - ) - - Beam_tracking_preparation_period = device_property( - dtype='DevFloat', - doc='Preparation time [seconds] needed before starting update operation', - mandatory=False, - default_value = 0.25 - ) - # ---------- # Attributes # ---------- - Pointing_direction_R = attribute(access=AttrWriteType.READ, - dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96, - fget=lambda self: self._pointing_direction_r) - - Pointing_direction_RW = attribute(access=AttrWriteType.READ_WRITE, - dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96, - fget=lambda self: self._pointing_direction_rw) - - Pointing_direction_str_R = attribute(access=AttrWriteType.READ, - doc='Pointing direction as a formatted string', - dtype=(numpy.str,), max_dim_x=96, - fget=lambda self: ["{0} ({1}, {2})".format(*x) for x in self._pointing_direction_r]) - - Pointing_timestamp_R = attribute(access=AttrWriteType.READ, - dtype=(numpy.double,), max_dim_x=96, - fget=lambda self: self._pointing_timestamp_r) - - Tracking_enabled_R = attribute(access=AttrWriteType.READ, - doc="Whether the tile beam is updated periodically", - dtype=numpy.bool, - fget=lambda self: self.Beam_tracker.is_alive()) - - Tracking_enabled_RW = attribute(access=AttrWriteType.READ_WRITE, - doc="Whether the tile beam should be updated periodically", - dtype=numpy.bool, - fget=lambda self: self._tracking_enabled_rw) - # -------- # overloaded functions # -------- - def init_device(self): - super().init_device() - - # thread to perform beam tracking - self.Beam_tracker = None - @log_exceptions() def configure_for_initialise(self): - super().configure_for_initialise() - - # Initialise pointing array data and attribute - self._pointing_timestamp_r = numpy.zeros(96, dtype=numpy.double) - self._pointing_direction_r = numpy.zeros((96,3), dtype="<U32") - self._pointing_direction_rw = numpy.array([["AZELGEO","0deg","90deg"]] * 96, dtype="<U32") - - # Initialise tracking control - self._tracking_enabled_rw = True + super().configure_for_initialise(self.NUM_TILES) # Set a reference of RECV device that is correlated to this BEAM device util = Util.instance() @@ -113,71 +54,28 @@ class TileBeam(beam_device): # Retrieve positions from RECV device HBAT_reference_itrf = self.recv_proxy.HBAT_reference_itrf_R - HBAT_antenna_itrf_offsets = self.recv_proxy.HBAT_antenna_itrf_offsets_R.reshape(96,16,3) + HBAT_antenna_itrf_offsets = self.recv_proxy.HBAT_antenna_itrf_offsets_R.reshape(self.NUM_TILES, 16, 3) # a delay calculator for each tile self.HBAT_delay_calculators = [delay_calculator(reference_itrf) for reference_itrf in HBAT_reference_itrf] # absolute positions of each antenna element - self.HBAT_antenna_positions = [HBAT_reference_itrf[tile] + HBAT_antenna_itrf_offsets[tile] for tile in range(96)] - - # Create a thread object to update beam weights - self.Beam_tracker = BeamTracker(self) - - @log_exceptions() - def configure_for_on(self): - super().configure_for_on() - - # Start beam tracking thread - if self._tracking_enabled_rw: - self.Beam_tracker.start() - - @log_exceptions() - def configure_for_off(self): - if self.Beam_tracker: - # Stop thread object - self.Beam_tracker.stop() - - super().configure_for_off() + self.HBAT_antenna_positions = [HBAT_reference_itrf[tile] + HBAT_antenna_itrf_offsets[tile] for tile in range(self.NUM_TILES)] # -------- # internal functions # -------- - def write_Pointing_direction_RW(self, value): - """ Setter method for attribute Pointing_direction_RW """ - # verify whether values are valid - for tile in range(96): - if not self.HBAT_delay_calculators[tile].is_valid_direction(value[tile]): - raise ValueError(f"Invalid direction: {value[tile]}") - - self._pointing_direction_rw = value - - # force update across tiles if pointing changes - self.Beam_tracker.force_update() - logger.info("Pointing direction update requested") - - def write_Tracking_enabled_RW(self, value): - self._tracking_enabled_rw = value - - if value: - self.Beam_tracker.start() - else: - self.Beam_tracker.stop() - - def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): - """ - Calculate the delays (in seconds) based on the pointing list and the timestamp + def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime): """ + Calculate the delays (in seconds) based on the pointing list and the timestamp. - if not timestamp: - "B008 Do not perform function calls in argument defaults. The call" - "is performed only once at function definition time." - timestamp = datetime.datetime.now() + Returns delays[tile][element] + """ - delays = numpy.zeros((96,16), dtype=numpy.float64) + delays = numpy.zeros((self.NUM_TILES, 16), dtype=numpy.float64) - for tile in range(96): + for tile in range(self.NUM_TILES): # initialise delay calculator d = self.HBAT_delay_calculators[tile] d.set_measure_time(timestamp) @@ -187,23 +85,18 @@ class TileBeam(beam_device): return delays - def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): + def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): """ Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) """ - if not timestamp: - "B008 Do not perform function calls in argument defaults. The call" - "is performed only once at function definition time." - timestamp = datetime.datetime.now() - # Retrieve delays from casacore delays = self._delays(pointing_direction, timestamp) # Convert delays into beam weights delays = delays.flatten() HBAT_bf_delay_steps = self.recv_proxy.calculate_HBAT_bf_delay_steps(delays) - HBAT_bf_delay_steps = numpy.array(HBAT_bf_delay_steps, dtype=numpy.int64).reshape(96,32) + HBAT_bf_delay_steps = numpy.array(HBAT_bf_delay_steps, dtype=numpy.int64).reshape(self.NUM_TILES, 32) # Write weights to RECV self.recv_proxy.HBAT_BF_delay_steps_RW = HBAT_bf_delay_steps @@ -211,7 +104,7 @@ class TileBeam(beam_device): # Record where we now point to, now that we've updated the weights. # Only the entries within the mask have been updated mask = self.recv_proxy.ANT_mask_RW.flatten() - for rcu in range(96): + for rcu in range(self.NUM_TILES): if mask[rcu]: self._pointing_direction_r[rcu] = pointing_direction[rcu] self._pointing_timestamp_r[rcu] = timestamp.timestamp() @@ -221,174 +114,9 @@ class TileBeam(beam_device): # Commands # -------- - @command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray) - @DebugIt() - @log_exceptions() - @only_in_states(beam_device.DEFAULT_COMMAND_STATES) - def delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): - """ - Calculate the delays (in seconds) based on the pointing list and the timestamp - TBD: antenna and reference positions will be retrieved from RECV and not stored as BEAM device properties - """ - - if not timestamp: - "B008 Do not perform function calls in argument defaults. The call" - "is performed only once at function definition time." - timestamp = datetime.datetime.now() - - pointing_direction = numpy.array(pointing_direction).reshape(96,3) - - delays = self._delays(pointing_direction, timestamp) - - return delays.flatten() - - @command(dtype_in=DevVarStringArray) - @DebugIt() - @log_exceptions() - @only_in_states(beam_device.DEFAULT_COMMAND_STATES) - def set_pointing(self, pointing_direction: list, timestamp: datetime.datetime = None): - """ - Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) - """ - - if not timestamp: - "B008 Do not perform function calls in argument defaults. The call" - "is performed only once at function definition time." - timestamp = datetime.datetime.now() - - # Reshape the flatten input array - pointing_direction = numpy.array(pointing_direction).reshape(96,3) - - self._set_pointing(pointing_direction, timestamp) - - @command(dtype_in = DevString) - @DebugIt() - @only_in_states(beam_device.DEFAULT_COMMAND_STATES) - def set_pointing_for_specific_time(self, parameters: DevString = None): - """ - Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) - for the given timestamp - - Parameters are given in a json document. - pointing_direction: array of 96 casacore pointings (3 string parameters) - timestamp: POSIX timestamp - """ - pointing_parameters = loads(parameters) - - pointing_direction = list(pointing_parameters["pointing_direction"]) - timestamp_param = float(pointing_parameters["timestamp"]) - timestamp = datetime.datetime.fromtimestamp(timestamp_param) - - # Reshape the flatten pointing array - pointing_direction = numpy.array(pointing_direction).reshape(96,3) - - self._set_pointing(pointing_direction, timestamp) - - # ---------- # Run server # ---------- def main(**kwargs): """Main function of the ObservationControl module.""" return entry(TileBeam, **kwargs) - -# ---------- -# Beam Tracker -# ---------- -class BeamTracker(): - - DISCONNECT_TIMEOUT = 3.0 - - """ Object that encapsulates a Thread, resposible for beam tracking operations """ - def __init__(self, device: beam_device): - self.thread = None - self.device = device - - # Condition to trigger a forced update or early abort - self.update_lock = Lock() - self.update_condition = Condition(self.update_lock) - - # Whether the pointing has to be forced updated - self.stale_pointing = True - - def start(self): - """ Starts the Beam Tracking thread """ - if self.thread: - # already started - return - - self.done = False - self.thread = Thread(target=self._update_pointing_direction, name=f"BeamTracker of {self.device.get_name()}") - self.thread.start() - - logger.info("BeamTracking thread started") - - def is_alive(self): - """ Returns True just before the Thread run() method starts until just after the Thread run() method terminates. """ - return self.thread and self.thread.is_alive() - - def force_update(self): - """ Force the pointing to be updated. """ - - self.stale_pointing = True - self.notify_thread() - - def notify_thread(self): - # inform the thread to stop waiting - with self.update_lock: - self.update_condition.notify() - - def stop(self): - """ Stops the Beam Tracking loop """ - - if not self.thread: - return - - logger.info("BeamTracking thread stopping") - - self.done = True - self.force_update() - - # wait for thread to finish - self.thread.join(self.DISCONNECT_TIMEOUT) - - if self.is_alive(): - logger.error("BeamTracking Thread did not properly terminate") - - self.thread = None - - logger.info("BeamTracking thread stopped") - - def _get_sleep_time(self): - """ Computes the sleep time (in seconds) that needs to be waited for the next beam tracking update """ - now = datetime.datetime.now().timestamp() - - # Computes the left seconds before the next update - next_update_in = self.device.Beam_tracking_interval - (now % self.device.Beam_tracking_interval) - - # Computes the needed sleep time before the next update - sleep_time = next_update_in - self.device.Beam_tracking_preparation_period - # If sleep time is negative, add the tracking interval for the next update - if sleep_time < 0: - return sleep_time + self.device.Beam_tracking_interval - else: - return sleep_time - - # @fault_on_error routes errors here. we forward them to our device - def Fault(self, msg): - self.device.Fault(msg) - - @log_exceptions() - @fault_on_error() - def _update_pointing_direction(self): - """ Updates the beam weights using a fixed interval of time """ - - # Check if flag beamtracking is true - with self.update_lock: - while not self.done: - self.stale_pointing = False - self.device._set_pointing(self.device._pointing_direction_rw, datetime.datetime.now()) - - # sleep until the next update, or when interrupted (this releases the lock, allowing for notification) - # note that we need wait_for as conditions can be triggered multiple times in succession - self.update_condition.wait_for(lambda: self.done or self.stale_pointing, self._get_sleep_time()) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py index dcabcafc1f5e6f8dead16ffc4f913068b4309943..d76ee10e4f15664f176175a34c031c3008872b61 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py @@ -11,6 +11,11 @@ from .base import AbstractTestBases from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +import numpy +import numpy.testing +import time +from ctypes import c_short + class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): def setUp(self): @@ -30,3 +35,55 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): sdp_proxy.warm_boot() sdp_proxy.set_defaults() return sdp_proxy + + def test_pointing_to_zenith(self): + # Setup configuration + sdp_proxy = self.setup_sdp() + + self.proxy.initialise() + self.proxy.subband_select = [0] * 488 # no other value will be read back from SDP yet, see L2SDP-725 + self.proxy.on() + + # The subband frequency of HBA subband 0 is 200 MHz, + # so its period is 5 ns. A delay of 2.5e-9 seconds is thus half a period, + # and result in a 180 degree phase offset. + delays = numpy.array([[2.5e-9] * 192] * 488) + + calculated_bf_weights = self.proxy.calculate_bf_weights(delays.flatten()) + + # With a unit weight of 2**14, we thus expect beamformer weights of -2**14 + 0j, + # which is 49152 when read as an uint32. + self.assertEqual(-2**14, c_short(49152).value) # check our calculations + expected_bf_weights = numpy.array([49152] * 192 * 488, dtype=numpy.uint32) + + # disabled until L2SDP-725 is fixed, as the read back clock cannot be anything else than 0 + #numpy.testing.assert_almost_equal(expected_bf_weights, calculated_bf_weights) + + def test_sdp_clock_change(self): + # Setup configuration + sdp_proxy = self.setup_sdp() + + self.proxy.initialise() + self.proxy.subband_select = [0] * 488 # no other value will be read back from SDP yet, see L2SDP-725 + self.proxy.on() + + # any non-zero delay should result in different weights for different clocks + delays = numpy.array([[2.5e-9] * 192] * 488) + + sdp_proxy.clock_RW = 200 * 1000000 + time.sleep(1) # wait for beamlet device to process change event + calculated_bf_weights_200 = self.proxy.calculate_bf_weights(delays.flatten()) + + sdp_proxy.clock_RW = 160 * 1000000 + time.sleep(1) # wait for beamlet device to process change event + calculated_bf_weights_160 = self.proxy.calculate_bf_weights(delays.flatten()) + + sdp_proxy.clock_RW = 200 * 1000000 + time.sleep(1) # wait for beamlet device to process change event + calculated_bf_weights_200_v2 = self.proxy.calculate_bf_weights(delays.flatten()) + + # outcome should be changed back and forth across clock changes + # disabled until L2SDP-725 is fixed, as the read back clock cannot be anything else than 0 + #numpy.testing.assert_((calculated_bf_weights_200 != calculated_bf_weights_160).all()) + #numpy.testing.assert_((calculated_bf_weights_200 == calculated_bf_weights_200_v2).all()) + diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index c023f256b1bdcb8cf5a935053fcba493fe2546dd..3a3fa57eff087ac3a04650731ab2ef3700104f8a 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -7,10 +7,47 @@ # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. + +from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy + from .base import AbstractTestBases +import numpy + class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): def setUp(self): """Intentionally recreate the device object in each test""" super().setUp("STAT/DigitalBeam/1") + + self.recv_proxy = self.setup_recv_proxy() + self.beamlet_proxy = self.setup_beamlet_proxy() + + def setup_recv_proxy(self): + recv_proxy = TestDeviceProxy("STAT/RECV/1") + recv_proxy.off() + recv_proxy.warm_boot() + recv_proxy.set_defaults() + return recv_proxy + + def setup_beamlet_proxy(self): + beamlet_proxy = TestDeviceProxy("STAT/Beamlet/1") + beamlet_proxy.off() + beamlet_proxy.warm_boot() + beamlet_proxy.set_defaults() + return beamlet_proxy + + def test_pointing_to_zenith(self): + # Setup beamlet configuration + self.beamlet_proxy.clock_RW = 200 * 1000000 + self.beamlet_proxy.subband_select = list(range(488)) + + self.proxy.initialise() + self.proxy.Tracking_enabled_RW = False + self.proxy.on() + + # Point to Zenith + # + # Cannot test what was written to SDP due to L2SDP-725 + # so we just call the command, checking whether no exception is thrown + self.proxy.set_pointing(numpy.array([["AZELGEO","0deg","90deg"]] * 488).flatten()) diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py index c481d085e00170cfa4771c4af0c79dcc675460b5..b70446c30d21714b46d1e96f8cf62ac65a509103 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py @@ -27,6 +27,12 @@ class receiver: # add payload to the header, and return the full packet return header + payload + def _read(self, length: int) -> bytes: + """ Low-level read function to fetch at most "length" (>1) bytes. Returns + nothing if there is no data left. """ + + return os.read(self.fd, length) + def read_data(self, data_length: int) -> bytes: """ Read exactly data_length bytes from the TCP connection. """ @@ -35,7 +41,7 @@ class receiver: # try to read the remainder. # NOTE: recv() may return less data than requested, and returns 0 # if there is nothing left to read (end of stream) - more_data = os.read(self.fd, data_length - len(data)) + more_data = self._read(data_length - len(data)) if not more_data: # connection got dropped raise EOFError("End of stream") @@ -54,6 +60,10 @@ class tcp_receiver(receiver): super().__init__(fd=self.sock.fileno()) + def _read(self, length): + # On Windows, we cannot use os.read to read from sockets + return self.sock.recv(length) + def reconnect(self): self.fd = None self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py index b984660d677c81349c134d9be7c7441924253fc6..f2c537aafea96a8c28388bff40637bd8ca5eea73 100644 --- a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py +++ b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py @@ -1,12 +1,13 @@ import datetime +import time +import logging +import numpy +import numpy.testing from tangostationcontrol.beam.delays import delay_calculator from tangostationcontrol.test import base - - - class TestDelays(base.TestCase): def test_init(self): """ @@ -94,7 +95,27 @@ class TestDelays(base.TestCase): # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions. delays = d.convert(direction, antenna_itrf) - self.assertListEqual(delays, [0.0], msg=f"delays = {delays}") + self.assertListEqual(delays.tolist(), [0.0], msg=f"delays = {delays}") + + def test_regression(self): + reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5 + d = delay_calculator(reference_itrf) + + # set the antenna position identical to the reference position + antenna_itrf = [[3826923.503, 460915.488, 5064643.517]] # CS001LBA, in ITRF2005 epoch 2012.5 + + # # set the timestamp to solve for + timestamp = datetime.datetime(2000, 1, 1, 0, 0, 0) + d.set_measure_time(timestamp) + + # # obtain the direction vector for a specific pointing + direction = "J2000", "0deg", "90deg" + + # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions. + delays = d.convert(direction, antenna_itrf) + + # check for regression + self.assertAlmostEqual(-8.31467564781444e-07, delays[0], delta=1.0e-10) def test_light_second_delay(self): """ @@ -123,3 +144,39 @@ class TestDelays(base.TestCase): self.assertAlmostEqual(0.1, delays[0], 6, f"delays[0] = {delays[0]}") + + def test_convert_bulk(self): + d = delay_calculator([0, 0, 0]) + timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that. + d.set_measure_time(timestamp) + + # generate different positions and directions + positions = numpy.array([[i,2,3] for i in range(5)]) + directions = numpy.array([["J2000", f"{i}deg", f"{i}deg"] for i in range(90)]) + + bulk_result = d.convert_bulk(directions, positions) + + # verify parallellisation along direction axis + for i, single_dir in enumerate(directions): + single_dir_result = d.convert_bulk([single_dir], positions) + numpy.testing.assert_equal(single_dir_result[:, 0], bulk_result[:, i]) + + # verify parallellisation along position axis + for i, single_pos in enumerate(positions): + single_pos_result = d.convert_bulk(directions, [single_pos]) + numpy.testing.assert_equal(single_pos_result[0, :], bulk_result[i, :]) + + def test_convert_bulk_speed(self): + d = delay_calculator([0, 0, 0]) + timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that. + d.set_measure_time(timestamp) + + positions = numpy.array([[1,2,3]] * 96) + directions = numpy.array([["J2000", "0deg", "0deg"]] * 488) + + count = 10 + before = time.monotonic_ns() + for _ in range(count): + _ = d.convert_bulk(directions, positions) + after = time.monotonic_ns() + logging.error(f"convert_bulk averaged {(after - before)/count/1e6} ms to convert 488 directions for 96 antennas.") diff --git a/tangostationcontrol/tangostationcontrol/test/beam/test_geo.py b/tangostationcontrol/tangostationcontrol/test/beam/test_geo.py index 5694376be684b96e74a6197e5e8c3d13f3df6d39..22def29f3a0ba17b627bd1e6308256f826bc1c9a 100644 --- a/tangostationcontrol/tangostationcontrol/test/beam/test_geo.py +++ b/tangostationcontrol/tangostationcontrol/test/beam/test_geo.py @@ -7,7 +7,7 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. -from tangostationcontrol.beam.geo import ETRS_to_ITRF, ETRS_to_GEO +from tangostationcontrol.beam.geo import ETRS_to_ITRF, ETRS_to_GEO, GEO_to_GEOHASH from tangostationcontrol.test import base @@ -70,3 +70,29 @@ class TestETRS_to_GEO(base.TestCase): LOFAR1_CS001_LBA_GEO = [52.911, 6.868] numpy.testing.assert_almost_equal(CS001_LBA_GEO, LOFAR1_CS001_LBA_GEO, decimal=3) + +class TestGEO_to_GEOHASH(base.TestCase): + def test_convert_single_coordinate(self): + """ Convert a single coordinate. """ + GEO_coords = numpy.array([1.0, 1.0]) + GEOHASH_coords = GEO_to_GEOHASH(GEO_coords) + + self.assertEqual(str, type(GEOHASH_coords)) + + def test_convert_array(self): + """ Convert an array of coordinates. """ + GEO_coords = numpy.array([ [1.0, 1.0], [2.0, 2.0], [3.0, 3.0] ]) + GEOHASH_coords = GEO_to_GEOHASH(GEO_coords) + + self.assertEqual((3,), GEOHASH_coords.shape) + + def test_CS001_LBA_regression(self): + """ Verify if the calculated CS001LBA phase center match fixed values, to detect changes in computation. """ + + CS001_LBA_GEO = [52.911, 6.868] + + # Convert to GEO + CS001_LBA_GEOHASH = GEO_to_GEOHASH(numpy.array(CS001_LBA_GEO)) + + # verify against precomputed value + self.assertEqual('u1kvh21hgvrcpm28', CS001_LBA_GEOHASH) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py new file mode 100644 index 0000000000000000000000000000000000000000..81b34f00e8135563b22af10bb72d53ea7fceab13 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from tangostationcontrol.devices.sdp.beamlet import Beamlet + +import numpy +import numpy.testing +from ctypes import c_short + +from tangostationcontrol.test import base + +# unpack into 16-bit complex +def to_complex(uint32): + imag = c_short(uint32 >> 16).value + real = c_short(uint32 & 0xFFFF).value + unit = Beamlet.BF_UNIT_WEIGHT + + return (real + 1j * imag) / unit + +class TestBeamletDevice(base.TestCase): + def test_phases_to_bf_weights(self): + # offer nice 0, 90, 180, 270, 360 degrees + phases = numpy.array([0.0, numpy.pi / 2, numpy.pi, numpy.pi * 1.5, numpy.pi * 2]) + unit = Beamlet.BF_UNIT_WEIGHT + + bf_weights = Beamlet._phases_to_bf_weights(phases) + + # check whether the complex representation is also along the right axes and + # has the right amplitude + self.assertEqual(to_complex(bf_weights[0]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[1]), 0 + 1j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[2]), -1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[3]), 0 - 1j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[4]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + + def test_calculate_bf_weights_small_numbers(self): + # 2 beamlets, 3 antennas. The antennas are 1 second apart. + delays = numpy.array([ + [1.0, 2.0, 3.0], + [1.0, 2.0, 3.0], + ]) + + # the frequency of the signal is 1.0 Hz and 0.5 Hz respectively, + # so the antennas will be either in phase or in opposite phase + beamlet_frequencies = numpy.array([ + [1.0, 1.0, 1.0], + [0.5, 0.5, 0.5], + ]) + + bf_weights = Beamlet._calculate_bf_weights(delays, beamlet_frequencies) + + self.assertEqual(delays.shape, bf_weights.shape) + + self.assertEqual(to_complex(bf_weights[0][0]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[0][1]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[0][2]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[1][0]), -1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[1][1]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[1][2]), -1 + 0j, msg=f"bf_weights = {bf_weights}") + + def test_calculate_bf_weights_actual_numbers(self): + # 2 beamlets, 3 antennas. The antennas are 1 second apart. + delays = numpy.array([ + [0.0, 2.5e-9, 5.0e-9] + ]) + + # the frequency of the signal is 1.0 Hz and 0.5 Hz respectively, + # so the antennas will be either in phase or in opposite phase + beamlet_frequencies = numpy.array([ + [200e6, 200e6, 200e6] + ]) + + bf_weights = Beamlet._calculate_bf_weights(delays, beamlet_frequencies) + + self.assertEqual(delays.shape, bf_weights.shape) + + self.assertEqual(to_complex(bf_weights[0][0]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[0][1]), -1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[0][2]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + + def test_subband_frequencies(self): + subbands = numpy.array([ + [0, 1, 102], + ]) + + clocks = numpy.array([ + 200 * 1000000, + 160 * 1000000 + ]) + + subband_width = 200e6 / 1024 + + # for reference values, see https://proxy.lofar.eu/rtsm/tests/ + + lba_frequencies = Beamlet._subband_frequencies(subbands, 160 * 1000000, 0) + self.assertAlmostEqual(lba_frequencies[0][0], 0.0000000e6) + self.assertAlmostEqual(lba_frequencies[0][1], 0.1562500e6) + self.assertAlmostEqual(lba_frequencies[0][2], 15.9375000e6) + + lba_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, 0) + self.assertAlmostEqual(lba_frequencies[0][0], 0.0000000e6) + self.assertAlmostEqual(lba_frequencies[0][1], 0.1953125e6) + self.assertAlmostEqual(lba_frequencies[0][2], 19.9218750e6) + + # Nyquist zone 1 is not used in 160 MHz + + hba_low_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, 1) + self.assertAlmostEqual(hba_low_frequencies[0][0], 100.0000000e6) + self.assertAlmostEqual(hba_low_frequencies[0][1], 100.1953125e6) + self.assertAlmostEqual(hba_low_frequencies[0][2], 119.9218750e6) + + hba_high_frequencies = Beamlet._subband_frequencies(subbands, 160 * 1000000, 2) + self.assertAlmostEqual(hba_high_frequencies[0][0], 160.0000000e6) + self.assertAlmostEqual(hba_high_frequencies[0][1], 160.1562500e6) + self.assertAlmostEqual(hba_high_frequencies[0][2], 175.9375000e6) + + hba_high_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, 2) + self.assertAlmostEqual(hba_high_frequencies[0][0], 200.0000000e6) + self.assertAlmostEqual(hba_high_frequencies[0][1], 200.1953125e6) + self.assertAlmostEqual(hba_high_frequencies[0][2], 219.9218750e6) +