Skip to content
Snippets Groups Projects
Commit 6be6d3e3 authored by Reinder Kraaij's avatar Reinder Kraaij :eye:
Browse files

Resolve L2TSS-2109 "Add central rest"

parent 6155add6
No related branches found
No related tags found
1 merge request!4Resolve L2TSS-2109 "Add central rest"
# Opah
![Build status](http://git.astron.nl/lofar2.0/opah/badges/main/pipeline.svg)
![Test coverage](http://git.astron.nl/lofar2.0/opah/badges/main/coverage.svg)
<!-- ![Latest release](https://git.astron.nl/templates/python-package/badges/main/release.svg) -->
## GRPC
An example repository of an CI/CD pipeline for building, testing and publishing a python package.
## Rest
The Rest Service is reachable by default under <servername>:50052 the swagger documentation Rest helper can be found under /apidocs/
## Installation
```
pip install .
```
## Setup
One time template setup should include configuring the docker registry to regularly cleanup old images of
the CI/CD pipelines. And you can consider creating protected version tags for software releases:
1. [Cleanup Docker Registry Images](https://git.astron.nl/groups/templates/-/wikis/Cleanup-Docker-Registry-Images)
2. [Setup Protected Verson Tags](https://git.astron.nl/groups/templates/-/wikis/Setting-up-Protected-Version-Tags)
Once the cleanup policy for docker registry is setup you can uncomment the `docker push` comment in the `.gitlab-ci.yml`
file from the `docker_build` job. This will allow to download minimal docker images with your Python package installed.
## Usage
```python
from opah import cool_module
cool_module.greeter() # prints "Hello World"
```
## Development
### Development environment
To setup and activte the develop environment run ```source ./setup.sh``` from within the source directory.
If PyCharm is used, this only needs to be done once.
Afterward the Python virtual env can be setup within PyCharm.
### Contributing
To contribute, please create a feature branch and a "Draft" merge request.
Upon completion, the merge request should be marked as ready and a reviewer
should be assigned.
Verify your changes locally and be sure to add tests. Verifying local
changes is done through `tox`.
```pip install tox```
With tox the same jobs as run on the CI/CD pipeline can be ran. These
include unit tests and linting.
```tox```
To automatically apply most suggested linting changes execute:
```tox -e format```
## License
This project is licensed under the Apache License Version 2.0
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from functools import wraps
from flask import jsonify
from grpc import RpcError
import logging
logger = logging.getLogger()
def grpc_error_handler(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except RpcError as e:
logger.error(f"gRPC error: {e.details()}")
return (
jsonify(
{
"error": "gRPC service unavailable",
"exception": e.details(),
"success": False,
}
),
503,
)
return wrapper
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from flask import Flask, jsonify, request, redirect, url_for
from flask_cors import CORS
from flasgger import Swagger
from waitress import serve
import grpc
from lofar_sid.interface.stationcontrol import antennafield_pb2, antennafield_pb2_grpc
from lofar_opah.control_restapi._decorators import grpc_error_handler
from http import HTTPStatus
app = Flask(__name__)
def get_grpc_stub(logger, station_name, station_suffix, remote_grpc_port):
"""Create a grpc Stub to the station"""
grpc_endpoint = f"{station_name}{station_suffix}:{remote_grpc_port}"
logger.debug("REST API Will Remotely connect to %s", grpc_endpoint)
channel = grpc.insecure_channel(grpc_endpoint)
return antennafield_pb2_grpc.AntennafieldStub(channel)
def start_rest_server(logger, rest_port, station_suffix, remote_grpc_port):
"""Starts a REST API server that acts as a proxy to gRPC."""
logger.debug(
'start_rest_server(rest_port:%s, station_suffix:"%s",remotegrpcport:%s) ',
rest_port,
station_suffix,
remote_grpc_port,
)
CORS(app)
swagger_template = {
"swagger": "2.0",
"info": {
"title": "The Lofar Control API",
"description": "API for controlling Lofar Antennas",
},
"basePath": "/v1",
}
Swagger(app, template=swagger_template)
@app.after_request
def log_failed_requests(response):
"""Log requests that resulted in client or server errors."""
logmessage = f"Method: {request.method} | Path: {request.path}"
f" | Status: {response.status_code} "
f" | IP: {request.remote_addr} | User-Agent: {request.user_agent}"
if response.status_code >= 400:
logger.error(logmessage)
else:
logger.debug(logmessage)
return response
def cast_antennareply_to_json(response):
"""Clear Cast, gets rid of additional grpc fields"""
return jsonify(
{
"success": response.success,
"exception": response.exception,
"result": {
"antenna_use": response.result.antenna_use,
"antenna_status": response.result.antenna_status,
},
"identifier": {
"antennafield_name": response.result.identifier.antennafield_name,
"antenna_name": response.result.identifier.antenna_name,
},
}
)
@app.route("/")
def redirect_to_apidocs():
return redirect(url_for("flasgger.apidocs"))
@app.route(
"/v1/<station_name>/antenna/<antennafield_name>/<antenna_name>", methods=["GET"]
)
@grpc_error_handler
def get_antenna(station_name, antennafield_name, antenna_name):
"""Get Antenna Information
---
parameters:
- name: station_name
description : Use localhost for localstation
in: path
type: string
required: true
- name: antennafield_name
in: path
type: string
required: true
- name: antenna_name
in: path
type: string
required: true
responses:
200:
description: Antenna information retrieved successfully
"""
antenna_request = antennafield_pb2.GetAntennaRequest(
identifier=antennafield_pb2.Identifier(
antennafield_name=antennafield_name,
antenna_name=antenna_name,
)
)
stub = get_grpc_stub(logger, station_name, station_suffix, remote_grpc_port)
response = stub.GetAntenna(antenna_request)
return cast_antennareply_to_json(response), (
HTTPStatus.OK if response.success else HTTPStatus.BAD_GATEWAY
)
@app.route(
"/v1/<station_name>/antenna/"
"<antennafield_name>/<antenna_name>"
"/status/<int:status>",
methods=["POST"],
)
@grpc_error_handler
def set_antenna_status(station_name, antennafield_name, antenna_name, status):
"""Set Antenna Status
---
parameters:
- name: station_name
description : Use localhost for localstation
in: path
type: string
required: true
- name: antennafield_name
in: path
type: string
required: true
- name: antenna_name
in: path
type: string
required: true
- name: status
in: path
type: integer
required: true
responses:
200:
description: Antenna status updated
"""
set_antenna_status_request = antennafield_pb2.SetAntennaStatusRequest(
identifier=antennafield_pb2.Identifier(
antennafield_name=antennafield_name,
antenna_name=antenna_name,
),
antenna_status=status,
)
stub = get_grpc_stub(logger, station_name, station_suffix, remote_grpc_port)
response = stub.SetAntennaStatus(set_antenna_status_request)
return cast_antennareply_to_json(response), (
HTTPStatus.OK if response.success else HTTPStatus.BAD_GATEWAY
)
@app.route(
"/v1/<station_name>/antenna/<antennafield_name>/<antenna_name>/use/<int:use>",
methods=["POST"],
)
@grpc_error_handler
def set_antenna_use(station_name, antennafield_name, antenna_name, use):
"""Set Antenna Use
---
parameters:
- name: station_name
in: path
type: string
required: true
- name: antennafield_name
in: path
type: string
required: true
- name: antenna_name
in: path
type: string
required: true
- name: use
in: path
type: integer
required: true
responses:
200:
description: Antenna use updated
"""
set_antenna_use_request = antennafield_pb2.SetAntennaUseRequest(
identifier=antennafield_pb2.Identifier(
antennafield_name=antennafield_name,
antenna_name=antenna_name,
),
antenna_use=use,
)
stub = get_grpc_stub(logger, station_name, station_suffix, remote_grpc_port)
response = stub.SetAntennaUse(set_antenna_use_request)
return cast_antennareply_to_json(response), (
HTTPStatus.OK if response.success else HTTPStatus.BAD_GATEWAY
)
logger.info("Control REST API server started on port %s", rest_port)
serve(app, host="0.0.0.0", port=rest_port)
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Main class for the API Server"""
import logging
import argparse
from rest_server import start_rest_server
from lofar_opah.metrics import start_metrics_server
import sys
logger = logging.getLogger()
REST_PORT = 50052
STATION_SUFFIX = ".control.lofar" # This is added to the stationname
logging.basicConfig(level=logging.DEBUG)
def _create_parser():
"""Define the parser"""
parser = argparse.ArgumentParser(description="Serve the station rest interface.")
parser.add_argument(
"--port",
default=50053,
help="HTTP port to listen on. Defaults to 50053",
)
parser.add_argument(
"--metrics-port",
default=8002,
help="Prometheus metrics HTTP port. Defaults to 8002",
)
parser.add_argument(
"--stationsuffix",
default=".control.lofar",
help=(
"Append this to all station_name e.g. .control.lofar."
"Leave empty for rest on localserver. Defaults to .control.lofar"
),
)
parser.add_argument(
"--remotegrpcport",
default="50051",
help="The port the remotegrpc service is listening on. defaults to 50051",
)
return parser
def main(argv=None):
parser = _create_parser()
args = parser.parse_args(argv or sys.argv[1:])
start_metrics_server(args.metrics_port)
logging.info(
"Launching Control Rest Server port:%s, stationsuffix:%s,remotegrpcport:%s",
args.port,
args.stationsuffix,
args.remotegrpcport,
)
# Create gRPC server
start_rest_server(logger, args.port, args.stationsuffix, args.remotegrpcport)
if __name__ == "__main__":
main()
......@@ -12,7 +12,6 @@ from lofar_sid.interface.opah import grafana_apiv3_pb2
from lofar_sid.interface.opah import grafana_apiv3_pb2_grpc
from .grafana_api import GrafanaAPIV3
# from tangostationcontrol.common.lofar_logging import configure_logger
from lofar_opah.metrics import start_metrics_server
logger = logging.getLogger()
......
......@@ -6,4 +6,9 @@ prometheus-client # Apache 2
grpcio # Apache 2
grpcio-reflection
grpcio-tools # Apache 2
flask
waitress
flask_cors
flasgger
types-flask-cors
types-waitress
\ No newline at end of file
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""test the control rest api"""
import unittest
from unittest.mock import MagicMock, patch
from http import HTTPStatus
from lofar_sid.interface.stationcontrol import antennafield_pb2
from lofar_opah.control_restapi.rest_server import start_rest_server
from lofar_opah.control_restapi.rest_server import app
class TestRestAPI(unittest.TestCase):
"""Test Rest Api Class"""
@classmethod
def setUpClass(cls):
"""Have a test server instead of a real server"""
with patch("lofar_opah.control_restapi.rest_server.serve") as mock_serve:
mock_serve.return_value = None # Mocking `serve` to do nothing
logger = MagicMock()
start_rest_server(
logger, rest_port=5000, station_suffix="-test", remote_grpc_port=50051
)
# Use the actual Flask app instance for testing
cls.client = app.test_client()
@patch("lofar_opah.control_restapi.rest_server.get_grpc_stub")
def test_get_antenna_success(self, mock_get_grpc_stub):
"""test getting antenna"""
mock_stub = MagicMock()
mock_response = self._generate_antenna_reply()
mock_stub.GetAntenna.return_value = mock_response
mock_get_grpc_stub.return_value = mock_stub
response = self.client.get("/v1/localhost/antenna/field1/antenna1")
self.assertEqual(response.status_code, HTTPStatus.OK)
self.assertTrue(response.json["success"])
def _generate_antenna_reply(self, success=True):
"""generate anteanna reply"""
mock_response = antennafield_pb2.AntennaReply(
success=success,
exception="",
result=antennafield_pb2.AntennaResult(
identifier=antennafield_pb2.Identifier(
antenna_name="A", antennafield_name="LBA"
),
antenna_use=antennafield_pb2.Antenna_Use.ON,
antenna_status=antennafield_pb2.Antenna_Status.BROKEN,
),
)
if not success:
mock_response.exception = "Bad"
return mock_response
@patch("lofar_opah.control_restapi.rest_server.get_grpc_stub")
def test_get_antenna_failure(self, mock_get_grpc_stub):
"""Test bad path of getting status"""
mock_stub = MagicMock()
mock_response = self._generate_antenna_reply(False)
mock_stub.GetAntenna.return_value = mock_response
mock_get_grpc_stub.return_value = mock_stub
response = self.client.get("/v1/localhost/antenna/field1/antenna1")
self.assertEqual(response.status_code, HTTPStatus.BAD_GATEWAY)
@patch("lofar_opah.control_restapi.rest_server.get_grpc_stub")
def test_set_antenna_status_success(self, mock_get_grpc_stub):
"""Test good path of setting status"""
mock_stub = MagicMock()
mock_response = self._generate_antenna_reply()
mock_stub.SetAntennaStatus.return_value = mock_response
mock_get_grpc_stub.return_value = mock_stub
response = self.client.post("/v1/localhost/antenna/field1/antenna1/status/1")
self.assertEqual(response.status_code, HTTPStatus.OK)
@patch("lofar_opah.control_restapi.rest_server.get_grpc_stub")
def test_set_antenna_use_success(self, mock_get_grpc_stub):
"""Test good path of setting use"""
mock_stub = MagicMock()
mock_response = self._generate_antenna_reply()
mock_stub.SetAntennaUse.return_value = mock_response
mock_get_grpc_stub.return_value = mock_stub
response = self.client.post("/v1/localhost/antenna/field1/antenna1/use/1")
self.assertEqual(response.status_code, HTTPStatus.OK)
@patch("lofar_opah.control_restapi.rest_server.get_grpc_stub")
def test_set_antenna_use_failure(self, mock_get_grpc_stub):
"""Test bad path of setting use"""
mock_stub = MagicMock()
mock_response = self._generate_antenna_reply(False)
mock_stub.SetAntennaUse.return_value = mock_response
mock_get_grpc_stub.return_value = mock_stub
response = self.client.post("/v1/localhost/antenna/field1/antenna1/use/1")
self.assertEqual(response.status_code, HTTPStatus.BAD_GATEWAY)
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment