diff --git a/README.md b/README.md index 3ad9b99583d7a1111367abd5ef430951be00502a..bfec2769a302e45020a91b0dd41d2644fac6580e 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,7 @@ Next change the version in the following places: through [https://git.astron.nl/lofar2.0/tango/-/tags](Deploy Tags) # Release Notes +* 0.47.1 Move GrafanaAPIV3 RPC interface to Opah repo * 0.47.0 Migrate from lofar-station-client to lofar-lotus package. Update various package dependencies. * 0.46.1 Include clock_RW in metadata JSON * 0.46.0 Expose latest BST/SST/XST from ZMQ over gRPC diff --git a/infra/jobs/station/Makefile b/infra/jobs/station/Makefile index a149954983a7cb6a6cc83ae1ec2b66cfd5b1f84a..286429a1b9fd011f80fbd493a7d11252c2e63388 100644 --- a/infra/jobs/station/Makefile +++ b/infra/jobs/station/Makefile @@ -21,4 +21,4 @@ render: $(JOBS) levant render $(addprefix -var-file=, $(realpath $(ENV))) -var debug_host="$(LOCAL_IP)" -var image_tag="$(TAG)" -var station="$(STATION)" -var region="$(REGION)" -out=$(realpath $(DIR_OUT))/$@ $(realpath $(DIR_SRC))/$< # if levant is missing variables, it silently fills in <no value> # see also https://github.com/hashicorp/levant/pull/308 - ! fgrep "<no value>" $@ + ! fgrep "<no value>" $(realpath $(DIR_OUT))/$@ diff --git a/integration_tests/default/devices/test_device_observation_field.py b/integration_tests/default/devices/test_device_observation_field.py index 79ea931cb85c8ca5f9acc5703bdbaf439aaa356f..f235787b480bc6e4b784bd1239acc4b04aa305a2 100644 --- a/integration_tests/default/devices/test_device_observation_field.py +++ b/integration_tests/default/devices/test_device_observation_field.py @@ -126,6 +126,11 @@ class TestDeviceObservationField(TestDeviceBase): ) self.tilebeam_proxy = self.setup_proxy("STAT/TileBeam/HBA0", defaults=True) + # load settings so tests (including those in super()) can + # use self.proxy.initialise() and self.proxy.on() + # immediately + self.proxy.observation_field_settings_RW = self.VALID_JSON + @staticmethod def antennafield_configure(proxy: TestDeviceProxy): power_mapping = [[1, i * 2 + 0] for i in range(CS001_TILES)] @@ -178,13 +183,16 @@ class TestDeviceObservationField(TestDeviceBase): def test_init_valid(self): """Initialize an observation with valid JSON""" - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() self.assertEqual(DevState.STANDBY, self.proxy.state()) def test_init_no_settings(self): """Initialize an observation with _invalid_ JSON""" + # Load invalid settings + with self.assertRaises(DevFailed): + self.proxy.observation_field_settings_RW = "{}" + # Cannot start without valid settings with self.assertRaises(DevFailed): self.proxy.Initialise() @@ -193,7 +201,7 @@ class TestDeviceObservationField(TestDeviceBase): self.assertEqual(DevState.FAULT, self.proxy.state()) def test_init_invalid(self): - """Initialize an observation with _invalid_ JSON""" + """Load an _invalid_ JSON""" # Cannot write invalid settings with self.assertRaises(DevFailed): @@ -204,7 +212,6 @@ class TestDeviceObservationField(TestDeviceBase): def test_prohibit_rewriting_settings(self): """Test that changing observation settings is disallowed once init""" - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() with self.assertRaises(DevFailed): @@ -241,7 +248,6 @@ class TestDeviceObservationField(TestDeviceBase): dab_filter = data["HBA"]["DAB_filter"] - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() @@ -270,7 +276,6 @@ class TestDeviceObservationField(TestDeviceBase): antennafield_proxy.RCU_band_select_RW.tolist(), [[0, 0]] * CS001_TILES, ) - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() expected_bands = ( @@ -286,7 +291,6 @@ class TestDeviceObservationField(TestDeviceBase): subband_select = [0] * N_beamlets_ctrl beamlet_proxy.subband_select_RW = subband_select self.assertListEqual(beamlet_proxy.subband_select_RW.tolist(), subband_select) - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() expected_subbands = [10, 20, 30] + [0] * (N_beamlets_ctrl - 3) @@ -302,7 +306,6 @@ class TestDeviceObservationField(TestDeviceBase): self.assertListEqual( list(digitalbeam_proxy.Pointing_direction_RW), default_pointing ) - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() @@ -332,7 +335,6 @@ class TestDeviceObservationField(TestDeviceBase): self.assertListEqual( list(tilebeam_proxy.Pointing_direction_RW[0]), ["J2000", "0rad", "0rad"] ) - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() @@ -355,7 +357,6 @@ class TestDeviceObservationField(TestDeviceBase): (CS001_TILES, N_elements * N_pol), dtype=bool ) - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() @@ -386,7 +387,6 @@ class TestDeviceObservationField(TestDeviceBase): xst_proxy.FPGA_xst_integration_interval_RW = [10.0] * N_pn xst_proxy.FPGA_xst_offload_nof_crosslets_RW = [0] * N_pn - self.proxy.observation_field_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION new file mode 100644 index 0000000000000000000000000000000000000000..650298f4f7d46565519bcded86a3b999d6176547 --- /dev/null +++ b/tangostationcontrol/VERSION @@ -0,0 +1 @@ +0.47.1 diff --git a/tangostationcontrol/rpc/grafana_api.py b/tangostationcontrol/rpc/grafana_api.py deleted file mode 100644 index 8955c28c0f2a5ce1215f75283661083cee543bb8..0000000000000000000000000000000000000000 --- a/tangostationcontrol/rpc/grafana_api.py +++ /dev/null @@ -1,525 +0,0 @@ -# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy) -# SPDX-License-Identifier: Apache-2.0 - -"""Exposure of the station's statistics for the innius-rpc-datasource plugin in Grafana.""" - -from datetime import datetime, timezone -import itertools -import math -from typing import Callable - -from tangostationcontrol.common.frequency_bands import Band, bands -from lofar_sid.interface.opah import grafana_apiv3_pb2 -from lofar_sid.interface.opah import grafana_apiv3_pb2_grpc -from lofar_sid.interface.stationcontrol import statistics_pb2 -from tangostationcontrol.rpc.statistics import ( - Statistics, - TooOldError, -) -from tangostationcontrol.rpc.common import ( - call_exception_metrics, -) - - -class StatisticsToGrafana: - """Abstract base class for converting Statistics responses to Grafana Frames.""" - - # Meta information for our statistics - meta = grafana_apiv3_pb2.FrameMeta( - type=grafana_apiv3_pb2.FrameMeta.FrameType.FrameTypeTimeSeriesLong, - PreferredVisualization=grafana_apiv3_pb2.FrameMeta.VisType.VisTypeGraph, - ) - - def __init__(self, statistics: Statistics): - super().__init__() - - self.statistics = statistics - - @staticmethod - def _subband_frequency_func( - frequency_band: statistics_pb2.FrequencyBand, - ) -> Callable[[int], float]: - """Return a function that converts a subband number into its central frequency (in Hz). - - NB: Spectral inversion is assumed to have been configured correctly at time of recording. - """ - - band_name = Band.lookup_nyquist_zone( - frequency_band.antenna_type, - frequency_band.clock, - frequency_band.nyquist_zone, - ) - - return bands[band_name].subband_frequency - - @staticmethod - def _verify_call_result(reply, time_window: tuple[datetime, datetime]): - """Verify reply from Statistics service. Raises if verification fails.""" - - if not ( - time_window[0] - <= reply.result.timestamp.ToDatetime(tzinfo=timezone.utc) - < time_window[1] - ): - raise TooOldError( - f"Stastistics not available in time window {time_window}. Available is {reply.result.timestamp}." - ) - - return True - - -class BstToGrafana(StatisticsToGrafana): - """Converts Statistics.BST responses to Grafana Frames.""" - - def _get_latest_in_window( - self, time_window: tuple[datetime, datetime], antenna_field: str - ) -> statistics_pb2.BstReply: - """Get the latest statistics in the given time window, if any.""" - - request = statistics_pb2.BstRequest( - antenna_field=antenna_field, - ) - reply = self.statistics.Bst(request, None) - - self._verify_call_result(reply, time_window) - - return reply - - @call_exception_metrics("StatisticsToGrafana", {"type": "bst"}) - def all_frames( - self, - time_window: tuple[datetime, datetime], - antenna_field: str, - pol: str | None, - ) -> list[grafana_apiv3_pb2.Frame]: - """Return all Grafana Frames for the requested data.""" - - try: - reply = self._get_latest_in_window(time_window, antenna_field) - result = reply.result - except TooOldError: - return [] - - # Turn result into Grafana fields - # - # Each value describes the power for one beamlet. - # - # Each polarisation results in one field. - frames = [ - grafana_apiv3_pb2.Frame( - metric="BST", - timestamps=[result.timestamp for _ in result.beamlets], - fields=list( - filter( - None, - [ - grafana_apiv3_pb2.Field( - name="beamlet", - values=[b.beamlet for b in result.beamlets], - ), - ( - grafana_apiv3_pb2.Field( - name="xx", - config=grafana_apiv3_pb2.config( - unit="dB", - ), - values=[b.x_power_db for b in result.beamlets], - ) - if pol in ["xx", None] - else None - ), - ( - grafana_apiv3_pb2.Field( - name="yy", - config=grafana_apiv3_pb2.config( - unit="dB", - ), - values=[b.y_power_db for b in result.beamlets], - ) - if pol in ["yy", None] - else None - ), - ], - ) - ), - meta=self.meta, - ) - ] - - return frames - - -class SstToGrafana(StatisticsToGrafana): - """Converts Statistics.SST responses to Grafana Frames.""" - - def _get_latest_in_window( - self, time_window: tuple[datetime, datetime], antenna_field: str - ) -> statistics_pb2.SstReply: - """Get the latest statistics in the given time window, if any.""" - - request = statistics_pb2.SstRequest( - antenna_field=antenna_field, - ) - reply = self.statistics.Sst(request, None) - - self._verify_call_result(reply, time_window) - - return reply - - @call_exception_metrics("StatisticsToGrafana", {"type": "sst"}) - def all_frames( - self, - time_window: tuple[datetime, datetime], - antenna_field: str, - selected_pol: str | None, - ) -> list[grafana_apiv3_pb2.Frame]: - """Return all Grafana Frames for the requested data.""" - - try: - reply = self._get_latest_in_window(time_window, antenna_field) - result = reply.result - except TooOldError: - return [] - - # Turn result into Grafana fields - # - # Each value describes the power of an antenna for a specific subband. - # - # Field 0 are the spectral frequencies for each value. - # Field 1+ is one field for each antenna and each requested polarisation. - antenna_nrs = [antenna.antenna for antenna in result.subbands[0].antennas] - fields_per_antenna = [ - [ - ( - grafana_apiv3_pb2.Field( - name="power", - labels=[ - grafana_apiv3_pb2.Label( - key="antenna", - value="%03d" % antenna_nr, - ), - grafana_apiv3_pb2.Label( - key="pol", - value="xx", - ), - ], - config=grafana_apiv3_pb2.config( - unit="dB", - ), - values=[ - subband.antennas[antenna_nr].x_power_db - for subband in result.subbands - ], - ) - if selected_pol in ["xx", None] - else None - ), - ( - grafana_apiv3_pb2.Field( - name="power", - labels=[ - grafana_apiv3_pb2.Label( - key="antenna", - value="%03d" % antenna_nr, - ), - grafana_apiv3_pb2.Label( - key="pol", - value="yy", - ), - ], - config=grafana_apiv3_pb2.config( - unit="dB", - ), - values=[ - subband.antennas[antenna_nr].y_power_db - for subband in result.subbands - ], - ) - if selected_pol in ["yy", None] - else None - ), - ] - for antenna_nr in antenna_nrs - ] - - subband_frequency = self._subband_frequency_func(result.frequency_band) - - frames = [ - grafana_apiv3_pb2.Frame( - metric="SST", - timestamps=[result.timestamp for _ in result.subbands], - fields=[ - grafana_apiv3_pb2.Field( - name="frequency", - config=grafana_apiv3_pb2.config( - unit="Hz", - ), - values=[subband_frequency(b.subband) for b in result.subbands], - ), - ] - + list(filter(None, itertools.chain(*fields_per_antenna))), - meta=self.meta, - ) - ] - - return frames - - -class XstToGrafana(StatisticsToGrafana): - """Converts Statistics.XST responses to Grafana Frames.""" - - def _get_latest_in_window( - self, time_window: tuple[datetime, datetime], antenna_field: str - ) -> statistics_pb2.XstReply: - """Get the latest statistics in the given time window, if any.""" - - request = statistics_pb2.XstRequest( - antenna_field=antenna_field, - ) - reply = self.statistics.Xst(request, None) - - self._verify_call_result(reply, time_window) - - return reply - - @call_exception_metrics("StatisticsToGrafana", {"type": "xst"}) - def all_frames( - self, - time_window: tuple[datetime, datetime], - antenna_field: str, - selected_pol: str | None, - ) -> list[grafana_apiv3_pb2.Frame]: - """Return all Grafana Frames for the requested data.""" - - try: - reply = self._get_latest_in_window(time_window, antenna_field) - result = reply.result - except TooOldError: - return [] - - subband_frequency = self._subband_frequency_func(result.frequency_band) - - # Turn result into Grafana fields - # - # Each value describes a baseline. - # - # Field 0 & 1 are the (antenna1, antenna2) indices describing each baseline. - # Field 2 is the central frequency of the values for each baseline. - fields = [ - grafana_apiv3_pb2.Field( - name="antenna1", - values=[baseline.antenna1 for baseline in result.baselines], - ), - grafana_apiv3_pb2.Field( - name="antenna2", - values=[baseline.antenna2 for baseline in result.baselines], - ), - grafana_apiv3_pb2.Field( - name="frequency", - config=grafana_apiv3_pb2.config( - unit="Hz", - ), - values=[subband_frequency(result.subband) for _ in result.baselines], - ), - ] - - # Subsequent fields describe the power and phase for each baseline, and - # the requested, or all, polarisations. - for pol in ("xx", "xy", "yx", "yy"): - if selected_pol is None or pol == selected_pol: - labels = ( - [ - grafana_apiv3_pb2.Label( - key="pol", - value=pol, - ) - ] - if not selected_pol - else [] - ) - - fields.extend( - [ - grafana_apiv3_pb2.Field( - name="power", - labels=labels, - config=grafana_apiv3_pb2.config( - unit="dB", - ), - values=[ - getattr(baseline, pol).power_db - for baseline in result.baselines - ], - ), - grafana_apiv3_pb2.Field( - name="phase", - labels=labels, - config=grafana_apiv3_pb2.config( - unit="deg", - ), - values=[ - math.fabs( - getattr(baseline, pol).phase * 360.0 / math.pi - ) - for baseline in result.baselines - ], - ), - ] - ) - - frames = [ - grafana_apiv3_pb2.Frame( - metric="XST", - timestamps=[result.timestamp for _ in result.baselines], - fields=fields, - meta=self.meta, - ) - ] - - return frames - - -class GrafanaAPIV3(grafana_apiv3_pb2_grpc.GrafanaQueryAPIServicer): - """Implements the Grafana interface for the innius simple-rpc-datasource, - see https://github.com/innius/grafana-simple-grpc-datasource""" - - def __init__(self, statistics: Statistics, antenna_fields: list[str]): - super().__init__() - - self.statistics = statistics - self.antenna_fields = antenna_fields - self.bst = BstToGrafana(self.statistics) - self.sst = SstToGrafana(self.statistics) - self.xst = XstToGrafana(self.statistics) - - # self._import_test_data() - - def _import_test_data(self): - """Import test data for demo purposes.""" - - import json - import os - - test_dir = os.path.dirname(__file__) + "/../../test/rpc/" - - for type_ in ("bst", "sst", "xst"): - with open(test_dir + f"{type_}-message.json") as f: - print(f"Loading {f}") - message = json.load(f) - - self.statistics.handle_statistics_message( - f"{type_}/lba/cs032", datetime.now(tz=timezone.utc), message - ) - - def GetQueryOptions(self, request: grafana_apiv3_pb2.GetOptionsRequest, context): - """List options per query.""" - - return grafana_apiv3_pb2.GetOptionsResponse(options=[]) - - def ListDimensionKeys( - self, request: grafana_apiv3_pb2.ListDimensionKeysRequest, context - ): - """List available data dimensions.""" - - results = [ - grafana_apiv3_pb2.ListDimensionKeysResponse.Result( - key="antenna_field", - description="Antenna field", - ), - grafana_apiv3_pb2.ListDimensionKeysResponse.Result( - key="pol", - description="Polarisation", - ), - ] - return grafana_apiv3_pb2.ListDimensionKeysResponse(results=results) - - def ListDimensionValues( - self, request: grafana_apiv3_pb2.ListDimensionValuesRequest, context - ): - """List possible values for each data dimension.""" - - results = [] - - if request.dimension_key == "antenna_field": - for antenna_field in self.antenna_fields: - results.append( - grafana_apiv3_pb2.ListDimensionValuesResponse.Result( - value=antenna_field, - ) - ) - - if request.dimension_key == "pol": - for pol in ["xx", "xy", "yx", "yy"]: - results.append( - grafana_apiv3_pb2.ListDimensionValuesResponse.Result( - value=pol, - ) - ) - - return grafana_apiv3_pb2.ListDimensionValuesResponse(results=results) - - def ListMetrics(self, request: grafana_apiv3_pb2.ListMetricsRequest, context): - """List available metrics.""" - - metrics = [ - grafana_apiv3_pb2.ListMetricsResponse.Metric( - name="BST", - description="Beamlet statistics", - ), - grafana_apiv3_pb2.ListMetricsResponse.Metric( - name="SST", - description="Subband statistics", - ), - grafana_apiv3_pb2.ListMetricsResponse.Metric( - name="XST", - description="Crosslet statistics", - ), - ] - - return grafana_apiv3_pb2.ListMetricsResponse(Metrics=metrics) - - @call_exception_metrics("GrafanaAPIV3") - def GetMetricValue(self, request: grafana_apiv3_pb2.GetMetricValueRequest, context): - return grafana_apiv3_pb2.GetMetricValueResponse(frames=[]) - - @call_exception_metrics("GrafanaAPIV3") - def GetMetricAggregate( - self, request: grafana_apiv3_pb2.GetMetricAggregateRequest, context - ): - """Return the set of values for the request metrics and dimensions.""" - - frames = [] - - dimensions = {d.key: d.value for d in request.dimensions} - time_window = ( - request.startDate.ToDatetime(tzinfo=timezone.utc), - request.endDate.ToDatetime(tzinfo=timezone.utc), - ) - - for metric in request.metrics: - if metric == "BST": - frames.extend( - self.bst.all_frames( - time_window, - dimensions["antenna_field"].lower(), - dimensions.get("pol"), - ) - ) - elif metric == "SST": - frames.extend( - self.sst.all_frames( - time_window, - dimensions["antenna_field"].lower(), - dimensions.get("pol"), - ) - ) - elif metric == "XST": - frames.extend( - self.xst.all_frames( - time_window, - dimensions["antenna_field"].lower(), - dimensions.get("pol"), - ) - ) - - return grafana_apiv3_pb2.GetMetricAggregateResponse(frames=frames) diff --git a/tangostationcontrol/rpc/server.py b/tangostationcontrol/rpc/server.py index ab25553e6ab0a8fe2923d050621ad1e2eff7f6c7..383efb5c27544c8ca56a8ba0d47835bf1833eafe 100644 --- a/tangostationcontrol/rpc/server.py +++ b/tangostationcontrol/rpc/server.py @@ -12,15 +12,13 @@ from lofar_sid.interface.stationcontrol import observation_pb2 from lofar_sid.interface.stationcontrol import observation_pb2_grpc from lofar_sid.interface.stationcontrol import statistics_pb2 from lofar_sid.interface.stationcontrol import statistics_pb2_grpc -from lofar_sid.interface.opah import grafana_apiv3_pb2 -from lofar_sid.interface.opah import grafana_apiv3_pb2_grpc +from lofar_sid.interface.stationcontrol import antennafield_pb2 +from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc from tangostationcontrol.rpc.observation import Observation from tangostationcontrol.rpc.statistics import Statistics -from tangostationcontrol.rpc.grafana_api import GrafanaAPIV3 from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler from tangostationcontrol.common.lofar_logging import configure_logger from tangostationcontrol.metrics import start_metrics_server -from lofar_sid.interface.stationcontrol import antennafield_pb2, antennafield_pb2_grpc from tangostationcontrol.rpc.antennafield import AntennaField logger = logging.getLogger() @@ -43,14 +41,10 @@ class Server: statistics_pb2_grpc.add_StatisticsServicer_to_server( self.statistics_servicer, self.server ) - grafana_apiv3_pb2_grpc.add_GrafanaQueryAPIServicer_to_server( - GrafanaAPIV3(self.statistics_servicer, antenna_fields), self.server - ) SERVICE_NAMES = ( observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name, antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name, statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name, - grafana_apiv3_pb2.DESCRIPTOR.services_by_name["GrafanaQueryAPI"].full_name, reflection.SERVICE_NAME, # reflection is required by innius-gpc-datasource ) reflection.enable_server_reflection(SERVICE_NAMES, self.server) diff --git a/tangostationcontrol/rpc/statistics.py b/tangostationcontrol/rpc/statistics.py index 13f66f67daf0013471d0f667390f413176434f1d..c856c8f8e09de243246062ca15e57fcb47394da6 100644 --- a/tangostationcontrol/rpc/statistics.py +++ b/tangostationcontrol/rpc/statistics.py @@ -3,12 +3,13 @@ from datetime import datetime, timedelta, timezone from math import log -from typing import Dict, Tuple +from typing import Dict, Tuple, Callable import logging import numpy from tangostationcontrol.common.constants import N_pol, N_subbands +from tangostationcontrol.common.frequency_bands import Band, bands from lofar_sid.interface.stationcontrol import statistics_pb2 from lofar_sid.interface.stationcontrol import statistics_pb2_grpc from tangostationcontrol.rpc.common import ( @@ -88,6 +89,23 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM nyquist_zone=message["source_info"]["nyquist_zone_index"], ) + @staticmethod + def _subband_frequency_func( + frequency_band: statistics_pb2.FrequencyBand, + ) -> Callable[[int], float]: + """Return a function that converts a subband number into its central frequency (in Hz). + + NB: Spectral inversion is assumed to have been configured correctly at time of recording. + """ + + band_name = Band.lookup_nyquist_zone( + frequency_band.antenna_type, + frequency_band.clock, + frequency_band.nyquist_zone, + ) + + return bands[band_name].subband_frequency + @call_exception_metrics("Statistics") def Bst(self, request: statistics_pb2.BstRequest, context): bst_message = self.get_last_message("bst", request.antenna_field) @@ -119,9 +137,13 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM sst_data = numpy.array(sst_message["sst_data"], dtype=numpy.float32) sst_data = sst_data.reshape((-1, N_pol, N_subbands)) + frequency_band = self._message_to_frequency_band(sst_message) + subband_frequency = self._subband_frequency_func(frequency_band) + subbands = [ statistics_pb2.SstResult.SstSubband( subband=subband_nr, + frequency=subband_frequency(subband_nr), antennas=[ statistics_pb2.SstResult.SstSubband.SstAntenna( antenna=antenna_nr, @@ -137,7 +159,7 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM return statistics_pb2.SstReply( result=statistics_pb2.SstResult( timestamp=datetime.fromisoformat(sst_message["timestamp"]), - frequency_band=self._message_to_frequency_band(sst_message), + frequency_band=frequency_band, integration_interval=sst_message["integration_interval"], subbands=subbands, ), @@ -187,12 +209,16 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM if antenna1 <= antenna2 ] + frequency_band = self._message_to_frequency_band(xst_message) + subband_frequency = self._subband_frequency_func(frequency_band) + return statistics_pb2.XstReply( result=statistics_pb2.XstResult( timestamp=datetime.fromisoformat(xst_message["timestamp"]), - frequency_band=self._message_to_frequency_band(xst_message), + frequency_band=frequency_band, integration_interval=xst_message["integration_interval"], subband=xst_message["subband"], + frequency=subband_frequency(xst_message["subband"]), baselines=baselines, ), ) diff --git a/tests/rpc/test_grafana_apiv3.py b/tests/rpc/test_grafana_apiv3.py deleted file mode 100644 index 2251a834bcfb5f9bcf6cbbb2a039e659d7ac1ed2..0000000000000000000000000000000000000000 --- a/tests/rpc/test_grafana_apiv3.py +++ /dev/null @@ -1,369 +0,0 @@ -# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy) -# SPDX-License-Identifier: Apache-2.0 - -from datetime import datetime, timezone, timedelta -import json -from os import path - -from tangostationcontrol.common.constants import ( - N_subbands, - N_pol, - N_beamlets_ctrl, - N_pn, - S_pn, - A_pn, -) -from tangostationcontrol.rpc.grafana_api import ( - GrafanaAPIV3, - StatisticsToGrafana, - BstToGrafana, - SstToGrafana, - XstToGrafana, -) -from lofar_sid.interface.opah import grafana_apiv3_pb2 -from tangostationcontrol.rpc.statistics import ( - Statistics, - dB, - NotAvailableError, - TooOldError, -) -from lofar_sid.interface.stationcontrol import statistics_pb2 - -from google.protobuf.timestamp_pb2 import Timestamp - -from tests import base - - -def field_labels(label_list: list[grafana_apiv3_pb2.Label]) -> dict[str, str]: - """Turn a list of objects with key and value fields into a dict.""" - return {x.key: x.value for x in label_list} - - -class TestStatisticsToGrafana(base.TestCase): - def test_verify_call_result(self): - statistics = Statistics() - sut = StatisticsToGrafana(statistics) - - now = datetime.now(tz=timezone.utc) - - reply = statistics_pb2.SstReply(result=statistics_pb2.SstResult(timestamp=now)) - - # should not raise - _ = sut._verify_call_result( - reply, (now - timedelta(seconds=1), now + timedelta(seconds=1)) - ) - - # too old raises - with self.assertRaises(TooOldError): - _ = sut._verify_call_result( - reply, (now + timedelta(seconds=1), now + timedelta(seconds=2)) - ) - - -class TestBstToGrafana(base.TestCase): - def setUp(self): - statistics = Statistics() - self.sut = BstToGrafana(statistics) - - # provide with sample statistic - with open(f"{path.dirname(__file__)}/bst-message.json") as f: - self.bst_message = json.load(f) - statistics.handle_statistics_message( - "bst/lba/cs032", datetime.now(), self.bst_message - ) - - self.valid_time_window = ( - datetime.fromisoformat("2025-01-01T00:00+00:00"), - datetime.fromisoformat("2026-01-01T00:00+00:00"), - ) - - def test_all_frames(self): - frames = self.sut.all_frames(self.valid_time_window, "lba", None) - - self.assertEqual(1, len(frames)) - self.assertEqual("BST", frames[0].metric) - self.assertEqual(3, len(frames[0].fields)) - self.assertEqual("beamlet", frames[0].fields[0].name) - self.assertEqual("xx", frames[0].fields[1].name) - self.assertEqual("yy", frames[0].fields[2].name) - - # all fields must be of the right size - self.assertEqual(N_beamlets_ctrl, len(frames[0].timestamps)) - self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[0].values)) - self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[1].values)) - self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[2].values)) - - # all timestamps must be equal - self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps]))) - - # all beamlets must be there - self.assertEqual(set(range(N_beamlets_ctrl)), set(frames[0].fields[0].values)) - - # test values - for beamlet_idx, value in enumerate(frames[0].fields[1].values): - self.assertAlmostEqual( - dB(self.bst_message["bst_data"][beamlet_idx][0]), - value, - 5, - msg=f"{beamlet_idx=}", - ) - for beamlet_idx, value in enumerate(frames[0].fields[2].values): - self.assertAlmostEqual( - dB(self.bst_message["bst_data"][beamlet_idx][1]), - value, - 5, - msg=f"{beamlet_idx=}", - ) - - -class TestSstToGrafana(base.TestCase): - def setUp(self): - statistics = Statistics() - self.sut = SstToGrafana(statistics) - - # provide with sample statistic - with open(f"{path.dirname(__file__)}/sst-message.json") as f: - self.sst_message = json.load(f) - statistics.handle_statistics_message( - "sst/lba/cs032", datetime.now(), self.sst_message - ) - - self.valid_time_window = ( - datetime.fromisoformat("2025-01-01T00:00+00:00"), - datetime.fromisoformat("2026-01-01T00:00+00:00"), - ) - - def test_all_frames(self): - frames = self.sut.all_frames(self.valid_time_window, "lba", None) - - self.assertEqual(1, len(frames)) - self.assertEqual("SST", frames[0].metric) - self.assertEqual(1 + N_pn * S_pn, len(frames[0].fields)) - self.assertEqual("frequency", frames[0].fields[0].name) - - for idx, field in enumerate(frames[0].fields[1:], 1): - self.assertEqual("power", field.name, msg=f"{idx=}") - - # all fields must be of the right size - self.assertEqual(N_subbands - 1, len(frames[0].timestamps)) - self.assertEqual(N_subbands - 1, len(frames[0].fields[0].values)) - - for idx, field in enumerate(frames[0].fields[1:], 1): - self.assertEqual(N_subbands - 1, len(field.values), msg=f"{idx=}") - - # all timestamps must be equal - self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps]))) - - # all inputs must be there - input_names = { - field_labels(field.labels)["antenna"] for field in frames[0].fields[1:] - } - self.assertEqual(N_pn * A_pn, len(input_names), msg=f"{input_names=}") - # both polarisarions must be there - input_pols = [ - field_labels(field.labels)["pol"] for field in frames[0].fields[1:] - ] - self.assertEqual(N_pn * A_pn, input_pols.count("xx"), msg=f"{input_pols=}") - self.assertEqual(N_pn * A_pn, input_pols.count("yy"), msg=f"{input_pols=}") - - # test values - for field in frames[0].fields[1:]: - labels = field_labels(field.labels) - - antenna_idx = int(labels["antenna"]) - pol_indices = {"xx": 0, "yy": 1} - pol_idx = pol_indices[labels["pol"]] - input_idx = antenna_idx * N_pol + pol_idx - - for idx, value in enumerate(field.values): - self.assertAlmostEqual( - dB(self.sst_message["sst_data"][input_idx][idx + 1]), - value, - 5, - msg=f"{labels=}, {idx=}", - ) - - -class TestXstToGrafana(base.TestCase): - def setUp(self): - statistics = Statistics() - self.sut = XstToGrafana(statistics) - - # provide with sample statistic - with open(f"{path.dirname(__file__)}/xst-message.json") as f: - self.xst_message = json.load(f) - statistics.handle_statistics_message( - "xst/lba/cs032", datetime.now(), self.xst_message - ) - - self.valid_time_window = ( - datetime.fromisoformat("2025-01-01T00:00+00:00"), - datetime.fromisoformat("2026-01-01T00:00+00:00"), - ) - - def test_all_frames(self): - frames = self.sut.all_frames(self.valid_time_window, "lba", None) - - self.assertEqual(1, len(frames)) - self.assertEqual("XST", frames[0].metric) - self.assertEqual(3 + 2 * N_pol**2, len(frames[0].fields)) - self.assertEqual("antenna1", frames[0].fields[0].name) - self.assertEqual("antenna2", frames[0].fields[1].name) - self.assertEqual("frequency", frames[0].fields[2].name) - self.assertEqual( - {"power", "phase"}, {field.name for field in frames[0].fields[3:]} - ) - - N_antennas = N_pn * A_pn - N_baselines = N_antennas * (N_antennas + 1) // 2 - - # all fields must be of the right size - self.assertEqual(N_baselines, len(frames[0].timestamps)) - - for idx, field in enumerate(frames[0].fields): - self.assertEqual(N_baselines, len(field.values), msg=f"{idx=}") - - # all timestamps must be equal - self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps]))) - - # all polarisarions must be there (for power and phase) - input_pols = [ - field_labels(field.labels)["pol"] for field in frames[0].fields[3:] - ] - self.assertEqual(2, input_pols.count("xx")) - self.assertEqual(2, input_pols.count("xy")) - self.assertEqual(2, input_pols.count("yx")) - self.assertEqual(2, input_pols.count("yy")) - - # all antennas must be there - self.assertEqual(set(range(N_antennas)), set(frames[0].fields[0].values)) - self.assertEqual(set(range(N_antennas)), set(frames[0].fields[1].values)) - - # test specific value for regression - self.assertAlmostEqual(30.7213135, frames[0].fields[3].values[99], 6) - self.assertAlmostEqual(3.88297279, frames[0].fields[4].values[99], 6) - self.assertAlmostEqual(27.2744141, frames[0].fields[5].values[99], 6) - self.assertAlmostEqual(297.7411965, frames[0].fields[6].values[99], 6) - - -class TestGrafanaAPIV3(base.TestCase): - def test_getqueryoptions(self): - """Test GetQueryOptions.""" - - statistics = Statistics() - sut = GrafanaAPIV3(statistics, []) - - request = grafana_apiv3_pb2.GetOptionsRequest() - reply = sut.GetQueryOptions(request, None) - - self.assertEqual(0, len(reply.options)) - - def test_listdimensionkeys(self): - """Test ListDimensionKeys.""" - - statistics = Statistics() - sut = GrafanaAPIV3(statistics, []) - - request = grafana_apiv3_pb2.ListDimensionKeysRequest() - reply = sut.ListDimensionKeys(request, None) - - dimensions = {d.key for d in reply.results} - - self.assertIn("antenna_field", dimensions) - self.assertIn("pol", dimensions) - - def test_listdimensionvalues(self): - """Test ListDimensionValues.""" - - statistics = Statistics() - sut = GrafanaAPIV3(statistics, ["LBA", "HBA"]) - - expected_values = { - "antenna_field": {"LBA", "HBA"}, - "pol": {"xx", "xy", "yx", "yy"}, - } - - for dim, expected_value in expected_values.items(): - request = grafana_apiv3_pb2.ListDimensionValuesRequest(dimension_key=dim) - reply = sut.ListDimensionValues(request, None) - - value = set([r.value for r in reply.results]) - self.assertEqual(expected_value, value, msg=f"{dim=}") - - def test_listmetrics(self): - """Test ListMetrics.""" - - statistics = Statistics() - sut = GrafanaAPIV3(statistics, []) - - request = grafana_apiv3_pb2.ListMetricsRequest() - reply = sut.ListMetrics(request, None) - - metrics = [m.name for m in reply.Metrics] - - self.assertIn("BST", metrics) - self.assertIn("SST", metrics) - self.assertIn("XST", metrics) - - -class TestGetMetricAggregate(base.TestCase): - def setUp(self): - statistics = Statistics() - self.sut = GrafanaAPIV3(statistics, ["LBA"]) - - # provide with sample statistic - with open(f"{path.dirname(__file__)}/sst-message.json") as f: - sst_message = json.load(f) - statistics.handle_statistics_message( - "sst/lba/cs032", datetime.now(), sst_message - ) - - def test_nometrics(self): - """Test if there are no metrics available.""" - - statistics = Statistics() - sut = GrafanaAPIV3(statistics, ["LBA"]) - - # request/response - request = grafana_apiv3_pb2.GetMetricAggregateRequest( - metrics=["SST"], - dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")], - startDate=Timestamp(seconds=0), - endDate=Timestamp(seconds=int(datetime.now().timestamp())), - ) - - with self.assertRaises(NotAvailableError): - _ = sut.GetMetricAggregate(request, None) - - def test_validdaterange(self): - """Test obtaining SSTs using a date range for which they are available.""" - - # request/response - request = grafana_apiv3_pb2.GetMetricAggregateRequest( - metrics=["SST"], - dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")], - startDate=Timestamp(seconds=0), - endDate=Timestamp(seconds=int(datetime.now().timestamp())), - ) - - reply = self.sut.GetMetricAggregate(request, None) - - # validate output - self.assertEqual(1, len(reply.frames)) - self.assertEqual("SST", reply.frames[0].metric) - - def test_tooold(self): - """Test obtaining SSTs using a date range for which they are too old.""" - - # request/response - request = grafana_apiv3_pb2.GetMetricAggregateRequest( - metrics=["SST"], - dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")], - startDate=Timestamp(seconds=int(datetime.now().timestamp()) - 1), - endDate=Timestamp(seconds=int(datetime.now().timestamp())), - ) - - reply = self.sut.GetMetricAggregate(request, None) - - # validate output - self.assertEqual(0, len(reply.frames)) diff --git a/tests/rpc/test_server.py b/tests/rpc/test_server.py index 0d3f4e2d4e066aae3000518e167cecc1a0371f39..723aaeabb339ffc2360dae907a0eac42618538ff 100644 --- a/tests/rpc/test_server.py +++ b/tests/rpc/test_server.py @@ -8,8 +8,8 @@ from grpc_reflection.v1alpha.proto_reflection_descriptor_database import ( ProtoReflectionDescriptorDatabase, ) -from lofar_sid.interface.opah import grafana_apiv3_pb2 -from lofar_sid.interface.opah import grafana_apiv3_pb2_grpc +from lofar_sid.interface.stationcontrol import antennafield_pb2 +from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc from tangostationcontrol.rpc.server import Server from tests import base @@ -36,11 +36,17 @@ class TestServer(base.TestCase): self.assertIn("Observation", services) self.assertIn("Antennafield", services) self.assertIn("Statistics", services) - self.assertIn("grafanav3.GrafanaQueryAPI", services) def test_call(self): """Test a basic gRPC call to the server.""" with grpc.insecure_channel(f"localhost:{self.server.port}") as channel: - stub = grafana_apiv3_pb2_grpc.GrafanaQueryAPIStub(channel) - _ = stub.GetQueryOptions(grafana_apiv3_pb2.GetOptionsRequest()) + stub = antennafield_pb2_grpc.AntennafieldStub(channel) + + identifier = antennafield_pb2.Identifier( + antennafield_name="lba", + antenna_name="LBA00", + ) + _ = stub.GetAntenna( + antennafield_pb2.GetAntennaRequest(identifier=identifier) + )