Skip to content
Snippets Groups Projects
Commit 68df4aa1 authored by Taya Snijder's avatar Taya Snijder
Browse files

Merge branch 'L2SS-826_create_central_observation_controller' into 'main'

Resolve L2SS-826 "Create central observation controller"

Closes L2SS-826

See merge request !22
parents 29f5478b 9825d221
No related branches found
No related tags found
1 merge request!22Resolve L2SS-826 "Create central observation controller"
Pipeline #40499 passed
......@@ -7,9 +7,9 @@
[//]: # (TODO Corne, Update badges to use dynamic types such as those from pypi once available)
# Station Client Library
# StationFutures Client Library
Client library for using Tango Station Control.
Client library for using Tango StationFutures Control.
Table of Contents:
......@@ -105,8 +105,13 @@ tox -e debug tests.requests.test_prometheus
```
## Releasenotes
<<<<<<< HEAD
- 0.10.0 - Added `MultiStationObservation` class for a pythonic interface with observations across multiple stations.
- 0.9.2 - Added `StationObservation` class for pythonic interface with observations
=======
- 0.10. - Added generic HDF5 file reader
- 0.9.2 - Added `Observation` class for pythonic interface with observations
>>>>>>> 29f5478b2364ddc7cc181acc3b5cbddadaec0d41
- 0.9.1 - Add `last_invalid_packet_exception` parameter to `StatisticsCollector`
- 0.9. - Added `devices.LofarDeviceProxy` that transparently exposes >2D attributes
- 0.8. - Fixed XST packet parsing.
......
0.10.0
0.11.0
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Contains classes to interact with observations
"""
from .multi_station_observation import MultiStationObservation
from .station_observation import StationObservation
__all__ = ["MultiStationObservation", "StationObservation"]
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 StationFutures Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
"""
Contains class for pythonic interface with observations
"""
import logging
import concurrent.futures
from lofar_station_client.observation.stationfutures import StationFutures
logger = logging.getLogger()
class MultiStationObservation:
"""
This class provides a pythonic interface
to start an observation on multiple _stations.
"""
TIMEOUT = 10
def __init__(self, specification: dict, hosts: list):
# convert specification dict to json
self._specification = specification
# get the ID
self.observation_id = specification["observation_id"]
# The list of _stations this class controls
self._stations = [StationFutures(self._specification, host) for host in hosts]
def _get_results(self, futures):
# pylint: disable=W0703, C0103
"""
This function takes the futures it is given and obtains the results.
It checks whether the future got done in the first place or if it timed out.
And it checks whether the result we got was an actual result or an exception.
"""
# wait for the futures to complete in the given timeout
done_set, _ = concurrent.futures.wait(
futures, timeout=self.TIMEOUT, return_when=concurrent.futures.ALL_COMPLETED
)
results = []
for future in futures:
# if the future completed: check whether we got good results or an exception
if future in done_set:
try:
result = future.result()
except Exception as e:
result = e
results.append(result)
else:
# if we timed out, append a timeout error
results.append(concurrent.futures.TimeoutError())
return results
def _check_success(self, results):
"""
This function returns a bool list for all the command results
it gives a True for every command that succeeded and a
false for every command that failed
"""
success_list = []
# determine whether we were able to successfully start all observations
for i in results:
if isinstance(i, Exception):
success_list.append(False)
else:
success_list.append(True)
return success_list
def start(self):
"""
Start the observation or all hosts
"""
# send out the command to start the observation to all _stations at once
commands = [station.start() for station in self._stations]
# wait for the commands to get processed
results = self._get_results(commands)
if not all(self._check_success(results)):
logger.warning("Warning: Unable to start one or more observations")
def stop(self):
"""
stops this observation in all connected stations
"""
# send out the command to abort the observation to all _stations at once
commands = [station.stop() for station in self._stations]
# wait for the commands to get processed
results = self._get_results(commands)
if not all(self._check_success(results)):
logger.warning("Warning: Unable to abort one or more observations")
@property
def observations(self) -> dict:
"""
Returns a dict of host -> station for all the different hosts
Value is None if the host is not connected
"""
return {
station.host: station if station.connected else None
for station in self._stations
}
@property
def is_running(self) -> dict:
"""
Returns a dict whether the observation is running per host.
Returns "{hostname}": False if the host is not connected or if
the command returned an exception
"""
# send out all the commands at once without waiting until each one is finished
commands = [station.is_running for station in self._stations]
# wait for the commands to get processed
results = self._get_results(commands)
# create the dict
return {
station.host: result
if station.connected and not isinstance(result, Exception)
else False
for station, result in zip(self._stations, results)
}
@property
def is_connected(self) -> dict:
"""
Returns a dict for all hosts whether they are _connected or not.
Returns "{hostname}": bool per station
"""
return {station.host: station.connected for station in self._stations}
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
# This file is part of the LOFAR2.0 StationFutures Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
......@@ -9,70 +9,53 @@
Contains class for pythonic interface with observations
"""
from json import dumps
import logging
from tango import DeviceProxy
from lofar_station_client.observation.stationfutures import StationFutures
logger = logging.getLogger()
class Observation:
class StationObservation:
"""
This class provides a pythonic interface to the
ObservationControl and Observation devices on a station.
ObservationControl and Observation devices on a single station.
"""
TIMEOUT = 10
def __init__(
self,
specification: dict,
host: str = "databaseds.tangonet:10000",
station="STAT",
):
self.station = StationFutures(specification, host)
# create device proxies to station at "host"
self._observation_control_dev = DeviceProxy(
f"tango://{host}/{station}/ObservationControl/1"
)
self.host = host
self.station = station
# convert specification dict to json
self._specification = dumps(specification)
# get the ID
self._id = int(specification["observation_id"])
def observation_id(self) -> int:
"""
Returns the observation ID of this observation
"""
return self._id
if not self.station.connected:
raise Exception(f"Was not able to connect with {host}")
def start(self):
"""
Starts this observation
"""
self._observation_control_dev.start_observation(self._specification)
self.station.start().result(timeout=self.TIMEOUT)
def abort(self):
def stop(self):
"""
Aborts this observation
stops this observation
"""
self._observation_control_dev.stop_observation(self._id)
self.station.stop().result(timeout=self.TIMEOUT)
def observation_proxy(self):
@property
def observation(self):
"""
Returns the DeviceProxy of this observation
"""
# return a proxy to the correct Observation device on the station (
# "STAT/Observation/{observation_id}")
return DeviceProxy(f"tango://{self.host}/{self.station}/Observation/{self._id}")
return self.station
@property
def is_running(self):
"""
Returns whether this observation is currently running or not
"""
# return whether the observation is still running (
# ObservationControl.is_observation_running command)
return self._observation_control_dev.is_observation_running(self._id)
return self.station.is_running.result(timeout=self.TIMEOUT)
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 StationFutures Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
"""
Internal class for station states with regards to observations
"""
import concurrent.futures
from json import dumps
import logging
from tango import DeviceProxy, GreenMode, DevFailed
logger = logging.getLogger()
class StationFutures:
"""
Container class station and observation data and acccess
"""
def __init__(self, specification: dict, host: str):
# pylint: disable=C0103
# store general data
self._id = specification["observation_id"]
self._host = host
self._json_specification = dumps(specification)
try:
# connects to the tangoDb and get the proxies
self._control_proxy = DeviceProxy(
f"tango://{host}/STAT/ObservationControl/1"
)
self._observation_proxy = DeviceProxy(
f"tango://{host}/STAT/observation/{self._id}"
)
# gives an exception when it fails to ping the proxies
_ = self._control_proxy.ping()
_ = self._observation_proxy.ping()
# set station to green mode
self._control_proxy.set_green_mode(GreenMode.Futures)
self._observation_proxy.set_green_mode(GreenMode.Futures)
except DevFailed as e:
self._control_proxy = None
self._observation_proxy = None
logger.warning(
"Failed to connect to device on host %s: %s: %s",
host,
e.__class__.__name__,
e,
)
def _failed_future(self):
future = concurrent.futures.Future()
future.set_exception(Exception("Station not connected"))
return future
def start(self) -> concurrent.futures:
"""Start the observation with the given specification on this station"""
if not self.connected:
return self._failed_future()
return self._control_proxy.start_observation(
self._json_specification, wait=False
)
def stop(self) -> concurrent.futures:
"""Stop the observation with the given ID on this station"""
if not self.connected:
return self._failed_future()
return self._control_proxy.stop_observation(self._id, wait=False)
@property
def is_running(self) -> concurrent.futures:
"""get whether the observation with the given ID is running on this station"""
if not self.connected:
return self._failed_future()
return self._control_proxy.is_observation_running(self._id, wait=False)
@property
def connected(self) -> bool:
"""get the connection status of this station"""
return self._control_proxy is not None
@property
def host(self) -> str:
"""get the host name of this station"""
return self._host
@property
def control_proxy(self) -> DeviceProxy:
"""get the control proxy of this station"""
return self._control_proxy
@property
def observation_proxy(self) -> DeviceProxy:
"""get the observation proxy of this station"""
return self._observation_proxy
from unittest import mock
from tests import base
from json import loads
from lofar_station_client.observation.multi_station_observation import (
MultiStationObservation,
)
import concurrent.futures
SPEC_DICT = loads(
"""
{
"observation_id": 12345,
"stop_time": "2106-02-07T00:00:00",
"antenna_mask": [0,1,2,9],
"filter": "HBA_110_190",
"SAPs": [{
"subbands": [10, 20, 30],
"pointing": {
"angle1": 1.5, "angle2": 0, "direction_type": "J2000"
}
}],
"tile_beam":
{ "angle1": 1.5, "angle2": 0, "direction_type": "J2000" },
"first_beamlet": 0
}
"""
)
HOST_LIST = ["host1", "host2", "host3"]
class FakeFuture:
def __init__(self, **kwargs):
return
def done(self):
return True
def result(self):
return True
class TestMultiStationObservation(base.TestCase):
@mock.patch(
"lofar_station_client.observation.multi_station_observation.StationFutures",
autospec=True,
)
def setUp(self, M_station):
self.station_mocks = [mock.Mock(), mock.Mock(), mock.Mock()]
future = concurrent.futures.Future()
future.set_result(True)
for i in self.station_mocks:
i.start.return_value = future
i.stop.return_value = future
type(i).is_running = mock.PropertyMock(return_value=future)
type(i).connected = mock.PropertyMock(return_value=True)
type(i).get_control_poxy = mock.PropertyMock(
return_value="A_real_control_proxy"
)
type(self.station_mocks[0]).observations = mock.PropertyMock(
return_value="A_real_observation_proxy1"
)
type(self.station_mocks[1]).observations = mock.PropertyMock(
return_value="A_real_observation_proxy2"
)
type(self.station_mocks[2]).observations = mock.PropertyMock(
return_value="A_real_observation_proxy3"
)
type(self.station_mocks[0]).host = mock.PropertyMock(return_value="host1")
type(self.station_mocks[1]).host = mock.PropertyMock(return_value="host2")
type(self.station_mocks[2]).host = mock.PropertyMock(return_value="host3")
# apply the mock
M_station.side_effect = self.station_mocks
self.M_station = M_station
self.observation = MultiStationObservation(
specification=SPEC_DICT, hosts=HOST_LIST
)
def test_start(self):
self.observation.start()
self.assertEqual(self.M_station.call_count, 3)
def test_stop(self):
self.observation.stop()
self.assertEqual(self.M_station.call_count, 3)
def test_proxies(self):
expected = {
"host1": self.station_mocks[0],
"host2": self.station_mocks[1],
"host3": self.station_mocks[2],
}
results = self.observation.observations
self.assertEqual(expected, results)
def test_is_running(self):
expected = {"host1": True, "host2": True, "host3": True}
results = self.observation.is_running
self.assertEqual(expected, results)
def test_is_connected(self):
expected = {"host1": True, "host2": True, "host3": True}
results = self.observation.is_connected
self.assertEqual(expected, results)
def test_no_connection(self):
"""
Tests the multiStationObservation classes ability to handle disconnected
stations by providing exceptions instead of values
"""
future = concurrent.futures.Future()
future.set_exception(Exception("oh no"))
for i in self.station_mocks:
type(i).is_running = mock.PropertyMock(return_value=future)
expected = {"host1": False, "host2": False, "host3": False}
results = self.observation.is_running
self.assertEqual(expected, results)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment