Skip to content
Snippets Groups Projects
Commit e28916f8 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-1509: Move Grafana RPC to Opah repo

parent e57b289e
Branches
Tags
1 merge request!1054L2SS-1509: Move Grafana RPC to Opah repo
Showing
with 58 additions and 1468 deletions
...@@ -150,6 +150,7 @@ Next change the version in the following places: ...@@ -150,6 +150,7 @@ Next change the version in the following places:
through [https://git.astron.nl/lofar2.0/tango/-/tags](Deploy Tags) through [https://git.astron.nl/lofar2.0/tango/-/tags](Deploy Tags)
# Release Notes # Release Notes
* 0.47.1 Move GrafanaAPIV3 RPC interface to Opah repo
* 0.47.0 Migrate from lofar-station-client to lofar-lotus package. Update various package dependencies. * 0.47.0 Migrate from lofar-station-client to lofar-lotus package. Update various package dependencies.
* 0.46.1 Include clock_RW in metadata JSON * 0.46.1 Include clock_RW in metadata JSON
* 0.46.0 Expose latest BST/SST/XST from ZMQ over gRPC * 0.46.0 Expose latest BST/SST/XST from ZMQ over gRPC
......
0.47.0 0.47.1
\ No newline at end of file
Protobuf definitions for the RPC services in the tangostationcontrol.rpc module.
`grafana-apiv3.proto` is taken from https://raw.githubusercontent.com/innius/grafana-simple-grpc-datasource/master/pkg/proto/v3/apiv3.proto
syntax = "proto3";
service Antennafield {
rpc GetAntenna(GetAntennaRequest) returns (AntennaReply) {}
rpc SetAntennaStatus(SetAntennaStatusRequest) returns (AntennaReply) {}
rpc SetAntennaUse(SetAntennaUseRequest) returns (AntennaReply) {}
}
enum Antenna_Status {
OK = 0;
SUSPICIOUS = 1;
BROKEN = 2;
BEYOND_REPAIR = 3;
NOT_AVAILABLE = 4;
}
enum Antenna_Use {
// use antenna only if it's OK or SUSPICIOUS
AUTO = 0;
// force antenna to be on, regardless of status
ON = 1;
// force antenna to be off, regardless of status
OFF = 2;
}
message Identifier {
// e.g. "LBA"
string antennafield_name = 1;
// e.g. "LBA00"
string antenna_name = 2;
}
message SetAntennaStatusRequest {
Identifier identifier = 1;
Antenna_Status antenna_status = 2 ;
}
message GetAntennaRequest {
Identifier identifier = 1;
}
message SetAntennaUseRequest {
Identifier identifier = 1;
Antenna_Use antenna_use = 2;
}
message AntennaResult {
Identifier identifier = 1;
Antenna_Use antenna_use = 2;
Antenna_Status antenna_status = 3;
}
message AntennaReply {
bool success = 1;
string exception = 2;
AntennaResult result = 3;
}
syntax = "proto3";
option go_package = "bitbucket.org/innius/grafana-simple-grpc-datasource/v3";
import "google/protobuf/timestamp.proto";
package grafanav3;
// The GrafanaQueryAPI definition.
service GrafanaQueryAPI {
// Returns a list of all available dimensions
rpc ListDimensionKeys (ListDimensionKeysRequest) returns (ListDimensionKeysResponse) {
}
// Returns a list of all dimension values for a certain dimension
rpc ListDimensionValues (ListDimensionValuesRequest) returns (ListDimensionValuesResponse) {
}
// Returns all metrics from the system
rpc ListMetrics (ListMetricsRequest) returns (ListMetricsResponse) {
}
// Gets the options for the specified query type
rpc GetQueryOptions (GetOptionsRequest) returns (GetOptionsResponse) {
}
// Gets the last known value for one or more metrics
rpc GetMetricValue (GetMetricValueRequest) returns (GetMetricValueResponse) {
}
// Gets the history for one or more metrics
rpc GetMetricHistory (GetMetricHistoryRequest) returns (GetMetricHistoryResponse) {
}
// Gets the history for one or more metrics
rpc GetMetricAggregate(GetMetricAggregateRequest) returns (GetMetricAggregateResponse) {
}
}
message ListMetricsRequest {
repeated Dimension dimensions = 1;
string filter = 2;
}
message ListMetricsResponse {
message Metric {
string name = 1;
string description = 2;
}
repeated Metric Metrics = 1;
}
message GetMetricValueRequest {
repeated Dimension dimensions = 1;
repeated string metrics = 2;
map<string,string> options = 3 ;
google.protobuf.Timestamp startDate = 4;
google.protobuf.Timestamp endDate = 5;
}
message GetMetricValueResponse {
message Frame {
string metric = 1;
google.protobuf.Timestamp timestamp = 2;
repeated SingleValueField fields = 3;
FrameMeta meta = 4;
}
repeated Frame frames = 1;
}
message GetOptionsRequest {
enum QueryType {
GetMetricHistory = 0;
GetMetricValue=1;
GetMetricAggregate=2;
}
// the query type for which options are requested
QueryType queryType = 1;
// the query options which are currently selected
map<string,string> selectedOptions = 2 ;
}
message EnumValue {
// the id of the enum value
string id = 1;
// the description of the option
string description = 2;
// the label of the option
string label = 3;
// the default enum value
bool default = 4;
}
message Option {
// the id of the option
string id = 1;
string description = 2;
enum Type {
Enum = 0; // enum is rendered as a Select control in the frontend
Boolean = 1;
}
Type type = 3;
repeated EnumValue enumValues = 4;
bool required = 5;
// the label of the option
string label = 6;
}
message GetOptionsResponse {
repeated Option options = 1;
}
message GetMetricAggregateRequest {
// The dimensions for the query
repeated Dimension dimensions = 1;
// the metrics for which the aggregates are retrieved
repeated string metrics = 2;
google.protobuf.Timestamp startDate = 4;
google.protobuf.Timestamp endDate = 5;
int64 maxItems = 6;
TimeOrdering timeOrdering = 7;
string startingToken = 8;
int64 intervalMs = 9;
map<string,string> options = 10 ;
}
message GetMetricAggregateResponse {
repeated Frame frames = 1;
string nextToken = 2;
}
message GetMetricHistoryRequest {
repeated Dimension dimensions = 3;
repeated string metrics = 4;
google.protobuf.Timestamp startDate = 5;
google.protobuf.Timestamp endDate = 6;
int64 maxItems = 7;
TimeOrdering timeOrdering = 8;
string startingToken = 9;
map<string,string> options = 10 ;
}
message GetMetricHistoryResponse {
repeated Frame frames = 1;
string nextToken = 2;
}
message Label {
string key = 1;
string value = 2;
}
message Field {
string name = 1;
repeated Label labels = 2;
config config = 3;
repeated double values = 4;
repeated string stringValues = 5;
}
message ValueMapping {
double from = 1;
double to = 2;
string value = 3;
string text = 4;
string color = 5;
}
message config {
string unit = 1;
repeated ValueMapping Mappings = 2;
}
message SingleValueField {
string name = 1;
repeated Label labels = 2;
config config = 3;
double value = 4;
string stringValue = 5;
}
// The data frame for each metric
message Frame {
string metric = 1;
repeated google.protobuf.Timestamp timestamps = 2;
repeated Field fields = 3;
FrameMeta meta = 4;
}
// FrameMeta matches:
// https://github.com/grafana/grafana/blob/master/packages/grafana-data/src/types/data.ts#L11
// NOTE -- in javascript this can accept any `[key: string]: any;` however
// this interface only exposes the values we want to be exposed
message FrameMeta {
enum FrameType {
FrameTypeUnknown = 0;
FrameTypeTimeSeriesWide = 1;
FrameTypeTimeSeriesLong = 2;
FrameTypeTimeSeriesMany = 3;
FrameTypeDirectoryListing = 4;
FrameTypeTable = 5;
}
// Type asserts that the frame matches a known type structure
FrameType type = 1 ;
message Notice {
enum NoticeSeverity {
// NoticeSeverityInfo is informational severity.
NoticeSeverityInfo = 0;
// NoticeSeverityWarning is warning severity.
NoticeSeverityWarning = 1;
// NoticeSeverityError is error severity.
NoticeSeverityError = 3;
}
// Severity is the severity level of the notice: info, warning, or error.
NoticeSeverity Severity = 1;
// Text is freeform descriptive text for the notice.
string text = 2;
// Link is an optional link for display in the user interface and can be an
// absolute URL or a path relative to Grafana's root url.
string link = 3;
enum InspectType {
// InspectTypeNone is no suggestion for a tab of the panel editor in Grafana's user interface.
InspectTypeNone = 0;
// InspectTypeMeta suggests the "meta" tab of the panel editor in Grafana's user interface.
InspectTypeMeta = 1;
// InspectTypeError suggests the "error" tab of the panel editor in Grafana's user interface.
InspectTypeError = 2;
// InspectTypeData suggests the "data" tab of the panel editor in Grafana's user interface.
InspectTypeData = 3;
// InspectTypeStats suggests the "stats" tab of the panel editor in Grafana's user interface.
InspectTypeStats = 4;
}
// Inspect is an optional suggestion for which tab to display in the panel inspector
// in Grafana's User interface. Can be meta, error, data, or stats.
InspectType inspect = 4;
}
// Notices provide additional information about the data in the Frame that
// Grafana can display to the user in the user interface.
repeated Notice Notices = 6;
// VisType is used to indicate how the data should be visualized in explore.
enum VisType {
// VisTypeGraph indicates the response should be visualized using a graph.
VisTypeGraph = 0;
// VisTypeTable indicates the response should be visualized using a table.
VisTypeTable = 1;
// VisTypeLogs indicates the response should be visualized using a logs visualization.
VisTypeLogs = 2;
// VisTypeTrace indicates the response should be visualized using a trace view visualization.
VisTypeTrace = 3;
// VisTypeNodeGraph indicates the response should be visualized using a node graph visualization.
VisTypeNodeGraph = 4;
}
// PreferredVisualization is currently used to show results in Explore only in preferred visualisation option.
VisType PreferredVisualization = 8;
// ExecutedQueryString is the raw query sent to the underlying system. All macros and templating
// have been applied. When metadata contains this value, it will be shown in the query inspector.
string executedQueryString = 9;
}
enum TimeOrdering {
ASCENDING = 0;
DESCENDING = 1;
}
message ListDimensionKeysRequest {
string filter = 1;
repeated Dimension selected_dimensions = 2;
}
message ListDimensionKeysResponse {
message Result {
string key = 1;
string description = 2;
}
repeated Result results = 1;
}
message ListDimensionValuesRequest {
string dimension_key = 1;
string filter = 2;
repeated Dimension selected_dimensions = 3;
}
message ListDimensionValuesResponse {
message Result {
string value = 1;
string description = 2;
}
repeated Result results = 1;
}
message TimeRange {
int64 fromEpochMS = 1;
int64 toEpochMS = 2;
}
message Dimension {
string key = 1;
string value = 2;
}
message QueryRequest {
string refId = 1;
int64 maxDataPoints = 2;
int64 intervalMS = 3;
TimeRange timeRange = 4;
// The offset for the result set
int64 startKey = 5;
repeated Dimension dimensions = 6;
}
// The response message containing the greetings
message QueryResponse {
string refId = 1;
int64 nextKey = 2;
message Value {
int64 timestamp = 1;
float value = 2;
}
repeated Value values = 3;
}
syntax = "proto3";
service Observation {
rpc StartObservation(StartObservationRequest) returns (ObservationReply){}
rpc StopObservation(StopObservationRequest) returns (ObservationReply) {}
}
message StartObservationRequest {
string configuration = 1;
}
message StopObservationRequest {
int64 observation_id = 1;
}
message ObservationReply {
bool success = 1;
string exception = 2;
}
syntax = "proto3";
import "google/protobuf/timestamp.proto";
service Statistics {
rpc Bst(BstRequest) returns (BstReply) {}
rpc Sst(SstRequest) returns (SstReply) {}
rpc Xst(XstRequest) returns (XstReply) {}
}
message FrequencyBand {
string antenna_type = 1;
int32 clock = 2;
int32 nyquist_zone = 3;
}
message BstRequest {
string antenna_field = 1;
optional uint32 maxage = 2;
}
message BstResult {
google.protobuf.Timestamp timestamp = 1;
FrequencyBand frequency_band = 2;
float integration_interval = 3;
message BstBeamlet {
int32 beamlet = 1;
float x_power_db = 2;
float y_power_db = 3;
}
repeated BstBeamlet beamlets = 4;
}
message BstReply {
BstResult result = 3;
}
message SstRequest {
string antenna_field = 1;
optional uint32 maxage = 2;
}
message SstResult {
google.protobuf.Timestamp timestamp = 1;
FrequencyBand frequency_band = 2;
float integration_interval = 3;
message SstSubband {
int32 subband = 1;
message SstAntenna {
int32 antenna = 1;
float x_power_db = 2;
float y_power_db = 3;
}
repeated SstAntenna antennas = 2;
}
repeated SstSubband subbands = 4;
}
message SstReply {
SstResult result = 3;
}
message XstRequest {
string antenna_field = 1;
optional uint32 maxage = 2;
}
message XstResult {
google.protobuf.Timestamp timestamp = 1;
FrequencyBand frequency_band = 2;
float integration_interval = 3;
int32 subband = 4;
message XstBaseline {
int32 antenna1 = 1;
int32 antenna2 = 2;
message XstValue {
float power_db = 1;
float phase = 2;
}
XstValue xx = 3;
XstValue xy = 4;
XstValue yx = 5;
XstValue yy = 6;
}
repeated XstBaseline baselines = 5;
}
message XstReply {
XstResult result = 3;
}
...@@ -6,9 +6,9 @@ import tango ...@@ -6,9 +6,9 @@ import tango
from tango import DeviceProxy from tango import DeviceProxy
from tangostationcontrol.common.antennas import antenna_field_family_name from tangostationcontrol.common.antennas import antenna_field_family_name
from tangostationcontrol.rpc._proto import antennafield_pb2_grpc from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from tangostationcontrol.rpc._proto.antennafield_pb2 import ( from lofar_sid.interface.stationcontrol.antennafield_pb2 import (
AntennaReply, AntennaReply,
AntennaResult, AntennaResult,
GetAntennaRequest, GetAntennaRequest,
......
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Exposure of the station's statistics for the innius-rpc-datasource plugin in Grafana."""
from datetime import datetime, timezone
import itertools
import math
from typing import Callable
from tangostationcontrol.common.frequency_bands import Band, bands
from tangostationcontrol.rpc._proto import grafana_apiv3_pb2
from tangostationcontrol.rpc._proto import grafana_apiv3_pb2_grpc
from tangostationcontrol.rpc._proto import statistics_pb2
from tangostationcontrol.rpc.statistics import (
Statistics,
TooOldError,
)
from tangostationcontrol.rpc.common import (
call_exception_metrics,
)
class StatisticsToGrafana:
"""Abstract base class for converting Statistics responses to Grafana Frames."""
# Meta information for our statistics
meta = grafana_apiv3_pb2.FrameMeta(
type=grafana_apiv3_pb2.FrameMeta.FrameType.FrameTypeTimeSeriesLong,
PreferredVisualization=grafana_apiv3_pb2.FrameMeta.VisType.VisTypeGraph,
)
def __init__(self, statistics: Statistics):
super().__init__()
self.statistics = statistics
@staticmethod
def _subband_frequency_func(
frequency_band: statistics_pb2.FrequencyBand,
) -> Callable[[int], float]:
"""Return a function that converts a subband number into its central frequency (in Hz).
NB: Spectral inversion is assumed to have been configured correctly at time of recording.
"""
band_name = Band.lookup_nyquist_zone(
frequency_band.antenna_type,
frequency_band.clock,
frequency_band.nyquist_zone,
)
return bands[band_name].subband_frequency
@staticmethod
def _verify_call_result(reply, time_window: tuple[datetime, datetime]):
"""Verify reply from Statistics service. Raises if verification fails."""
if not (
time_window[0]
<= reply.result.timestamp.ToDatetime(tzinfo=timezone.utc)
< time_window[1]
):
raise TooOldError(
f"Stastistics not available in time window {time_window}. Available is {reply.result.timestamp}."
)
return True
class BstToGrafana(StatisticsToGrafana):
"""Converts Statistics.BST responses to Grafana Frames."""
def _get_latest_in_window(
self, time_window: tuple[datetime, datetime], antenna_field: str
) -> statistics_pb2.BstReply:
"""Get the latest statistics in the given time window, if any."""
request = statistics_pb2.BstRequest(
antenna_field=antenna_field,
)
reply = self.statistics.Bst(request, None)
self._verify_call_result(reply, time_window)
return reply
@call_exception_metrics("StatisticsToGrafana", {"type": "bst"})
def all_frames(
self,
time_window: tuple[datetime, datetime],
antenna_field: str,
pol: str | None,
) -> list[grafana_apiv3_pb2.Frame]:
"""Return all Grafana Frames for the requested data."""
try:
reply = self._get_latest_in_window(time_window, antenna_field)
result = reply.result
except TooOldError:
return []
# Turn result into Grafana fields
#
# Each value describes the power for one beamlet.
#
# Each polarisation results in one field.
frames = [
grafana_apiv3_pb2.Frame(
metric="BST",
timestamps=[result.timestamp for _ in result.beamlets],
fields=list(
filter(
None,
[
grafana_apiv3_pb2.Field(
name="beamlet",
values=[b.beamlet for b in result.beamlets],
),
(
grafana_apiv3_pb2.Field(
name="xx",
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[b.x_power_db for b in result.beamlets],
)
if pol in ["xx", None]
else None
),
(
grafana_apiv3_pb2.Field(
name="yy",
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[b.y_power_db for b in result.beamlets],
)
if pol in ["yy", None]
else None
),
],
)
),
meta=self.meta,
)
]
return frames
class SstToGrafana(StatisticsToGrafana):
"""Converts Statistics.SST responses to Grafana Frames."""
def _get_latest_in_window(
self, time_window: tuple[datetime, datetime], antenna_field: str
) -> statistics_pb2.SstReply:
"""Get the latest statistics in the given time window, if any."""
request = statistics_pb2.SstRequest(
antenna_field=antenna_field,
)
reply = self.statistics.Sst(request, None)
self._verify_call_result(reply, time_window)
return reply
@call_exception_metrics("StatisticsToGrafana", {"type": "sst"})
def all_frames(
self,
time_window: tuple[datetime, datetime],
antenna_field: str,
selected_pol: str | None,
) -> list[grafana_apiv3_pb2.Frame]:
"""Return all Grafana Frames for the requested data."""
try:
reply = self._get_latest_in_window(time_window, antenna_field)
result = reply.result
except TooOldError:
return []
# Turn result into Grafana fields
#
# Each value describes the power of an antenna for a specific subband.
#
# Field 0 are the spectral frequencies for each value.
# Field 1+ is one field for each antenna and each requested polarisation.
antenna_nrs = [antenna.antenna for antenna in result.subbands[0].antennas]
fields_per_antenna = [
[
(
grafana_apiv3_pb2.Field(
name="power",
labels=[
grafana_apiv3_pb2.Label(
key="antenna",
value="%03d" % antenna_nr,
),
grafana_apiv3_pb2.Label(
key="pol",
value="xx",
),
],
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[
subband.antennas[antenna_nr].x_power_db
for subband in result.subbands
],
)
if selected_pol in ["xx", None]
else None
),
(
grafana_apiv3_pb2.Field(
name="power",
labels=[
grafana_apiv3_pb2.Label(
key="antenna",
value="%03d" % antenna_nr,
),
grafana_apiv3_pb2.Label(
key="pol",
value="yy",
),
],
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[
subband.antennas[antenna_nr].y_power_db
for subband in result.subbands
],
)
if selected_pol in ["yy", None]
else None
),
]
for antenna_nr in antenna_nrs
]
subband_frequency = self._subband_frequency_func(result.frequency_band)
frames = [
grafana_apiv3_pb2.Frame(
metric="SST",
timestamps=[result.timestamp for _ in result.subbands],
fields=[
grafana_apiv3_pb2.Field(
name="frequency",
config=grafana_apiv3_pb2.config(
unit="Hz",
),
values=[subband_frequency(b.subband) for b in result.subbands],
),
]
+ list(filter(None, itertools.chain(*fields_per_antenna))),
meta=self.meta,
)
]
return frames
class XstToGrafana(StatisticsToGrafana):
"""Converts Statistics.XST responses to Grafana Frames."""
def _get_latest_in_window(
self, time_window: tuple[datetime, datetime], antenna_field: str
) -> statistics_pb2.XstReply:
"""Get the latest statistics in the given time window, if any."""
request = statistics_pb2.XstRequest(
antenna_field=antenna_field,
)
reply = self.statistics.Xst(request, None)
self._verify_call_result(reply, time_window)
return reply
@call_exception_metrics("StatisticsToGrafana", {"type": "xst"})
def all_frames(
self,
time_window: tuple[datetime, datetime],
antenna_field: str,
selected_pol: str | None,
) -> list[grafana_apiv3_pb2.Frame]:
"""Return all Grafana Frames for the requested data."""
try:
reply = self._get_latest_in_window(time_window, antenna_field)
result = reply.result
except TooOldError:
return []
subband_frequency = self._subband_frequency_func(result.frequency_band)
# Turn result into Grafana fields
#
# Each value describes a baseline.
#
# Field 0 & 1 are the (antenna1, antenna2) indices describing each baseline.
# Field 2 is the central frequency of the values for each baseline.
fields = [
grafana_apiv3_pb2.Field(
name="antenna1",
values=[baseline.antenna1 for baseline in result.baselines],
),
grafana_apiv3_pb2.Field(
name="antenna2",
values=[baseline.antenna2 for baseline in result.baselines],
),
grafana_apiv3_pb2.Field(
name="frequency",
config=grafana_apiv3_pb2.config(
unit="Hz",
),
values=[subband_frequency(result.subband) for _ in result.baselines],
),
]
# Subsequent fields describe the power and phase for each baseline, and
# the requested, or all, polarisations.
for pol in ("xx", "xy", "yx", "yy"):
if selected_pol is None or pol == selected_pol:
labels = (
[
grafana_apiv3_pb2.Label(
key="pol",
value=pol,
)
]
if not selected_pol
else []
)
fields.extend(
[
grafana_apiv3_pb2.Field(
name="power",
labels=labels,
config=grafana_apiv3_pb2.config(
unit="dB",
),
values=[
getattr(baseline, pol).power_db
for baseline in result.baselines
],
),
grafana_apiv3_pb2.Field(
name="phase",
labels=labels,
config=grafana_apiv3_pb2.config(
unit="deg",
),
values=[
math.fabs(
getattr(baseline, pol).phase * 360.0 / math.pi
)
for baseline in result.baselines
],
),
]
)
frames = [
grafana_apiv3_pb2.Frame(
metric="XST",
timestamps=[result.timestamp for _ in result.baselines],
fields=fields,
meta=self.meta,
)
]
return frames
class GrafanaAPIV3(grafana_apiv3_pb2_grpc.GrafanaQueryAPIServicer):
"""Implements the Grafana interface for the innius simple-rpc-datasource,
see https://github.com/innius/grafana-simple-grpc-datasource"""
def __init__(self, statistics: Statistics, antenna_fields: list[str]):
super().__init__()
self.statistics = statistics
self.antenna_fields = antenna_fields
self.bst = BstToGrafana(self.statistics)
self.sst = SstToGrafana(self.statistics)
self.xst = XstToGrafana(self.statistics)
# self._import_test_data()
def _import_test_data(self):
"""Import test data for demo purposes."""
import json
import os
test_dir = os.path.dirname(__file__) + "/../../test/rpc/"
for type_ in ("bst", "sst", "xst"):
with open(test_dir + f"{type_}-message.json") as f:
print(f"Loading {f}")
message = json.load(f)
self.statistics.handle_statistics_message(
f"{type_}/lba/cs032", datetime.now(tz=timezone.utc), message
)
def GetQueryOptions(self, request: grafana_apiv3_pb2.GetOptionsRequest, context):
"""List options per query."""
return grafana_apiv3_pb2.GetOptionsResponse(options=[])
def ListDimensionKeys(
self, request: grafana_apiv3_pb2.ListDimensionKeysRequest, context
):
"""List available data dimensions."""
results = [
grafana_apiv3_pb2.ListDimensionKeysResponse.Result(
key="antenna_field",
description="Antenna field",
),
grafana_apiv3_pb2.ListDimensionKeysResponse.Result(
key="pol",
description="Polarisation",
),
]
return grafana_apiv3_pb2.ListDimensionKeysResponse(results=results)
def ListDimensionValues(
self, request: grafana_apiv3_pb2.ListDimensionValuesRequest, context
):
"""List possible values for each data dimension."""
results = []
if request.dimension_key == "antenna_field":
for antenna_field in self.antenna_fields:
results.append(
grafana_apiv3_pb2.ListDimensionValuesResponse.Result(
value=antenna_field,
)
)
if request.dimension_key == "pol":
for pol in ["xx", "xy", "yx", "yy"]:
results.append(
grafana_apiv3_pb2.ListDimensionValuesResponse.Result(
value=pol,
)
)
return grafana_apiv3_pb2.ListDimensionValuesResponse(results=results)
def ListMetrics(self, request: grafana_apiv3_pb2.ListMetricsRequest, context):
"""List available metrics."""
metrics = [
grafana_apiv3_pb2.ListMetricsResponse.Metric(
name="BST",
description="Beamlet statistics",
),
grafana_apiv3_pb2.ListMetricsResponse.Metric(
name="SST",
description="Subband statistics",
),
grafana_apiv3_pb2.ListMetricsResponse.Metric(
name="XST",
description="Crosslet statistics",
),
]
return grafana_apiv3_pb2.ListMetricsResponse(Metrics=metrics)
@call_exception_metrics("GrafanaAPIV3")
def GetMetricValue(self, request: grafana_apiv3_pb2.GetMetricValueRequest, context):
return grafana_apiv3_pb2.GetMetricValueResponse(frames=[])
@call_exception_metrics("GrafanaAPIV3")
def GetMetricAggregate(
self, request: grafana_apiv3_pb2.GetMetricAggregateRequest, context
):
"""Return the set of values for the request metrics and dimensions."""
frames = []
dimensions = {d.key: d.value for d in request.dimensions}
time_window = (
request.startDate.ToDatetime(tzinfo=timezone.utc),
request.endDate.ToDatetime(tzinfo=timezone.utc),
)
for metric in request.metrics:
if metric == "BST":
frames.extend(
self.bst.all_frames(
time_window,
dimensions["antenna_field"].lower(),
dimensions.get("pol"),
)
)
elif metric == "SST":
frames.extend(
self.sst.all_frames(
time_window,
dimensions["antenna_field"].lower(),
dimensions.get("pol"),
)
)
elif metric == "XST":
frames.extend(
self.xst.all_frames(
time_window,
dimensions["antenna_field"].lower(),
dimensions.get("pol"),
)
)
return grafana_apiv3_pb2.GetMetricAggregateResponse(frames=frames)
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import logging import logging
from tangostationcontrol.rpc._proto import observation_pb2 from lofar_sid.interface.stationcontrol import observation_pb2
from tangostationcontrol.rpc._proto import observation_pb2_grpc from lofar_sid.interface.stationcontrol import observation_pb2_grpc
from tangostationcontrol.rpc.common import ( from tangostationcontrol.rpc.common import (
call_exception_metrics, call_exception_metrics,
reply_on_exception, reply_on_exception,
......
...@@ -8,19 +8,17 @@ import sys ...@@ -8,19 +8,17 @@ import sys
import grpc import grpc
from grpc_reflection.v1alpha import reflection from grpc_reflection.v1alpha import reflection
from tangostationcontrol.rpc._proto import observation_pb2 from lofar_sid.interface.stationcontrol import observation_pb2
from tangostationcontrol.rpc._proto import observation_pb2_grpc from lofar_sid.interface.stationcontrol import observation_pb2_grpc
from tangostationcontrol.rpc._proto import statistics_pb2 from lofar_sid.interface.stationcontrol import statistics_pb2
from tangostationcontrol.rpc._proto import statistics_pb2_grpc from lofar_sid.interface.stationcontrol import statistics_pb2_grpc
from tangostationcontrol.rpc._proto import grafana_apiv3_pb2 from lofar_sid.interface.stationcontrol import antennafield_pb2
from tangostationcontrol.rpc._proto import grafana_apiv3_pb2_grpc from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from tangostationcontrol.rpc.observation import Observation from tangostationcontrol.rpc.observation import Observation
from tangostationcontrol.rpc.statistics import Statistics from tangostationcontrol.rpc.statistics import Statistics
from tangostationcontrol.rpc.grafana_api import GrafanaAPIV3
from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler
from tangostationcontrol.common.lofar_logging import configure_logger from tangostationcontrol.common.lofar_logging import configure_logger
from tangostationcontrol.metrics import start_metrics_server from tangostationcontrol.metrics import start_metrics_server
from tangostationcontrol.rpc._proto import antennafield_pb2, antennafield_pb2_grpc
from tangostationcontrol.rpc.antennafield import AntennaField from tangostationcontrol.rpc.antennafield import AntennaField
logger = logging.getLogger() logger = logging.getLogger()
...@@ -43,14 +41,10 @@ class Server: ...@@ -43,14 +41,10 @@ class Server:
statistics_pb2_grpc.add_StatisticsServicer_to_server( statistics_pb2_grpc.add_StatisticsServicer_to_server(
self.statistics_servicer, self.server self.statistics_servicer, self.server
) )
grafana_apiv3_pb2_grpc.add_GrafanaQueryAPIServicer_to_server(
GrafanaAPIV3(self.statistics_servicer, antenna_fields), self.server
)
SERVICE_NAMES = ( SERVICE_NAMES = (
observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name, observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name,
antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name, antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name,
statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name, statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name,
grafana_apiv3_pb2.DESCRIPTOR.services_by_name["GrafanaQueryAPI"].full_name,
reflection.SERVICE_NAME, # reflection is required by innius-gpc-datasource reflection.SERVICE_NAME, # reflection is required by innius-gpc-datasource
) )
reflection.enable_server_reflection(SERVICE_NAMES, self.server) reflection.enable_server_reflection(SERVICE_NAMES, self.server)
......
...@@ -3,14 +3,15 @@ ...@@ -3,14 +3,15 @@
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from math import log from math import log
from typing import Dict, Tuple from typing import Dict, Tuple, Callable
import logging import logging
import numpy import numpy
from tangostationcontrol.common.constants import N_pol, N_subbands from tangostationcontrol.common.constants import N_pol, N_subbands
from tangostationcontrol.rpc._proto import statistics_pb2 from tangostationcontrol.common.frequency_bands import Band, bands
from tangostationcontrol.rpc._proto import statistics_pb2_grpc from lofar_sid.interface.stationcontrol import statistics_pb2
from lofar_sid.interface.stationcontrol import statistics_pb2_grpc
from tangostationcontrol.rpc.common import ( from tangostationcontrol.rpc.common import (
call_exception_metrics, call_exception_metrics,
) )
...@@ -88,6 +89,23 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM ...@@ -88,6 +89,23 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM
nyquist_zone=message["source_info"]["nyquist_zone_index"], nyquist_zone=message["source_info"]["nyquist_zone_index"],
) )
@staticmethod
def _subband_frequency_func(
frequency_band: statistics_pb2.FrequencyBand,
) -> Callable[[int], float]:
"""Return a function that converts a subband number into its central frequency (in Hz).
NB: Spectral inversion is assumed to have been configured correctly at time of recording.
"""
band_name = Band.lookup_nyquist_zone(
frequency_band.antenna_type,
frequency_band.clock,
frequency_band.nyquist_zone,
)
return bands[band_name].subband_frequency
@call_exception_metrics("Statistics") @call_exception_metrics("Statistics")
def Bst(self, request: statistics_pb2.BstRequest, context): def Bst(self, request: statistics_pb2.BstRequest, context):
bst_message = self.get_last_message("bst", request.antenna_field) bst_message = self.get_last_message("bst", request.antenna_field)
...@@ -119,9 +137,13 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM ...@@ -119,9 +137,13 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM
sst_data = numpy.array(sst_message["sst_data"], dtype=numpy.float32) sst_data = numpy.array(sst_message["sst_data"], dtype=numpy.float32)
sst_data = sst_data.reshape((-1, N_pol, N_subbands)) sst_data = sst_data.reshape((-1, N_pol, N_subbands))
frequency_band = self._message_to_frequency_band(sst_message)
subband_frequency = self._subband_frequency_func(frequency_band)
subbands = [ subbands = [
statistics_pb2.SstResult.SstSubband( statistics_pb2.SstResult.SstSubband(
subband=subband_nr, subband=subband_nr,
frequency=subband_frequency(subband_nr),
antennas=[ antennas=[
statistics_pb2.SstResult.SstSubband.SstAntenna( statistics_pb2.SstResult.SstSubband.SstAntenna(
antenna=antenna_nr, antenna=antenna_nr,
...@@ -137,7 +159,7 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM ...@@ -137,7 +159,7 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM
return statistics_pb2.SstReply( return statistics_pb2.SstReply(
result=statistics_pb2.SstResult( result=statistics_pb2.SstResult(
timestamp=datetime.fromisoformat(sst_message["timestamp"]), timestamp=datetime.fromisoformat(sst_message["timestamp"]),
frequency_band=self._message_to_frequency_band(sst_message), frequency_band=frequency_band,
integration_interval=sst_message["integration_interval"], integration_interval=sst_message["integration_interval"],
subbands=subbands, subbands=subbands,
), ),
...@@ -187,12 +209,16 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM ...@@ -187,12 +209,16 @@ class Statistics(statistics_pb2_grpc.StatisticsServicer, LastStatisticsMessagesM
if antenna1 <= antenna2 if antenna1 <= antenna2
] ]
frequency_band = self._message_to_frequency_band(xst_message)
subband_frequency = self._subband_frequency_func(frequency_band)
return statistics_pb2.XstReply( return statistics_pb2.XstReply(
result=statistics_pb2.XstResult( result=statistics_pb2.XstResult(
timestamp=datetime.fromisoformat(xst_message["timestamp"]), timestamp=datetime.fromisoformat(xst_message["timestamp"]),
frequency_band=self._message_to_frequency_band(xst_message), frequency_band=frequency_band,
integration_interval=xst_message["integration_interval"], integration_interval=xst_message["integration_interval"],
subband=xst_message["subband"], subband=xst_message["subband"],
frequency=subband_frequency(xst_message["subband"]),
baselines=baselines, baselines=baselines,
), ),
) )
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from tangostationcontrol.rpc._proto.antennafield_pb2 import ( from lofar_sid.interface.stationcontrol.antennafield_pb2 import (
Identifier, Identifier,
SetAntennaStatusRequest, SetAntennaStatusRequest,
SetAntennaUseRequest, SetAntennaUseRequest,
......
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from datetime import datetime, timezone, timedelta
import json
from os import path
from tangostationcontrol.common.constants import (
N_subbands,
N_pol,
N_beamlets_ctrl,
N_pn,
S_pn,
A_pn,
)
from tangostationcontrol.rpc.grafana_api import (
GrafanaAPIV3,
StatisticsToGrafana,
BstToGrafana,
SstToGrafana,
XstToGrafana,
)
from tangostationcontrol.rpc._proto import grafana_apiv3_pb2
from tangostationcontrol.rpc.statistics import (
Statistics,
dB,
NotAvailableError,
TooOldError,
)
from tangostationcontrol.rpc._proto import statistics_pb2
from google.protobuf.timestamp_pb2 import Timestamp
from test import base
def field_labels(label_list: list[grafana_apiv3_pb2.Label]) -> dict[str, str]:
"""Turn a list of objects with key and value fields into a dict."""
return {x.key: x.value for x in label_list}
class TestStatisticsToGrafana(base.TestCase):
def test_verify_call_result(self):
statistics = Statistics()
sut = StatisticsToGrafana(statistics)
now = datetime.now(tz=timezone.utc)
reply = statistics_pb2.SstReply(result=statistics_pb2.SstResult(timestamp=now))
# should not raise
_ = sut._verify_call_result(
reply, (now - timedelta(seconds=1), now + timedelta(seconds=1))
)
# too old raises
with self.assertRaises(TooOldError):
_ = sut._verify_call_result(
reply, (now + timedelta(seconds=1), now + timedelta(seconds=2))
)
class TestBstToGrafana(base.TestCase):
def setUp(self):
statistics = Statistics()
self.sut = BstToGrafana(statistics)
# provide with sample statistic
with open(f"{path.dirname(__file__)}/bst-message.json") as f:
self.bst_message = json.load(f)
statistics.handle_statistics_message(
"bst/lba/cs032", datetime.now(), self.bst_message
)
self.valid_time_window = (
datetime.fromisoformat("2025-01-01T00:00+00:00"),
datetime.fromisoformat("2026-01-01T00:00+00:00"),
)
def test_all_frames(self):
frames = self.sut.all_frames(self.valid_time_window, "lba", None)
self.assertEqual(1, len(frames))
self.assertEqual("BST", frames[0].metric)
self.assertEqual(3, len(frames[0].fields))
self.assertEqual("beamlet", frames[0].fields[0].name)
self.assertEqual("xx", frames[0].fields[1].name)
self.assertEqual("yy", frames[0].fields[2].name)
# all fields must be of the right size
self.assertEqual(N_beamlets_ctrl, len(frames[0].timestamps))
self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[0].values))
self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[1].values))
self.assertEqual(N_beamlets_ctrl, len(frames[0].fields[2].values))
# all timestamps must be equal
self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps])))
# all beamlets must be there
self.assertEqual(set(range(N_beamlets_ctrl)), set(frames[0].fields[0].values))
# test values
for beamlet_idx, value in enumerate(frames[0].fields[1].values):
self.assertAlmostEqual(
dB(self.bst_message["bst_data"][beamlet_idx][0]),
value,
5,
msg=f"{beamlet_idx=}",
)
for beamlet_idx, value in enumerate(frames[0].fields[2].values):
self.assertAlmostEqual(
dB(self.bst_message["bst_data"][beamlet_idx][1]),
value,
5,
msg=f"{beamlet_idx=}",
)
class TestSstToGrafana(base.TestCase):
def setUp(self):
statistics = Statistics()
self.sut = SstToGrafana(statistics)
# provide with sample statistic
with open(f"{path.dirname(__file__)}/sst-message.json") as f:
self.sst_message = json.load(f)
statistics.handle_statistics_message(
"sst/lba/cs032", datetime.now(), self.sst_message
)
self.valid_time_window = (
datetime.fromisoformat("2025-01-01T00:00+00:00"),
datetime.fromisoformat("2026-01-01T00:00+00:00"),
)
def test_all_frames(self):
frames = self.sut.all_frames(self.valid_time_window, "lba", None)
self.assertEqual(1, len(frames))
self.assertEqual("SST", frames[0].metric)
self.assertEqual(1 + N_pn * S_pn, len(frames[0].fields))
self.assertEqual("frequency", frames[0].fields[0].name)
for idx, field in enumerate(frames[0].fields[1:], 1):
self.assertEqual("power", field.name, msg=f"{idx=}")
# all fields must be of the right size
self.assertEqual(N_subbands - 1, len(frames[0].timestamps))
self.assertEqual(N_subbands - 1, len(frames[0].fields[0].values))
for idx, field in enumerate(frames[0].fields[1:], 1):
self.assertEqual(N_subbands - 1, len(field.values), msg=f"{idx=}")
# all timestamps must be equal
self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps])))
# all inputs must be there
input_names = {
field_labels(field.labels)["antenna"] for field in frames[0].fields[1:]
}
self.assertEqual(N_pn * A_pn, len(input_names), msg=f"{input_names=}")
# both polarisarions must be there
input_pols = [
field_labels(field.labels)["pol"] for field in frames[0].fields[1:]
]
self.assertEqual(N_pn * A_pn, input_pols.count("xx"), msg=f"{input_pols=}")
self.assertEqual(N_pn * A_pn, input_pols.count("yy"), msg=f"{input_pols=}")
# test values
for field in frames[0].fields[1:]:
labels = field_labels(field.labels)
antenna_idx = int(labels["antenna"])
pol_indices = {"xx": 0, "yy": 1}
pol_idx = pol_indices[labels["pol"]]
input_idx = antenna_idx * N_pol + pol_idx
for idx, value in enumerate(field.values):
self.assertAlmostEqual(
dB(self.sst_message["sst_data"][input_idx][idx + 1]),
value,
5,
msg=f"{labels=}, {idx=}",
)
class TestXstToGrafana(base.TestCase):
def setUp(self):
statistics = Statistics()
self.sut = XstToGrafana(statistics)
# provide with sample statistic
with open(f"{path.dirname(__file__)}/xst-message.json") as f:
self.xst_message = json.load(f)
statistics.handle_statistics_message(
"xst/lba/cs032", datetime.now(), self.xst_message
)
self.valid_time_window = (
datetime.fromisoformat("2025-01-01T00:00+00:00"),
datetime.fromisoformat("2026-01-01T00:00+00:00"),
)
def test_all_frames(self):
frames = self.sut.all_frames(self.valid_time_window, "lba", None)
self.assertEqual(1, len(frames))
self.assertEqual("XST", frames[0].metric)
self.assertEqual(3 + 2 * N_pol**2, len(frames[0].fields))
self.assertEqual("antenna1", frames[0].fields[0].name)
self.assertEqual("antenna2", frames[0].fields[1].name)
self.assertEqual("frequency", frames[0].fields[2].name)
self.assertEqual(
{"power", "phase"}, {field.name for field in frames[0].fields[3:]}
)
N_antennas = N_pn * A_pn
N_baselines = N_antennas * (N_antennas + 1) // 2
# all fields must be of the right size
self.assertEqual(N_baselines, len(frames[0].timestamps))
for idx, field in enumerate(frames[0].fields):
self.assertEqual(N_baselines, len(field.values), msg=f"{idx=}")
# all timestamps must be equal
self.assertEqual(1, len(set([ts.ToDatetime() for ts in frames[0].timestamps])))
# all polarisarions must be there (for power and phase)
input_pols = [
field_labels(field.labels)["pol"] for field in frames[0].fields[3:]
]
self.assertEqual(2, input_pols.count("xx"))
self.assertEqual(2, input_pols.count("xy"))
self.assertEqual(2, input_pols.count("yx"))
self.assertEqual(2, input_pols.count("yy"))
# all antennas must be there
self.assertEqual(set(range(N_antennas)), set(frames[0].fields[0].values))
self.assertEqual(set(range(N_antennas)), set(frames[0].fields[1].values))
# test specific value for regression
self.assertAlmostEqual(30.7213135, frames[0].fields[3].values[99], 6)
self.assertAlmostEqual(3.88297279, frames[0].fields[4].values[99], 6)
self.assertAlmostEqual(27.2744141, frames[0].fields[5].values[99], 6)
self.assertAlmostEqual(297.7411965, frames[0].fields[6].values[99], 6)
class TestGrafanaAPIV3(base.TestCase):
def test_getqueryoptions(self):
"""Test GetQueryOptions."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, [])
request = grafana_apiv3_pb2.GetOptionsRequest()
reply = sut.GetQueryOptions(request, None)
self.assertEqual(0, len(reply.options))
def test_listdimensionkeys(self):
"""Test ListDimensionKeys."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, [])
request = grafana_apiv3_pb2.ListDimensionKeysRequest()
reply = sut.ListDimensionKeys(request, None)
dimensions = {d.key for d in reply.results}
self.assertIn("antenna_field", dimensions)
self.assertIn("pol", dimensions)
def test_listdimensionvalues(self):
"""Test ListDimensionValues."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, ["LBA", "HBA"])
expected_values = {
"antenna_field": {"LBA", "HBA"},
"pol": {"xx", "xy", "yx", "yy"},
}
for dim, expected_value in expected_values.items():
request = grafana_apiv3_pb2.ListDimensionValuesRequest(dimension_key=dim)
reply = sut.ListDimensionValues(request, None)
value = set([r.value for r in reply.results])
self.assertEqual(expected_value, value, msg=f"{dim=}")
def test_listmetrics(self):
"""Test ListMetrics."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, [])
request = grafana_apiv3_pb2.ListMetricsRequest()
reply = sut.ListMetrics(request, None)
metrics = [m.name for m in reply.Metrics]
self.assertIn("BST", metrics)
self.assertIn("SST", metrics)
self.assertIn("XST", metrics)
class TestGetMetricAggregate(base.TestCase):
def setUp(self):
statistics = Statistics()
self.sut = GrafanaAPIV3(statistics, ["LBA"])
# provide with sample statistic
with open(f"{path.dirname(__file__)}/sst-message.json") as f:
sst_message = json.load(f)
statistics.handle_statistics_message(
"sst/lba/cs032", datetime.now(), sst_message
)
def test_nometrics(self):
"""Test if there are no metrics available."""
statistics = Statistics()
sut = GrafanaAPIV3(statistics, ["LBA"])
# request/response
request = grafana_apiv3_pb2.GetMetricAggregateRequest(
metrics=["SST"],
dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")],
startDate=Timestamp(seconds=0),
endDate=Timestamp(seconds=int(datetime.now().timestamp())),
)
with self.assertRaises(NotAvailableError):
_ = sut.GetMetricAggregate(request, None)
def test_validdaterange(self):
"""Test obtaining SSTs using a date range for which they are available."""
# request/response
request = grafana_apiv3_pb2.GetMetricAggregateRequest(
metrics=["SST"],
dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")],
startDate=Timestamp(seconds=0),
endDate=Timestamp(seconds=int(datetime.now().timestamp())),
)
reply = self.sut.GetMetricAggregate(request, None)
# validate output
self.assertEqual(1, len(reply.frames))
self.assertEqual("SST", reply.frames[0].metric)
def test_tooold(self):
"""Test obtaining SSTs using a date range for which they are too old."""
# request/response
request = grafana_apiv3_pb2.GetMetricAggregateRequest(
metrics=["SST"],
dimensions=[grafana_apiv3_pb2.Dimension(key="antenna_field", value="LBA")],
startDate=Timestamp(seconds=int(datetime.now().timestamp()) - 1),
endDate=Timestamp(seconds=int(datetime.now().timestamp())),
)
reply = self.sut.GetMetricAggregate(request, None)
# validate output
self.assertEqual(0, len(reply.frames))
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
from unittest import mock from unittest import mock
from tangostationcontrol.rpc.observation import Observation from tangostationcontrol.rpc.observation import Observation
from tangostationcontrol.rpc._proto import observation_pb2 from lofar_sid.interface.stationcontrol import observation_pb2
from test import base from test import base
......
...@@ -8,8 +8,8 @@ from grpc_reflection.v1alpha.proto_reflection_descriptor_database import ( ...@@ -8,8 +8,8 @@ from grpc_reflection.v1alpha.proto_reflection_descriptor_database import (
ProtoReflectionDescriptorDatabase, ProtoReflectionDescriptorDatabase,
) )
from tangostationcontrol.rpc._proto import grafana_apiv3_pb2 from lofar_sid.interface.stationcontrol import antennafield_pb2
from tangostationcontrol.rpc._proto import grafana_apiv3_pb2_grpc from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from tangostationcontrol.rpc.server import Server from tangostationcontrol.rpc.server import Server
from test import base from test import base
...@@ -36,11 +36,17 @@ class TestServer(base.TestCase): ...@@ -36,11 +36,17 @@ class TestServer(base.TestCase):
self.assertIn("Observation", services) self.assertIn("Observation", services)
self.assertIn("Antennafield", services) self.assertIn("Antennafield", services)
self.assertIn("Statistics", services) self.assertIn("Statistics", services)
self.assertIn("grafanav3.GrafanaQueryAPI", services)
def test_call(self): def test_call(self):
"""Test a basic gRPC call to the server.""" """Test a basic gRPC call to the server."""
with grpc.insecure_channel(f"localhost:{self.server.port}") as channel: with grpc.insecure_channel(f"localhost:{self.server.port}") as channel:
stub = grafana_apiv3_pb2_grpc.GrafanaQueryAPIStub(channel) stub = antennafield_pb2_grpc.AntennafieldStub(channel)
_ = stub.GetQueryOptions(grafana_apiv3_pb2.GetOptionsRequest())
identifier = antennafield_pb2.Identifier(
antennafield_name="lba",
antenna_name="LBA00",
)
_ = stub.GetAntenna(
antennafield_pb2.GetAntennaRequest(identifier=identifier)
)
...@@ -20,7 +20,7 @@ from tangostationcontrol.rpc.statistics import ( ...@@ -20,7 +20,7 @@ from tangostationcontrol.rpc.statistics import (
TooOldError, TooOldError,
NotAvailableError, NotAvailableError,
) )
from tangostationcontrol.rpc._proto import statistics_pb2 from lofar_sid.interface.stationcontrol import statistics_pb2
from test import base from test import base
......
...@@ -29,10 +29,6 @@ allowlist_externals = ...@@ -29,10 +29,6 @@ allowlist_externals =
commands_pre = commands_pre =
{envpython} --version {envpython} --version
{work_dir}/.tox/bin/python -m tox --version {work_dir}/.tox/bin/python -m tox --version
{envpython} -m grpc_tools.protoc -Itangostationcontrol/rpc/_proto=proto --python_out=. --pyi_out=. --grpc_python_out=. proto/observation.proto
{envpython} -m grpc_tools.protoc -Itangostationcontrol/rpc/_proto=proto --python_out=. --pyi_out=. --grpc_python_out=. proto/antennafield.proto
{envpython} -m grpc_tools.protoc -Itangostationcontrol/rpc/_proto=proto --python_out=. --pyi_out=. --grpc_python_out=. proto/statistics.proto
{envpython} -m grpc_tools.protoc -Itangostationcontrol/rpc/_proto=proto --python_out=. --pyi_out=. --grpc_python_out=. proto/grafana-apiv3.proto
commands = commands =
{envpython} -m pytest --version {envpython} -m pytest --version
{envpython} -m pytest -v --log-level=DEBUG --forked test/{posargs} {envpython} -m pytest -v --log-level=DEBUG --forked test/{posargs}
...@@ -124,4 +120,4 @@ commands = ...@@ -124,4 +120,4 @@ commands =
[flake8] [flake8]
filename = *.py,.stestr.conf,.txt filename = *.py,.stestr.conf,.txt
ignore = B014, B019, B028, W291, W293, W391, E111, E114, E121, E122, E123, E124, E126, E127, E128, E131, E201, E201, E202, E203, E221, E222, E225, E226, E231, E241, E251, E252, E261, E262, E265, E271, E301, E302, E303, E305, E306, E401, E402, E501, E502, E701, E712, E721, E731, F403, F523, F541, F841, H301, H306, H401, H403, H404, H405, W503 ignore = B014, B019, B028, W291, W293, W391, E111, E114, E121, E122, E123, E124, E126, E127, E128, E131, E201, E201, E202, E203, E221, E222, E225, E226, E231, E241, E251, E252, E261, E262, E265, E271, E301, E302, E303, E305, E306, E401, E402, E501, E502, E701, E712, E721, E731, F403, F523, F541, F841, H301, H306, H401, H403, H404, H405, W503
exclude = SNMP_mib_loading,output_pymibs,_proto exclude = SNMP_mib_loading,output_pymibs
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment