Select Git revision
antennafield.py

Reinder Kraaij authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
antennafield.py 2.80 KiB
"""This module handles antenna field control via GRPC"""
import logging
from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from tangostationcontrol.common.proxies.proxy import create_device_proxy
from lofar_sid.interface.stationcontrol.antennafield_pb2 import (
GetAntennafieldRequest,
AntennafieldIdentifier,
AntennafieldReply,
AntennafieldResult,
SetAntennafieldRequest,
)
from tangostationcontrol.rpc.common import (
call_exception_metrics,
reply_on_exception,
)
from tangostationcontrol.rpc.proxy.antennadeviceproxyfactory import (
AntennaDeviceProxyFactory,
)
logger = logging.getLogger()
class AntennaFieldNotFoundException(ValueError):
"""Exception raised when an Antennafield is not found in the station."""
class Antennafield(antennafield_pb2_grpc.AntennafieldServicer):
"""Represents an Antennafield in the station
This class provides methods to control and manage antennas remotely.
"""
def _get_Antennafield_reply(
self,
identifier: AntennafieldIdentifier,
):
"""Returns the Message for the caller of the GRPC."""
try:
antenna_field = self._create_rcu_device_proxy(identifier, False)
antannafield_powered = (
1 if antenna_field.hardware_powered_on_fraction_R > 0 else 0
)
except (AttributeError, IndexError) as ex:
raise ValueError(
f"Could not access device attribute for antennafield_id="
f"{identifier.antennafield_id} "
) from ex
return AntennafieldReply(
success=True,
exception="",
result=AntennafieldResult(
identifier=identifier,
power_status=antannafield_powered,
),
)
@reply_on_exception(AntennafieldReply)
@call_exception_metrics("Antennafield")
def GetAntennafieldPower(self, request: GetAntennafieldRequest, context):
return self._get_Antennafield_reply(request.identifier.antennafield_id)
@reply_on_exception(AntennafieldReply)
@call_exception_metrics("Antennafield")
def SetAntennafieldPower(self, request: SetAntennafieldRequest, context):
antenna_field = AntennaDeviceProxyFactory.create_device_proxy_for_antennafield(
request.identifier.antennafield_id, True
)
stationmanager = create_device_proxy("STAT/StationManager/1")
if request.power_status:
if stationmanager.antennas_powered_on_R:
antenna_field.power_hardware_on()
else:
raise ValueError(
"Station State does not allow Antennafield to be powered ON"
)
else:
antenna_field.power_hardware_off()
return self._get_Antennafield_reply(request.identifier)