Select Git revision
observation.py

Reinder Kraaij authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
observation.py 2.94 KiB
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import logging
from lofar_sid.interface.stationcontrol import observation_pb2
from lofar_sid.interface.stationcontrol import observation_pb2_grpc
from tangostationcontrol.rpc.common import (
call_exception_metrics,
reply_on_exception,
)
from tangostationcontrol.common.proxies.proxy import create_device_proxy
from tangostationcontrol.configuration import ObservationSettings
logger = logging.getLogger()
class Observation(observation_pb2_grpc.ObservationServicer):
TIMEOUT = 5000
@reply_on_exception(observation_pb2.ObservationReply)
@call_exception_metrics("Observation")
def StartObservation(
self, request: observation_pb2.StartObservationRequest, context
):
logger.info("Start observation request incomming %s", request.configuration)
# Validate before handing over to Observation Control
observation_id = ""
# guard against empty jsons incomming
if request.configuration is not None and request.configuration.strip() != "{}" :
observationsettings = ObservationSettings.from_json(request.configuration)
logger.info("Converted ObservationSettings %s", observationsettings)
if observationsettings.antenna_fields:
primary_antenna = observationsettings.antenna_fields[0]
observation_id = primary_antenna.observation_id
logger.info(
"Creating device proxy. ",
extra={
"observation_id": observation_id,
"start_time": primary_antenna.start_time,
"stop_time": primary_antenna.stop_time,
},
)
else:
logger.error(
"No Observation Id found in observation %s", request.configuration
)
else:
logger.error(
"No Observation configuration found %s", request.configuration
)
observation_control = create_device_proxy(
"STAT/ObservationControl/1", self.TIMEOUT
)
observation_control.add_observation(request.configuration)
logger.info(
"Start observation request added to Add observation.",
extra={"observation_id": observation_id},
)
return observation_pb2.ObservationReply(success=True)
@reply_on_exception(observation_pb2.ObservationReply)
@call_exception_metrics("Observation")
def StopObservation(self, request: observation_pb2.StopObservationRequest, context):
logger.info(f"Stopping observation {request.observation_id}")
observation_control = create_device_proxy(
"STAT/ObservationControl/1", self.TIMEOUT
)
observation_control.stop_observation_now(request.observation_id)
return observation_pb2.ObservationReply(success=True)