diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e2176cfed59fae5f751239ae89910d4ec04c5c0f..b46d8eb28323be9801f1f9dad55d21bea7ab44a8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -471,6 +471,7 @@ docker_build_device_base: - docker_build_device_base tags: - integration_tests + - dind image: docker:latest variables: JUMPPAD_HOME: $CI_PROJECT_DIR @@ -499,8 +500,13 @@ docker_build_device_base: - . setup.sh || true # TANGO_HOST must be unset our databaseds will be unreachable - unset TANGO_HOST + # do not conflate STATION gitlab variable with STATION setting for levant rendering + - unset STATION - export TAG="$tag" - bash $CI_PROJECT_DIR/sbin/tag_and_push_docker_image.sh pull $tag + after_script: + # show infrastructure logs + - grep ERROR log/* .jumppad/logs/* || true artifacts: when: always paths: diff --git a/README.md b/README.md index db0315e908d37d4a07e73d713c2470077f17e7fb..871bfe24ef0a2bf2a1b16e3ac9108ebcb7929305 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,7 @@ Next change the version in the following places: through [https://git.astron.nl/lofar2.0/tango/-/tags](Deploy Tags) # Release Notes +* 0.51.3 Mirror central calibration tables into Calibration device using a volume and sync nomad job. * 0.51.2 Allow automated deployment from a multi-project pipeline in Gitlab CI/CD * 0.51.1 Generate caltables from LOFAR1 data or by generating dummies * 0.51.0 Add Calibration service to gRPC server diff --git a/infra/dev/nomad/config/nomad/client.hcl b/infra/dev/nomad/config/nomad/client.hcl index d68fc99e0ba6043cc7cbf5eaa5091ada095b027f..00faa22d82e6c1d98fdb49216a74f3d846078592 100644 --- a/infra/dev/nomad/config/nomad/client.hcl +++ b/infra/dev/nomad/config/nomad/client.hcl @@ -47,6 +47,10 @@ client { path = "/localdata/volumes/object-storage-data" } + host_volume "caltables" { + path = "/localdata/volumes/caltables" + } + host_volume "IERS" { path = "/localdata/volumes/IERS-data" } diff --git a/infra/dev/tango/tango.hcl b/infra/dev/tango/tango.hcl index 7f3a610d9c6777286807653d601f158ae2a27dce..dc6387640260982c9bfedcf9317cff4806b335ba 100644 --- a/infra/dev/tango/tango.hcl +++ b/infra/dev/tango/tango.hcl @@ -48,10 +48,10 @@ resource "exec" "init-object-storage" { minio/mc \ -c "mc alias set object-storage http://s3.service.consul:9000 minioadmin minioadmin echo 'Initialising caltables' - mc mb object-storage/caltables + mc mb -p object-storage/caltables mc cp --recursive /opt/seed-data/object-storage/caltables/ object-storage/caltables/ echo 'Initialising IERS tables' - mc mb object-storage/iers + mc mb -p object-storage/iers mc cp --recursive /opt/seed-data/object-storage/iers/ object-storage/iers/ date +'%F %T' echo 'Initialisation completed'" diff --git a/infra/jobs/station/device-server.levant.nomad b/infra/jobs/station/device-server.levant.nomad index b02b2c7c1e2bfb61ce8369790572226851ce9799..83f2401b0a576c3f967f36476ae9911de1ff365e 100644 --- a/infra/jobs/station/device-server.levant.nomad +++ b/infra/jobs/station/device-server.levant.nomad @@ -9,22 +9,39 @@ job "device-servers" { delay_function = "constant" } - group "sync-IERS" { + group "sync-from-minio" { count = 1 network { mode = "bridge" - port "metrics" { + port "metrics-sync-caltables" { to = 8081 host_network = "station" } + + port "metrics-sync-IERS" { + to = 8082 + host_network = "station" + } + } + + service { + tags = ["scrape"] + name = "sync-caltables" + port = "metrics-sync-caltables" } service { tags = ["scrape"] name = "sync-iers" - port = "metrics" + port = "metrics-sync-IERS" + } + + volume "caltables" { + type = "host" + read_only = false + source = "caltables" } volume "IERS" { @@ -33,6 +50,28 @@ job "device-servers" { source = "IERS" } + task "sync-caltables" { + driver = "docker" + + volume_mount { + volume = "caltables" + destination = "/opt/caltables" + read_only = false + } + + config { + image = "[[ $.registry.astron.url ]]/mc:latest" + entrypoint = [""] + command = "/bin/bash" + args = ["-c", "mc alias set object-storage http://s3.service.consul:9000 [[.object_storage.user.name]] [[.object_storage.user.pass]] && mc mirror --preserve --watch object-storage/caltables/ /opt/caltables/ --monitoring-address 0.0.0.0:8082" ] + } + + resources { + cpu = 10 + memory = 128 + } + } + task "sync-IERS" { driver = "docker" @@ -65,6 +104,14 @@ job "device-servers" { mode = "cni/station" } + [[ if eq $name "calibration" ]] + volume "caltables" { + type = "host" + read_only = true + source = "caltables" + } + [[ end ]] + volume "IERS" { type = "host" read_only = true @@ -109,6 +156,14 @@ job "device-servers" { task "device-[[ $name ]]" { driver = "docker" + [[ if eq $name "calibration" ]] + volume_mount { + volume = "caltables" + destination = "/opt/caltables" + read_only = true + } + [[ end ]] + volume_mount { volume = "IERS" destination = "/opt/IERS" @@ -129,8 +184,6 @@ job "device-servers" { TANGO_HOST = "tango.service.consul:10000" TANGO_ZMQ_EVENT_PORT = "4505" TANGO_ZMQ_HEARTBEAT_PORT = "4506" - MINIO_ROOT_USER = "minioadmin" - MINIO_ROOT_PASSWORD = "minioadmin" [[ if eq $.station "dev" ]] DEBUG_HOST = "[[ $.debug_host ]]" [[ end ]] diff --git a/infra/station/nomad.yml b/infra/station/nomad.yml index c35323ac87dc5828a1077744432f03050e6f3c82..f326e101eee73fa09b19e5f88381a715ceed504f 100644 --- a/infra/station/nomad.yml +++ b/infra/station/nomad.yml @@ -41,6 +41,7 @@ - monitoring-loki-data - tango-database - object-storage-data + - caltables - IERS-data - jupyter-notebooks diff --git a/infra/station/nomad/nomad.hcl.j2 b/infra/station/nomad/nomad.hcl.j2 index 45b83a6b0a2caf6698d113e1f6f1c613d279d77e..171adbeb00ca48dc20710f4e850a7763e273cc21 100644 --- a/infra/station/nomad/nomad.hcl.j2 +++ b/infra/station/nomad/nomad.hcl.j2 @@ -73,6 +73,10 @@ client { path = "/var/lib/station-data/IERS-data" } + host_volume "caltables" { + path = "/var/lib/station-data/caltables" + } + host_volume "jupyter-notebooks" { path = "/var/lib/station-data/jupyter-notebooks" } diff --git a/requirements.txt b/requirements.txt index 46838872a74d30834220a3436c4073a4fc8d5721..d9ef5aae97dbe2ff992a1a9e2733f5502742f7ba 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,7 +19,6 @@ etrs-itrs@git+https://github.com/brentjens/etrs-itrs # Apache 2 lofarantpos >= 0.5.0 # Apache 2 python-geohash >= 0.8.5 # Apache 2 / MIT attributewrapper@git+https://git.astron.nl/lofar2.0/attributewrapper # Apache2 -minio >= 7.1.14 # Apache 2 prometheus-client # Apache 2 dnspython # ISC logfmter # MIT diff --git a/sbin/prepare_dev_env.sh b/sbin/prepare_dev_env.sh index 7e1d51cbf8eb6ffe6807f0a8a5b157efcf22f8aa..cf1f737eb2bb22c61a103dfa5bc7b72ba7f5b04f 100755 --- a/sbin/prepare_dev_env.sh +++ b/sbin/prepare_dev_env.sh @@ -80,6 +80,7 @@ docker run --rm -i -v "$docker_volume":/mnt bash bash <<- EOM mkdir -p /mnt/volumes/monitoring-prometheus-data mkdir -p /mnt/volumes/object-storage-data mkdir -p /mnt/volumes/jupyter-notebooks + mkdir -p /mnt/volumes/caltables mkdir -p /mnt/volumes/IERS-data mkdir -p /mnt/volumes/caltables chmod 0777 -R /mnt/volumes diff --git a/sbin/run_integration_test.sh b/sbin/run_integration_test.sh index 1fe7928131e9cd2a0f0e770e15ffbb1233d34dab..9bc76b9b6ce2128adc852859351ee20bd4e1b455 100755 --- a/sbin/run_integration_test.sh +++ b/sbin/run_integration_test.sh @@ -163,6 +163,25 @@ function save_logs { echo "Failed to save dsconfig dump" } + echo "Saving job status" + { + NAMESPACES=$(docker exec server.station.nomad.nomad-cluster.local.jmpd.in nomad namespace list -json | jq -r ".[].Name") + + for namespace in ${NAMESPACES}; do + echo "---- Jobs in namespace ${namespace}" + docker exec server.station.nomad.nomad-cluster.local.jmpd.in nomad job status -namespace "${namespace}" + done + } > log/nomad-job-status.log || { + echo "Obtaining nomad job status failed" + } + + echo "Saving service health status" + { + docker exec consul.nomad.container.local.jmpd.in wget -O - http://127.0.0.1:8500/v1/health/state/any | jq '.[] | {ServiceName, Status, Output}' -c + } > log/consul-health-status.log || { + echo "Obtaining consul service status failed" + } + { mkdir -p log/allocations # obtain (and save!) the list of tasks, but only those that have actually started. diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 969eb252497ceec1593a4b037de294d89fa0996e..cf2529d2b3aac5e8dedb7aa71021e35893139ddb 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.51.2 +0.51.3 diff --git a/tangostationcontrol/common/calibration.py b/tangostationcontrol/common/calibration.py index 2fe724d4472e39eca8f8e738bc6e0c15ab082e28..d0172781a41966a04fb4f380afedc333fdd2e3b3 100644 --- a/tangostationcontrol/common/calibration.py +++ b/tangostationcontrol/common/calibration.py @@ -1,17 +1,13 @@ # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 import logging -import os -import tempfile +from pathlib import Path from typing import Dict -from urllib.parse import urlparse import numpy from lofar_lotus.file_access import member, attribute, read_hdf5 -from minio import Minio from tango import DeviceProxy -from tangostationcontrol.common import consul from tangostationcontrol.common.constants import ( N_subbands, N_pol, @@ -57,56 +53,9 @@ class CalibrationManager: def url(self, new_url): self._url = new_url - def __init__(self, url: str, station_name: str): - self._url = url + def __init__(self, caltable_dir: Path, station_name: str): + self._caltable_dir = caltable_dir self._station_name = station_name - self._tmp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) - self.bucket_name = "caltables" - self.prefix = self._station_name - self._init_minio() - self.sync_calibration_tables() - - def _init_minio(self): - result = urlparse(self._url) - bucket_name, prefix, *_ = result.path[1:].split("/", 1) + [""] - if len(prefix) > 0: - self.prefix = "/".join([prefix.rstrip("/"), self._station_name]) - if len(bucket_name) > 0: - self.bucket_name = bucket_name - - logger.info(f"Derived {self.prefix=} {self.bucket_name=} from {self._url=}") - - service: consul.Service = next(consul.lookup_service(result.netloc)) - logger.info("Use service %s:%s", service.host, service.port) - self._storage = Minio( - f"{service.addr}:{service.port}", - access_key=os.getenv("MINIO_ROOT_USER"), - secret_key=os.getenv("MINIO_ROOT_PASSWORD"), - secure=result.scheme == "https", - ) - - def sync_calibration_tables(self): - """Download calibration tables from Minio server""" - logger.debug( - "Sync calibration tables from bucket %s with prefix %s/", - self.bucket_name, - self.prefix, - ) - objects = self._storage.list_objects(self.bucket_name, prefix=f"{self.prefix}/") - for obj in objects: - filename = os.path.basename(obj.object_name) - try: - self._storage.fget_object( - self.bucket_name, - obj.object_name, - os.path.join(self._tmp_dir.name, filename), - ) - except Exception as ex: - raise IOError( - f"Failed to download {self.bucket=} {obj.object_name=}" - ) from ex - - logger.info("Downloaded %s from %s", filename, obj.object_name) @staticmethod def _band_to_reference_frequency(is_hba, rcu_band): @@ -146,13 +95,13 @@ class CalibrationManager: def get_antenna_calibration(antenna_name: str, rcu_band: int): """Return the calibration values for the given antenna and RCU band.""" - calibration_filename = os.path.join( - self._tmp_dir.name, + calibration_filename: Path = self._caltable_dir.joinpath( + self._station_name, f"CalTable-{self._station_name}-{antenna_type}" f"-{self._band_to_reference_frequency(is_hba, rcu_band)}MHz.h5", ) logging.debug(f"Load calibration file {calibration_filename}") - f = read_hdf5(calibration_filename, CalibrationTable) + f = read_hdf5(str(calibration_filename), CalibrationTable) with f as table: # Retrieve data and convert them in the correct Tango attr shape if not device_name_matches( diff --git a/tangostationcontrol/devices/calibration.py b/tangostationcontrol/devices/calibration.py index a29f46c8dc98ad096385efcff88a7e24f4f895b7..aa349894adbdfb9b162b344f0acc880f01cc8072 100644 --- a/tangostationcontrol/devices/calibration.py +++ b/tangostationcontrol/devices/calibration.py @@ -4,6 +4,7 @@ """Calibration Device Server for LOFAR2.0""" import logging +from pathlib import Path from prometheus_client import Counter from tango import Database @@ -85,10 +86,6 @@ class Calibration(LOFARDevice): logger.warning("Device not active. Ignore AntennaField changed event") return - # make sure we have the latest tables - logger.debug("Syncing calibration tables") - self._calibration_manager.sync_calibration_tables() - # frequencies changed, so we need to recalibrate self._calibrate_antenna_field(device.name()) @@ -99,10 +96,6 @@ class Calibration(LOFARDevice): logger.warning("Device not active. Ignore clock changed event") return - # make sure we have the latest tables - logger.debug("Syncing calibration tables") - self._calibration_manager.sync_calibration_tables() - found = False for k, ant in self.ant_proxies.items(): # Recalibrate associated AntennaField @@ -118,12 +111,12 @@ class Calibration(LOFARDevice): f"Could not find any AntennaField to calibrate for clock change event from {device}" ) - Calibration_Table_Base_URL = device_property( - doc="Base URL of the calibration tables", + Calibration_Table_Dir = device_property( + doc="Directory holding the calibration tables", dtype="DevString", mandatory=False, update_db=True, - default_value="http://s3/caltables", + default_value="/opt/caltables", ) @attribute(dtype=(str,), max_dim_x=20) @@ -134,17 +127,6 @@ class Calibration(LOFARDevice): def SDPs_Monitored_R(self): return list(self.sdp_proxies.keys()) - @debugit() - @command() - @only_in_states(DEFAULT_COMMAND_STATES) - def download_calibration_tables(self): - """Download the latest calibration tables and apply them.""" - - self._calibration_manager.sync_calibration_tables() - - # Apply downloaded tables - self.calibrate_all() - @debugit() @command(dtype_in=str) @only_in_states(DEFAULT_COMMAND_STATES) @@ -224,7 +206,7 @@ class Calibration(LOFARDevice): station_name = self.control.read_parent_attribute("station_name_R") self._calibration_manager = CalibrationManager( - self.Calibration_Table_Base_URL, station_name + Path(self.Calibration_Table_Dir), station_name ) db = Database() @@ -279,6 +261,6 @@ class Calibration(LOFARDevice): def configure_for_on(self): # (Re)calibrate all antennafields, as we did not receive # any events yet. - self.download_calibration_tables() + self.calibrate_all() super().configure_for_on() diff --git a/tests/common/test_calibration.py b/tests/common/test_calibration.py index 8e7091bac789a9d802433e84e51657d763cef2ec..e416407d1891a364d8d59328105ee8b894ca4e8c 100644 --- a/tests/common/test_calibration.py +++ b/tests/common/test_calibration.py @@ -1,14 +1,12 @@ # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -import os -from os import path +from pathlib import Path from unittest.mock import patch, Mock, call, PropertyMock from tests import base import numpy from numpy.testing import assert_array_equal -from tangostationcontrol.common import consul from tangostationcontrol.common.calibration import ( delay_compensation, loss_compensation, @@ -19,61 +17,9 @@ from tangostationcontrol.common.constants import S_pn, N_subbands, N_pn, SDP_UNI from tangostationcontrol.common.sdp import complex_to_weights -class MockMinio: - def __init__(self, **kwargs): - self.args = kwargs - - -def new_lookup_service(*args, **kwargs): - yield consul.Service(host="test", port=9000, addr="test") - - -@patch("tangostationcontrol.common.calibration.Minio") -@patch("tangostationcontrol.common.consul.lookup_service", new=new_lookup_service) -@patch.dict( - os.environ, - {"MINIO_ROOT_USER": "my_user", "MINIO_ROOT_PASSWORD": "my_passwd"}, - clear=True, -) class TestCalibrationManager(base.TestCase): """Test class for Calibration Manager""" - def test_sync_calibration_tables(self, minio): - """Test whether calibration table files are correctly retrieved""" - minio.return_value.list_objects.return_value = [ - Mock(object_name="/unittest-station/file1.h5"), - Mock(object_name="/unittest-station/file2.h5"), - Mock(object_name="/unittest-station/file3.h5"), - ] - sut = CalibrationManager( - "http://server:1234/test_bucket/test_prefix", "unittest-station" - ) - minio.has_call_with( - "server:1234", access_key="my_user", secret_key="my_passwd", secure=False - ) - minio.return_value.list_objects.has_call_with( - "test_bucket", prefix="test_prefix/unittest-station/" - ) - minio.return_value.fget_object.assert_has_calls( - [ - call( - "test_bucket", - "/unittest-station/file1.h5", - path.join(sut._tmp_dir.name, "file1.h5"), - ), - call( - "test_bucket", - "/unittest-station/file2.h5", - path.join(sut._tmp_dir.name, "file2.h5"), - ), - call( - "test_bucket", - "/unittest-station/file3.h5", - path.join(sut._tmp_dir.name, "file3.h5"), - ), - ] - ) - def _setup_mock_antennafield(self): return Mock( Antenna_to_SDP_Mapping_R=numpy.array( @@ -114,19 +60,19 @@ class TestCalibrationManager(base.TestCase): caltable_mock = self._setup_mock_caltable() hdf_reader.return_value.__enter__.return_value = caltable_mock - sut = CalibrationManager("http://server:1234", "unittest-station") + sut = CalibrationManager(Path("/tmp"), "unittest-station") sut.calibrate_subband_weights(antenna_field_mock, sdp_mock) hdf_reader.assert_has_calls( [ call( - f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-200MHz.h5", + f"{sut._caltable_dir}/unittest-station/CalTable-unittest-station-HBA-200MHz.h5", CalibrationTable, ), call().__enter__(), call().load(caltable_mock.antennas["T1"]), call().__exit__(None, None, None), call( - f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-150MHz.h5", + f"{sut._caltable_dir}/unittest-station/CalTable-unittest-station-HBA-150MHz.h5", CalibrationTable, ), call().__enter__(), @@ -137,7 +83,7 @@ class TestCalibrationManager(base.TestCase): return subband_weights @patch("tangostationcontrol.common.calibration.read_hdf5") - def test_calibrate_subband_weights(self, hdf_reader, _): + def test_calibrate_subband_weights(self, hdf_reader): """Test whether calibration values are correctly applied to weights""" nyquist_zone = numpy.array([[0] * N_pn] * S_pn) subband_weights = self._test_calibration(nyquist_zone, hdf_reader) @@ -160,7 +106,7 @@ class TestCalibrationManager(base.TestCase): ) @patch("tangostationcontrol.common.calibration.read_hdf5") - def test_calibrate_reverse_order(self, hdf_reader, _): + def test_calibrate_reverse_order(self, hdf_reader): """Test whether calibration values are applied in decreasing order when subband frequencies are decreasing""" nyquist_zone = numpy.array(