Skip to content
Snippets Groups Projects
Select Git revision
  • 6c85f209aa2ee95ed493e4e2b7b037e7b33c71c9
  • master default protected
  • test-pytango-10.0.3
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

server.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    server.py 4.28 KiB
    #  Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    import argparse
    from concurrent import futures
    import logging
    import sys
    
    import grpc
    from grpc_reflection.v1alpha import reflection
    from lofar_sid.interface.stationcontrol import observation_pb2
    from lofar_sid.interface.stationcontrol import observation_pb2_grpc
    from lofar_sid.interface.stationcontrol import statistics_pb2
    from lofar_sid.interface.stationcontrol import statistics_pb2_grpc
    from lofar_sid.interface.stationcontrol import antennafield_pb2
    from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
    from tangostationcontrol.rpc.observation import Observation
    from tangostationcontrol.rpc.statistics import Statistics
    from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler
    from tangostationcontrol.common.lofar_logging import configure_logger
    from tangostationcontrol.metrics import start_metrics_server
    from tangostationcontrol.rpc.antennafield import AntennaField
    
    logger = logging.getLogger()
    
    
    class Server:
        def __init__(self, antenna_fields: list[str], port: int = 50051):
            # Initialise Statistics service
            self.statistics_servicer = Statistics()
    
            # Initialise gRPC server
            logger.info("Initialising grpc server")
            self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
            observation_pb2_grpc.add_ObservationServicer_to_server(
                Observation(), self.server
            )
            antennafield_pb2_grpc.add_AntennafieldServicer_to_server(
                AntennaField(), self.server
            )
            statistics_pb2_grpc.add_StatisticsServicer_to_server(
                self.statistics_servicer, self.server
            )
            SERVICE_NAMES = (
                observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name,
                antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name,
                statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name,
                reflection.SERVICE_NAME,  # reflection is required by innius-gpc-datasource
            )
            reflection.enable_server_reflection(SERVICE_NAMES, self.server)
    
            self.port = self.server.add_insecure_port(f"0.0.0.0:{port}")
            logger.info(f"Server initialised, listening on port {self.port}")
    
        def handle_statistics_message(self, topic, timestamp, message):
            self.statistics_servicer.handle_statistics_message(topic, timestamp, message)
    
        def run(self):
            self.server.start()
            logger.info(f"Server running on port {self.port}")
            self.server.wait_for_termination()
    
        def stop(self):
            logger.info("Server stopping.")
            self.server.stop(grace=1.0)
            logger.info("Server stopped.")
    
    
    def _create_parser():
        """Define the parser"""
        parser = argparse.ArgumentParser(description="Serve the station gRPC interface.")
        parser.add_argument(
            "--port",
            default=50051,
            type=int,
            help="HTTP port to listen on.",
        )
        parser.add_argument(
            "--metrics-port",
            default=8001,
            type=int,
            help="Prometheus metrics HTTP port.",
        )
        parser.add_argument(
            "--station",
            required=True,
            help="Station name",
        )
        parser.add_argument(
            "-Z",
            "--statistics-zmq",
            action="append",
            help="ZMQ end point to receive statistics messages form.",
        )
        parser.add_argument(
            "-A",
            "--antenna-field",
            action="append",
            help="Antenna field to expose.",
        )
        return parser
    
    
    def main(argv=None):
        parser = _create_parser()
        args = parser.parse_args(argv or sys.argv[1:])
    
        # Initialise simple subsystems
        configure_logger()
        start_metrics_server(args.metrics_port)
    
        def register_handle_message(func: callable):
            last_message_cache.handle_message = func
    
        # Create gRPC server
        server = Server(args.antenna_field, port=args.port)
    
        # Connect to ZMQ endpoints
        last_message_cache = MultiEndpointZMQMessageHandler(
            server.handle_statistics_message
        )
    
        for endpoint in args.statistics_zmq or []:
            logger.info(f"Subscribing to ZMQ endpoint {endpoint}")
            last_message_cache.add_receiver(endpoint, [""])
    
        # Serve indefinitely
        server.run()
    
    
    if __name__ == "__main__":
        main()