Select Git revision
t_scheduling.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.py 4.28 KiB
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import argparse
from concurrent import futures
import logging
import sys
import grpc
from grpc_reflection.v1alpha import reflection
from lofar_sid.interface.stationcontrol import observation_pb2
from lofar_sid.interface.stationcontrol import observation_pb2_grpc
from lofar_sid.interface.stationcontrol import statistics_pb2
from lofar_sid.interface.stationcontrol import statistics_pb2_grpc
from lofar_sid.interface.stationcontrol import antennafield_pb2
from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from tangostationcontrol.rpc.observation import Observation
from tangostationcontrol.rpc.statistics import Statistics
from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler
from tangostationcontrol.common.lofar_logging import configure_logger
from tangostationcontrol.metrics import start_metrics_server
from tangostationcontrol.rpc.antennafield import AntennaField
logger = logging.getLogger()
class Server:
def __init__(self, antenna_fields: list[str], port: int = 50051):
# Initialise Statistics service
self.statistics_servicer = Statistics()
# Initialise gRPC server
logger.info("Initialising grpc server")
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
observation_pb2_grpc.add_ObservationServicer_to_server(
Observation(), self.server
)
antennafield_pb2_grpc.add_AntennafieldServicer_to_server(
AntennaField(), self.server
)
statistics_pb2_grpc.add_StatisticsServicer_to_server(
self.statistics_servicer, self.server
)
SERVICE_NAMES = (
observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name,
antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name,
statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name,
reflection.SERVICE_NAME, # reflection is required by innius-gpc-datasource
)
reflection.enable_server_reflection(SERVICE_NAMES, self.server)
self.port = self.server.add_insecure_port(f"0.0.0.0:{port}")
logger.info(f"Server initialised, listening on port {self.port}")
def handle_statistics_message(self, topic, timestamp, message):
self.statistics_servicer.handle_statistics_message(topic, timestamp, message)
def run(self):
self.server.start()
logger.info(f"Server running on port {self.port}")
self.server.wait_for_termination()
def stop(self):
logger.info("Server stopping.")
self.server.stop(grace=1.0)
logger.info("Server stopped.")
def _create_parser():
"""Define the parser"""
parser = argparse.ArgumentParser(description="Serve the station gRPC interface.")
parser.add_argument(
"--port",
default=50051,
type=int,
help="HTTP port to listen on.",
)
parser.add_argument(
"--metrics-port",
default=8001,
type=int,
help="Prometheus metrics HTTP port.",
)
parser.add_argument(
"--station",
required=True,
help="Station name",
)
parser.add_argument(
"-Z",
"--statistics-zmq",
action="append",
help="ZMQ end point to receive statistics messages form.",
)
parser.add_argument(
"-A",
"--antenna-field",
action="append",
help="Antenna field to expose.",
)
return parser
def main(argv=None):
parser = _create_parser()
args = parser.parse_args(argv or sys.argv[1:])
# Initialise simple subsystems
configure_logger()
start_metrics_server(args.metrics_port)
def register_handle_message(func: callable):
last_message_cache.handle_message = func
# Create gRPC server
server = Server(args.antenna_field, port=args.port)
# Connect to ZMQ endpoints
last_message_cache = MultiEndpointZMQMessageHandler(
server.handle_statistics_message
)
for endpoint in args.statistics_zmq or []:
logger.info(f"Subscribing to ZMQ endpoint {endpoint}")
last_message_cache.add_receiver(endpoint, [""])
# Serve indefinitely
server.run()
if __name__ == "__main__":
main()