Skip to content
Snippets Groups Projects
Select Git revision
  • 1a71c3e9c700bfe6cf21e169afddc7884777e275
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

solver.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    server.py 4.28 KiB
    #  Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    import argparse
    from concurrent import futures
    import logging
    import sys
    
    import grpc
    from grpc_reflection.v1alpha import reflection
    from lofar_sid.interface.stationcontrol import observation_pb2
    from lofar_sid.interface.stationcontrol import observation_pb2_grpc
    from lofar_sid.interface.stationcontrol import statistics_pb2
    from lofar_sid.interface.stationcontrol import statistics_pb2_grpc
    from lofar_sid.interface.stationcontrol import antennafield_pb2
    from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
    from tangostationcontrol.rpc.observation import Observation
    from tangostationcontrol.rpc.statistics import Statistics
    from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler
    from tangostationcontrol.common.lofar_logging import configure_logger
    from tangostationcontrol.metrics import start_metrics_server
    from tangostationcontrol.rpc.antennafield import AntennaField
    
    logger = logging.getLogger()
    
    
    class Server:
        def __init__(self, antenna_fields: list[str], port: int = 50051):
            # Initialise Statistics service
            self.statistics_servicer = Statistics()
    
            # Initialise gRPC server
            logger.info("Initialising grpc server")
            self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
            observation_pb2_grpc.add_ObservationServicer_to_server(
                Observation(), self.server
            )
            antennafield_pb2_grpc.add_AntennafieldServicer_to_server(
                AntennaField(), self.server
            )
            statistics_pb2_grpc.add_StatisticsServicer_to_server(
                self.statistics_servicer, self.server
            )
            SERVICE_NAMES = (
                observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name,
                antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name,
                statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name,
                reflection.SERVICE_NAME,  # reflection is required by innius-gpc-datasource
            )
            reflection.enable_server_reflection(SERVICE_NAMES, self.server)
    
            self.port = self.server.add_insecure_port(f"0.0.0.0:{port}")
            logger.info(f"Server initialised, listening on port {self.port}")
    
        def handle_statistics_message(self, topic, timestamp, message):
            self.statistics_servicer.handle_statistics_message(topic, timestamp, message)
    
        def run(self):
            self.server.start()
            logger.info(f"Server running on port {self.port}")
            self.server.wait_for_termination()
    
        def stop(self):
            logger.info("Server stopping.")
            self.server.stop(grace=1.0)
            logger.info("Server stopped.")
    
    
    def _create_parser():
        """Define the parser"""
        parser = argparse.ArgumentParser(description="Serve the station gRPC interface.")
        parser.add_argument(
            "--port",
            default=50051,
            type=int,
            help="HTTP port to listen on.",
        )
        parser.add_argument(
            "--metrics-port",
            default=8001,
            type=int,
            help="Prometheus metrics HTTP port.",
        )
        parser.add_argument(
            "--station",
            required=True,
            help="Station name",
        )
        parser.add_argument(
            "-Z",
            "--statistics-zmq",
            action="append",
            help="ZMQ end point to receive statistics messages form.",
        )
        parser.add_argument(
            "-A",
            "--antenna-field",
            action="append",
            help="Antenna field to expose.",
        )
        return parser
    
    
    def main(argv=None):
        parser = _create_parser()
        args = parser.parse_args(argv or sys.argv[1:])
    
        # Initialise simple subsystems
        configure_logger()
        start_metrics_server(args.metrics_port)
    
        def register_handle_message(func: callable):
            last_message_cache.handle_message = func
    
        # Create gRPC server
        server = Server(args.antenna_field, port=args.port)
    
        # Connect to ZMQ endpoints
        last_message_cache = MultiEndpointZMQMessageHandler(
            server.handle_statistics_message
        )
    
        for endpoint in args.statistics_zmq or []:
            logger.info(f"Subscribing to ZMQ endpoint {endpoint}")
            last_message_cache.add_receiver(endpoint, [""])
    
        # Serve indefinitely
        server.run()
    
    
    if __name__ == "__main__":
        main()