Skip to content
Snippets Groups Projects
Select Git revision
  • 4e58fcc0eb7151d3838542b1c1ead06c2b0a7d20
  • master default protected
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • L2SS-1970-apsct-lol
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
  • v0.51.9-5 protected
41 results

observation.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    observation.py 2.94 KiB
    # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    
    import logging
    from lofar_sid.interface.stationcontrol import observation_pb2
    from lofar_sid.interface.stationcontrol import observation_pb2_grpc
    from tangostationcontrol.rpc.common import (
        call_exception_metrics,
        reply_on_exception,
    )
    from tangostationcontrol.common.proxies.proxy import create_device_proxy
    from tangostationcontrol.configuration import ObservationSettings
    
    logger = logging.getLogger()
    
    
    class Observation(observation_pb2_grpc.ObservationServicer):
        TIMEOUT = 5000
    
        @reply_on_exception(observation_pb2.ObservationReply)
        @call_exception_metrics("Observation")
        def StartObservation(
            self, request: observation_pb2.StartObservationRequest, context
        ):
            logger.info("Start observation request incomming  %s", request.configuration)
            # Validate before handing over to Observation Control
    
            observation_id = ""
            # guard against empty jsons incomming
            if request.configuration is not None and request.configuration.strip() != "{}" :
                observationsettings = ObservationSettings.from_json(request.configuration)
                logger.info("Converted ObservationSettings   %s", observationsettings)
                if  observationsettings.antenna_fields:
                    primary_antenna = observationsettings.antenna_fields[0]
                    observation_id = primary_antenna.observation_id
                    logger.info(
                        "Creating device proxy. ",
                        extra={
                            "observation_id": observation_id,
                            "start_time": primary_antenna.start_time,
                            "stop_time": primary_antenna.stop_time,
                        },
                    )
                else:
                    logger.error(
                        "No Observation Id found in observation  %s", request.configuration
                    )
            else:
                logger.error(
                   "No Observation configuration found  %s", request.configuration
                 )
    
    
            observation_control = create_device_proxy(
                "STAT/ObservationControl/1", self.TIMEOUT
            )
            observation_control.add_observation(request.configuration)
            logger.info(
                "Start observation request added to Add observation.",
                extra={"observation_id": observation_id},
            )
            return observation_pb2.ObservationReply(success=True)
    
        @reply_on_exception(observation_pb2.ObservationReply)
        @call_exception_metrics("Observation")
        def StopObservation(self, request: observation_pb2.StopObservationRequest, context):
            logger.info(f"Stopping observation {request.observation_id}")
            observation_control = create_device_proxy(
                "STAT/ObservationControl/1", self.TIMEOUT
            )
            observation_control.stop_observation_now(request.observation_id)
            return observation_pb2.ObservationReply(success=True)