diff --git a/README.md b/README.md index 0f522293e759c3a2cb3c1ef6fc20f92b38cf095f..b3524c55e9d02b1b2d9a6fe382e3357a02b1ee42 100644 --- a/README.md +++ b/README.md @@ -7,9 +7,9 @@ [//]: # (TODO Corne, Update badges to use dynamic types such as those from pypi once available) -# Station Client Library +# StationFutures Client Library -Client library for using Tango Station Control. +Client library for using Tango StationFutures Control. Table of Contents: @@ -105,8 +105,13 @@ tox -e debug tests.requests.test_prometheus ``` ## Releasenotes +<<<<<<< HEAD +- 0.10.0 - Added `MultiStationObservation` class for a pythonic interface with observations across multiple stations. +- 0.9.2 - Added `StationObservation` class for pythonic interface with observations +======= - 0.10. - Added generic HDF5 file reader - 0.9.2 - Added `Observation` class for pythonic interface with observations +>>>>>>> 29f5478b2364ddc7cc181acc3b5cbddadaec0d41 - 0.9.1 - Add `last_invalid_packet_exception` parameter to `StatisticsCollector` - 0.9. - Added `devices.LofarDeviceProxy` that transparently exposes >2D attributes - 0.8. - Fixed XST packet parsing. diff --git a/VERSION b/VERSION index 78bc1abd14f2c1f6330989d876c4ee7d5daf7ff6..d9df1bbc0c7befdbc28d61efc28ed3e5c08d015f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.10.0 +0.11.0 diff --git a/lofar_station_client/observation/__init__.py b/lofar_station_client/observation/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..6717505b1f3ee39e9756a6c3882672ceac4771c3 100644 --- a/lofar_station_client/observation/__init__.py +++ b/lofar_station_client/observation/__init__.py @@ -0,0 +1,12 @@ +# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + + +""" +Contains classes to interact with observations +""" + +from .multi_station_observation import MultiStationObservation +from .station_observation import StationObservation + +__all__ = ["MultiStationObservation", "StationObservation"] diff --git a/lofar_station_client/observation/multi_station_observation.py b/lofar_station_client/observation/multi_station_observation.py new file mode 100644 index 0000000000000000000000000000000000000000..d55832cbf0ac9dde9b30542d269a30972ff588d4 --- /dev/null +++ b/lofar_station_client/observation/multi_station_observation.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR2.0 StationFutures Control project. +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" +Contains class for pythonic interface with observations +""" + +import logging +import concurrent.futures + +from lofar_station_client.observation.stationfutures import StationFutures + +logger = logging.getLogger() + + +class MultiStationObservation: + """ + This class provides a pythonic interface + to start an observation on multiple _stations. + """ + + TIMEOUT = 10 + + def __init__(self, specification: dict, hosts: list): + + # convert specification dict to json + self._specification = specification + + # get the ID + self.observation_id = specification["observation_id"] + + # The list of _stations this class controls + self._stations = [StationFutures(self._specification, host) for host in hosts] + + def _get_results(self, futures): + # pylint: disable=W0703, C0103 + """ + This function takes the futures it is given and obtains the results. + It checks whether the future got done in the first place or if it timed out. + And it checks whether the result we got was an actual result or an exception. + """ + + # wait for the futures to complete in the given timeout + done_set, _ = concurrent.futures.wait( + futures, timeout=self.TIMEOUT, return_when=concurrent.futures.ALL_COMPLETED + ) + + results = [] + + for future in futures: + # if the future completed: check whether we got good results or an exception + if future in done_set: + try: + result = future.result() + except Exception as e: + result = e + results.append(result) + else: + # if we timed out, append a timeout error + results.append(concurrent.futures.TimeoutError()) + + return results + + def _check_success(self, results): + """ + This function returns a bool list for all the command results + it gives a True for every command that succeeded and a + false for every command that failed + """ + success_list = [] + + # determine whether we were able to successfully start all observations + for i in results: + if isinstance(i, Exception): + success_list.append(False) + else: + success_list.append(True) + + return success_list + + def start(self): + """ + Start the observation or all hosts + """ + + # send out the command to start the observation to all _stations at once + commands = [station.start() for station in self._stations] + + # wait for the commands to get processed + results = self._get_results(commands) + + if not all(self._check_success(results)): + logger.warning("Warning: Unable to start one or more observations") + + def stop(self): + """ + stops this observation in all connected stations + """ + # send out the command to abort the observation to all _stations at once + commands = [station.stop() for station in self._stations] + + # wait for the commands to get processed + results = self._get_results(commands) + + if not all(self._check_success(results)): + logger.warning("Warning: Unable to abort one or more observations") + + @property + def observations(self) -> dict: + """ + Returns a dict of host -> station for all the different hosts + Value is None if the host is not connected + """ + + return { + station.host: station if station.connected else None + for station in self._stations + } + + @property + def is_running(self) -> dict: + """ + Returns a dict whether the observation is running per host. + Returns "{hostname}": False if the host is not connected or if + the command returned an exception + """ + + # send out all the commands at once without waiting until each one is finished + commands = [station.is_running for station in self._stations] + + # wait for the commands to get processed + results = self._get_results(commands) + + # create the dict + return { + station.host: result + if station.connected and not isinstance(result, Exception) + else False + for station, result in zip(self._stations, results) + } + + @property + def is_connected(self) -> dict: + """ + Returns a dict for all hosts whether they are _connected or not. + Returns "{hostname}": bool per station + """ + return {station.host: station.connected for station in self._stations} diff --git a/lofar_station_client/observation/observation.py b/lofar_station_client/observation/observation.py deleted file mode 100644 index 4e8ff48f1d39cb8969b1d0681f6c06c33b3c9b71..0000000000000000000000000000000000000000 --- a/lofar_station_client/observation/observation.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of the LOFAR2.0 Station Control project. -# -# Distributed under the terms of the APACHE license. -# See LICENSE.txt for more info. - -""" -Contains class for pythonic interface with observations -""" - -from json import dumps -import logging - -from tango import DeviceProxy - -logger = logging.getLogger() - - -class Observation: - """ - This class provides a pythonic interface to the - ObservationControl and Observation devices on a station. - """ - - def __init__( - self, - specification: dict, - host: str = "databaseds.tangonet:10000", - station="STAT", - ): - - # create device proxies to station at "host" - self._observation_control_dev = DeviceProxy( - f"tango://{host}/{station}/ObservationControl/1" - ) - self.host = host - self.station = station - - # convert specification dict to json - self._specification = dumps(specification) - - # get the ID - self._id = int(specification["observation_id"]) - - def observation_id(self) -> int: - """ - Returns the observation ID of this observation - """ - return self._id - - def start(self): - """ - Starts this observation - """ - self._observation_control_dev.start_observation(self._specification) - - def abort(self): - """ - Aborts this observation - """ - self._observation_control_dev.stop_observation(self._id) - - def observation_proxy(self): - """ - Returns the DeviceProxy of this observation - """ - # return a proxy to the correct Observation device on the station ( - # "STAT/Observation/{observation_id}") - return DeviceProxy(f"tango://{self.host}/{self.station}/Observation/{self._id}") - - def is_running(self): - """ - Returns whether this observation is currently running or not - """ - # return whether the observation is still running ( - # ObservationControl.is_observation_running command) - return self._observation_control_dev.is_observation_running(self._id) diff --git a/lofar_station_client/observation/station_observation.py b/lofar_station_client/observation/station_observation.py new file mode 100644 index 0000000000000000000000000000000000000000..08cbb1a5adbdabb2b8f131b1d5043f0d404a2d9a --- /dev/null +++ b/lofar_station_client/observation/station_observation.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR2.0 StationFutures Control project. +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" +Contains class for pythonic interface with observations +""" + +import logging + +from lofar_station_client.observation.stationfutures import StationFutures + +logger = logging.getLogger() + + +class StationObservation: + """ + This class provides a pythonic interface to the + ObservationControl and Observation devices on a single station. + """ + + TIMEOUT = 10 + + def __init__( + self, + specification: dict, + host: str = "databaseds.tangonet:10000", + ): + self.station = StationFutures(specification, host) + + if not self.station.connected: + raise Exception(f"Was not able to connect with {host}") + + def start(self): + """ + Starts this observation + """ + self.station.start().result(timeout=self.TIMEOUT) + + def stop(self): + """ + stops this observation + """ + self.station.stop().result(timeout=self.TIMEOUT) + + @property + def observation(self): + """ + Returns the DeviceProxy of this observation + """ + return self.station + + @property + def is_running(self): + """ + Returns whether this observation is currently running or not + """ + return self.station.is_running.result(timeout=self.TIMEOUT) diff --git a/lofar_station_client/observation/stationfutures.py b/lofar_station_client/observation/stationfutures.py new file mode 100644 index 0000000000000000000000000000000000000000..96bb8a5e9be019fc41dae685614c2d5271ef7bb1 --- /dev/null +++ b/lofar_station_client/observation/stationfutures.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR2.0 StationFutures Control project. +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" +Internal class for station states with regards to observations +""" + +import concurrent.futures +from json import dumps +import logging + +from tango import DeviceProxy, GreenMode, DevFailed + + +logger = logging.getLogger() + + +class StationFutures: + """ + Container class station and observation data and acccess + """ + + def __init__(self, specification: dict, host: str): + # pylint: disable=C0103 + + # store general data + self._id = specification["observation_id"] + self._host = host + self._json_specification = dumps(specification) + + try: + # connects to the tangoDb and get the proxies + self._control_proxy = DeviceProxy( + f"tango://{host}/STAT/ObservationControl/1" + ) + self._observation_proxy = DeviceProxy( + f"tango://{host}/STAT/observation/{self._id}" + ) + # gives an exception when it fails to ping the proxies + _ = self._control_proxy.ping() + _ = self._observation_proxy.ping() + + # set station to green mode + self._control_proxy.set_green_mode(GreenMode.Futures) + self._observation_proxy.set_green_mode(GreenMode.Futures) + + except DevFailed as e: + self._control_proxy = None + self._observation_proxy = None + + logger.warning( + "Failed to connect to device on host %s: %s: %s", + host, + e.__class__.__name__, + e, + ) + + def _failed_future(self): + future = concurrent.futures.Future() + future.set_exception(Exception("Station not connected")) + return future + + def start(self) -> concurrent.futures: + """Start the observation with the given specification on this station""" + if not self.connected: + return self._failed_future() + + return self._control_proxy.start_observation( + self._json_specification, wait=False + ) + + def stop(self) -> concurrent.futures: + """Stop the observation with the given ID on this station""" + if not self.connected: + return self._failed_future() + + return self._control_proxy.stop_observation(self._id, wait=False) + + @property + def is_running(self) -> concurrent.futures: + """get whether the observation with the given ID is running on this station""" + if not self.connected: + return self._failed_future() + + return self._control_proxy.is_observation_running(self._id, wait=False) + + @property + def connected(self) -> bool: + """get the connection status of this station""" + return self._control_proxy is not None + + @property + def host(self) -> str: + """get the host name of this station""" + return self._host + + @property + def control_proxy(self) -> DeviceProxy: + """get the control proxy of this station""" + return self._control_proxy + + @property + def observation_proxy(self) -> DeviceProxy: + """get the observation proxy of this station""" + return self._observation_proxy diff --git a/tests/observation/__init__.py b/tests/observation/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/observation/test_multi_station_observation.py b/tests/observation/test_multi_station_observation.py new file mode 100644 index 0000000000000000000000000000000000000000..64b80bfee4c4467c7c3c61634a8b2a6678f4a0ea --- /dev/null +++ b/tests/observation/test_multi_station_observation.py @@ -0,0 +1,131 @@ +from unittest import mock + + +from tests import base +from json import loads + +from lofar_station_client.observation.multi_station_observation import ( + MultiStationObservation, +) + + +import concurrent.futures + +SPEC_DICT = loads( + """ + { + "observation_id": 12345, + "stop_time": "2106-02-07T00:00:00", + "antenna_mask": [0,1,2,9], + "filter": "HBA_110_190", + "SAPs": [{ + "subbands": [10, 20, 30], + "pointing": { + "angle1": 1.5, "angle2": 0, "direction_type": "J2000" + } + }], + "tile_beam": + { "angle1": 1.5, "angle2": 0, "direction_type": "J2000" }, + "first_beamlet": 0 + } + """ +) + +HOST_LIST = ["host1", "host2", "host3"] + + +class FakeFuture: + def __init__(self, **kwargs): + return + + def done(self): + return True + + def result(self): + return True + + +class TestMultiStationObservation(base.TestCase): + @mock.patch( + "lofar_station_client.observation.multi_station_observation.StationFutures", + autospec=True, + ) + def setUp(self, M_station): + + self.station_mocks = [mock.Mock(), mock.Mock(), mock.Mock()] + + future = concurrent.futures.Future() + future.set_result(True) + + for i in self.station_mocks: + i.start.return_value = future + i.stop.return_value = future + type(i).is_running = mock.PropertyMock(return_value=future) + type(i).connected = mock.PropertyMock(return_value=True) + type(i).get_control_poxy = mock.PropertyMock( + return_value="A_real_control_proxy" + ) + + type(self.station_mocks[0]).observations = mock.PropertyMock( + return_value="A_real_observation_proxy1" + ) + type(self.station_mocks[1]).observations = mock.PropertyMock( + return_value="A_real_observation_proxy2" + ) + type(self.station_mocks[2]).observations = mock.PropertyMock( + return_value="A_real_observation_proxy3" + ) + + type(self.station_mocks[0]).host = mock.PropertyMock(return_value="host1") + type(self.station_mocks[1]).host = mock.PropertyMock(return_value="host2") + type(self.station_mocks[2]).host = mock.PropertyMock(return_value="host3") + + # apply the mock + M_station.side_effect = self.station_mocks + self.M_station = M_station + self.observation = MultiStationObservation( + specification=SPEC_DICT, hosts=HOST_LIST + ) + + def test_start(self): + self.observation.start() + self.assertEqual(self.M_station.call_count, 3) + + def test_stop(self): + self.observation.stop() + self.assertEqual(self.M_station.call_count, 3) + + def test_proxies(self): + expected = { + "host1": self.station_mocks[0], + "host2": self.station_mocks[1], + "host3": self.station_mocks[2], + } + results = self.observation.observations + self.assertEqual(expected, results) + + def test_is_running(self): + expected = {"host1": True, "host2": True, "host3": True} + results = self.observation.is_running + self.assertEqual(expected, results) + + def test_is_connected(self): + expected = {"host1": True, "host2": True, "host3": True} + results = self.observation.is_connected + self.assertEqual(expected, results) + + def test_no_connection(self): + """ + Tests the multiStationObservation classes ability to handle disconnected + stations by providing exceptions instead of values + """ + + future = concurrent.futures.Future() + future.set_exception(Exception("oh no")) + + for i in self.station_mocks: + type(i).is_running = mock.PropertyMock(return_value=future) + + expected = {"host1": False, "host2": False, "host3": False} + results = self.observation.is_running + self.assertEqual(expected, results)