Skip to content
Snippets Groups Projects
Commit c4b735e4 authored by Corné Lukken's avatar Corné Lukken
Browse files

Merge branch 'L2SS-1492' into 'main'

L2SS-1492: MultiStationObservation and StationObservationFuture change spec handling

Closes L2SS-1492

See merge request !75
parents db75f2c4 4bfc701b
No related branches found
No related tags found
1 merge request!75L2SS-1492: MultiStationObservation and StationObservationFuture change spec handling
Pipeline #62937 failed
......@@ -21,6 +21,11 @@ venv
# Packaging generations
*.egg-info
dist
build
# Setuptools SCM
lofar_station_client/_version.py
# Caches
__pycache__
......@@ -130,6 +130,8 @@ tox -e debug tests.requests.test_prometheus
```
## Release notes
- 0.18.0 - MultiStationObservation and StationFutures allow multi field observations
- 0.17.3 - Fix hosts and ports to be compatible with consul overlay network
- 0.17.2 - Fix antennafield_device naming after separation in `AFL` and `AFH`
- 0.17.1 - Add missing `subbands` field to statistics data
......
#!/bin/sh
FILE_DIR=$(dirname -- "$(readlink -f -- "${0}")")
echo "Cleaning.. ${FILE_DIR}/source/source_documentation/*"
for f in "${FILE_DIR}"/source/source_documentation/*
do
case $f in
*/index.rst) true;;
*) echo "Removing.. ${f}"; rm "${f}";;
esac
done
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Constants used across observation management functionality"""
OBSERVATION_CONTROL_DEVICE_NAME = "STAT/ObservationControl/1"
OBSERVATION_FIELD_DEVICE_NAME = "STAT/ObservationField"
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 lofar-station-client project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Contains class for pythonic interface with observations
......@@ -12,7 +8,9 @@ Contains class for pythonic interface with observations
import logging
import concurrent.futures
from lofar_station_client.observation.station_futures import StationFutures
from lofar_station_client.observation.station_observation_future import (
StationObservationFuture,
)
logger = logging.getLogger()
......@@ -20,20 +18,54 @@ logger = logging.getLogger()
class MultiStationObservation:
"""
This class provides a pythonic interface
to start an observation on multiple _stations.
to start an observation on multiple stations.
"""
TIMEOUT = 10
def __init__(self, specification: dict, hosts: list):
# convert specification dict to json
self._specification = specification
def __init__(self, specifications: dict, hosts: list):
"""
:param specifications: l2stationspecs as retrieved from TMSS in format:
``
{
"stations": [
{
"station": "cs002"
"antenna_fields": [
{
"observation_id": 144345,
"antenna_field": "HBA0",
"antenna_set": "all",
}
}
}
]
}
``
:param hosts: list of pytango database host:port strings
"""
self._specifications = specifications
if len(specifications["stations"]) != len(hosts):
raise ValueError(
"Should have a unique host per station in spec!, "
f"{len(specifications['stations'])} != {len(hosts)}"
)
# get the ID
self.observation_id = specification["observation_id"]
# get the ID, assumed to be unique across stations and antenna field
self.observation_id = specifications["stations"][0]["antenna_fields"][0][
"observation_id"
]
# TODO(Corne): Check obs id across stations and antenna fields, enforce that it
# is the same id.
# The list of _stations this class controls
self._stations = [StationFutures(self._specification, host) for host in hosts]
self._stations = []
for index, specification in enumerate(self._specifications["stations"]):
self._stations.append(StationObservationFuture(specification, hosts[index]))
def _get_results(self, futures):
# pylint: disable=W0703, C0103
......@@ -64,7 +96,8 @@ class MultiStationObservation:
return results
def _check_success(self, results):
@staticmethod
def _check_success(results):
"""
This function returns a bool list for all the command results
it gives a True for every command that succeeded and a
......@@ -82,9 +115,7 @@ class MultiStationObservation:
return success_list
def start(self):
"""
Start the observation or all hosts
"""
"""Start the observation or all hosts"""
# send out the command to start the observation to all _stations at once
commands = [station.start() for station in self._stations]
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 lofar-station-client project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Contains class for pythonic interface with observations
......@@ -11,15 +7,17 @@ Contains class for pythonic interface with observations
import logging
from lofar_station_client.observation.station_futures import StationFutures
from lofar_station_client.observation.station_observation_future import (
StationObservationFuture,
)
logger = logging.getLogger()
class StationObservation:
"""
This class provides a pythonic interface to the
ObservationControl and Observation devices on a single station.
This class provides a blocking pythonic interface to the
ObservationControl and ObservationField devices on a single station.
"""
TIMEOUT = 10
......@@ -29,7 +27,7 @@ class StationObservation:
specification: dict,
host: str = "databaseds.tangonet:10000",
):
self.station = StationFutures(specification, host)
self.station = StationObservationFuture(specification, host)
if not self.station.connected:
raise RuntimeError(f"Was not able to connect with {host}")
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 lofar-station-client project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Internal class for station states with regards to observations
Internal class for station states to observations
"""
import concurrent.futures
from json import dumps
import logging
from typing import List
from tango import DeviceProxy, GreenMode, DevFailed
from lofar_station_client.observation.constants import (
OBSERVATION_CONTROL_DEVICE_NAME,
OBSERVATION_FIELD_DEVICE_NAME,
)
logger = logging.getLogger()
class StationFutures:
class StationObservationFuture:
"""
Container class station and observation data and acccess
Container class to manage an observation on a specific station using Python Futures
Use :py:class:`StationObservation` as wrapper for synchronized blocking interface.
"""
def __init__(self, specification: dict, host: str):
"""Construct object to manage observation on a station using Futures
:param specification: Specification from TMSS (LOBSTER) should be of pattern:
``{
"station": "cs002"
"antenna_fields": [
{
"observation_id": 144345,
"antenna_field": "HBA0",
"antenna_set": "all",
}
}
}``
:param host: Tango database host including port
"""
# pylint: disable=C0103
# store general data
self._id = specification["observation_id"]
self._host = host
self._json_specification = dumps(specification)
# Make no attempt to check each field has the same id
self._host: str = host
self._specification: dict = specification
self._id = specification["antenna_fields"][0]["observation_id"]
self._observation_field_proxies = None
self._antenna_fields = []
for observation_field in specification["antenna_fields"]:
self._antenna_fields.append(observation_field["antenna_field"])
try:
# connects to the tangoDb and get the proxy
self._control_proxy = DeviceProxy(
f"tango://{host}/STAT/ObservationControl/1"
f"tango://{self.host}/{OBSERVATION_CONTROL_DEVICE_NAME}"
)
# gives an exception when it fails to ping the proxy
_ = self._control_proxy.ping()
......@@ -53,7 +76,8 @@ class StationFutures:
e,
)
def _failed_future(self):
@staticmethod
def _failed_future():
future = concurrent.futures.Future()
future.set_exception(Exception("Station not connected"))
return future
......@@ -63,8 +87,8 @@ class StationFutures:
if not self.connected:
return self._failed_future()
return self._control_proxy.start_observation(
self._json_specification, wait=False
return self._control_proxy.add_observation(
dumps(self._specification), wait=False
)
def stop(self) -> concurrent.futures:
......@@ -85,6 +109,7 @@ class StationFutures:
@property
def connected(self) -> bool:
"""get the connection status of this station"""
# TODO(Corne): What if connection fails between construction of object and call?
return self._control_proxy is not None
@property
......@@ -98,6 +123,23 @@ class StationFutures:
return self._control_proxy
@property
def observation_proxy(self) -> DeviceProxy:
"""get the observation proxy of this station"""
return DeviceProxy(f"tango://{self.host}/STAT/observation/{self._id}")
def observation_field_proxies(self) -> List[DeviceProxy]:
"""get the observationfield proxies of this station for the given observation"""
if not self._observation_field_proxies:
self._observation_field_proxies = []
try:
for observation_field in self._specification["antenna_fields"]:
antenna_field = observation_field["antenna_field"]
self._antenna_fields.append(antenna_field)
self._observation_field_proxies.append(
DeviceProxy(
f"tango://{self.host}/{OBSERVATION_FIELD_DEVICE_NAME}/"
f"{self._id}-{antenna_field}"
)
)
except DevFailed as ex:
self._observation_field_proxies = None
raise ex
return self._observation_field_proxies
[build-system]
requires = ['setuptools>=62.6', 'wheel']
build-backend = 'setuptools.build_meta'
requires = [
"setuptools>=62.6",
"setuptools_scm[toml]>=6.2",
"wheel"
]
build-backend = "setuptools.build_meta"
[tool.setuptools_scm]
write_to = "lofar_station_client/_version.py"
[tool.pylint]
ignore = "_version.py"
[metadata]
name = lofar-station-client
version = file: VERSION
description = Client library for using Tango Station Control
author="Mevius"
author_email="mol@astron.nl"
......@@ -24,6 +23,7 @@ classifiers =
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Topic :: Scientific/Engineering
Topic :: Scientific/Engineering :: Astronomy
......@@ -42,4 +42,4 @@ console_scripts =
max-line-length = 88
extend-ignore = E203
filename = *.py
exclude=venv,.tox,.egg-info
exclude=build,.venv,.git,.tox,dist,docs,*lib/python*,*egg,_version.py
......@@ -10,3 +10,6 @@ asynctest>=0.13.0 # Apache-2.0
testscenarios>=0.5.0 # Apache-2.0/BSD
pytz>=2022.6 # MIT
psutil>=5.9.4 # BSD
pytest>=7.3.0 # MIT
pytest-forked>=1.6.0 # MIT
pytest-cov >= 3.0.0 # MIT
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from json import loads
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT = loads(
"""
{
"stations": [{
"station": "cs002",
"antenna_fields": [{
"observation_id": 12345,
"stop_time": "2106-02-07T00:00:00",
"antenna_field": "HBA0",
"antenna_set": "ALL",
"antenna_mask": [0,1,2,9],
"filter": "HBA_110_190",
"SAPs": [{
"subbands": [10, 20, 30],
"pointing": {
"angle1": 1.5, "angle2": 0, "direction_type": "J2000"
}
}],
"tile_beam":
{ "angle1": 1.5, "angle2": 0, "direction_type": "J2000" },
"first_beamlet": 0
}]
}]
}
"""
)
SPEC_TWO_HOST_TWO_FIELD_DICT = loads(
"""
{
"stations": [{
"station": "cs002",
"antenna_fields": [{
"observation_id": 12345,
"stop_time": "2106-02-07T00:00:00",
"antenna_field": "HBA0",
"antenna_set": "ALL",
"antenna_mask": [0,1,2,9],
"filter": "HBA_110_190",
"SAPs": [{
"subbands": [10, 20, 30],
"pointing": {
"angle1": 1.5, "angle2": 0, "direction_type": "J2000"
}
}],
"tile_beam":
{ "angle1": 1.5, "angle2": 0, "direction_type": "J2000" },
"first_beamlet": 0
},
{
"observation_id": 12345,
"stop_time": "2106-02-07T00:00:00",
"antenna_field": "HBA0",
"antenna_set": "ALL",
"antenna_mask": [0,1,2,9],
"filter": "HBA_110_190",
"SAPs": [{
"subbands": [10, 20, 30],
"pointing": {
"angle1": 1.5, "angle2": 0, "direction_type": "J2000"
}
}],
"tile_beam":
{ "angle1": 1.5, "angle2": 0, "direction_type": "J2000" },
"first_beamlet": 0
}]
},
{
"station": "cs003",
"antenna_fields": [{
"observation_id": 12345,
"stop_time": "2106-02-07T00:00:00",
"antenna_field": "HBA0",
"antenna_set": "ALL",
"antenna_mask": [0,1,2,9],
"filter": "HBA_110_190",
"SAPs": [{
"subbands": [10, 20, 30],
"pointing": {
"angle1": 1.5, "angle2": 0, "direction_type": "J2000"
}
}],
"tile_beam":
{ "angle1": 1.5, "angle2": 0, "direction_type": "J2000" },
"first_beamlet": 0
},
{
"observation_id": 12345,
"stop_time": "2106-02-07T00:00:00",
"antenna_field": "HBA1",
"antenna_set": "ALL",
"antenna_mask": [0,1,2,9],
"filter": "HBA_110_190",
"SAPs": [{
"subbands": [10, 20, 30],
"pointing": {
"angle1": 1.5, "angle2": 0, "direction_type": "J2000"
}
}],
"tile_beam":
{ "angle1": 1.5, "angle2": 0, "direction_type": "J2000" },
"first_beamlet": 0
}]
}]
}
"""
)
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 lofar-station-client project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from typing import List, Callable
from unittest import mock
import concurrent.futures
from json import loads
from tests import base
from lofar_station_client.observation.multi_station_observation import (
MultiStationObservation,
import lofar_station_client
from lofar_station_client.observation import multi_station_observation
from tests import base
from tests.observation.specs import (
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT,
SPEC_TWO_HOST_TWO_FIELD_DICT,
)
SPEC_DICT = loads(
"""
{
"observation_id": 12345,
"stop_time": "2106-02-07T00:00:00",
"antenna_set": "ALL",
"antenna_mask": [0,1,2,9],
"filter": "HBA_110_190",
"SAPs": [{
"subbands": [10, 20, 30],
"pointing": {
"angle1": 1.5, "angle2": 0, "direction_type": "J2000"
}
}],
"tile_beam":
{ "angle1": 1.5, "angle2": 0, "direction_type": "J2000" },
"first_beamlet": 0
}
"""
class TestMultiStationObservation(base.TestCase):
def setUp(self):
proxy_patcher = mock.patch.object(
lofar_station_client.observation.multi_station_observation,
"StationObservationFuture",
autospec=True,
)
self.m_station_obs_future = proxy_patcher.start()
self.addCleanup(proxy_patcher.stop)
def configure_obs_future_mock(
self, host: str, is_running: any = True, connected: bool = True
):
"""Configure StationObservationFuture mock and return it"""
m_obs_future = mock.Mock()
HOST_LIST = ["host1", "host2", "host3"]
future_start_stop = concurrent.futures.Future()
future_start_stop.set_result(True)
future_running = concurrent.futures.Future()
future_running.set_result(is_running)
class FakeFuture:
def __init__(self, **kwargs):
return
m_obs_future.start.return_value = future_start_stop
m_obs_future.stop.return_value = future_start_stop
type(m_obs_future).is_running = mock.PropertyMock(return_value=future_running)
type(m_obs_future).connected = mock.PropertyMock(return_value=connected)
type(m_obs_future).get_control_poxy = mock.PropertyMock(
return_value="A_real_control_proxy"
)
def done(self):
return True
type(m_obs_future).observations = mock.PropertyMock(
return_value="A_real_observation_proxy1"
)
def result(self):
return True
type(m_obs_future).host = mock.PropertyMock(return_value=host)
return m_obs_future
class TestMultiStationObservation(base.TestCase):
@mock.patch(
"lofar_station_client.observation.multi_station_observation.StationFutures",
autospec=True,
@mock.patch.object(multi_station_observation, "StationObservationFuture")
def test_construct_called_correct_arguments(self, m_station_obs_future):
m_host = "host1"
multi_station_observation.MultiStationObservation(
specifications=SPEC_SINGLE_HOST_SINGLE_FIELD_DICT, hosts=[m_host]
)
def setUp(self, M_station):
self.station_mocks = [mock.Mock(), mock.Mock(), mock.Mock()]
future = concurrent.futures.Future()
future.set_result(True)
for i in self.station_mocks:
i.start.return_value = future
i.stop.return_value = future
type(i).is_running = mock.PropertyMock(return_value=future)
type(i).connected = mock.PropertyMock(return_value=True)
type(i).get_control_poxy = mock.PropertyMock(
return_value="A_real_control_proxy"
m_station_obs_future.assert_called_once_with(
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT["stations"][0], m_host
)
type(self.station_mocks[0]).observations = mock.PropertyMock(
return_value="A_real_observation_proxy1"
def test_create_station_object(self):
"""Test construction of MultiStationObs creates appropriate child objects"""
m_host = "host1"
t_multi_observation = multi_station_observation.MultiStationObservation(
specifications=SPEC_SINGLE_HOST_SINGLE_FIELD_DICT, hosts=[m_host]
)
type(self.station_mocks[1]).observations = mock.PropertyMock(
return_value="A_real_observation_proxy2"
self.assertEqual(1, len(t_multi_observation._stations))
def test_create_multiple_station_objects(self):
""""""
m_hosts = ["host1", "host2"]
t_multi_observation = multi_station_observation.MultiStationObservation(
specifications=SPEC_TWO_HOST_TWO_FIELD_DICT, hosts=m_hosts
)
type(self.station_mocks[2]).observations = mock.PropertyMock(
return_value="A_real_observation_proxy3"
self.assertEqual(2, len(t_multi_observation._stations))
def test_create_station_object_error(self):
""""""
m_host = ["host1"]
self.assertRaises(
ValueError,
multi_station_observation.MultiStationObservation,
specifications=SPEC_TWO_HOST_TWO_FIELD_DICT,
hosts=m_host,
)
type(self.station_mocks[0]).host = mock.PropertyMock(return_value="host1")
type(self.station_mocks[1]).host = mock.PropertyMock(return_value="host2")
type(self.station_mocks[2]).host = mock.PropertyMock(return_value="host3")
def test_start_observation(self):
m_host = "host1"
m_station_host = self.configure_obs_future_mock(m_host)
self.m_station_obs_future.side_effect = [m_station_host]
# apply the mock
M_station.side_effect = self.station_mocks
self.M_station = M_station
self.observation = MultiStationObservation(
specification=SPEC_DICT, hosts=HOST_LIST
t_multi_observation = multi_station_observation.MultiStationObservation(
specifications=SPEC_SINGLE_HOST_SINGLE_FIELD_DICT, hosts=[m_host]
)
def test_start(self):
self.observation.start()
self.assertEqual(self.M_station.call_count, 3)
t_multi_observation.start()
def test_stop(self):
self.observation.stop()
self.assertEqual(self.M_station.call_count, 3)
m_station_host.start.assert_called_once()
def test_proxies(self):
expected = {
"host1": self.station_mocks[0],
"host2": self.station_mocks[1],
"host3": self.station_mocks[2],
}
results = self.observation.observations
self.assertEqual(expected, results)
def test_stop_observation(self):
m_host = "host1"
m_station_host = self.configure_obs_future_mock(m_host)
self.m_station_obs_future.side_effect = [m_station_host]
def test_is_running(self):
expected = {"host1": True, "host2": True, "host3": True}
results = self.observation.is_running
self.assertEqual(expected, results)
t_multi_observation = multi_station_observation.MultiStationObservation(
specifications=SPEC_SINGLE_HOST_SINGLE_FIELD_DICT, hosts=[m_host]
)
t_multi_observation.stop()
m_station_host.stop.assert_called_once()
def setup_test(
self,
specification: dict,
hosts: List[str],
test: Callable[
[
multi_station_observation.MultiStationObservation,
List[mock.Mock],
List[str],
],
None,
],
):
"""Dynamically configure a test scenario and execute to test() for conditions"""
m_station_hosts = []
for host in hosts:
m_station_hosts.append(self.configure_obs_future_mock(host))
self.m_station_obs_future.side_effect = m_station_hosts
t_multi_observation = multi_station_observation.MultiStationObservation(
specifications=specification, hosts=hosts
)
test(t_multi_observation, m_station_hosts, hosts)
def test_is_connected(self):
expected = {"host1": True, "host2": True, "host3": True}
results = self.observation.is_connected
def case_observation_proxies(self, t_multi_observation, m_station_hosts, hosts):
"""Test case to use with :py:func:`~setup_test` to test observations property"""
expected = {}
for index, host in enumerate(hosts):
expected[host] = m_station_hosts[index]
results = t_multi_observation.observations
self.assertEqual(expected, results)
def test_no_connection(self):
"""
Tests the multiStationObservation classes ability to handle disconnected
stations by providing exceptions instead of values
"""
def test_observation_proxy(self):
self.setup_test(
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT, ["host1"], self.case_observation_proxies
)
def test_observation_proxies(self):
self.setup_test(
SPEC_TWO_HOST_TWO_FIELD_DICT,
["host1", "m_host2"],
self.case_observation_proxies,
)
def case_is_running(self, t_multi_observation, m_station_hosts, hosts):
expected = {}
for index, host in enumerate(hosts):
expected[host] = True
self.assertEqual(expected, t_multi_observation.is_running)
def test_observation_is_running_true(self):
self.setup_test(
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT, ["host1"], self.case_is_running
)
def test_observations_is_running_true(self):
self.setup_test(
SPEC_TWO_HOST_TWO_FIELD_DICT, ["host1", "host2"], self.case_is_running
)
def case_is_connected(self, t_multi_observation, m_station_hosts, hosts):
expected = {}
for index, host in enumerate(hosts):
expected[host] = True
self.assertEqual(expected, t_multi_observation.is_connected)
self.assertTrue(t_multi_observation.all_connected)
def test_observation_is_connected_true(self):
self.setup_test(
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT, ["host1"], self.case_is_connected
)
def test_observations_is_connected_true(self):
self.setup_test(
SPEC_TWO_HOST_TWO_FIELD_DICT, ["host1", "host2"], self.case_is_connected
)
def case_no_connection(self, t_multi_observation, m_station_hosts, hosts):
future = concurrent.futures.Future()
future.set_exception(Exception("oh no"))
for i in self.station_mocks:
type(i).is_running = mock.PropertyMock(return_value=future)
for m_station in m_station_hosts:
m_station.is_running = mock.PropertyMock(return_value=future)
expected = {"host1": False, "host2": False, "host3": False}
results = self.observation.is_running
self.assertEqual(expected, results)
expected = {}
for index, host in enumerate(hosts):
expected[host] = False
self.assertEqual(expected, t_multi_observation.is_running)
def test_no_connection_is_running(self):
hosts = ["host1", "host2"]
m_station_hosts = []
for host in hosts:
m_station_hosts.append(
self.configure_obs_future_mock(host, is_running=Exception("oh no"))
)
self.m_station_obs_future.side_effect = m_station_hosts
t_multi_observation = multi_station_observation.MultiStationObservation(
specifications=SPEC_TWO_HOST_TWO_FIELD_DICT, hosts=hosts
)
expected = {}
for index, host in enumerate(hosts):
expected[host] = False
self.assertEqual(expected, t_multi_observation.is_running)
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from json import dumps
from unittest import mock
from tango import DevFailed
from lofar_station_client.observation import station_observation_future
from lofar_station_client.observation.constants import (
OBSERVATION_CONTROL_DEVICE_NAME,
OBSERVATION_FIELD_DEVICE_NAME,
)
from tests import base
from tests.observation.specs import (
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT,
SPEC_TWO_HOST_TWO_FIELD_DICT,
)
class TestStationObservation(base.TestCase):
def setUp(self):
proxy_patcher = mock.patch.object(
station_observation_future,
"DeviceProxy",
autospec=True,
)
self.m_device_proxy = proxy_patcher.start()
self.addCleanup(proxy_patcher.stop)
def create_test_base(self, specification: dict, host: str):
t_obs_future = station_observation_future.StationObservationFuture(
specification, host
)
num_fields = len(specification["antenna_fields"])
# Did create correct control device
self.assertEqual(
((f"tango://{t_obs_future.host}/" f"{OBSERVATION_CONTROL_DEVICE_NAME}",),),
self.m_device_proxy.call_args_list[0],
)
self.assertEqual(num_fields, len(t_obs_future._antenna_fields))
# self.assertEqual(num_fields, len(t_obs_future._observation_field_proxies))
def test_create_single_field(self):
self.create_test_base(SPEC_SINGLE_HOST_SINGLE_FIELD_DICT["stations"][0], "henk")
def test_create_two_field(self):
self.create_test_base(SPEC_TWO_HOST_TWO_FIELD_DICT["stations"][0], "henk")
def proxies_test_base(self, specification: dict, host: str):
t_obs_future = station_observation_future.StationObservationFuture(
specification, host
)
num_fields = len(specification["antenna_fields"])
proxies = t_obs_future.observation_field_proxies
for index, antenna_field in enumerate(specification["antenna_fields"]):
observation_id = antenna_field["observation_id"]
antenna_field = antenna_field["antenna_field"]
# Did create correct observation field device
self.assertEqual(
(
(
f"tango://{t_obs_future.host}/{OBSERVATION_FIELD_DEVICE_NAME}/"
f"{observation_id}-{antenna_field}",
),
),
self.m_device_proxy.call_args_list[index + 1],
)
self.assertEqual(num_fields, len(proxies))
def test_proxies_single_field(self):
self.proxies_test_base(
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT["stations"][0], "host"
)
def test_proxies_two_field(self):
self.proxies_test_base(SPEC_TWO_HOST_TWO_FIELD_DICT["stations"][0], "host")
def test_not_connected(self):
self.m_device_proxy.side_effect = [DevFailed()]
t_obs_future = station_observation_future.StationObservationFuture(
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT["stations"][0], "host"
)
self.assertFalse(t_obs_future.connected)
def test_connected(self):
t_obs_future = station_observation_future.StationObservationFuture(
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT["stations"][0], "host"
)
self.assertTrue(t_obs_future.connected)
def test_start(self):
m_add = mock.Mock()
self.m_device_proxy.return_value = m_add
t_obs_future = station_observation_future.StationObservationFuture(
SPEC_SINGLE_HOST_SINGLE_FIELD_DICT["stations"][0], "host"
)
t_obs_future.start()
m_add.add_observation.assert_called_with(
dumps(t_obs_future._specification), wait=False
)
......@@ -17,8 +17,8 @@ deps =
commands_pre =
{envpython} --version
commands =
{envpython} --version
stestr run {posargs}
{envpython} -m pytest --version
{envpython} -m pytest -v --log-level=DEBUG --forked tests/{posargs}
# Use generative name and command prefixes to reuse the same virtualenv
# for all linting jobs.
......@@ -27,7 +27,7 @@ usedevelop = False
envdir = {toxworkdir}/linting
commands =
pep8: {envpython} -m flake8 --version
pep8: {envpython} -m flake8
pep8: {envpython} -m flake8 lofar_station_client tests
black: {envpython} -m black --version
black: {envpython} -m black --check --diff .
pylint: {envpython} -m pylint --version
......@@ -40,29 +40,31 @@ commands =
[testenv:debug]
commands = {envpython} -m testtools.run {posargs}
[testenv:coverage]
; stestr does not natively support generating coverage reports use
; `PYTHON=python -m coverage run....` to overcome this.
[testenv:{cover,coverage}]
envdir = {toxworkdir}/coverage
setenv =
PYTHON={envpython} -m coverage run --source lofar_station_client --omit='*tests*' --parallel-mode
VIRTUAL_ENV={envdir}
commands =
{envpython} -m pytest --version
{envpython} -m coverage --version
{envpython} -m coverage erase
{envpython} -m stestr run {posargs}
{envpython} -m coverage combine
{envpython} -m coverage html -d cover --omit='*tests*'
{envpython} -m coverage xml -o coverage.xml
{envpython} -m coverage report --omit='*tests*'
{envpython} -m pytest -v --log-level=DEBUG --cov-report term --cov-report html --cov-append --cov-report xml:coverage.xml --cov=lofar_station_client --forked tests/{posargs}
[testenv:build]
usedevelop = False
commands = {envpython} -m build
[testenv:docs]
allowlist_externals =
sh
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/docs/requirements.txt
extras = docs
commands = sphinx-build -b html -W docs/source docs/build/html
commands =
sh docs/cleanup.sh
sphinx-build --version
sphinx-build -b html -W docs/source docs/build/html
[testenv:integration]
# Do no install the lofar station client package, force packaged version install
......@@ -79,4 +81,4 @@ commands =
[flake8]
filename = *.py
exclude=.tox,.egg-info
exclude=.tox,.egg-info,dist,_version,build
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment