Skip to content
Snippets Groups Projects
Select Git revision
  • 4df02a6f5d5e1f3eb3705767d8c1454c47c01130
  • master default protected
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • L2SS-1970-apsct-lol
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
41 results

antennafield.py

  • Reinder Kraaij's avatar
    Reinder Kraaij authored
    4df02a6f
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    antennafield.py 2.80 KiB
    """This module handles antenna field control via GRPC"""
    
    import logging
    
    from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
    from tangostationcontrol.common.proxies.proxy import create_device_proxy
    from lofar_sid.interface.stationcontrol.antennafield_pb2 import (
        GetAntennafieldRequest,
        AntennafieldIdentifier,
        AntennafieldReply,
        AntennafieldResult,
        SetAntennafieldRequest,
    )
    
    from tangostationcontrol.rpc.common import (
        call_exception_metrics,
        reply_on_exception,
    )
    from tangostationcontrol.rpc.proxy.antennadeviceproxyfactory import (
        AntennaDeviceProxyFactory,
    )
    
    logger = logging.getLogger()
    
    
    class AntennaFieldNotFoundException(ValueError):
        """Exception raised when an Antennafield  is not found in the station."""
    
    
    class Antennafield(antennafield_pb2_grpc.AntennafieldServicer):
        """Represents an Antennafield in the station
    
        This class provides methods to control and manage antennas remotely.
        """
    
        def _get_Antennafield_reply(
            self,
            identifier: AntennafieldIdentifier,
        ):
            """Returns the  Message for  the caller of the GRPC."""
            try:
                antenna_field = self._create_rcu_device_proxy(identifier, False)
                antannafield_powered = (
                    1 if antenna_field.hardware_powered_on_fraction_R > 0 else 0
                )
    
            except (AttributeError, IndexError) as ex:
                raise ValueError(
                    f"Could not access device attribute for antennafield_id="
                    f"{identifier.antennafield_id} "
                ) from ex
    
            return AntennafieldReply(
                success=True,
                exception="",
                result=AntennafieldResult(
                    identifier=identifier,
                    power_status=antannafield_powered,
                ),
            )
    
        @reply_on_exception(AntennafieldReply)
        @call_exception_metrics("Antennafield")
        def GetAntennafieldPower(self, request: GetAntennafieldRequest, context):
            return self._get_Antennafield_reply(request.identifier.antennafield_id)
    
        @reply_on_exception(AntennafieldReply)
        @call_exception_metrics("Antennafield")
        def SetAntennafieldPower(self, request: SetAntennafieldRequest, context):
            antenna_field = AntennaDeviceProxyFactory.create_device_proxy_for_antennafield(
                request.identifier.antennafield_id, True
            )
    
            stationmanager = create_device_proxy("STAT/StationManager/1")
    
            if request.power_status:
                if stationmanager.antennas_powered_on_R:
                    antenna_field.power_hardware_on()
                else:
                    raise ValueError(
                        "Station State does not allow Antennafield to be powered ON"
                    )
            else:
                antenna_field.power_hardware_off()
    
            return self._get_Antennafield_reply(request.identifier)