Skip to content
Snippets Groups Projects
Commit 6aa2c6a3 authored by Reinder Kraaij's avatar Reinder Kraaij :eye:
Browse files

Merge branch 'L2SS-2129--Add-GRPC' into 'master'

Resolve L2SS-2129 "Add grpc"

Closes L2SS-2129

See merge request !1076
parents 08655762 19553888
Branches
Tags
1 merge request!1076Resolve L2SS-2129 "Add grpc"
......@@ -55,3 +55,4 @@ infra/dev/nomad/tmp/*
Lib/*
Scripts/*
pyvenv.cfg
bin/*
\ No newline at end of file
......@@ -56,6 +56,8 @@ run_shellcheck:
- shellcheck **/*.sh
run_CDB_correctness:
needs:
- trigger_prepare
stage: test
script:
- cd $CI_PROJECT_DIR/CDB/stations
......@@ -100,6 +102,7 @@ secret_detection:
# Run all unit tests for Python versions except the base image
run_unit_tests:
extends: .run_unit_test_version_base
needs: []
stage: test
allow_failure: true
image: python:3.${PY_VERSION}
......@@ -111,6 +114,7 @@ run_unit_tests:
run_unit_tests_coverage:
extends: .run_unit_test_version_base
needs: []
stage: test
script:
- tox -e coverage
......@@ -120,6 +124,7 @@ run_unit_tests_coverage:
coverage_report:
coverage_format: cobertura
path: coverage.xml
junit: tango_report.xml
paths:
- cover/*
- .coverage
......
......@@ -62,7 +62,7 @@ commands = [
["python", "-m", "coverage", "--version"],
["python", "-m", "coverage", "erase"],
["python", "-m", "coverage", "run", "-m", "pytest", "-v", "tests/{posargs}"],
["python", "-m", "pytest", "-v", "--log-level=DEBUG", "--cov-report", "term", "--cov-report", "html", "--cov-append", "--cov-report", "xml:coverage.xml", "--cov=tangostationcontrol", "tests/{posargs}"]
["python", "-m", "pytest", "-v", "--log-level=DEBUG", "--cov-report", "term", "--cov-report", "html", "--cov-append", "--cov-report", "xml:coverage.xml", "--cov=tangostationcontrol", "tests/{posargs}", "--junitxml=tango_report.xml"]
]
[tool.tox.env.integration]
......
......@@ -2,7 +2,7 @@
# order of appearance. Changing the order has an impact on the overall
# integration process, which may cause wedges in the gate later.
lofar-sid >= 1.0.11 # Apache 2
lofar-sid >= 1.0.16 # Apache 2
lofar-lotus>=0.0.4 # Apache 2
PyTango>=10.0.0 # LGPL v3
numpy>=1.21.6 # BSD3
......
......@@ -957,6 +957,17 @@ class AF(LOFARDevice):
self.__setup_recv_mapper()
self.__setup_sdp_mapper()
def _read_hardware_powered_fraction_R(self):
"""Read attribute which monitors the power"""
mask = self.read_attribute("ANT_mask_RW")
powered = self.read_attribute("RCU_PWR_ANT_on_R")
try:
return numpy.count_nonzero(powered & mask) / numpy.count_nonzero(mask)
except ZeroDivisionError:
return 1.0
def _power_hardware_on(self):
# Configure the devices that process our antennas
self.configure_recv()
......@@ -968,20 +979,33 @@ class AF(LOFARDevice):
[False] * self.nr_antennas,
)
def _power_hardware_off(self):
def power_antennas_off(self, mask: numpy.ndarray):
# Power the specified antennas off, do not touch other antennas.
#
# Mask: a boolean array indicating which antennas should be powered OFF
# Save actual mask values
ANT_mask_RW = self.read_attribute("ANT_mask_RW")
# Enable controlling all antennas
self.proxy.write_attribute("ANT_mask_RW", [True] * len(ANT_mask_RW))
# Enable controlling all requested antennas
self.proxy.write_attribute("ANT_mask_RW", mask)
try:
# Turn off power to all antennas
self.proxy.write_attribute("RCU_PWR_ANT_on_RW", [False] * len(ANT_mask_RW))
# make sure we update RCU_PWR_ANT_on_RW neatly with our mask
RCU_PWR_ANT_on_RW = numpy.logical_and(
self.read_attribute("RCU_PWR_ANT_on_RW"), numpy.logical_not(mask)
)
# Turn off power to all antennas (in the requested mask)
self.proxy.write_attribute("RCU_PWR_ANT_on_RW", RCU_PWR_ANT_on_RW)
finally:
# Restore original mask
self.proxy.write_attribute("ANT_mask_RW", ANT_mask_RW)
def _power_hardware_off(self):
# Power all antennas off
self.power_antennas_off([True] * self.nr_antennas())
# --------
# Commands
# --------
......@@ -993,6 +1017,12 @@ class AF(LOFARDevice):
def configure_recv(self):
"""Configure RECV to process our antennas."""
# Power off what should be off, f.e. if they got turned off in the mask
# but are still powered.
self.power_antennas_off(
numpy.logical_not(self.read_attribute("Antenna_Usage_Mask_R"))
)
# Disable controlling the tiles that fall outside the mask
self.proxy.write_attribute(
"ANT_mask_RW", self.read_attribute("Antenna_Usage_Mask_R")
......
......@@ -717,9 +717,9 @@ class RecvDeviceWalker:
if recv <= 0:
continue
recv_ant_masks[recv - 1][rcu_input // N_rcu_inp][rcu_input % N_rcu_inp] = (
True
)
recv_ant_masks[recv - 1][rcu_input // N_rcu_inp][
rcu_input % N_rcu_inp
] = True
return recv_ant_masks
......
"""This module handles antenna field control via GRPC"""
import logging
from tango import DeviceProxy, DevState
from lofar_sid.interface.stationcontrol import antenna_pb2_grpc
from lofar_sid.interface.stationcontrol.antenna_pb2 import (
AntennaReply,
AntennaResult,
GetAntennaRequest,
Identifier,
SetAntennaStatusRequest,
SetAntennaUseRequest,
)
from tangostationcontrol.rpc.common import (
call_exception_metrics,
reply_on_exception,
)
from tangostationcontrol.rpc.proxy.antennadeviceproxyfactory import (
AntennaDeviceProxyFactory,
)
logger = logging.getLogger()
class AntennaNotFoundException(ValueError):
"""Exception raised when an antenna is not found in the device."""
class Antenna(antenna_pb2_grpc.AntennaServicer):
"""Represents an antenna field in the TangoStationControl system.
This class provides methods to control and manage antennas remotely.
"""
def _get_antenna_index(
self, identifier: Identifier, antennafield_device: DeviceProxy
) -> int:
"""Returns the current antenna index."""
try:
return antennafield_device.Antenna_Names_R.index(identifier.antenna_name)
except ValueError as exc:
logger.warning(
"Antenna not found",
extra={
"antenna_name": identifier.antenna_name,
"antennafield_name": identifier.antennafield_name,
},
)
raise AntennaNotFoundException(
f"Antenna '{identifier.antenna_name}' "
f"not found in device '{identifier.antennafield_name}'."
) from exc
def _get_antenna_reply(
self,
identifier: Identifier,
antenna_index: int,
antennafield_device: DeviceProxy,
):
"""Returns the Message for the caller of the GRPC."""
try:
antenna_status = antennafield_device.Antenna_Status_R[antenna_index]
antenna_use = antennafield_device.Antenna_Use_R[antenna_index]
except (AttributeError, IndexError) as ex:
raise ValueError(
f"Could not access device attribute for antenna "
f"{identifier.antenna_name=} "
f"{identifier.antenna_index}"
) from ex
return AntennaReply(
success=True,
exception="",
result=AntennaResult(
identifier=identifier,
antenna_use=antenna_use,
antenna_status=antenna_status,
),
)
@reply_on_exception(AntennaReply)
@call_exception_metrics("AntennaField")
def GetAntenna(self, request: GetAntennaRequest, context):
"""Gets Correct Device, then returns the Message for the caller of the GRPC."""
antennafield_device = (
AntennaDeviceProxyFactory.create_device_proxy_for_antennafield(
request.identifier.antennafield_name
)
)
antenna_index = self._get_antenna_index(request.identifier, antennafield_device)
return self._get_antenna_reply(
request.identifier, antenna_index, antennafield_device
)
def _apply_changes(self, antenna_field):
"""Apply Changes to the System. It is required to bring it off first"""
# If the antenna_field is ON, we need to reload the changes
if antenna_field.state() == DevState.ON:
antenna_field.Off()
antenna_field.Initialise()
antenna_field.On()
# If the antenna_field is ON, the antennas should also receive power
# as per the station transitions.
antenna_field.power_hardware_on()
def _update_antenna_property(
self,
identifier: Identifier,
attribute_name: str,
property_name: str,
new_value: int,
) -> AntennaReply:
"""Update the antenna Property"""
antennafield_device = (
AntennaDeviceProxyFactory.create_device_proxy_for_antennafield(
identifier.antennafield_name, True
)
)
antenna_index = self._get_antenna_index(identifier, antennafield_device)
property_value = list(antennafield_device.read_attribute(attribute_name).value)
property_value[antenna_index] = new_value
antennafield_device.put_property({property_name: property_value})
self._apply_changes(antennafield_device)
return self._get_antenna_reply(identifier, antenna_index, antennafield_device)
@reply_on_exception(AntennaReply)
@call_exception_metrics("AntennaField")
def SetAntennaStatus(self, request: SetAntennaStatusRequest, context):
"""Set the new Antenna Status"""
return self._update_antenna_property(
request.identifier,
"Antenna_Status_R",
"Antenna_Status",
request.antenna_status,
)
@reply_on_exception(AntennaReply)
@call_exception_metrics("AntennaField")
def SetAntennaUse(self, request: SetAntennaUseRequest, context):
"""Set the new Antenna Use"""
return self._update_antenna_property(
request.identifier, "Antenna_Use_R", "Antenna_Use", request.antenna_use
)
......@@ -2,187 +2,85 @@
import logging
import tango
from tango import DeviceProxy
from tangostationcontrol.common.antennas import antenna_field_family_name
from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from tangostationcontrol.common.proxies.proxy import create_device_proxy
from lofar_sid.interface.stationcontrol.antennafield_pb2 import (
AntennaReply,
AntennaResult,
GetAntennaRequest,
Identifier,
SetAntennaStatusRequest,
SetAntennaUseRequest,
GetAntennafieldRequest,
AntennafieldIdentifier,
AntennafieldReply,
AntennafieldResult,
SetAntennafieldRequest,
)
from tangostationcontrol.common.proxies.proxy import create_device_proxy
from tangostationcontrol.rpc.common import (
call_exception_metrics,
reply_on_exception,
)
from tangostationcontrol.rpc.proxy.antennadeviceproxyfactory import (
AntennaDeviceProxyFactory,
)
logger = logging.getLogger()
class AntennaNotFoundException(ValueError):
"""Exception raised when an antenna is not found in the device."""
class AntennaFieldNotFoundException(ValueError):
"""Exception raised when an Antennafield is not found in the station."""
class AntennaField(antennafield_pb2_grpc.AntennafieldServicer):
"""Represents an antenna field in the TangoStationControl system.
class Antennafield(antennafield_pb2_grpc.AntennafieldServicer):
"""Represents an Antennafield in the station
This class provides methods to control and manage antennas remotely.
"""
def _create_antennafield_device_proxy(
self, identifier: Identifier, write_access: bool = False
) -> DeviceProxy:
"""Get the device proxy for a antenna in a antennafield, given that it exist."""
try:
family = antenna_field_family_name(identifier.antennafield_name)
db = tango.Database()
device_exported = db.get_device_exported_for_class(family)
logger.info(
"Create_antennafield_device_proxy ANTENNA=%s FIELD=%s FAMILY=%s",
identifier.antenna_name,
identifier.antennafield_name,
family,
)
for device_name in device_exported:
_, _, exported_antennafield_name = device_name.split("/")
logger.info(
"Found a device name in device_exported ANTENNA=%s FIELD=%s FAMILY=%s DEVICE_NAME=%s EXPORTED_ANTENNAFIELD_NAME=%s",
identifier.antenna_name,
identifier.antennafield_name,
family,
device_name,
exported_antennafield_name,
)
if (
exported_antennafield_name.casefold()
== identifier.antennafield_name.casefold()
):
logger.info(
"We have a Match. ANTENNA=%s FIELD=%s FAMILY=%s DEVICE_NAME=%s",
identifier.antenna_name,
identifier.antennafield_name,
family,
device_name,
)
return create_device_proxy(device_name, write_access=write_access)
except Exception as ex:
logger.exception(
"Failed to create device proxy",
extra={
"antennafield_name": identifier.antennafield_name,
"antenna_name": identifier.antenna_name,
},
)
raise IOError(
f"Failed to create device proxy to '{identifier.antennafield_name}'"
) from ex
raise ValueError(
f"Antenna '{identifier.antenna_name}' in Antenna field '{identifier.antennafield_name}' not found "
)
def _get_antenna_index(
self, identifier: Identifier, antennafield_device: DeviceProxy
) -> int:
"""Returns the current antenna index."""
try:
return antennafield_device.Antenna_Names_R.index(identifier.antenna_name)
except ValueError as exc:
logger.warning(
"Antenna not found",
extra={
"antenna_name": identifier.antenna_name,
"antennafield_name": identifier.antennafield_name,
},
)
raise AntennaNotFoundException(
f"Antenna '{identifier.antenna_name}' "
f"not found in device '{identifier.antennafield_name}'."
) from exc
def _get_antenna_reply(
def _get_Antennafield_reply(
self,
identifier: Identifier,
antenna_index: int,
antennafield_device: DeviceProxy,
identifier: AntennafieldIdentifier,
):
"""Returns the Message for the caller of the GRPC."""
try:
antenna_status = antennafield_device.Antenna_Status_R[antenna_index]
antenna_use = antennafield_device.Antenna_Use_R[antenna_index]
antenna_field = self._create_rcu_device_proxy(identifier, False)
antannafield_powered = (
1 if antenna_field.hardware_powered_on_fraction_R > 0 else 0
)
except (AttributeError, IndexError) as ex:
raise ValueError(
f"Could not access device attribute for antenna "
f"{identifier.antenna_name=} "
f"{identifier.antenna_index}"
f"Could not access device attribute for Antennafield_id="
f"{identifier.Antennafield_id=} "
) from ex
return AntennaReply(
return AntennafieldReply(
success=True,
exception="",
result=AntennaResult(
result=AntennafieldResult(
identifier=identifier,
antenna_use=antenna_use,
antenna_status=antenna_status,
power_status=antannafield_powered,
),
)
@reply_on_exception(AntennaReply)
@call_exception_metrics("AntennaField")
def GetAntenna(self, request: GetAntennaRequest, context):
"""Gets Correct Device, then returns the Message for the caller of the GRPC."""
antennafield_device = self._create_antennafield_device_proxy(request.identifier)
antenna_index = self._get_antenna_index(request.identifier, antennafield_device)
@reply_on_exception(AntennafieldReply)
@call_exception_metrics("Antennafield")
def GetAntennafieldPower(self, request: GetAntennafieldRequest, context):
return self._get_Antennafield_reply(request.identifier.Antennafield_id)
return self._get_antenna_reply(
request.identifier, antenna_index, antennafield_device
@reply_on_exception(AntennafieldReply)
@call_exception_metrics("Antennafield")
def SetAntennafieldPower(self, request: SetAntennafieldRequest, context):
antenna_field = AntennaDeviceProxyFactory.create_device_proxy_for_antennafield(
request.identifier.Antennafield_id, True
)
def _apply_changes(self, antenna_field):
"""Apply Changes to the System. It is required to bring it off first"""
antenna_field.Off()
antenna_field.Initialise()
antenna_field.On()
stationmanager = create_device_proxy("STAT/StationManager/1")
def _update_antenna_property(
self,
identifier: Identifier,
attribute_name: str,
property_name: str,
new_value: int,
) -> AntennaReply:
"""Update the antenna Property"""
antennafield_device = self._create_antennafield_device_proxy(identifier, True)
antenna_index = self._get_antenna_index(identifier, antennafield_device)
property_value = list(antennafield_device.read_attribute(attribute_name).value)
property_value[antenna_index] = new_value
antennafield_device.put_property({property_name: property_value})
self._apply_changes(antennafield_device)
return self._get_antenna_reply(identifier, antenna_index, antennafield_device)
@reply_on_exception(AntennaReply)
@call_exception_metrics("AntennaField")
def SetAntennaStatus(self, request: SetAntennaStatusRequest, context):
"""Set the new Antenna Status"""
return self._update_antenna_property(
request.identifier,
"Antenna_Status_R",
"Antenna_Status",
request.antenna_status,
if request.power_status:
if stationmanager.antennas_powered_on_R:
antenna_field.power_hardware_on()
else:
raise ValueError(
"Station State does not allow Antennafield to be powered ON"
)
else:
antenna_field.power_hardware_off()
@reply_on_exception(AntennaReply)
@call_exception_metrics("AntennaField")
def SetAntennaUse(self, request: SetAntennaUseRequest, context):
"""Set the new Antenna Use"""
return self._update_antenna_property(
request.identifier, "Antenna_Use_R", "Antenna_Use", request.antenna_use
)
return self._get_Antennafield_reply(request.identifier)
......@@ -50,7 +50,6 @@ class Observation(observation_pb2_grpc.ObservationServicer):
"No Observation configuration found %s", request.configuration
)
observation_control = create_device_proxy(
"STAT/ObservationControl/1", self.TIMEOUT
)
......
import tango
from tango import DeviceProxy
from tangostationcontrol.common.antennas import antenna_field_family_name
from tangostationcontrol.common.proxies.proxy import create_device_proxy
import logging
logger = logging.getLogger()
class AntennaDeviceProxyFactory:
@staticmethod
def create_device_proxy_for_antennafield(
antennafield_name: str, write_access: bool = False
) -> DeviceProxy:
"""Get the device proxy for a given antenna field, if it exists."""
try:
family = antenna_field_family_name(antennafield_name)
db = tango.Database()
device_exported = db.get_device_exported_for_class(family)
logger.info(
"Creating device proxy for FIELD=%s FAMILY=%s",
antennafield_name,
family,
)
for device_name in device_exported:
_, _, exported_antennafield_name = device_name.split("/")
logger.info(
"Checking device FIELD=%s FAMILY=%s DEVICE_NAME=%s EXPORTED_FIELD=%s",
antennafield_name,
family,
device_name,
exported_antennafield_name,
)
if (
exported_antennafield_name.casefold()
== antennafield_name.casefold()
):
logger.info(
"Match found for FIELD=%s FAMILY=%s DEVICE_NAME=%s",
antennafield_name,
family,
device_name,
)
return create_device_proxy(device_name, write_access=write_access)
except Exception as ex:
logger.exception(
"Failed to create device proxy",
extra={"antennafield_name": antennafield_name},
)
raise IOError(
f"Failed to create device proxy to '{antennafield_name}'"
) from ex
raise ValueError(f"Antenna field '{antennafield_name}' not found")
......@@ -14,12 +14,16 @@ from lofar_sid.interface.stationcontrol import statistics_pb2
from lofar_sid.interface.stationcontrol import statistics_pb2_grpc
from lofar_sid.interface.stationcontrol import antennafield_pb2
from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from lofar_sid.interface.stationcontrol import antenna_pb2
from lofar_sid.interface.stationcontrol import antenna_pb2_grpc
from tangostationcontrol.rpc.observation import Observation
from tangostationcontrol.rpc.statistics import Statistics
from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler
from tangostationcontrol.common.lofar_logging import configure_logger
from tangostationcontrol.metrics import start_metrics_server
from tangostationcontrol.rpc.antennafield import AntennaField
from tangostationcontrol.rpc.antenna import Antenna
from tangostationcontrol.rpc.antennafield import Antennafield
logger = logging.getLogger()
......@@ -36,14 +40,17 @@ class Server:
Observation(), self.server
)
antennafield_pb2_grpc.add_AntennafieldServicer_to_server(
AntennaField(), self.server
Antennafield(), self.server
)
statistics_pb2_grpc.add_StatisticsServicer_to_server(
self.statistics_servicer, self.server
)
antenna_pb2_grpc.add_AntennaServicer_to_server(Antenna(), self.server)
SERVICE_NAMES = (
observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name,
antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name,
antenna_pb2.DESCRIPTOR.services_by_name["Antenna"].full_name,
statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name,
reflection.SERVICE_NAME, # reflection is required by innius-gpc-datasource
)
......
......@@ -2,17 +2,18 @@
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import MagicMock, patch
from lofar_sid.interface.stationcontrol.antennafield_pb2 import (
from lofar_sid.interface.stationcontrol.antenna_pb2 import (
Identifier,
SetAntennaStatusRequest,
SetAntennaUseRequest,
)
from tangostationcontrol.rpc.antennafield import AntennaField, AntennaNotFoundException
from tangostationcontrol.rpc.antenna import Antenna, AntennaNotFoundException
from tangostationcontrol.rpc.proxy.antennadeviceproxyfactory import AntennaDeviceProxyFactory
from tests import base
class TestAntennaField(base.TestCase):
class TestAntenna(base.TestCase):
def mock_tango_db_response(self, mock_tango_database):
"""Helper function to mock Tango database response."""
mock_db = MagicMock()
......@@ -23,9 +24,9 @@ class TestAntennaField(base.TestCase):
]
return mock_db
@patch("tangostationcontrol.rpc.antennafield.create_device_proxy")
@patch("tangostationcontrol.rpc.antennafield.tango.Database")
def test_create_antennafield_device_proxy_success(
@patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.create_device_proxy")
@patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.tango.Database")
def test_create_antenna_device_proxy_success(
self, mock_tango_database, mock_create_device_proxy
):
mock_device = MagicMock()
......@@ -34,10 +35,8 @@ class TestAntennaField(base.TestCase):
self.mock_tango_db_response(mock_tango_database)
# Act
antenna_field = AntennaField()
result = antenna_field._create_antennafield_device_proxy(
identifier, write_access=True
result = AntennaDeviceProxyFactory.create_device_proxy_for_antennafield(
identifier.antennafield_name, write_access=True
)
# Assert
......@@ -46,11 +45,9 @@ class TestAntennaField(base.TestCase):
)
self.assertEqual(result, mock_device)
@patch(
"tangostationcontrol.rpc.antennafield.create_device_proxy"
) # Mocking create_device_proxy
@patch("tangostationcontrol.rpc.antennafield.tango.Database") # Mock tango Database
def test_create_antennafield_device_proxy_failure(
@patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.create_device_proxy")
@patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.tango.Database") # Mock tango Database
def test_create_antenna_device_proxy_failure(
self, mock_tango_database, mock_create_device_proxy
):
# Arrange
......@@ -58,10 +55,8 @@ class TestAntennaField(base.TestCase):
identifier = Identifier(antennafield_name="LBA", antenna_name="Antenna2")
self.mock_tango_db_response(mock_tango_database)
# Act & Assert
antenna_field = AntennaField()
with self.assertRaises(IOError):
antenna_field._create_antennafield_device_proxy(identifier)
AntennaDeviceProxyFactory.create_device_proxy_for_antennafield(identifier.antennafield_name)
def test_get_antenna_index_found(self):
# Arrange
......@@ -70,8 +65,8 @@ class TestAntennaField(base.TestCase):
identifier = Identifier(antennafield_name="LBA", antenna_name="Antenna2")
# Act
antenna_field = AntennaField()
result = antenna_field._get_antenna_index(identifier, mock_device)
antenna = Antenna()
result = antenna._get_antenna_index(identifier, mock_device)
# Assert
self.assertEqual(result, 1)
......@@ -83,11 +78,11 @@ class TestAntennaField(base.TestCase):
identifier = Identifier(antennafield_name="LBA", antenna_name="Antenna2")
# Act
antenna_field = AntennaField()
antenna = Antenna()
# Assert
with self.assertRaises(AntennaNotFoundException):
antenna_field._get_antenna_index(identifier, mock_device)
antenna._get_antenna_index(identifier, mock_device)
def test_get_antenna_reply(self):
# Arrange
......@@ -97,20 +92,21 @@ class TestAntennaField(base.TestCase):
identifier = Identifier(antennafield_name="LBA", antenna_name="Antenna1")
antenna_field = AntennaField()
antenna = Antenna()
antenna_index = 0
# Act
reply = antenna_field._get_antenna_reply(identifier, antenna_index, mock_device)
reply = antenna._get_antenna_reply(identifier, antenna_index, mock_device)
# Assert
self.assertTrue(reply.success, msg=f"{reply.exception}")
self.assertEqual(reply.result.antenna_status, True)
self.assertEqual(reply.result.antenna_use, 1)
@patch("tangostationcontrol.rpc.antennafield.create_device_proxy")
@patch("tangostationcontrol.rpc.antennafield.tango.Database")
def test_set_antenna_status(self, mock_tango_database, mock_create_device_proxy):
#@patch("tangostationcontrol.rpc.antenna.create_device_proxy")
@patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.create_device_proxy")
@patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.tango.Database")
def test_set_antenna_status(self, mock_tango_database, mock_create_device_proxy): # ,mock_create_device_proxy_antenna
# Arrange
self.mock_tango_db_response(mock_tango_database)
......@@ -118,15 +114,14 @@ class TestAntennaField(base.TestCase):
mock_create_device_proxy.return_value.Antenna_Status_R = [1, 0]
mock_create_device_proxy.return_value.Antenna_Use_R = [1, 0]
mock_create_device_proxy.return_value.Antenna_Names_R = ["Antenna0", "Antenna1"]
request = SetAntennaStatusRequest(
antenna_status=True,
identifier=Identifier(antennafield_name="LBA", antenna_name="Antenna1"),
)
# Act
antenna_field = AntennaField()
reply = antenna_field.SetAntennaStatus(request, None)
antenna = Antenna()
reply = antenna.SetAntennaStatus(request, None)
# Assert
self.assertTrue(reply.success, msg=f"{reply.exception}")
......@@ -134,15 +129,19 @@ class TestAntennaField(base.TestCase):
{"Antenna_Status": [1, 1]}
)
@patch("tangostationcontrol.rpc.antennafield.create_device_proxy")
@patch("tangostationcontrol.rpc.antennafield.tango.Database")
def test_set_antenna_use(self, mock_tango_database, mock_create_device_proxy):
@patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.create_device_proxy")
@patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.tango.Database")
def test_set_antenna_use(self, mock_tango_database, mock_create_device_proxy_factory ): # ,mock_create_device_proxy_antenna
self.mock_tango_db_response(mock_tango_database)
mock_create_device_proxy.return_value.read_attribute.return_value.value = [1, 0]
mock_create_device_proxy.return_value.Antenna_Status_R = [1, 0]
mock_create_device_proxy.return_value.Antenna_Use_R = [1, 0]
mock_create_device_proxy.return_value.Antenna_Names_R = ["Antenna0", "Antenna1"]
mock_create_device_proxy_factory.return_value.read_attribute.return_value.value = [1, 0]
mock_create_device_proxy_factory.return_value.Antenna_Status_R = [1, 0]
mock_create_device_proxy_factory.return_value.Antenna_Use_R = [1, 0]
mock_create_device_proxy_factory.return_value.Antenna_Names_R = ["Antenna0", "Antenna1"]
request = SetAntennaUseRequest(
antenna_use=1,
......@@ -150,11 +149,11 @@ class TestAntennaField(base.TestCase):
)
# Act
antenna_field = AntennaField()
reply = antenna_field.SetAntennaUse(request, None)
antenna = Antenna()
reply = antenna.SetAntennaUse(request, None)
# Assert
self.assertTrue(reply.success, msg=f"{reply.exception}")
mock_create_device_proxy.return_value.put_property.assert_called_once_with(
mock_create_device_proxy_factory.return_value.put_property.assert_called_once_with(
{"Antenna_Use": [1, 1]}
)
......@@ -8,8 +8,8 @@ from grpc_reflection.v1alpha.proto_reflection_descriptor_database import (
ProtoReflectionDescriptorDatabase,
)
from lofar_sid.interface.stationcontrol import antennafield_pb2
from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc
from lofar_sid.interface.stationcontrol import antenna_pb2
from lofar_sid.interface.stationcontrol import antenna_pb2_grpc
from tangostationcontrol.rpc.server import Server
from tests import base
......@@ -41,12 +41,12 @@ class TestServer(base.TestCase):
"""Test a basic gRPC call to the server."""
with grpc.insecure_channel(f"localhost:{self.server.port}") as channel:
stub = antennafield_pb2_grpc.AntennafieldStub(channel)
stub = antenna_pb2_grpc.AntennaStub(channel)
identifier = antennafield_pb2.Identifier(
identifier = antenna_pb2.Identifier(
antennafield_name="lba",
antenna_name="LBA00",
)
_ = stub.GetAntenna(
antennafield_pb2.GetAntennaRequest(identifier=identifier)
antenna_pb2.GetAntennaRequest(identifier=identifier)
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment