Skip to content
Snippets Groups Projects
Select Git revision
  • d3c190d3b7a43e85794edb3cde84311da4add890
  • master default protected
  • test-pytango-10.0.3
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

test_observation_client.py

Blame
  • Jan David Mol's avatar
    L2SS-2154: Use LOTUS library for common metrics code
    Jan David Mol authored
    f0ed5f12
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_observation_client.py 3.20 KiB
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    
    from json import loads
    from os import environ
    
    from integration_tests import base
    from integration_tests.device_proxy import TestDeviceProxy
    from tangostationcontrol.test.dummy_observation_settings import (
        get_observation_settings_hba_core_immediate,
    )
    import grpc
    from lofar_sid.interface.stationcontrol.observation_pb2 import (
        StartObservationRequest,
        StopObservationRequest,
    )
    from lofar_sid.interface.stationcontrol.observation_pb2_grpc import ObservationStub
    from tango import DevState
    
    
    class TestObservation(base.IntegrationTestCase):
        def setUp(self):
            self.setup_stationmanager_proxy()
            self.observation_control_proxy = TestDeviceProxy("STAT/ObservationControl/1")
            self.observation_control_proxy.off()
            self.observation_control_proxy.boot()
            self.addCleanup(
                self.observation_control_proxy.test_device_turn_off,
                self.observation_control_proxy,
            )
    
            # make sure any devices we depend on are also started
            for device in [
                "STAT/RECVH/H0",
                "STAT/SDPFirmware/HBA0",
                "STAT/SDP/HBA0",
                "STAT/Beamlet/HBA0",
                "STAT/DigitalBeam/HBA0",
                "STAT/TileBeam/HBA0",
                "STAT/AFH/HBA0",
                "STAT/SST/HBA0",
                "STAT/XST/HBA0",
            ]:
                proxy = TestDeviceProxy(device)
                proxy.off()
                proxy.boot()
                self.addCleanup(proxy.test_device_turn_off, proxy)
    
        def setup_stationmanager_proxy(self):
            """Setup StationManager"""
            stationmanager_proxy = TestDeviceProxy("STAT/StationManager/1")
            stationmanager_proxy.off()
            stationmanager_proxy.boot()
            self.addCleanup(stationmanager_proxy.test_device_turn_off, stationmanager_proxy)
            self.assertEqual(stationmanager_proxy.state(), DevState.ON)
            return stationmanager_proxy
    
        def test_observation(self):
            """Test of the observation_wrapper class basic functionality"""
    
            # convert the JSON specification to a dict for this class
            specification = get_observation_settings_hba_core_immediate().to_json()
            specification_dict = loads(specification)
    
            # create an observation class using the dict and as host just get it using a
            # util function
    
            tango_host = environ["TANGO_HOST"]
            obs = TestDeviceProxy(f"tango://{tango_host}/STAT/ObservationControl/1")
    
            obs_id = specification_dict["antenna_fields"][0]["observation_id"]
    
            with grpc.insecure_channel("rpc.service.consul:50051") as channel:
                stub = ObservationStub(channel)
                stub.StartObservation(StartObservationRequest(configuration=specification))
    
            # Assert the observation is running after starting it
            self.assertTrue(obs.is_observation_running(obs_id))
    
            # Assert the observation has stopped after aborting
    
            with grpc.insecure_channel("rpc.service.consul:50051") as channel:
                stub = ObservationStub(channel)
                stub.StopObservation(StopObservationRequest(observation_id=obs_id))
    
            self.assertFalse(obs.is_observation_running(obs_id))