Skip to content
Snippets Groups Projects
Commit 6c85f209 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-1509-move-grafana-rpc-to-opah' into 'master'

Resolve L2SS-1509 "Move grafana rpc to opah"

Closes L2SS-1509

See merge request !1062
parents be530073 f2c7ffd5
Branches
Tags v0.47.1
1 merge request!1062Resolve L2SS-1509 "Move grafana rpc to opah"
......@@ -150,6 +150,7 @@ Next change the version in the following places:
through [https://git.astron.nl/lofar2.0/tango/-/tags](Deploy Tags)
# Release Notes
* 0.47.1 Move GrafanaAPIV3 RPC interface to Opah repo
* 0.47.0 Migrate from lofar-station-client to lofar-lotus package. Update various package dependencies.
* 0.46.1 Include clock_RW in metadata JSON
* 0.46.0 Expose latest BST/SST/XST from ZMQ over gRPC
......
......@@ -21,4 +21,4 @@ render: $(JOBS)
levant render $(addprefix -var-file=, $(realpath $(ENV))) -var debug_host="$(LOCAL_IP)" -var image_tag="$(TAG)" -var station="$(STATION)" -var region="$(REGION)" -out=$(realpath $(DIR_OUT))/$@ $(realpath $(DIR_SRC))/$<
# if levant is missing variables, it silently fills in <no value>
# see also https://github.com/hashicorp/levant/pull/308
! fgrep "<no value>" $@
! fgrep "<no value>" $(realpath $(DIR_OUT))/$@
......@@ -126,6 +126,11 @@ class TestDeviceObservationField(TestDeviceBase):
)
self.tilebeam_proxy = self.setup_proxy("STAT/TileBeam/HBA0", defaults=True)
# load settings so tests (including those in super()) can
# use self.proxy.initialise() and self.proxy.on()
# immediately
self.proxy.observation_field_settings_RW = self.VALID_JSON
@staticmethod
def antennafield_configure(proxy: TestDeviceProxy):
power_mapping = [[1, i * 2 + 0] for i in range(CS001_TILES)]
......@@ -178,13 +183,16 @@ class TestDeviceObservationField(TestDeviceBase):
def test_init_valid(self):
"""Initialize an observation with valid JSON"""
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.assertEqual(DevState.STANDBY, self.proxy.state())
def test_init_no_settings(self):
"""Initialize an observation with _invalid_ JSON"""
# Load invalid settings
with self.assertRaises(DevFailed):
self.proxy.observation_field_settings_RW = "{}"
# Cannot start without valid settings
with self.assertRaises(DevFailed):
self.proxy.Initialise()
......@@ -193,7 +201,7 @@ class TestDeviceObservationField(TestDeviceBase):
self.assertEqual(DevState.FAULT, self.proxy.state())
def test_init_invalid(self):
"""Initialize an observation with _invalid_ JSON"""
"""Load an _invalid_ JSON"""
# Cannot write invalid settings
with self.assertRaises(DevFailed):
......@@ -204,7 +212,6 @@ class TestDeviceObservationField(TestDeviceBase):
def test_prohibit_rewriting_settings(self):
"""Test that changing observation settings is disallowed once init"""
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
with self.assertRaises(DevFailed):
......@@ -241,7 +248,6 @@ class TestDeviceObservationField(TestDeviceBase):
dab_filter = data["HBA"]["DAB_filter"]
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.proxy.On()
......@@ -270,7 +276,6 @@ class TestDeviceObservationField(TestDeviceBase):
antennafield_proxy.RCU_band_select_RW.tolist(),
[[0, 0]] * CS001_TILES,
)
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.proxy.On()
expected_bands = (
......@@ -286,7 +291,6 @@ class TestDeviceObservationField(TestDeviceBase):
subband_select = [0] * N_beamlets_ctrl
beamlet_proxy.subband_select_RW = subband_select
self.assertListEqual(beamlet_proxy.subband_select_RW.tolist(), subband_select)
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.proxy.On()
expected_subbands = [10, 20, 30] + [0] * (N_beamlets_ctrl - 3)
......@@ -302,7 +306,6 @@ class TestDeviceObservationField(TestDeviceBase):
self.assertListEqual(
list(digitalbeam_proxy.Pointing_direction_RW), default_pointing
)
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.proxy.On()
......@@ -332,7 +335,6 @@ class TestDeviceObservationField(TestDeviceBase):
self.assertListEqual(
list(tilebeam_proxy.Pointing_direction_RW[0]), ["J2000", "0rad", "0rad"]
)
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.proxy.On()
......@@ -355,7 +357,6 @@ class TestDeviceObservationField(TestDeviceBase):
(CS001_TILES, N_elements * N_pol), dtype=bool
)
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.proxy.On()
......@@ -386,7 +387,6 @@ class TestDeviceObservationField(TestDeviceBase):
xst_proxy.FPGA_xst_integration_interval_RW = [10.0] * N_pn
xst_proxy.FPGA_xst_offload_nof_crosslets_RW = [0] * N_pn
self.proxy.observation_field_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.proxy.On()
......
0.47.1
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Exposure of the station's statistics for the innius-rpc-datasource plugin in Grafana."""
from datetime import datetime, timezone
import itertools
import math
from typing import Callable
from tangostationcontrol.common.frequency_bands import Band, bands
from lofar_sid.interface.opah import grafana_apiv3_pb2
from lofar_sid.interface.opah import grafana_apiv3_pb2_grpc
from lofar_sid.interface.stationcontrol import statistics_pb2
from tangostationcontrol.rpc.statistics import (
Statistics,
TooOldError,
)
from tangostationcontrol.rpc.common import (
call_exception_metrics,
)
class StatisticsToGrafana:
"""Abstract base class for converting Statistics responses to Grafana Frames."""
# Meta information for our statistics
meta = grafana_apiv3_pb2.FrameMeta(
type=grafana_apiv3_pb2.FrameMeta.FrameType.FrameTypeTimeSeriesLong,
PreferredVisualization=grafana_apiv3_pb2.FrameMeta.VisType.VisTypeGraph,
)
def __init__(self, statistics: Statistics):
super().__init__()
self.statistics = statistics
@staticmethod
def _subband_frequency_func(
frequency_band: statistics_pb2.FrequencyBand,
) -> Callable[[int], float]:
"""Return a function that converts a subband number into its central frequency (in Hz).
NB: Spectral inversion is assumed to have been configured correctly at time of recording.
"""
band_name = Band.lookup_nyquist_zone(
frequency_band.antenna_type,
frequency_band.clock,
frequency_band.nyquist_zone,
)
return bands[band_name].subband_frequency
@staticmethod
def _verify_call_result(reply, time_window: tuple[datetime, datetime]):
"""Verify reply from Statistics service. Raises if verification fails."""
if not (
time_window[0]
<= reply.result.timestamp.ToDatetime(tzinfo=timezone.utc)
< time_window[1]
):
raise TooOldError(
f"Stastistics not available in time window {time_window}. Available is {reply.result.timestamp}."
)
return True
class BstToGrafana(StatisticsToGrafana):
"""Converts Statistics.BST responses to Grafana Frames."""
def _get_latest_in_window(
self, time_window: tuple[datetime, datetime], antenna_field: str
) -> statistics_pb2.BstReply:
"""Get the latest statistics in the given time window, if any."""
request = statistics_pb2.BstRequest(
antenna_field=antenna_field,
)
reply = self.statistics.Bst(request, None)
self._verify_call_result(reply, time_window)
return reply
@call_exception_metrics("StatisticsToGrafana", {"type": "bst"})
def all_frames(
self,
time_window: tuple[datetime, datetime],
antenna_field: str,
pol: str | None,
) -> list[grafana_apiv3_pb2.Frame]:
"""Return all Grafana Frames for the requested data."""
try:
reply = self._get_latest_in_window(time_window, antenna_field)
result = reply.result
except TooOldError:
return []
# Turn result into Grafana fields
#
# Each value describes the power for one beamlet.
#
# Each polarisation results in one field.
frames = [
grafana_apiv3_pb2.Frame(
metric="BST",
timestamps=[result.timestamp for _ in result.beamlets],
fields=list(
filter(
None,
[
grafana_apiv3_pb2.Field(
name="beamlet",
values=[b.beamlet for b in result.beamlets],
),
(
grafana_apiv3_pb2.Field(
name="xx",
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[b.x_power_db for b in result.beamlets],
)
if pol in ["xx", None]
else None
),
(
grafana_apiv3_pb2.Field(
name="yy",
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[b.y_power_db for b in result.beamlets],
)
if pol in ["yy", None]
else None
),
],
)
),
meta=self.meta,
)
]
return frames
class SstToGrafana(StatisticsToGrafana):
"""Converts Statistics.SST responses to Grafana Frames."""
def _get_latest_in_window(
self, time_window: tuple[datetime, datetime], antenna_field: str
) -> statistics_pb2.SstReply:
"""Get the latest statistics in the given time window, if any."""
request = statistics_pb2.SstRequest(
antenna_field=antenna_field,
)
reply = self.statistics.Sst(request, None)
self._verify_call_result(reply, time_window)
return reply
@call_exception_metrics("StatisticsToGrafana", {"type": "sst"})
def all_frames(
self,
time_window: tuple[datetime, datetime],
antenna_field: str,
selected_pol: str | None,
) -> list[grafana_apiv3_pb2.Frame]:
"""Return all Grafana Frames for the requested data."""
try:
reply = self._get_latest_in_window(time_window, antenna_field)
result = reply.result
except TooOldError:
return []
# Turn result into Grafana fields
#
# Each value describes the power of an antenna for a specific subband.
#
# Field 0 are the spectral frequencies for each value.
# Field 1+ is one field for each antenna and each requested polarisation.
antenna_nrs = [antenna.antenna for antenna in result.subbands[0].antennas]
fields_per_antenna = [
[
(
grafana_apiv3_pb2.Field(
name="power",
labels=[
grafana_apiv3_pb2.Label(
key="antenna",
value="%03d" % antenna_nr,
),
grafana_apiv3_pb2.Label(
key="pol",
value="xx",
),
],
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[
subband.antennas[antenna_nr].x_power_db
for subband in result.subbands
],
)
if selected_pol in ["xx", None]
else None
),
(
grafana_apiv3_pb2.Field(
name="power",
labels=[
grafana_apiv3_pb2.Label(
key="antenna",
value="%03d" % antenna_nr,
),
grafana_apiv3_pb2.Label(
key="pol",
value="yy",
),
],
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[
subband.antennas[antenna_nr].y_power_db
for subband in result.subbands
],
)
if selected_pol in ["yy", None]
else None
),
]
for antenna_nr in antenna_nrs
]
subband_frequency = self._subband_frequency_func(result.frequency_band)
frames = [
grafana_apiv3_pb2.Frame(
metric="SST",
timestamps=[result.timestamp for _ in result.subbands],
fields=[
grafana_apiv3_pb2.Field(
name="frequency",
config=grafana_apiv3_pb2.config(
unit="Hz",
),
values=[subband_frequency(b.subband) for b in result.subbands],
),
]
+ list(filter(None, itertools.chain(*fields_per_antenna))),
meta=self.meta,
)
]
return frames
class XstToGrafana(StatisticsToGrafana):
"""Converts Statistics.XST responses to Grafana Frames."""
def _get_latest_in_window(
self, time_window: tuple[datetime, datetime], antenna_field: str
) -> statistics_pb2.XstReply:
"""Get the latest statistics in the given time window, if any."""
request = statistics_pb2.XstRequest(
antenna_field=antenna_field,
)
reply = self.statistics.Xst(request, None)
self._verify_call_result(reply, time_window)
return reply
@call_exception_metrics("StatisticsToGrafana", {"type": "xst"})
def all_frames(
self,
time_window: tuple[datetime, datetime],
antenna_field: str,
selected_pol: str | None,
) -> list[grafana_apiv3_pb2.Frame]:
"""Return all Grafana Frames for the requested data."""
try:
reply = self._get_latest_in_window(time_window, antenna_field)
result = reply.result
except TooOldError:
return []
subband_frequency = self._subband_frequency_func(result.frequency_band)
# Turn result into Grafana fields
#
# Each value describes a baseline.
#
# Field 0 & 1 are the (antenna1, antenna2) indices describing each baseline.
# Field 2 is the central frequency of the values for each baseline.
fields = [
grafana_apiv3_pb2.Field(
name="antenna1",
values=[baseline.antenna1 for baseline in result.baselines],
),
grafana_apiv3_pb2.Field(
name="antenna2",
values=[baseline.antenna2 for baseline in result.baselines],
),
grafana_apiv3_pb2.Field(
name="frequency",
config=grafana_apiv3_pb2.config(
unit="Hz",
),
values=[subband_frequency(result.subband) for _ in result.baselines],
),
]
# Subsequent fields describe the power and phase for each baseline, and
# the requested, or all, polarisations.
for pol in ("xx", "xy", "yx", "yy"):
if selected_pol is None or pol == selected_pol:
labels = (
[
grafana_apiv3_pb2.Label(
key="pol",
value=pol,
)
]
if not selected_pol
else []
)
fields.extend(
[
grafana_apiv3_pb2.Field(
name="power",
labels=labels,
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[
getattr(baseline, pol).power_db
for baseline in result.baselines
],
),
grafana_apiv3_pb2.Field(
name="phase",
labels=labels,
config=grafana_apiv3_pb2.config(
unit="deg",
),
values=[
math.fabs(
getattr(baseline, pol).phase * 360.0 / math.pi
)
for baseline in result.baselines
],
),
]
)
frames = [
grafana_apiv3_pb2.Frame(
metric="XST",
timestamps=[result.timestamp for _ in result.baselines],
fields=fields,
meta=self.meta,
)
]
return frames
class GrafanaAPIV3(grafana_apiv3_pb2_grpc.GrafanaQueryAPIServicer):
"""Implements the Grafana interface for the innius simple-rpc-datasource,
see https://github.com/innius/grafana-simple-grpc-datasource"""
def __init__(self, statistics: Statistics, antenna_fields: list[str]):
super().__init__()
self.statistics = statistics
self.antenna_fields = antenna_fields
self.bst = BstToGrafana(self.statistics)
self.sst = SstToGrafana(self.statistics)
self.xst = XstToGrafana(self.statistics)
# self._import_test_data()
def _import_test_data(self):
"""Import test data for demo purposes."""
import json
import os
test_dir = os.path.dirname(__file__) + "/../../test/rpc/"
for type_ in ("bst", "sst", "xst"):
with open(test_dir + f"{type_}-message.json") as f:
print(f"Loading {f}")
message = json.load(f)
self.statistics.handle_statistics_message(
f"{type_}/lba/cs032", datetime.now(tz=timezone.utc), message
)
def GetQueryOptions(self, request: grafana_apiv3_pb2.GetOptionsRequest, context):
"""List options per query."""
return grafana_apiv3_pb2.GetOptionsResponse(options=[])
def ListDimensionKeys(
self, request: grafana_apiv3_pb2.ListDimensionKeysRequest, context
):
"""List available data dimensions."""
results = [
grafana_apiv3_pb2.ListDimensionKeysResponse.Result(
key="antenna_field",
description="Antenna field",
),
grafana_apiv3_pb2.ListDimensionKeysResponse.Result(
key="pol",
description="Polarisation",
),
]
return grafana_apiv3_pb2.ListDimensionKeysResponse(results=results)
def ListDimensionValues(
self, request: grafana_apiv3_pb2.ListDimensionValuesRequest, context
):
"""List possible values for each data dimension."""
results = []
if request.dimension_key == "antenna_field":
for antenna_field in self.antenna_fields:
results.append(
grafana_apiv3_pb2.ListDimensionValuesResponse.Result(
value=antenna_field,
)
)
if request.dimension_key == "pol":
for pol in ["xx", "xy", "yx", "yy"]:
results.append(
grafana_apiv3_pb2.ListDimensionValuesResponse.Result(
value=pol,
)
)
return grafana_apiv3_pb2.ListDimensionValuesResponse(results=results)
def ListMetrics(self, request: grafana_apiv3_pb2.ListMetricsRequest, context):
"""List available metrics."""
metrics = [
grafana_apiv3_pb2.ListMetricsResponse.Metric(
name="BST",
description="Beamlet statistics",
),
grafana_apiv3_pb2.ListMetricsResponse.Metric(
name="SST",
description="Subband statistics",
),
grafana_apiv3_pb2.ListMetricsResponse.Metric(
name="XST",
description="Crosslet statistics",
),
]
return grafana_apiv3_pb2.ListMetricsResponse(Metrics=metrics)
@call_exception_metrics("GrafanaAPIV3")
def GetMetricValue(self, request: grafana_apiv3_pb2.GetMetricValueRequest, context):
return grafana_apiv3_pb2.GetMetricValueResponse(frames=[])
@call_exception_metrics("GrafanaAPIV3")
def GetMetricAggregate(
self, request: grafana_apiv3_pb2.GetMetricAggregateRequest, context
):
"""Return the set of values for the request metrics and dimensions."""
frames = []
dimensions = {d.key: d.value for d in request.dimensions}
time_window = (
request.startDate.ToDatetime(tzinfo=timezone.utc),
request.endDate.ToDatetime(tzinfo=timezone.utc),
)
for metric in request.metrics:
if metric == "BST":
frames.extend(
self.bst.all_frames(
time_window,
dimensions["antenna_field"].lower(),
dimensions.get("pol"),
)
)
elif metric == "SST":
frames.extend(
self.sst.all_frames(
time_window,
dimensions["antenna_field"].lower(),
dimensions.get("pol"),
)
)
elif metric == "XST":
frames.extend(
self.xst.all_frames(
time_window,
dimensions["antenna_field"].lower(),
dimensions.get("pol"),
)
)
return grafana_apiv3_pb2.GetMetricAggregateResponse(frames=frames)
......@@ -12,15 +12,13 @@ from lofar_sid.interface.stationcontrol import observation_pb2
from lofar_sid.interface.stationcontrol import observation_pb2_grpc
from lofar_sid.interface.stationcontrol import statistics_pb2
from lofar_sid.interface.stationcontrol import statistics_pb2_grpc
from lofar_sid.interface.opah import grafana_apiv3_pb2
from lofar_sid.interface.opah import grafana_apiv3_pb2_grpc
from lofar_sid.interface.stationcontrol import antennafield_pb2
from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from tangostationcontrol.rpc.observation import Observation
from tangostationcontrol.rpc.statistics import Statistics
from tangostationcontrol.rpc.grafana_api import GrafanaAPIV3
from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler
from tangostationcontrol.common.lofar_logging import configure_logger
from tangostationcontrol.metrics import start_metrics_server
from lofar_sid.interface.stationcontrol import antennafield_pb2, antennafield_pb2_grpc
from tangostationcontrol.rpc.antennafield import AntennaField
logger = logging.getLogger()
......@@ -43,14 +41,10 @@ class Server:
statistics_pb2_grpc.add_StatisticsServicer_to_server(
self.statistics_servicer, self.server
)
grafana_apiv3_pb2_grpc.add_GrafanaQueryAPIServicer_to_server(
GrafanaAPIV3(self.statistics_servicer, antenna_fields), self.server
)
SERVICE_NAMES = (
observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name,
antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name,
statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name,
grafana_apiv3_pb2.DESCRIPTOR.services_by_name["GrafanaQueryAPI"].full_name,
reflection.SERVICE_NAME, # reflection is required by innius-gpc-datasource
)
reflection.enable_server_reflection(SERVICE_NAMES, self.server)
......
......@@ -3,12 +3,13 @@
from datetime import datetime, timedelta, timezone
from math import log
from typing import Dict, Tuple
from typing import Dict, Tuple, Callable
import logging
import numpy
from tangostationcontrol.common.constants import N_pol, N_subbands
from tangostationcontrol.common.frequency_bands import Band, bands
from lofar_sid.interface.stationcontrol import statistics_pb2
from lofar_sid.interface.stationcontrol import statistics_pb2_grpc
from tangostationcontrol.rpc.common import (
......@@ -88,6 +89,23 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM
nyquist_zone=message["source_info"]["nyquist_zone_index"],
)
@staticmethod
def _subband_frequency_func(
frequency_band: statistics_pb2.FrequencyBand,
) -> Callable[[int], float]:
"""Return a function that converts a subband number into its central frequency (in Hz).
NB: Spectral inversion is assumed to have been configured correctly at time of recording.
"""
band_name = Band.lookup_nyquist_zone(
frequency_band.antenna_type,
frequency_band.clock,
frequency_band.nyquist_zone,
)
return bands[band_name].subband_frequency
@call_exception_metrics("Statistics")
def Bst(self, request: statistics_pb2.BstRequest, context):
bst_message = self.get_last_message("bst", request.antenna_field)
......@@ -119,9 +137,13 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM
sst_data = numpy.array(sst_message["sst_data"], dtype=numpy.float32)
sst_data = sst_data.reshape((-1, N_pol, N_subbands))
frequency_band = self._message_to_frequency_band(sst_message)
subband_frequency = self._subband_frequency_func(frequency_band)
subbands = [
statistics_pb2.SstResult.SstSubband(
subband=subband_nr,
frequency=subband_frequency(subband_nr),
antennas=[
statistics_pb2.SstResult.SstSubband.SstAntenna(
antenna=antenna_nr,
......@@ -137,7 +159,7 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM
return statistics_pb2.SstReply(
result=statistics_pb2.SstResult(
timestamp=datetime.fromisoformat(sst_message["timestamp"]),
frequency_band=self._message_to_frequency_band(sst_message),
frequency_band=frequency_band,
integration_interval=sst_message["integration_interval"],
subbands=subbands,
),
......@@ -187,12 +209,16 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM
if antenna1 <= antenna2
]
frequency_band = self._message_to_frequency_band(xst_message)
subband_frequency = self._subband_frequency_func(frequency_band)
return statistics_pb2.XstReply(
result=statistics_pb2.XstResult(
timestamp=datetime.fromisoformat(xst_message["timestamp"]),
frequency_band=self._message_to_frequency_band(xst_message),
frequency_band=frequency_band,
integration_interval=xst_message["integration_interval"],
subband=xst_message["subband"],
frequency=subband_frequency(xst_message["subband"]),
baselines=baselines,
),
)
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from datetime import datetime, timezone, timedelta
import json
from os import path
from tangostationcontrol.common.constants import (
N_subbands,
N_pol,
N_beamlets_ctrl,
N_pn,
S_pn,
A_pn,
)
from tangostationcontrol.rpc.grafana_api import (
GrafanaAPIV3,
StatisticsToGrafana,
BstToGrafana,
SstToGrafana,
XstToGrafana,
)
from lofar_sid.interface.opah import grafana_apiv3_pb2
from tangostationcontrol.rpc.statistics import (
Statistics,
dB,
NotAvailableError,
TooOldError,
)
from lofar_sid.interface.stationcontrol import statistics_pb2
from google.protobuf.timestamp_pb2 import Timestamp
from tests import base
def field_labels(label_list: list[grafana_apiv3_pb2.Label]) -> dict[str, str]:
"""Turn a list of objects with key and value fields into a dict."""
return {x.key: x.value for x in label_list}
class TestStatisticsToGrafana(base.TestCase):
def test_verify_call_result(self):
statistics = Statistics()
sut = StatisticsToGrafana(statistics)
now = datetime.now(tz=timezone.utc)
reply = statistics_pb2.SstReply(result=statistics_pb2.SstResult(timestamp=now))
# should not raise
_ = sut._verify_call_result(
reply, (now - timedelta(seconds=1), now + timedelta(seconds=1))
)
# too old raises
with self.assertRaises(TooOldError):
_ = sut._verify_call_result(
reply, (now + timedelta(seconds=1), now + timedelta(seconds=2))
)
class TestBstToGrafana(base.TestCase):
def setUp(self):
statistics = Statistics()
self.sut = BstToGrafana(statistics)
# provide with sample statistic
with open(f"{path.dirname(__file__)}/bst-message.json") as f:
self.bst_message = json.load(f)
statistics.handle_statistics_message(
"bst/lba/cs032", datetime.now(), self.bst_message
)
self.valid_time_window = (
datetime.fromisoformat("2025-01-01T00:00+00:00"),
datetime.fromisoformat("2026-01-01T00:00+00:00"),
)
def test_all_frames(self):
frames = self.sut.all_frames(self.valid_time_window, "lba", None)
self.assertEqual(1, len(frames))
self.assertEqual("BST", frames[0].metric)
self.assertEqual(3, len(frames[0].fields))
self.assertEqual("beamlet", frames[0].fields[0].name)
self.assertEqual("xx", frames[0].fields[1].name)
self.assertEqual("yy", frames[0].fields[2].name)
# all fields must be of the right size
self.assertEqual(N_beamlets_ctrl, len(frames[0].timestamps))
self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[0].values))
self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[1].values))
self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[2].values))
# all timestamps must be equal
self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps])))
# all beamlets must be there
self.assertEqual(set(range(N_beamlets_ctrl)), set(frames[0].fields[0].values))
# test values
for beamlet_idx, value in enumerate(frames[0].fields[1].values):
self.assertAlmostEqual(
dB(self.bst_message["bst_data"][beamlet_idx][0]),
value,
5,
msg=f"{beamlet_idx=}",
)
for beamlet_idx, value in enumerate(frames[0].fields[2].values):
self.assertAlmostEqual(
dB(self.bst_message["bst_data"][beamlet_idx][1]),
value,
5,
msg=f"{beamlet_idx=}",
)
class TestSstToGrafana(base.TestCase):
def setUp(self):
statistics = Statistics()
self.sut = SstToGrafana(statistics)
# provide with sample statistic
with open(f"{path.dirname(__file__)}/sst-message.json") as f:
self.sst_message = json.load(f)
statistics.handle_statistics_message(
"sst/lba/cs032", datetime.now(), self.sst_message
)
self.valid_time_window = (
datetime.fromisoformat("2025-01-01T00:00+00:00"),
datetime.fromisoformat("2026-01-01T00:00+00:00"),
)
def test_all_frames(self):
frames = self.sut.all_frames(self.valid_time_window, "lba", None)
self.assertEqual(1, len(frames))
self.assertEqual("SST", frames[0].metric)
self.assertEqual(1 + N_pn * S_pn, len(frames[0].fields))
self.assertEqual("frequency", frames[0].fields[0].name)
for idx, field in enumerate(frames[0].fields[1:], 1):
self.assertEqual("power", field.name, msg=f"{idx=}")
# all fields must be of the right size
self.assertEqual(N_subbands - 1, len(frames[0].timestamps))
self.assertEqual(N_subbands - 1, len(frames[0].fields[0].values))
for idx, field in enumerate(frames[0].fields[1:], 1):
self.assertEqual(N_subbands - 1, len(field.values), msg=f"{idx=}")
# all timestamps must be equal
self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps])))
# all inputs must be there
input_names = {
field_labels(field.labels)["antenna"] for field in frames[0].fields[1:]
}
self.assertEqual(N_pn * A_pn, len(input_names), msg=f"{input_names=}")
# both polarisarions must be there
input_pols = [
field_labels(field.labels)["pol"] for field in frames[0].fields[1:]
]
self.assertEqual(N_pn * A_pn, input_pols.count("xx"), msg=f"{input_pols=}")
self.assertEqual(N_pn * A_pn, input_pols.count("yy"), msg=f"{input_pols=}")
# test values
for field in frames[0].fields[1:]:
labels = field_labels(field.labels)
antenna_idx = int(labels["antenna"])
pol_indices = {"xx": 0, "yy": 1}
pol_idx = pol_indices[labels["pol"]]
input_idx = antenna_idx * N_pol + pol_idx
for idx, value in enumerate(field.values):
self.assertAlmostEqual(
dB(self.sst_message["sst_data"][input_idx][idx + 1]),
value,
5,
msg=f"{labels=}, {idx=}",
)
class TestXstToGrafana(base.TestCase):
def setUp(self):
statistics = Statistics()
self.sut = XstToGrafana(statistics)
# provide with sample statistic
with open(f"{path.dirname(__file__)}/xst-message.json") as f:
self.xst_message = json.load(f)
statistics.handle_statistics_message(
"xst/lba/cs032", datetime.now(), self.xst_message
)
self.valid_time_window = (
datetime.fromisoformat("2025-01-01T00:00+00:00"),
datetime.fromisoformat("2026-01-01T00:00+00:00"),
)
def test_all_frames(self):
frames = self.sut.all_frames(self.valid_time_window, "lba", None)
self.assertEqual(1, len(frames))
self.assertEqual("XST", frames[0].metric)
self.assertEqual(3 + 2 * N_pol**2, len(frames[0].fields))
self.assertEqual("antenna1", frames[0].fields[0].name)
self.assertEqual("antenna2", frames[0].fields[1].name)
self.assertEqual("frequency", frames[0].fields[2].name)
self.assertEqual(
{"power", "phase"}, {field.name for field in frames[0].fields[3:]}
)
N_antennas = N_pn * A_pn
N_baselines = N_antennas * (N_antennas + 1) // 2
# all fields must be of the right size
self.assertEqual(N_baselines, len(frames[0].timestamps))
for idx, field in enumerate(frames[0].fields):
self.assertEqual(N_baselines, len(field.values), msg=f"{idx=}")
# all timestamps must be equal
self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps])))
# all polarisarions must be there (for power and phase)
input_pols = [
field_labels(field.labels)["pol"] for field in frames[0].fields[3:]
]
self.assertEqual(2, input_pols.count("xx"))
self.assertEqual(2, input_pols.count("xy"))
self.assertEqual(2, input_pols.count("yx"))
self.assertEqual(2, input_pols.count("yy"))
# all antennas must be there
self.assertEqual(set(range(N_antennas)), set(frames[0].fields[0].values))
self.assertEqual(set(range(N_antennas)), set(frames[0].fields[1].values))
# test specific value for regression
self.assertAlmostEqual(30.7213135, frames[0].fields[3].values[99], 6)
self.assertAlmostEqual(3.88297279, frames[0].fields[4].values[99], 6)
self.assertAlmostEqual(27.2744141, frames[0].fields[5].values[99], 6)
self.assertAlmostEqual(297.7411965, frames[0].fields[6].values[99], 6)
class TestGrafanaAPIV3(base.TestCase):
def test_getqueryoptions(self):
"""Test GetQueryOptions."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, [])
request = grafana_apiv3_pb2.GetOptionsRequest()
reply = sut.GetQueryOptions(request, None)
self.assertEqual(0, len(reply.options))
def test_listdimensionkeys(self):
"""Test ListDimensionKeys."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, [])
request = grafana_apiv3_pb2.ListDimensionKeysRequest()
reply = sut.ListDimensionKeys(request, None)
dimensions = {d.key for d in reply.results}
self.assertIn("antenna_field", dimensions)
self.assertIn("pol", dimensions)
def test_listdimensionvalues(self):
"""Test ListDimensionValues."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, ["LBA", "HBA"])
expected_values = {
"antenna_field": {"LBA", "HBA"},
"pol": {"xx", "xy", "yx", "yy"},
}
for dim, expected_value in expected_values.items():
request = grafana_apiv3_pb2.ListDimensionValuesRequest(dimension_key=dim)
reply = sut.ListDimensionValues(request, None)
value = set([r.value for r in reply.results])
self.assertEqual(expected_value, value, msg=f"{dim=}")
def test_listmetrics(self):
"""Test ListMetrics."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, [])
request = grafana_apiv3_pb2.ListMetricsRequest()
reply = sut.ListMetrics(request, None)
metrics = [m.name for m in reply.Metrics]
self.assertIn("BST", metrics)
self.assertIn("SST", metrics)
self.assertIn("XST", metrics)
class TestGetMetricAggregate(base.TestCase):
def setUp(self):
statistics = Statistics()
self.sut = GrafanaAPIV3(statistics, ["LBA"])
# provide with sample statistic
with open(f"{path.dirname(__file__)}/sst-message.json") as f:
sst_message = json.load(f)
statistics.handle_statistics_message(
"sst/lba/cs032", datetime.now(), sst_message
)
def test_nometrics(self):
"""Test if there are no metrics available."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, ["LBA"])
# request/response
request = grafana_apiv3_pb2.GetMetricAggregateRequest(
metrics=["SST"],
dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")],
startDate=Timestamp(seconds=0),
endDate=Timestamp(seconds=int(datetime.now().timestamp())),
)
with self.assertRaises(NotAvailableError):
_ = sut.GetMetricAggregate(request, None)
def test_validdaterange(self):
"""Test obtaining SSTs using a date range for which they are available."""
# request/response
request = grafana_apiv3_pb2.GetMetricAggregateRequest(
metrics=["SST"],
dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")],
startDate=Timestamp(seconds=0),
endDate=Timestamp(seconds=int(datetime.now().timestamp())),
)
reply = self.sut.GetMetricAggregate(request, None)
# validate output
self.assertEqual(1, len(reply.frames))
self.assertEqual("SST", reply.frames[0].metric)
def test_tooold(self):
"""Test obtaining SSTs using a date range for which they are too old."""
# request/response
request = grafana_apiv3_pb2.GetMetricAggregateRequest(
metrics=["SST"],
dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")],
startDate=Timestamp(seconds=int(datetime.now().timestamp()) - 1),
endDate=Timestamp(seconds=int(datetime.now().timestamp())),
)
reply = self.sut.GetMetricAggregate(request, None)
# validate output
self.assertEqual(0, len(reply.frames))
......@@ -8,8 +8,8 @@ from grpc_reflection.v1alpha.proto_reflection_descriptor_database import (
ProtoReflectionDescriptorDatabase,
)
from lofar_sid.interface.opah import grafana_apiv3_pb2
from lofar_sid.interface.opah import grafana_apiv3_pb2_grpc
from lofar_sid.interface.stationcontrol import antennafield_pb2
from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from tangostationcontrol.rpc.server import Server
from tests import base
......@@ -36,11 +36,17 @@ class TestServer(base.TestCase):
self.assertIn("Observation", services)
self.assertIn("Antennafield", services)
self.assertIn("Statistics", services)
self.assertIn("grafanav3.GrafanaQueryAPI", services)
def test_call(self):
"""Test a basic gRPC call to the server."""
with grpc.insecure_channel(f"localhost:{self.server.port}") as channel:
stub = grafana_apiv3_pb2_grpc.GrafanaQueryAPIStub(channel)
_ = stub.GetQueryOptions(grafana_apiv3_pb2.GetOptionsRequest())
stub = antennafield_pb2_grpc.AntennafieldStub(channel)
identifier = antennafield_pb2.Identifier(
antennafield_name="lba",
antenna_name="LBA00",
)
_ = stub.GetAntenna(
antennafield_pb2.GetAntennaRequest(identifier=identifier)
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment