diff --git a/.gitignore b/.gitignore index 5e9041cc27593f8d3206ffc4a68d4f71b2908cdf..57c7e7cb01cb714ff2d27a9eed643825e333ebeb 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,4 @@ infra/dev/nomad/tmp/* Lib/* Scripts/* pyvenv.cfg +bin/* \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 5b2722363c521506f17ff7981ad9640748cf8639..73edba9d4d4279c2246a0ae4c61d5d9e1824ffc5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -56,6 +56,8 @@ run_shellcheck: - shellcheck **/*.sh run_CDB_correctness: + needs: + - trigger_prepare stage: test script: - cd $CI_PROJECT_DIR/CDB/stations @@ -100,6 +102,7 @@ secret_detection: # Run all unit tests for Python versions except the base image run_unit_tests: extends: .run_unit_test_version_base + needs: [] stage: test allow_failure: true image: python:3.${PY_VERSION} @@ -111,6 +114,7 @@ run_unit_tests: run_unit_tests_coverage: extends: .run_unit_test_version_base + needs: [] stage: test script: - tox -e coverage @@ -120,6 +124,7 @@ run_unit_tests_coverage: coverage_report: coverage_format: cobertura path: coverage.xml + junit: tango_report.xml paths: - cover/* - .coverage diff --git a/pyproject.toml b/pyproject.toml index 5ee79070141b3421b1bbe0746a23256a2e0d8f77..832377cfea268c75a6b3827f5eef8301ea01cdb5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,7 +62,7 @@ commands = [ ["python", "-m", "coverage", "--version"], ["python", "-m", "coverage", "erase"], ["python", "-m", "coverage", "run", "-m", "pytest", "-v", "tests/{posargs}"], - ["python", "-m", "pytest", "-v", "--log-level=DEBUG", "--cov-report", "term", "--cov-report", "html", "--cov-append", "--cov-report", "xml:coverage.xml", "--cov=tangostationcontrol", "tests/{posargs}"] + ["python", "-m", "pytest", "-v", "--log-level=DEBUG", "--cov-report", "term", "--cov-report", "html", "--cov-append", "--cov-report", "xml:coverage.xml", "--cov=tangostationcontrol", "tests/{posargs}", "--junitxml=tango_report.xml"] ] [tool.tox.env.integration] diff --git a/requirements.txt b/requirements.txt index 63ef8fefebe0b5105e49983b26ee38a0248359cd..ae76051b5d0ed8a9ef1e201c00fd8430cfc321fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # order of appearance. Changing the order has an impact on the overall # integration process, which may cause wedges in the gate later. -lofar-sid >= 1.0.11 # Apache 2 +lofar-sid >= 1.0.16 # Apache 2 lofar-lotus>=0.0.4 # Apache 2 PyTango>=10.0.0 # LGPL v3 numpy>=1.21.6 # BSD3 diff --git a/tangostationcontrol/devices/base_device_classes/antennafield_device.py b/tangostationcontrol/devices/base_device_classes/antennafield_device.py index 28f95e0e3b45ea8eee0be5642c92ad59ff3734f5..a3910e6e439dc4244612d7b40eae4508ab72200b 100644 --- a/tangostationcontrol/devices/base_device_classes/antennafield_device.py +++ b/tangostationcontrol/devices/base_device_classes/antennafield_device.py @@ -957,6 +957,17 @@ class AF(LOFARDevice): self.__setup_recv_mapper() self.__setup_sdp_mapper() + def _read_hardware_powered_fraction_R(self): + """Read attribute which monitors the power""" + + mask = self.read_attribute("ANT_mask_RW") + powered = self.read_attribute("RCU_PWR_ANT_on_R") + + try: + return numpy.count_nonzero(powered & mask) / numpy.count_nonzero(mask) + except ZeroDivisionError: + return 1.0 + def _power_hardware_on(self): # Configure the devices that process our antennas self.configure_recv() @@ -968,20 +979,33 @@ class AF(LOFARDevice): [False] * self.nr_antennas, ) - def _power_hardware_off(self): + def power_antennas_off(self, mask: numpy.ndarray): + # Power the specified antennas off, do not touch other antennas. + # + # Mask: a boolean array indicating which antennas should be powered OFF + # Save actual mask values ANT_mask_RW = self.read_attribute("ANT_mask_RW") - # Enable controlling all antennas - self.proxy.write_attribute("ANT_mask_RW", [True] * len(ANT_mask_RW)) + # Enable controlling all requested antennas + self.proxy.write_attribute("ANT_mask_RW", mask) try: - # Turn off power to all antennas - self.proxy.write_attribute("RCU_PWR_ANT_on_RW", [False] * len(ANT_mask_RW)) + # make sure we update RCU_PWR_ANT_on_RW neatly with our mask + RCU_PWR_ANT_on_RW = numpy.logical_and( + self.read_attribute("RCU_PWR_ANT_on_RW"), numpy.logical_not(mask) + ) + + # Turn off power to all antennas (in the requested mask) + self.proxy.write_attribute("RCU_PWR_ANT_on_RW", RCU_PWR_ANT_on_RW) finally: # Restore original mask self.proxy.write_attribute("ANT_mask_RW", ANT_mask_RW) + def _power_hardware_off(self): + # Power all antennas off + self.power_antennas_off([True] * self.nr_antennas()) + # -------- # Commands # -------- @@ -993,6 +1017,12 @@ class AF(LOFARDevice): def configure_recv(self): """Configure RECV to process our antennas.""" + # Power off what should be off, f.e. if they got turned off in the mask + # but are still powered. + self.power_antennas_off( + numpy.logical_not(self.read_attribute("Antenna_Usage_Mask_R")) + ) + # Disable controlling the tiles that fall outside the mask self.proxy.write_attribute( "ANT_mask_RW", self.read_attribute("Antenna_Usage_Mask_R") diff --git a/tangostationcontrol/devices/base_device_classes/mapper.py b/tangostationcontrol/devices/base_device_classes/mapper.py index 6c984cb17573a6660e798d498fbd293f4656a031..e7f6bc16fcc1c249999fb36c9f2db70e755a9708 100644 --- a/tangostationcontrol/devices/base_device_classes/mapper.py +++ b/tangostationcontrol/devices/base_device_classes/mapper.py @@ -717,9 +717,9 @@ class RecvDeviceWalker: if recv <= 0: continue - recv_ant_masks[recv - 1][rcu_input // N_rcu_inp][rcu_input % N_rcu_inp] = ( - True - ) + recv_ant_masks[recv - 1][rcu_input // N_rcu_inp][ + rcu_input % N_rcu_inp + ] = True return recv_ant_masks diff --git a/tangostationcontrol/rpc/antenna.py b/tangostationcontrol/rpc/antenna.py new file mode 100644 index 0000000000000000000000000000000000000000..d6b761cb9b8981cf9b50076a3cbd07f4a2f9ebdb --- /dev/null +++ b/tangostationcontrol/rpc/antenna.py @@ -0,0 +1,154 @@ +"""This module handles antenna field control via GRPC""" + +import logging + +from tango import DeviceProxy, DevState + + +from lofar_sid.interface.stationcontrol import antenna_pb2_grpc + +from lofar_sid.interface.stationcontrol.antenna_pb2 import ( + AntennaReply, + AntennaResult, + GetAntennaRequest, + Identifier, + SetAntennaStatusRequest, + SetAntennaUseRequest, +) + +from tangostationcontrol.rpc.common import ( + call_exception_metrics, + reply_on_exception, +) +from tangostationcontrol.rpc.proxy.antennadeviceproxyfactory import ( + AntennaDeviceProxyFactory, +) + +logger = logging.getLogger() + + +class AntennaNotFoundException(ValueError): + """Exception raised when an antenna is not found in the device.""" + + +class Antenna(antenna_pb2_grpc.AntennaServicer): + """Represents an antenna field in the TangoStationControl system. + + This class provides methods to control and manage antennas remotely. + """ + + def _get_antenna_index( + self, identifier: Identifier, antennafield_device: DeviceProxy + ) -> int: + """Returns the current antenna index.""" + try: + return antennafield_device.Antenna_Names_R.index(identifier.antenna_name) + except ValueError as exc: + logger.warning( + "Antenna not found", + extra={ + "antenna_name": identifier.antenna_name, + "antennafield_name": identifier.antennafield_name, + }, + ) + raise AntennaNotFoundException( + f"Antenna '{identifier.antenna_name}' " + f"not found in device '{identifier.antennafield_name}'." + ) from exc + + def _get_antenna_reply( + self, + identifier: Identifier, + antenna_index: int, + antennafield_device: DeviceProxy, + ): + """Returns the Message for the caller of the GRPC.""" + try: + antenna_status = antennafield_device.Antenna_Status_R[antenna_index] + antenna_use = antennafield_device.Antenna_Use_R[antenna_index] + except (AttributeError, IndexError) as ex: + raise ValueError( + f"Could not access device attribute for antenna " + f"{identifier.antenna_name=} " + f"{identifier.antenna_index}" + ) from ex + + return AntennaReply( + success=True, + exception="", + result=AntennaResult( + identifier=identifier, + antenna_use=antenna_use, + antenna_status=antenna_status, + ), + ) + + @reply_on_exception(AntennaReply) + @call_exception_metrics("AntennaField") + def GetAntenna(self, request: GetAntennaRequest, context): + """Gets Correct Device, then returns the Message for the caller of the GRPC.""" + antennafield_device = ( + AntennaDeviceProxyFactory.create_device_proxy_for_antennafield( + request.identifier.antennafield_name + ) + ) + antenna_index = self._get_antenna_index(request.identifier, antennafield_device) + + return self._get_antenna_reply( + request.identifier, antenna_index, antennafield_device + ) + + def _apply_changes(self, antenna_field): + """Apply Changes to the System. It is required to bring it off first""" + + # If the antenna_field is ON, we need to reload the changes + if antenna_field.state() == DevState.ON: + antenna_field.Off() + antenna_field.Initialise() + antenna_field.On() + + # If the antenna_field is ON, the antennas should also receive power + # as per the station transitions. + antenna_field.power_hardware_on() + + def _update_antenna_property( + self, + identifier: Identifier, + attribute_name: str, + property_name: str, + new_value: int, + ) -> AntennaReply: + """Update the antenna Property""" + antennafield_device = ( + AntennaDeviceProxyFactory.create_device_proxy_for_antennafield( + identifier.antennafield_name, True + ) + ) + antenna_index = self._get_antenna_index(identifier, antennafield_device) + + property_value = list(antennafield_device.read_attribute(attribute_name).value) + property_value[antenna_index] = new_value + + antennafield_device.put_property({property_name: property_value}) + self._apply_changes(antennafield_device) + + return self._get_antenna_reply(identifier, antenna_index, antennafield_device) + + @reply_on_exception(AntennaReply) + @call_exception_metrics("AntennaField") + def SetAntennaStatus(self, request: SetAntennaStatusRequest, context): + """Set the new Antenna Status""" + return self._update_antenna_property( + request.identifier, + "Antenna_Status_R", + "Antenna_Status", + request.antenna_status, + ) + + @reply_on_exception(AntennaReply) + @call_exception_metrics("AntennaField") + def SetAntennaUse(self, request: SetAntennaUseRequest, context): + """Set the new Antenna Use""" + return self._update_antenna_property( + request.identifier, "Antenna_Use_R", "Antenna_Use", request.antenna_use + ) diff --git a/tangostationcontrol/rpc/antennafield.py b/tangostationcontrol/rpc/antennafield.py index f8ea954ffb268ba9cf06a140682213d0b7192645..9776320a40cf16cc3f6fa50b7a25bb7bd62093b4 100644 --- a/tangostationcontrol/rpc/antennafield.py +++ b/tangostationcontrol/rpc/antennafield.py @@ -2,187 +2,85 @@ import logging -import tango -from tango import DeviceProxy - -from tangostationcontrol.common.antennas import antenna_field_family_name from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc - +from tangostationcontrol.common.proxies.proxy import create_device_proxy from lofar_sid.interface.stationcontrol.antennafield_pb2 import ( - AntennaReply, - AntennaResult, - GetAntennaRequest, - Identifier, - SetAntennaStatusRequest, - SetAntennaUseRequest, + GetAntennafieldRequest, + AntennafieldIdentifier, + AntennafieldReply, + AntennafieldResult, + SetAntennafieldRequest, ) -from tangostationcontrol.common.proxies.proxy import create_device_proxy + from tangostationcontrol.rpc.common import ( call_exception_metrics, reply_on_exception, ) +from tangostationcontrol.rpc.proxy.antennadeviceproxyfactory import ( + AntennaDeviceProxyFactory, +) logger = logging.getLogger() -class AntennaNotFoundException(ValueError): - """Exception raised when an antenna is not found in the device.""" +class AntennaFieldNotFoundException(ValueError): + """Exception raised when an Antennafield is not found in the station.""" -class AntennaField(antennafield_pb2_grpc.AntennafieldServicer): - """Represents an antenna field in the TangoStationControl system. +class Antennafield(antennafield_pb2_grpc.AntennafieldServicer): + """Represents an Antennafield in the station This class provides methods to control and manage antennas remotely. """ - def _create_antennafield_device_proxy( - self, identifier: Identifier, write_access: bool = False - ) -> DeviceProxy: - """Get the device proxy for a antenna in a antennafield, given that it exist.""" - try: - family = antenna_field_family_name(identifier.antennafield_name) - db = tango.Database() - device_exported = db.get_device_exported_for_class(family) - logger.info( - "Create_antennafield_device_proxy ANTENNA=%s FIELD=%s FAMILY=%s", - identifier.antenna_name, - identifier.antennafield_name, - family, - ) - for device_name in device_exported: - _, _, exported_antennafield_name = device_name.split("/") - logger.info( - "Found a device name in device_exported ANTENNA=%s FIELD=%s FAMILY=%s DEVICE_NAME=%s EXPORTED_ANTENNAFIELD_NAME=%s", - identifier.antenna_name, - identifier.antennafield_name, - family, - device_name, - exported_antennafield_name, - ) - if ( - exported_antennafield_name.casefold() - == identifier.antennafield_name.casefold() - ): - logger.info( - "We have a Match. ANTENNA=%s FIELD=%s FAMILY=%s DEVICE_NAME=%s", - identifier.antenna_name, - identifier.antennafield_name, - family, - device_name, - ) - return create_device_proxy(device_name, write_access=write_access) - except Exception as ex: - logger.exception( - "Failed to create device proxy", - extra={ - "antennafield_name": identifier.antennafield_name, - "antenna_name": identifier.antenna_name, - }, - ) - raise IOError( - f"Failed to create device proxy to '{identifier.antennafield_name}'" - ) from ex - raise ValueError( - f"Antenna '{identifier.antenna_name}' in Antenna field '{identifier.antennafield_name}' not found " - ) - - def _get_antenna_index( - self, identifier: Identifier, antennafield_device: DeviceProxy - ) -> int: - """Returns the current antenna index.""" - try: - return antennafield_device.Antenna_Names_R.index(identifier.antenna_name) - except ValueError as exc: - logger.warning( - "Antenna not found", - extra={ - "antenna_name": identifier.antenna_name, - "antennafield_name": identifier.antennafield_name, - }, - ) - raise AntennaNotFoundException( - f"Antenna '{identifier.antenna_name}' " - f"not found in device '{identifier.antennafield_name}'." - ) from exc - - def _get_antenna_reply( + def _get_Antennafield_reply( self, - identifier: Identifier, - antenna_index: int, - antennafield_device: DeviceProxy, + identifier: AntennafieldIdentifier, ): """Returns the Message for the caller of the GRPC.""" try: - antenna_status = antennafield_device.Antenna_Status_R[antenna_index] - antenna_use = antennafield_device.Antenna_Use_R[antenna_index] + antenna_field = self._create_rcu_device_proxy(identifier, False) + antannafield_powered = ( + 1 if antenna_field.hardware_powered_on_fraction_R > 0 else 0 + ) + except (AttributeError, IndexError) as ex: raise ValueError( - f"Could not access device attribute for antenna " - f"{identifier.antenna_name=} " - f"{identifier.antenna_index}" + f"Could not access device attribute for Antennafield_id=" + f"{identifier.Antennafield_id=} " ) from ex - return AntennaReply( + return AntennafieldReply( success=True, exception="", - result=AntennaResult( + result=AntennafieldResult( identifier=identifier, - antenna_use=antenna_use, - antenna_status=antenna_status, + power_status=antannafield_powered, ), ) - @reply_on_exception(AntennaReply) - @call_exception_metrics("AntennaField") - def GetAntenna(self, request: GetAntennaRequest, context): - """Gets Correct Device, then returns the Message for the caller of the GRPC.""" - antennafield_device = self._create_antennafield_device_proxy(request.identifier) - antenna_index = self._get_antenna_index(request.identifier, antennafield_device) + @reply_on_exception(AntennafieldReply) + @call_exception_metrics("Antennafield") + def GetAntennafieldPower(self, request: GetAntennafieldRequest, context): + return self._get_Antennafield_reply(request.identifier.Antennafield_id) - return self._get_antenna_reply( - request.identifier, antenna_index, antennafield_device + @reply_on_exception(AntennafieldReply) + @call_exception_metrics("Antennafield") + def SetAntennafieldPower(self, request: SetAntennafieldRequest, context): + antenna_field = AntennaDeviceProxyFactory.create_device_proxy_for_antennafield( + request.identifier.Antennafield_id, True ) - def _apply_changes(self, antenna_field): - """Apply Changes to the System. It is required to bring it off first""" - antenna_field.Off() - antenna_field.Initialise() - antenna_field.On() + stationmanager = create_device_proxy("STAT/StationManager/1") - def _update_antenna_property( - self, - identifier: Identifier, - attribute_name: str, - property_name: str, - new_value: int, - ) -> AntennaReply: - """Update the antenna Property""" - antennafield_device = self._create_antennafield_device_proxy(identifier, True) - antenna_index = self._get_antenna_index(identifier, antennafield_device) - - property_value = list(antennafield_device.read_attribute(attribute_name).value) - property_value[antenna_index] = new_value - - antennafield_device.put_property({property_name: property_value}) - self._apply_changes(antennafield_device) - - return self._get_antenna_reply(identifier, antenna_index, antennafield_device) - - @reply_on_exception(AntennaReply) - @call_exception_metrics("AntennaField") - def SetAntennaStatus(self, request: SetAntennaStatusRequest, context): - """Set the new Antenna Status""" - return self._update_antenna_property( - request.identifier, - "Antenna_Status_R", - "Antenna_Status", - request.antenna_status, - ) + if request.power_status: + if stationmanager.antennas_powered_on_R: + antenna_field.power_hardware_on() + else: + raise ValueError( + "Station State does not allow Antennafield to be powered ON" + ) + else: + antenna_field.power_hardware_off() - @reply_on_exception(AntennaReply) - @call_exception_metrics("AntennaField") - def SetAntennaUse(self, request: SetAntennaUseRequest, context): - """Set the new Antenna Use""" - return self._update_antenna_property( - request.identifier, "Antenna_Use_R", "Antenna_Use", request.antenna_use - ) + return self._get_Antennafield_reply(request.identifier) diff --git a/tangostationcontrol/rpc/observation.py b/tangostationcontrol/rpc/observation.py index c769b43aeb6ccb14d4bf4555677539feece9db24..9df3e284ed1deb77c5a0878f1bb814a7854dc7fe 100644 --- a/tangostationcontrol/rpc/observation.py +++ b/tangostationcontrol/rpc/observation.py @@ -27,10 +27,10 @@ class Observation(observation_pb2_grpc.ObservationServicer): observation_id = "" # guard against empty jsons incomming - if request.configuration is not None and request.configuration.strip() != "{}" : + if request.configuration is not None and request.configuration.strip() != "{}": observationsettings = ObservationSettings.from_json(request.configuration) logger.info("Converted ObservationSettings %s", observationsettings) - if observationsettings.antenna_fields: + if observationsettings.antenna_fields: primary_antenna = observationsettings.antenna_fields[0] observation_id = primary_antenna.observation_id logger.info( @@ -47,9 +47,8 @@ class Observation(observation_pb2_grpc.ObservationServicer): ) else: logger.error( - "No Observation configuration found %s", request.configuration - ) - + "No Observation configuration found %s", request.configuration + ) observation_control = create_device_proxy( "STAT/ObservationControl/1", self.TIMEOUT diff --git a/tangostationcontrol/rpc/proxy/antennadeviceproxyfactory.py b/tangostationcontrol/rpc/proxy/antennadeviceproxyfactory.py new file mode 100644 index 0000000000000000000000000000000000000000..e7caa6a77b3b07338d0a84f58d48e2d9f11a12af --- /dev/null +++ b/tangostationcontrol/rpc/proxy/antennadeviceproxyfactory.py @@ -0,0 +1,55 @@ +import tango +from tango import DeviceProxy +from tangostationcontrol.common.antennas import antenna_field_family_name +from tangostationcontrol.common.proxies.proxy import create_device_proxy +import logging + +logger = logging.getLogger() + + +class AntennaDeviceProxyFactory: + + @staticmethod + def create_device_proxy_for_antennafield( + antennafield_name: str, write_access: bool = False + ) -> DeviceProxy: + """Get the device proxy for a given antenna field, if it exists.""" + try: + family = antenna_field_family_name(antennafield_name) + db = tango.Database() + device_exported = db.get_device_exported_for_class(family) + logger.info( + "Creating device proxy for FIELD=%s FAMILY=%s", + antennafield_name, + family, + ) + for device_name in device_exported: + _, _, exported_antennafield_name = device_name.split("/") + logger.info( + "Checking device FIELD=%s FAMILY=%s DEVICE_NAME=%s EXPORTED_FIELD=%s", + antennafield_name, + family, + device_name, + exported_antennafield_name, + ) + if ( + exported_antennafield_name.casefold() + == antennafield_name.casefold() + ): + logger.info( + "Match found for FIELD=%s FAMILY=%s DEVICE_NAME=%s", + antennafield_name, + family, + device_name, + ) + return create_device_proxy(device_name, write_access=write_access) + except Exception as ex: + logger.exception( + "Failed to create device proxy", + extra={"antennafield_name": antennafield_name}, + ) + raise IOError( + f"Failed to create device proxy to '{antennafield_name}'" + ) from ex + + raise ValueError(f"Antenna field '{antennafield_name}' not found") diff --git a/tangostationcontrol/rpc/server.py b/tangostationcontrol/rpc/server.py index 383efb5c27544c8ca56a8ba0d47835bf1833eafe..1647ba64bf18392763451dbaa43f21aab64356c8 100644 --- a/tangostationcontrol/rpc/server.py +++ b/tangostationcontrol/rpc/server.py @@ -14,12 +14,16 @@ from lofar_sid.interface.stationcontrol import statistics_pb2 from lofar_sid.interface.stationcontrol import statistics_pb2_grpc from lofar_sid.interface.stationcontrol import antennafield_pb2 from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc +from lofar_sid.interface.stationcontrol import antenna_pb2 +from lofar_sid.interface.stationcontrol import antenna_pb2_grpc + from tangostationcontrol.rpc.observation import Observation from tangostationcontrol.rpc.statistics import Statistics from tangostationcontrol.rpc.messagehandler import MultiEndpointZMQMessageHandler from tangostationcontrol.common.lofar_logging import configure_logger from tangostationcontrol.metrics import start_metrics_server -from tangostationcontrol.rpc.antennafield import AntennaField +from tangostationcontrol.rpc.antenna import Antenna +from tangostationcontrol.rpc.antennafield import Antennafield logger = logging.getLogger() @@ -36,14 +40,17 @@ class Server: Observation(), self.server ) antennafield_pb2_grpc.add_AntennafieldServicer_to_server( - AntennaField(), self.server + Antennafield(), self.server ) statistics_pb2_grpc.add_StatisticsServicer_to_server( self.statistics_servicer, self.server ) + antenna_pb2_grpc.add_AntennaServicer_to_server(Antenna(), self.server) + SERVICE_NAMES = ( observation_pb2.DESCRIPTOR.services_by_name["Observation"].full_name, antennafield_pb2.DESCRIPTOR.services_by_name["Antennafield"].full_name, + antenna_pb2.DESCRIPTOR.services_by_name["Antenna"].full_name, statistics_pb2.DESCRIPTOR.services_by_name["Statistics"].full_name, reflection.SERVICE_NAME, # reflection is required by innius-gpc-datasource ) diff --git a/tests/rpc/test_antennafield.py b/tests/rpc/test_antenna.py similarity index 61% rename from tests/rpc/test_antennafield.py rename to tests/rpc/test_antenna.py index a1674732ce39cd88aadb4c4d9dcb8ee42b1c4fa6..b08c8f95a9f8cdcb8590bb12f7ea4abe3c3e6ea0 100644 --- a/tests/rpc/test_antennafield.py +++ b/tests/rpc/test_antenna.py @@ -2,17 +2,18 @@ # SPDX-License-Identifier: Apache-2.0 from unittest.mock import MagicMock, patch -from lofar_sid.interface.stationcontrol.antennafield_pb2 import ( +from lofar_sid.interface.stationcontrol.antenna_pb2 import ( Identifier, SetAntennaStatusRequest, SetAntennaUseRequest, ) -from tangostationcontrol.rpc.antennafield import AntennaField, AntennaNotFoundException +from tangostationcontrol.rpc.antenna import Antenna, AntennaNotFoundException +from tangostationcontrol.rpc.proxy.antennadeviceproxyfactory import AntennaDeviceProxyFactory from tests import base -class TestAntennaField(base.TestCase): +class TestAntenna(base.TestCase): def mock_tango_db_response(self, mock_tango_database): """Helper function to mock Tango database response.""" mock_db = MagicMock() @@ -23,9 +24,9 @@ class TestAntennaField(base.TestCase): ] return mock_db - @patch("tangostationcontrol.rpc.antennafield.create_device_proxy") - @patch("tangostationcontrol.rpc.antennafield.tango.Database") - def test_create_antennafield_device_proxy_success( + @patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.create_device_proxy") + @patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.tango.Database") + def test_create_antenna_device_proxy_success( self, mock_tango_database, mock_create_device_proxy ): mock_device = MagicMock() @@ -34,10 +35,8 @@ class TestAntennaField(base.TestCase): self.mock_tango_db_response(mock_tango_database) - # Act - antenna_field = AntennaField() - result = antenna_field._create_antennafield_device_proxy( - identifier, write_access=True + result = AntennaDeviceProxyFactory.create_device_proxy_for_antennafield( + identifier.antennafield_name, write_access=True ) # Assert @@ -46,11 +45,9 @@ class TestAntennaField(base.TestCase): ) self.assertEqual(result, mock_device) - @patch( - "tangostationcontrol.rpc.antennafield.create_device_proxy" - ) # Mocking create_device_proxy - @patch("tangostationcontrol.rpc.antennafield.tango.Database") # Mock tango Database - def test_create_antennafield_device_proxy_failure( + @patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.create_device_proxy") + @patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.tango.Database") # Mock tango Database + def test_create_antenna_device_proxy_failure( self, mock_tango_database, mock_create_device_proxy ): # Arrange @@ -58,10 +55,8 @@ class TestAntennaField(base.TestCase): identifier = Identifier(antennafield_name="LBA", antenna_name="Antenna2") self.mock_tango_db_response(mock_tango_database) - # Act & Assert - antenna_field = AntennaField() with self.assertRaises(IOError): - antenna_field._create_antennafield_device_proxy(identifier) + AntennaDeviceProxyFactory.create_device_proxy_for_antennafield(identifier.antennafield_name) def test_get_antenna_index_found(self): # Arrange @@ -70,8 +65,8 @@ class TestAntennaField(base.TestCase): identifier = Identifier(antennafield_name="LBA", antenna_name="Antenna2") # Act - antenna_field = AntennaField() - result = antenna_field._get_antenna_index(identifier, mock_device) + antenna = Antenna() + result = antenna._get_antenna_index(identifier, mock_device) # Assert self.assertEqual(result, 1) @@ -83,11 +78,11 @@ class TestAntennaField(base.TestCase): identifier = Identifier(antennafield_name="LBA", antenna_name="Antenna2") # Act - antenna_field = AntennaField() + antenna = Antenna() # Assert with self.assertRaises(AntennaNotFoundException): - antenna_field._get_antenna_index(identifier, mock_device) + antenna._get_antenna_index(identifier, mock_device) def test_get_antenna_reply(self): # Arrange @@ -97,20 +92,21 @@ class TestAntennaField(base.TestCase): identifier = Identifier(antennafield_name="LBA", antenna_name="Antenna1") - antenna_field = AntennaField() + antenna = Antenna() antenna_index = 0 # Act - reply = antenna_field._get_antenna_reply(identifier, antenna_index, mock_device) + reply = antenna._get_antenna_reply(identifier, antenna_index, mock_device) # Assert self.assertTrue(reply.success, msg=f"{reply.exception}") self.assertEqual(reply.result.antenna_status, True) self.assertEqual(reply.result.antenna_use, 1) - @patch("tangostationcontrol.rpc.antennafield.create_device_proxy") - @patch("tangostationcontrol.rpc.antennafield.tango.Database") - def test_set_antenna_status(self, mock_tango_database, mock_create_device_proxy): + #@patch("tangostationcontrol.rpc.antenna.create_device_proxy") + @patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.create_device_proxy") + @patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.tango.Database") + def test_set_antenna_status(self, mock_tango_database, mock_create_device_proxy): # ,mock_create_device_proxy_antenna # Arrange self.mock_tango_db_response(mock_tango_database) @@ -118,15 +114,14 @@ class TestAntennaField(base.TestCase): mock_create_device_proxy.return_value.Antenna_Status_R = [1, 0] mock_create_device_proxy.return_value.Antenna_Use_R = [1, 0] mock_create_device_proxy.return_value.Antenna_Names_R = ["Antenna0", "Antenna1"] - request = SetAntennaStatusRequest( antenna_status=True, identifier=Identifier(antennafield_name="LBA", antenna_name="Antenna1"), ) # Act - antenna_field = AntennaField() - reply = antenna_field.SetAntennaStatus(request, None) + antenna = Antenna() + reply = antenna.SetAntennaStatus(request, None) # Assert self.assertTrue(reply.success, msg=f"{reply.exception}") @@ -134,15 +129,19 @@ class TestAntennaField(base.TestCase): {"Antenna_Status": [1, 1]} ) - @patch("tangostationcontrol.rpc.antennafield.create_device_proxy") - @patch("tangostationcontrol.rpc.antennafield.tango.Database") - def test_set_antenna_use(self, mock_tango_database, mock_create_device_proxy): + + @patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.create_device_proxy") + @patch("tangostationcontrol.rpc.proxy.antennadeviceproxyfactory.tango.Database") + + def test_set_antenna_use(self, mock_tango_database, mock_create_device_proxy_factory ): # ,mock_create_device_proxy_antenna self.mock_tango_db_response(mock_tango_database) - mock_create_device_proxy.return_value.read_attribute.return_value.value = [1, 0] - mock_create_device_proxy.return_value.Antenna_Status_R = [1, 0] - mock_create_device_proxy.return_value.Antenna_Use_R = [1, 0] - mock_create_device_proxy.return_value.Antenna_Names_R = ["Antenna0", "Antenna1"] + + mock_create_device_proxy_factory.return_value.read_attribute.return_value.value = [1, 0] + mock_create_device_proxy_factory.return_value.Antenna_Status_R = [1, 0] + mock_create_device_proxy_factory.return_value.Antenna_Use_R = [1, 0] + mock_create_device_proxy_factory.return_value.Antenna_Names_R = ["Antenna0", "Antenna1"] + request = SetAntennaUseRequest( antenna_use=1, @@ -150,11 +149,11 @@ class TestAntennaField(base.TestCase): ) # Act - antenna_field = AntennaField() - reply = antenna_field.SetAntennaUse(request, None) + antenna = Antenna() + reply = antenna.SetAntennaUse(request, None) # Assert self.assertTrue(reply.success, msg=f"{reply.exception}") - mock_create_device_proxy.return_value.put_property.assert_called_once_with( + mock_create_device_proxy_factory.return_value.put_property.assert_called_once_with( {"Antenna_Use": [1, 1]} ) diff --git a/tests/rpc/test_server.py b/tests/rpc/test_server.py index 723aaeabb339ffc2360dae907a0eac42618538ff..35d4ad3d51f41fd1e9a3886c71b5506c1e544e25 100644 --- a/tests/rpc/test_server.py +++ b/tests/rpc/test_server.py @@ -8,8 +8,8 @@ from grpc_reflection.v1alpha.proto_reflection_descriptor_database import ( ProtoReflectionDescriptorDatabase, ) -from lofar_sid.interface.stationcontrol import antennafield_pb2 -from lofar_sid.interface.stationcontrol import antennafield_pb2_grpc +from lofar_sid.interface.stationcontrol import antenna_pb2 +from lofar_sid.interface.stationcontrol import antenna_pb2_grpc from tangostationcontrol.rpc.server import Server from tests import base @@ -41,12 +41,12 @@ class TestServer(base.TestCase): """Test a basic gRPC call to the server.""" with grpc.insecure_channel(f"localhost:{self.server.port}") as channel: - stub = antennafield_pb2_grpc.AntennafieldStub(channel) + stub = antenna_pb2_grpc.AntennaStub(channel) - identifier = antennafield_pb2.Identifier( + identifier = antenna_pb2.Identifier( antennafield_name="lba", antenna_name="LBA00", ) _ = stub.GetAntenna( - antennafield_pb2.GetAntennaRequest(identifier=identifier) + antenna_pb2.GetAntennaRequest(identifier=identifier) )