diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json index f82fcb5d1e8c67db00fb73df63ffa25713635d74..97de683bb7ebd6bc26b9e357812aaddd11b3d344 100644 --- a/CDB/LOFAR_ConfigDb.json +++ b/CDB/LOFAR_ConfigDb.json @@ -96,24 +96,14 @@ } } }, - "Observation": { - "STAT": { - "Observation": { - "STAT/Observation/0": { - } - } - } - }, "ObservationControl": { "STAT": { "ObservationControl": { "STAT/ObservationControl/1": { "properties": { "Control_Children": [ - "STAT/Observation/0" ], "Power_Children": [ - "STAT/Observation/0" ] } } diff --git a/CDB/integrations/multiobs_ConfigDb.json b/CDB/integrations/multiobs_ConfigDb.json index e3cf4855da6ae785af99f39941768abbdb170766..ed52f2202654d17367fb9b379b148a7d4aeb6cec 100644 --- a/CDB/integrations/multiobs_ConfigDb.json +++ b/CDB/integrations/multiobs_ConfigDb.json @@ -1,6 +1,6 @@ { "servers": { - "Observation": { + "ObservationControl": { "STAT": { "Observation": { "STAT/Observation/1": {}, diff --git a/CDB/stations/common.json b/CDB/stations/common.json index 9d77c0392f10cc858047f641b14cf5244e5c75ad..45768cf244203c86e94ca786550ae669aaf30e96 100644 --- a/CDB/stations/common.json +++ b/CDB/stations/common.json @@ -85,23 +85,14 @@ } } }, - "Observation": { - "STAT": { - "Observation": { - "STAT/Observation/0": {} - } - } - }, "ObservationControl": { "STAT": { "ObservationControl": { "STAT/ObservationControl/1": { "properties": { "Power_Children": [ - "STAT/Observation/0" ], "Control_Children": [ - "STAT/Observation/0" ] } } diff --git a/CDB/stations/testenv_cs001.json b/CDB/stations/testenv_cs001.json index cb0c1bc74fe03a7c4e6f13ea5c8d0ad2a37e9a03..0cfb260ea9a5df7aecf8f7bf9611556586e19f46 100644 --- a/CDB/stations/testenv_cs001.json +++ b/CDB/stations/testenv_cs001.json @@ -1120,24 +1120,19 @@ } } }, - "Observation": { - "STAT": { - "Observation": { - "STAT/Observation/1": {} - } - } - }, "ObservationControl": { "STAT": { + "Observation": { + "STAT/Observation/1": { + } + }, "ObservationControl": { "STAT/ObservationControl/1": { "properties": { "Power_Children": [ - "STAT/Observation/0", "STAT/Observation/1" ], "Control_Children": [ - "STAT/Observation/0", "STAT/Observation/1" ] } diff --git a/CDB/test_environment_ConfigDb.json b/CDB/test_environment_ConfigDb.json deleted file mode 100644 index 49216e5634c8fce200872549058b41bf2c968aac..0000000000000000000000000000000000000000 --- a/CDB/test_environment_ConfigDb.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "servers": { - "TileBeam": { - "STAT": { - "TileBeam": { - "STAT/TileBeam/HBA": { - "properties": { - "Tracking_enabled_RW_default": [ "True" ] - } - } - } - } - }, - "Observation": { - "STAT": { - "Observation": { - "STAT/Observation/1": {} - } - } - } - } -} diff --git a/docker-compose/device-observation.yml b/docker-compose/device-observation.yml deleted file mode 100644 index d016f2071957c5ba189cd298537e16a19ee6b8a2..0000000000000000000000000000000000000000 --- a/docker-compose/device-observation.yml +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy) -# SPDX-License-Identifier: Apache-2.0 -# -# Docker compose file that launches a LOFAR2.0 station's -# Observation device. -# -# Defines: -# - device-observation: LOFAR2.0 station Observation -# -# Requires: -# - lofar-device-base.yml -# -version: '2.1' - -services: - device-observation: - image: ${LOCAL_DOCKER_REGISTRY_HOST}/${LOCAL_DOCKER_REGISTRY_USER}/lofar-device-base:${TAG} - hostname: device-observation - container_name: device-observation - logging: - driver: "json-file" - options: - max-size: "100m" - max-file: "10" - networks: - - control - ports: - - "5718:5718" # unique port for this DS - - "5818:5818" # ZeroMQ event port - - "5918:5918" # ZeroMQ heartbeat port - extra_hosts: - - "host.docker.internal:host-gateway" - volumes: - - ..:/opt/lofar/tango:rw - environment: - - TANGO_HOST=${TANGO_HOST} - - TANGO_ZMQ_EVENT_PORT=5818 - - TANGO_ZMQ_HEARTBEAT_PORT=5918 - healthcheck: - test: nc -z -v device-observation 5718 - interval: 1m - timeout: 30s - retries: 3 - start_period: 30s - working_dir: /opt/lofar/tango - entrypoint: - - bin/start-ds.sh - # configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA - # can't know about our Docker port forwarding - - l2ss-observation Observation STAT -v -ORBendPoint giop:tcp:0:5718 -ORBendPointPublish giop:tcp:${HOSTNAME}:5718 - restart: on-failure - stop_signal: SIGINT # request a graceful shutdown of Tango - stop_grace_period: 2s diff --git a/docker-compose/http-json-schemas/definitions/observation-settings.json b/docker-compose/http-json-schemas/definitions/observation-settings.json index 2162b60072b138d1d2175f580d0dfad5f260aa09..50a6590f3d0fc671121ffa867349585b56d4d4ec 100644 --- a/docker-compose/http-json-schemas/definitions/observation-settings.json +++ b/docker-compose/http-json-schemas/definitions/observation-settings.json @@ -14,10 +14,21 @@ "type": "number", "minimum": 1 }, + "start_time": { + "type": "string", + "format": "date-time", + "default": "1970-01-01T00:00:00" + }, "stop_time": { "type": "string", "format": "date-time" }, + "lead_time": { + "type": "number", + "description": "Number of seconds to start before the provided start time, to account for initialising the on-line signal chain, and for possibly negative geometrical delay compensation.", + "default": 2.0, + "minimum": 0 + }, "antenna_field": { "default": "HBA", "description": "Antenna field to use", diff --git a/sbin/run_integration_test.sh b/sbin/run_integration_test.sh index 62a878b304944c6b17472d7c7b17f844902f1e5d..2a908c1636097cc23b65fccf09430f0bd1371de8 100755 --- a/sbin/run_integration_test.sh +++ b/sbin/run_integration_test.sh @@ -89,7 +89,7 @@ echo '/usr/local/bin/wait-for-it.sh ${TANGO_HOST} --strict --timeout=300 -- true # Devices list is used to explitly word split when supplied to commands, must # disable shellcheck SC2086 for each case. -DEVICES=(device-station-manager device-boot device-aps device-apsct device-ccd device-ec device-apspu device-sdpfirmware device-sdp device-recvh device-recvl device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-antennafield device-temperature-manager device-observation device-observation-control device-configuration device-calibration) +DEVICES=(device-station-manager device-boot device-aps device-apsct device-ccd device-ec device-apspu device-sdpfirmware device-sdp device-recvh device-recvl device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-antennafield device-temperature-manager device-observation-control device-configuration device-calibration) SIMULATORS=(sdptr-sim recvh-sim recvl-sim unb2-sim apsct-sim apspu-sim ccd-sim ec-sim) diff --git a/tangostationcontrol/docs/source/observing.rst b/tangostationcontrol/docs/source/observing.rst index 7a871f1ab53f62a101eb49669dae1d6d0dce5d20..a207accf044f52ff808feac09e2912611d1535b1 100644 --- a/tangostationcontrol/docs/source/observing.rst +++ b/tangostationcontrol/docs/source/observing.rst @@ -10,7 +10,9 @@ To observe with a station, you must construct the observation's specifications, observation_spec = { "observation_id": 12345, - "stop_time": "2106-02-07T00:00:00", + "start_time": "2106-02-07T00:00:00", + "stop_time": "2106-02-07T01:00:00", + "antenna_field": "HBA", "antenna_set": "ALL", "filter": "HBA_210_250", "SAPs": [{ @@ -22,7 +24,7 @@ To observe with a station, you must construct the observation's specifications, import json obs_control = DeviceProxy("STAT/ObservationControl/1") - obs_control.start_observation(json.dumps(observation_spec)) + obs_control.add_observation(json.dumps(observation_spec)) The above specification contains the following parameters: @@ -31,7 +33,11 @@ The above specification contains the following parameters: +====================+=========================================================================================+ | ``observation_id`` | User-specified unique reference to this observation. | +--------------------+-----------------------------------------------------------------------------------------+ -| ``stop_time`` | Automatically stop observing when this timestamp is reached. | +| ``start_time`` | automatically start observing when this timestamp is reached. | ++--------------------+-----------------------------------------------------------------------------------------+ +| ``stop_time`` | automatically stop observing when this timestamp is reached. | ++--------------------+-----------------------------------------------------------------------------------------+ +| ``antenna_field`` | Which antenna field to use (LBA, HBA, HBA0, HBA1). | +--------------------+-----------------------------------------------------------------------------------------+ | ``antenna_set`` | Which subset of antennas to use (ALL, INNER, OUTER, EVEN, ODD). | +--------------------+-----------------------------------------------------------------------------------------+ @@ -42,7 +48,7 @@ The above specification contains the following parameters: | ``tile_beam`` | Pointing to track with the HBA tiles (optional). | +--------------------+-----------------------------------------------------------------------------------------+ -This will configure the antenna field ``HBA`` as follows: +This will configure the specified antenna field (f.e. ``HBA``) as follows: * ``STAT/DigitalBeam/HBA`` is configured to beam form the antennas in the specified ``antenna_set``, track all pointings given in ``SAPs[x].pointing``, and produce beamlets for all subbands in ``SAPs[x].subbands``. The beamlets mirror the subbands in the order in which they are specified, * The ``observation_id`` is used to annotate the beamlet data produced by this observation, @@ -61,30 +67,39 @@ The effect of the observations can be observed through the following means, all Life cycle ```````````````````````` -The observation will start the moment it enters the system, and stop at the specified time. Each antenna field can have one observation running at a time. +The ObservationControl device will start each Observation when its start time is reached or past, and will stop it at the specified stop time. You can also force this to happen:: + + obs_control = DeviceProxy("STAT/ObservationControl/1") + obs_control.start_observation_now(12345) # starts observation 12345 now, regardless of its specified start time + obs_control.stop_observation_now(12345) # stops observation 12345 now, regardless of its specified stop time Managing observation(s) ------------------------- To manage running observations, we can interact with ObservationControl:: + + >>> # Check which observations are known (running or yet to run) + >>> obs_control.observations_R + array([12345]) + >>> # Check which observations are running >>> obs_control.running_observations_R array([12345]) >>> # Stop a running observation - >>> obs_control.stop_observation(12345) + >>> obs_control.stop_observation_now(12345) >>> # Stop all running observations - >>> obs_control.stop_all_observations() + >>> obs_control.stop_all_observations_now() Alternatively, we can inspect a running observation more closely. Each observation is represented by its own device: ``STAT/Observation/$id``, so if observation 12345 has been started, we can do the following:: observation = DeviceProxy("STAT/Observation/12345") -This device exposes several attributes: +This device exposes its settings as individual attributes, as well as: -:observation_running_R: Ever-increasing value as long as the observation is running. Allows one to check whether monitoring has become stale. +:alive_R: Ever-increasing value as long as the observation is running. Allows one to check whether monitoring has become stale. :type: ``int`` diff --git a/tangostationcontrol/integration_test/configuration/configDB/test_environment_ConfigDb.json b/tangostationcontrol/integration_test/configuration/configDB/test_environment_ConfigDb.json index 49216e5634c8fce200872549058b41bf2c968aac..17ce2a7f58083db9dc31b5b39b34c81bda50cd56 100644 --- a/tangostationcontrol/integration_test/configuration/configDB/test_environment_ConfigDb.json +++ b/tangostationcontrol/integration_test/configuration/configDB/test_environment_ConfigDb.json @@ -11,7 +11,7 @@ } } }, - "Observation": { + "ObservationControl": { "STAT": { "Observation": { "STAT/Observation/1": {} diff --git a/tangostationcontrol/integration_test/configuration/test_device_configuration.py b/tangostationcontrol/integration_test/configuration/test_device_configuration.py index ef6bdc38cd060af159cceef3caf58227b4375628..78c3044b9f3f14f7034089a6e500eafca302f87f 100644 --- a/tangostationcontrol/integration_test/configuration/test_device_configuration.py +++ b/tangostationcontrol/integration_test/configuration/test_device_configuration.py @@ -33,7 +33,7 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase): } } }, - "Observation": { + "ObservationControl": { "STAT": { "Observation": { "STAT/Observation/11": {} @@ -149,7 +149,7 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase): ) # configuration device self.assertFalse( "stat/observation/11" - in dbdata["servers"]["observation"]["stat"]["observation"], + in dbdata["servers"]["observationcontrol"]["stat"]["observation"], msg=f"{dbdata}", ) # observation device self.assertTrue( @@ -184,7 +184,9 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase): # Test whether new device has been added self.assertTrue( "stat/observation/11" - in updated_dbdata["servers"]["observation"]["stat"]["observation"], + in updated_dbdata["servers"]["observationcontrol"]["stat"][ + "observation" + ], msg=f"{updated_dbdata}", ) # observation device # Test whether old device has been deleted @@ -240,7 +242,7 @@ class TestDeviceConfiguration(AbstractTestBases.TestDeviceBase): # Test whether new device has been added self.assertTrue( "stat/observation/11" - in updated_dbdata["servers"]["observation"]["stat"]["observation"], + in updated_dbdata["servers"]["observationcontrol"]["stat"]["observation"], msg=f"{updated_dbdata}", ) # observation device # Test whether old device has NOT been deleted diff --git a/tangostationcontrol/integration_test/default/devices/test_device_observation.py b/tangostationcontrol/integration_test/default/devices/test_device_observation.py index 634b0f2229597674d423f0d0c1c1f8bc35beee0c..b3635044730ea7c36953a46a4563a6ee769cecac 100644 --- a/tangostationcontrol/integration_test/default/devices/test_device_observation.py +++ b/tangostationcontrol/integration_test/default/devices/test_device_observation.py @@ -178,15 +178,29 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): self.proxy.Initialise() self.assertEqual(DevState.STANDBY, self.proxy.state()) - def test_init_invalid(self): + def test_init_no_settings(self): """Initialize an observation with _invalid_ JSON""" self.proxy.off() + + # Cannot start without valid settings with self.assertRaises(DevFailed): - self.proxy.observation_settings_RW = "{}" self.proxy.Initialise() + + # Since initialisation did not succeed, we're still in OFF self.assertEqual(DevState.FAULT, self.proxy.state()) + def test_init_invalid(self): + """Initialize an observation with _invalid_ JSON""" + + self.proxy.off() + + # Cannot write invalid settings + with self.assertRaises(DevFailed): + self.proxy.observation_settings_RW = "{}" + + self.assertEqual(DevState.OFF, self.proxy.state()) + def test_prohibit_rewriting_settings(self): """Test that changing observation settings is disallowed once init""" @@ -205,9 +219,8 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): stop_timestamp = datetime.fromisoformat(data["stop_time"]).timestamp() observation_id = data["observation_id"] antenna_set = data["antenna_set"] - filter = data["filter"] - num_saps = len(data["SAPs"]) - saps_subband = [data["SAPs"][i]["subbands"] for i in range(0, num_saps)] + filter_ = data["filter"] + saps_subband = data["SAPs"][0]["subbands"] pointing_direction = data["SAPs"][0]["pointing"] saps_pointing = [ ( @@ -232,7 +245,7 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): self.assertEqual(stop_timestamp, self.proxy.stop_time_R) self.assertEqual(observation_id, self.proxy.observation_id_R) self.assertEqual(antenna_set, self.proxy.antenna_set_R) - self.assertEqual(filter, self.proxy.filter_R) + self.assertEqual(filter_, self.proxy.filter_R) self.assertListEqual(saps_subband, self.proxy.saps_subband_R.tolist()) self.assertListEqual(saps_pointing, list(self.proxy.saps_pointing_R)) self.assertListEqual(tile_beam, list(self.proxy.tile_beam_R)) diff --git a/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py b/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py index 1e53d46474867f4ada12caf4746fd0ed0c80c875..b97cdf03f41235f29cbccd1a88fc5f4c7cefa062 100644 --- a/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py +++ b/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py @@ -169,9 +169,7 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): parameters["observation_id"] = -1 self.on_device_assert(self.proxy) - self.assertRaises( - DevFailed, self.proxy.start_observation, json.dumps(parameters) - ) + self.assertRaises(DevFailed, self.proxy.add_observation, json.dumps(parameters)) def test_check_and_convert_parameters_invalid_time(self): """Test invalid parameter detection""" @@ -180,29 +178,44 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): parameters["stop_time"] = (datetime.now() - timedelta(seconds=1)).isoformat() self.on_device_assert(self.proxy) - self.assertRaises( - DevFailed, self.proxy.start_observation, json.dumps(parameters) - ) + self.assertRaises(DevFailed, self.proxy.add_observation, json.dumps(parameters)) def test_check_and_convert_parameters_invalid_empty(self): """Test empty parameter detection""" self.on_device_assert(self.proxy) - self.assertRaises(DevFailed, self.proxy.start_observation, "{}") + self.assertRaises(DevFailed, self.proxy.add_observation, "{}") - def test_start_observation(self): - """Test starting an observation""" + def test_add_observation_now(self): + """Test starting an observation now""" self.on_device_assert(self.proxy) - self.proxy.start_observation(self.VALID_JSON) + self.proxy.add_observation(self.VALID_JSON) self.assertTrue(self.proxy.is_any_observation_running()) self.assertTrue(self.proxy.is_observation_running(12345)) - self.proxy.stop_observation(12345) + self.proxy.stop_observation_now(12345) + + def test_add_observation_future(self): + """Test starting an observation in the future""" + + self.on_device_assert(self.proxy) + + parameters = json.loads(self.VALID_JSON) + parameters["start_time"] = (datetime.now() + timedelta(days=1)).isoformat() + parameters["stop_time"] = (datetime.now() + timedelta(days=2)).isoformat() + self.proxy.add_observation(json.dumps(parameters)) + + self.assertIn(12345, self.proxy.observations_R) + self.assertNotIn(12345, self.proxy.running_observations_R) + self.assertFalse(self.proxy.is_any_observation_running()) + self.assertFalse(self.proxy.is_observation_running(12345)) + + self.proxy.stop_observation_now(12345) - def test_start_observation_multiple(self): + def test_add_observation_multiple(self): """Test starting multiple observations""" second_observation_json = json.loads(self.VALID_JSON) @@ -210,49 +223,49 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): self.on_device_assert(self.proxy) - self.proxy.start_observation(self.VALID_JSON) - self.proxy.start_observation(json.dumps(second_observation_json)) + self.proxy.add_observation(self.VALID_JSON) + self.proxy.add_observation(json.dumps(second_observation_json)) self.assertTrue(self.proxy.is_any_observation_running()) self.assertTrue(self.proxy.is_observation_running(12345)) self.assertTrue(self.proxy.is_observation_running(54321)) - self.proxy.stop_observation(12345) - self.proxy.stop_observation(54321) + self.proxy.stop_observation_now(12345) + self.proxy.stop_observation_now(54321) def test_stop_observation_invalid_id(self): """Test stop_observation exceptions for invalid ids""" self.on_device_assert(self.proxy) - self.assertRaises(DevFailed, self.proxy.stop_observation, -1) + self.assertRaises(DevFailed, self.proxy.stop_observation_now, -1) def test_stop_observation_invalid_running(self): """Test stop_observation exceptions for not running""" self.on_device_assert(self.proxy) - self.assertRaises(DevFailed, self.proxy.stop_observation, 2) + self.assertRaises(DevFailed, self.proxy.stop_observation_now, 2) - def test_is_any_observation_running_after_stop_all_observations(self): + def test_is_any_observation_running_after_stop_all_observations_now(self): """Test whether is_any_observation_running conforms when we start & stop an observation""" self.on_device_assert(self.proxy) - self.proxy.start_observation(self.VALID_JSON) - self.proxy.stop_all_observations() + self.proxy.add_observation(self.VALID_JSON) + self.proxy.stop_all_observations_now() # Test false self.assertFalse(self.proxy.is_any_observation_running()) - def test_start_stop_observation(self): + def test_start_stop_observation_now(self): """Test starting and stopping an observation""" self.on_device_assert(self.proxy) # uses ID 12345 - self.proxy.start_observation(self.VALID_JSON) - self.proxy.stop_observation(12345) + self.proxy.add_observation(self.VALID_JSON) + self.proxy.stop_observation_now(12345) # Test false self.assertFalse(self.proxy.is_observation_running(12345)) @@ -266,9 +279,9 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): self.on_device_assert(self.proxy) # uses ID 12345 - self.proxy.start_observation(self.VALID_JSON) - self.proxy.start_observation(json.dumps(second_observation_json)) - self.proxy.stop_all_observations() + self.proxy.add_observation(self.VALID_JSON) + self.proxy.add_observation(json.dumps(second_observation_json)) + self.proxy.stop_all_observations_now() # Test false self.assertFalse(self.proxy.is_observation_running(12345)) @@ -279,6 +292,4 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): parameters = json.loads(self.VALID_JSON) parameters["antenna_set"] = "ZZZ" self.on_device_assert(self.proxy) - self.assertRaises( - DevFailed, self.proxy.start_observation, json.dumps(parameters) - ) + self.assertRaises(DevFailed, self.proxy.add_observation, json.dumps(parameters)) diff --git a/tangostationcontrol/tangostationcontrol/common/observation_controller.py b/tangostationcontrol/tangostationcontrol/common/observation_controller.py index 9843316f2f0a278f4be8d026f947d74dbd71574d..360e353515b25fa4b1f88cbeb97da696f7ce52c5 100644 --- a/tangostationcontrol/tangostationcontrol/common/observation_controller.py +++ b/tangostationcontrol/tangostationcontrol/common/observation_controller.py @@ -4,7 +4,6 @@ import logging import time from datetime import datetime -from typing import List from tango import DevFailed, DevState, Except, Util, EventType, DeviceProxy from tangostationcontrol.common.lofar_logging import log_exceptions @@ -14,7 +13,11 @@ from tangostationcontrol.configuration import ObservationSettings logger = logging.getLogger() -class RunningObservation(object): +class Observation(object): + @property + def proxy(self) -> DeviceProxy: + return self._device_proxy + @property def observation_id(self) -> int: return self._parameters.observation_id @@ -32,7 +35,7 @@ class RunningObservation(object): # Name for the Observation.observation_running subscription @property def attribute_name(self) -> str: - return f"{self.device_name}/observation_running_R" + return f"{self.device_name}/alive_R" def __init__(self, tango_domain, parameters: ObservationSettings): self._device_proxy: DeviceProxy | None = None @@ -44,7 +47,7 @@ class RunningObservation(object): # have one instance of it. self._tango_util: Util = Util.instance() - def create_tango_device(self): + def create_observation_device(self): """Instatiate an Observation Device""" logger.info("Create device: %s", self.device_name) try: @@ -70,28 +73,41 @@ class RunningObservation(object): logger.exception(error_string) Except.re_throw_exception(ex, "DevFailed", error_string, __name__) - def create_device_proxy(self): + def destroy_observation_device(self): + try: + self._tango_util.delete_device(self.class_name, self.device_name) + except DevFailed: + logger.exception( + f"Could not delete device {self.device_name} of class {self.class_name} from Tango DB." + ) + + def initialise_observation(self): # Instantiate a dynamic Tango Device "Observation". self._device_proxy = create_device_proxy(self.device_name) + # Initialise generic properties + self.proxy.put_property({"Control_Children": [], "Power_Children": []}) + # Configure the dynamic device its attribute for the observation # parameters. - self._device_proxy.observation_settings_RW = self._parameters.to_json() + self.proxy.observation_settings_RW = self._parameters.to_json() # Take the Observation device through the motions. Pass the # entire JSON set of parameters so that it can pull from it what it # needs. - self._device_proxy.Initialise() + self.proxy.Initialise() - # The call to On will actually tell the Observation device to - # become fully active. - self._device_proxy.On() + def start(self): + self.proxy.On() + + def is_running(self): + return self.proxy and self.proxy.state() == DevState.ON def subscribe(self, cb): # Turn on the polling for the attribute. # Note that this is not automatically done despite the attribute # having the right polling values set in the ctor. - self._device_proxy.poll_attribute(self.attribute_name.split("/")[-1], 1000) + self.proxy.poll_attribute(self.attribute_name.split("/")[-1], 1000) # Right. Now subscribe to periodic events. self._event_id = self._device_proxy.subscribe_event( @@ -101,92 +117,47 @@ class RunningObservation(object): "Successfully started an observation with ID=%s.", self.observation_id ) - def shutdown(self): + def stop(self): # Check if the device has not terminated itself in the meanwhile. try: - self._device_proxy.ping() + self.proxy.ping() except DevFailed: - logger.warning( - "The device for the Observation with ID=%s \ - has unexpectedly already disappeared. It is advised to check \ - the logs up to 10s prior to this message to see what happened.", - {self.observation_id}, + logger.error( + f"Observation device for ID={self.observation_id} unexpectedly disappeared." ) else: # Unsubscribe from the subscribed event. - event_id = self._event_id - self._device_proxy.unsubscribe_event(event_id) + self.proxy.unsubscribe_event(self._event_id) # Tell the Observation device to stop the running # observation. This is a synchronous call and the clean-up # does not take long. - self._device_proxy.Off() - - # Wait for 1s for the Observation device to go to - # DevState.OFF. Force shutdown if observation.state() is - # not OFF. - remaining_wait_time = 1.0 - sleep_time = 0.1 - stopped = False - while remaining_wait_time > 0.0: - if self._device_proxy.state() is DevState.OFF: - stopped = True - break - time.sleep(sleep_time) - remaining_wait_time = remaining_wait_time - sleep_time - # Check if the observation object is really in OFF state. - if stopped: - logger.info( - "Successfully stopped the observation with ID=%s", - {self.observation_id}, - ) - else: - logger.warning( - "Could not shut down the Observation device ( %s ) \ - for observation ID=%s. This means that there is a \ - chance for a memory leak. Will continue anyway and forcefully delete \ - the Observation object.", - {self.device_name}, - {self.observation_id}, - ) + self.proxy.Off() # Finally remove the device object from the Tango DB. - try: - self._tango_util.delete_device(self.class_name, self.device_name) - except DevFailed: - logger.warning( - "Something went wrong when the device %s \ - was removed from the Tango DB. There is nothing that can be done \ - about this here at this moment but you should check the Tango DB yourself.", - {self.device_name}, - ) + self.destroy_observation_device() -class ObservationController(object): - @property - def running_observations(self) -> List[int]: - return list(self._running_observations.keys()) +class ObservationController(dict[str, Observation]): + """A dictionary of observations. Actively manages the observation state transtions + (start, stop).""" def __init__(self, tango_domain: str): self._tango_util = Util.instance() self._tango_domain = tango_domain - self._running_observations: dict[int, RunningObservation] = {} - def is_any_observation_running(self): - return len(self._running_observations) > 0 - - def is_observation_running(self, obs_id): - observation = self._running_observations.get(obs_id) - return observation is not None + @property + def running_observations(self) -> list[str]: + return [obs_id for obs_id, obs in self.items() if obs.is_running()] @log_exceptions() - def observation_running_callback(self, event): + def observation_callback(self, event): """ - This callback checks if a running observation is still - supposed to run. If this function finds out that the - observation is not supposed to run any more, then - self.stop_observation(obs_id) is called which takes care of the - clean up. + This callback checks and manages the state transitions + for each observation. + + It starts observations at their specified start_time, + and stops & removes them at their specified stop_time. """ if event.err: # Something is fishy with this event. @@ -194,108 +165,98 @@ class ObservationController(object): "The Observation device %s sent an event but the event \ signals an error. It is advised to check the logs for any indication \ that something went wrong in that device. Event data=%s", - {event.device}, - {event}, + event.device, + event, ) return + # update the state of this observation, if needed + self._update_observation_state(event.device) + + def _update_observation_state(self, device: DeviceProxy): + """Start or stop the observation managed by the given Observation + device.""" + # Get the Observation ID from the sending device. - obs_id = event.device.observation_id_R + obs_id = device.observation_id_R - # Check if the observation is still supposed to run. - running_obs = self._running_observations.copy() - if not running_obs: - # No obs is running??? - logger.warning( - "Received an observation_running event for the observation with ID=%s. \ - According to the records in ObservationControl, this observation is \ - not supposed to run. Please check previous logs, especially around the time \ - an observation with this ID was started. Will continue and ignore this event.", - {obs_id}, - ) - return + # Get the start/stop times from the sending device + obs_start_time = device.start_time_R + obs_stop_time = device.stop_time_R - if obs_id in running_obs: - # Get the Observation's stop_time from the Observation device. - obs_stop_time = event.device.stop_time_R - current_obs_time = event.attr_value.value - # I expect that current_obs_time is always - # smaller than the stop time that I read from my - # records. - delta_t = obs_stop_time - current_obs_time - if delta_t < 0.0: - # The observation has not finished yet and is - # more than 1.0 seconds past its scheduled stop - # time. Tell the observation to finish and clean up. - obs = running_obs[obs_id] - self.stop_observation(obs_id) - else: - # The observation that we are trying to process is not part - # of the running_obs dictionary - logger.warning( - "Received an observation_running event for the observation with ID=%s. \ - According to the records in ObservationControl, this observation is \ - not supposed to run. Please check previous logs, especially around \ - the time an observation with this ID was started. \ - Will continue and ignore this event.", - {obs_id}, - ) - return + # Get how much earlier we have to start + obs_lead_time = device.lead_time_R + + # Obtain the current time ONCE to avoid race conditions + now = time.time() + + # Manage state transitions + if now > obs_stop_time: + # Stop observation + self.stop_observation_now(obs_id) + elif ( + now >= obs_start_time - obs_lead_time and device.state() == DevState.STANDBY + ): + # Start observation + self.start_observation(obs_id) - def start_observation(self, settings: ObservationSettings): + def add_observation(self, settings: ObservationSettings): """Create a new Observation Device and start an observation""" # Check further properties that cannot be validated through a JSON schema if settings.stop_time <= datetime.now(): - error = f'Cannot start an observation with ID={settings.observation_id} \ - because the parameter stop_time parameter value="{settings.stop_time}" \ - is invalid. Set a stop_time parameter later in time than the start time.' - Except.throw_exception("IllegalCommand", error, __name__) + raise ValueError( + f"Cannot start observation {settings.observation_id} because it is already past its stop time {settings.stop_time}" + ) - obs = RunningObservation(self._tango_domain, settings) - obs.create_tango_device() + obs = Observation(self._tango_domain, settings) + obs.create_observation_device() try: - obs.create_device_proxy() + obs.initialise_observation() except DevFailed as ex: # Remove the device again. - self._tango_util.delete_device(obs.class_name, obs.device_name) - error_string = f"Cannot access the Observation device instance for observation \ - ID={obs.observation_id} with device class name={obs.class_name} and \ - device instance name={obs.device_name}. \ - This means that the observation likely did not start \ - but certainly cannot be controlled and/or forcefully be stopped." - logger.exception(error_string) - Except.re_throw_exception(ex, "DevFailed", error_string, __name__) - - # Finally update the self.running_observation dict's entry of this - # observation with the complete set of info. - self._running_observations[obs.observation_id] = obs + obs.destroy_observation_device() + + raise Exception( + "Failed to initialise observation {settings.observation_id}" + ) from ex + + # Register this observation now that is has been succesfully created + self[obs.observation_id] = obs + # Keep pinging it to manage its state transitions try: - obs.subscribe(self.observation_running_callback) + obs.subscribe(self.observation_callback) except DevFailed as ex: - self._tango_util.delete_device(obs.class_name, obs.device_name) - error_string = "Cannot access the Observation device instance for observation ID=%s \ - with device class name=%s and device instance name=%s. This means that the observation \ - cannot be controlled and/or forcefully be stopped." - logger.exception( - error_string, obs.observation_id, obs.class_name, obs.device_name - ) - Except.re_throw_exception(ex, "DevFailed", error_string, __name__) + # Remove the device again. + obs.destroy_observation_device() - def stop_observation(self, obs_id): + raise Exception( + "Cannot subscribe to attribute. Cancelled observation {settings.observation_id}." + ) from ex + + # Make sure the current state is accurate + self._update_observation_state(obs.proxy) + + def start_observation(self, obs_id): + """Start the observation with the given ID""" + try: + observation = self[obs_id] + except KeyError as _exc: + raise Exception(f"Unknown observation: {obs_id}") + + observation.start() + + def stop_observation_now(self, obs_id): """Stop the observation with the given ID""" - if self.is_observation_running(obs_id) is False: - error = f"Cannot stop an observation with ID={obs_id}, \ - because the observation is not running." - Except.throw_exception("IllegalCommand", error, __name__) - - # Fetch the obs data and remove it from the dict of - # currently running observations. - observation = self._running_observations.pop(obs_id) - observation.shutdown() - - def stop_all_observations(self): - """Stop all running observations""" - for obs_id in self._running_observations.copy().keys(): - self.stop_observation(obs_id) + try: + observation = self.pop(obs_id) + except KeyError as _exc: + raise Exception(f"Unknown observation: {obs_id}") + + observation.stop() + + def stop_all_observations_now(self): + """Stop all observations (running or to be run)""" + for obs_id in list(self): # draw a copy as we modify the list + self.stop_observation_now(obs_id) diff --git a/tangostationcontrol/tangostationcontrol/configuration/observation_settings.py b/tangostationcontrol/tangostationcontrol/configuration/observation_settings.py index 4e8c4e5df1e6e77927acd97d4c54c5f0599070f5..b8f49369f062899c95c2b95369ab9a506acaf4dd 100644 --- a/tangostationcontrol/tangostationcontrol/configuration/observation_settings.py +++ b/tangostationcontrol/tangostationcontrol/configuration/observation_settings.py @@ -13,15 +13,18 @@ class ObservationSettings(_ConfigurationBase): def __init__( self, observation_id: int, + start_time: datetime | None, stop_time: datetime, antenna_field: str, antenna_set: str, filter: str, SAPs: Sequence[Sap], - tile_beam: Pointing = None, + tile_beam: Pointing | None = None, first_beamlet: int = 0, + lead_time: float | None = None, ): self.observation_id = observation_id + self.start_time = start_time self.stop_time = stop_time self.antenna_field = antenna_field self.antenna_set = antenna_set @@ -29,10 +32,13 @@ class ObservationSettings(_ConfigurationBase): self.SAPs = SAPs self.tile_beam = tile_beam self.first_beamlet = first_beamlet + self.lead_time = lead_time def __iter__(self): + yield "observation_id", self.observation_id + if self.start_time: + yield "start_time", self.start_time.isoformat() yield from { - "observation_id": self.observation_id, "stop_time": self.stop_time.isoformat(), "antenna_field": self.antenna_field, "antenna_set": self.antenna_set, @@ -42,11 +48,16 @@ class ObservationSettings(_ConfigurationBase): if self.tile_beam: yield "tile_beam", dict(self.tile_beam) yield "first_beamlet", self.first_beamlet + if self.lead_time is not None: + yield "lead_time", self.lead_time @staticmethod def to_object(json_dct) -> "ObservationSettings": return ObservationSettings( json_dct["observation_id"], + datetime.fromisoformat(json_dct["start_time"]) + if "start_time" in json_dct + else None, datetime.fromisoformat(json_dct["stop_time"]), json_dct["antenna_field"], json_dct["antenna_set"], @@ -54,4 +65,5 @@ class ObservationSettings(_ConfigurationBase): json_dct["SAPs"], json_dct["tile_beam"] if "tile_beam" in json_dct else None, json_dct["first_beamlet"] if "first_beamlet" in json_dct else 0, + json_dct["lead_time"] if "lead_time" in json_dct else None, ) diff --git a/tangostationcontrol/tangostationcontrol/devices/observation.py b/tangostationcontrol/tangostationcontrol/devices/observation.py index af767e7cc22ea86ffba395de34707211d828a23f..471f0a1fc8b026cdb28343564f8ccd97e9f14be2 100644 --- a/tangostationcontrol/tangostationcontrol/devices/observation.py +++ b/tangostationcontrol/tangostationcontrol/devices/observation.py @@ -1,6 +1,7 @@ # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 +from itertools import chain import logging from time import time from typing import Optional @@ -9,7 +10,7 @@ import numpy from jsonschema.exceptions import ValidationError # PyTango imports -from tango import AttrWriteType, DeviceProxy, DevState, Util, Except +from tango import AttrWriteType, DeviceProxy, DevState, Util from tango.server import attribute from tangostationcontrol.common.constants import ( DEFAULT_POLLING_PERIOD, @@ -25,7 +26,6 @@ from tangostationcontrol.common.proxy import create_device_proxy from tangostationcontrol.configuration import ObservationSettings from tangostationcontrol.devices.device_decorators import fault_on_error from tangostationcontrol.devices.device_decorators import only_in_states -from tangostationcontrol.devices.device_decorators import only_when_on from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice logger = logging.getLogger() @@ -40,38 +40,136 @@ class Observation(LOFARDevice): specific observation. It will, if necessary keep tabs on HW MPs to signal issues that are not caught by MPs being outside their nominal range. - The lifecycle of instances of this device is controlled by ObservationControl + The lifecycle of instances of this device is controlled by ObservationControl. + + Settings are written to observation_settings_RW, which can only be done + in the OFF state. """ # Attributes - observation_running_R = attribute( + @attribute( + doc="Return a value that changes over time, to check if the device is alive.", dtype=numpy.float64, - access=AttrWriteType.READ, polling_period=DEFAULT_POLLING_PERIOD, period=DEFAULT_POLLING_PERIOD, rel_change="1.0", ) - observation_id_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ) - stop_time_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ) - antenna_field_R = attribute(dtype=str, access=AttrWriteType.READ) - antenna_set_R = attribute(dtype=str, access=AttrWriteType.READ) - filter_R = attribute(dtype=str, access=AttrWriteType.READ) - saps_subband_R = attribute( - dtype=((numpy.uint32,),), + def alive_R(self): + return time() + + @attribute( + doc="Observation ID", + dtype=numpy.int64, + ) + def observation_id_R(self): + return self._observation_settings.observation_id + + @attribute( + doc="Observation start time (seconds since 1970), or 0 if immediate.", + unit="s", + dtype=numpy.float64, + ) + def start_time_R(self): + return ( + self._observation_settings.start_time.timestamp() + if self._observation_settings.start_time + else 0.0 + ) + + @attribute( + doc="Observation stop time (seconds since 1970).", + unit="s", + dtype=numpy.float64, + ) + def stop_time_R(self): + return self._observation_settings.stop_time.timestamp() + + @attribute( + doc="Seconds to be on sky before the observation start time, to fill buffers and allow for negative geometric delay compensation downstream.", + unit="s", + dtype=numpy.float64, + ) + def lead_time_R(self): + return self._observation_settings.lead_time or 0.0 + + @attribute( + doc="Which antenna field this Observation configures", + dtype=str, + fisallowed="is_attribute_access_allowed", + ) + def antenna_field_R(self): + return self._observation_settings.antenna_field + + @attribute( + doc="Which antenna set this Observation configures", + dtype=str, + fisallowed="is_attribute_access_allowed", + ) + def antenna_set_R(self): + return self._observation_settings.antenna_set + + @attribute( + doc="Which band filter to use for all antennas", + dtype=str, + fisallowed="is_attribute_access_allowed", + ) + def filter_R(self): + return self._observation_settings.filter + + @attribute( + doc="Which subbands to beamform for each beamlet.", + dtype=(numpy.uint32,), max_dim_x=N_beamlets_ctrl, - max_dim_y=N_beamlets_ctrl, - access=AttrWriteType.READ, + fisallowed="is_attribute_access_allowed", ) - saps_pointing_R = attribute( + def saps_subband_R(self): + return numpy.array( + list(chain(*[sap.subbands for sap in self._observation_settings.SAPs])), + dtype=numpy.uint32, + ) + + @attribute( + doc="Which pointing to beamform towards for each beamlet.", dtype=((str,),), max_dim_x=N_point_prop, max_dim_y=N_beamlets_ctrl, - access=AttrWriteType.READ, ) - tile_beam_R = attribute( - dtype=(str,), max_dim_x=N_point_prop, access=AttrWriteType.READ + def saps_pointing_R(self): + saps_pointing = [] + for sap in self._observation_settings.SAPs: + for _ in sap.subbands: + saps_pointing.append( + ( + sap.pointing.direction_type, + f"{sap.pointing.angle1}deg", + f"{sap.pointing.angle2}deg", + ), + ) + return saps_pointing + + @attribute( + doc="Which pointing to beamform all HBA tiles to (if any).", + dtype=(str,), + max_dim_x=N_point_prop, ) - first_beamlet_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ) + def tile_beam_R(self): + if self._observation_settings.tile_beam is None: + return None + + pointing_direction = self._observation_settings.tile_beam + return [ + str(pointing_direction.direction_type), + f"{pointing_direction.angle1}deg", + f"{pointing_direction.angle2}deg", + ] + + @attribute( + doc="Beamlet index of the FPGA output, at which to start mapping the beamlets of this observation.", + dtype=numpy.uint64, + fisallowed="is_attribute_access_allowed", + ) + def first_beamlet_R(self): + return self._observation_settings.first_beamlet observation_settings_RW = attribute(dtype=str, access=AttrWriteType.READ_WRITE) @@ -92,30 +190,67 @@ class Observation(LOFARDevice): def configure_for_initialise(self): """Load the JSON from the attribute and configure member variables""" + if self._observation_settings is None: + raise RuntimeError("Device can not be initialized without configuration") + super().configure_for_initialise() - if self._observation_settings is None: - Except.throw_exception( - "IllegalCommand", - "Device can not be initialized without configuration", - __name__, - ) + logger.info( + f"Initialising observation with ID={self._observation_settings.observation_id} " + f"with settings: {self._observation_settings.to_json()}" + ) + + self._prepare_observation() + + logger.info( + f"The observation with ID={self._observation_settings.observation_id} " + f"is initialised to run between {self._observation_settings.start_time} " + f"and {self._observation_settings.stop_time}." + ) + + def configure_for_on(self): + """Indicate the observation has started""" + + super().configure_for_on() + + self._start_observation() + + logger.info( + f"Started the observation with ID={self._observation_settings.observation_id}." + ) + + def configure_for_off(self): + """Indicate the observation has stopped""" + + super().configure_for_off() + + self._stop_observation() + + logger.info( + "Stopped the observation with ID=%s.", + { + self._observation_settings.observation_id + if self._observation_settings + else None + }, + ) + + @log_exceptions() + def _prepare_observation(self): + """Setup proxies and other preparations that can be done ahead of + the observation start time without changing any station + settings.""" # ObservationControl takes already good care of checking that the # parameters are in order and sufficient. It is therefore unnecessary # at the moment to check the parameters here again. # This could change when the parameter check becomes depending on # certain aspects that only an Observation device can know. - self._num_saps = len(self._observation_settings.SAPs) - self._saps_pointing = self._build_saps_pointing(self._observation_settings) antennafield = self._observation_settings.antenna_field - # Set a reference of AntennaField device that is correlated to this device util = Util.instance() - # TODO(Stefano): set a proper policy for the devices instance number - # It cannot be inherited from the Observation instance number - # (i.e. Observation_id) + self.antennafield_proxy = create_device_proxy( f"{util.get_ds_inst_name()}/AntennaField/{antennafield}" ) @@ -130,140 +265,52 @@ class Observation(LOFARDevice): f"{util.get_ds_inst_name()}/DigitalBeam/{antennafield}" ) - if self._observation_settings.tile_beam: + if self._has_tilebeam(): # Set a reference of TileBeam device that is correlated to this device self.tilebeam_proxy = create_device_proxy( f"{util.get_ds_inst_name()}/TileBeam/{antennafield}" ) - logger.info( - "The observation with ID=%s is configured." - "It will begin as soon as On() is called and it is supposed to stop at %s.", - self._observation_settings.observation_id, - self._observation_settings.stop_time, - ) - - def configure_for_off(self): - """Indicate the observation has stopped""" - - super().configure_for_off() - - logger.info( - "Stopped the observation with ID=%s.", - { - self._observation_settings.observation_id - if self._observation_settings - else None - }, - ) - - def configure_for_on(self): - """Indicate the observation has started""" - - super().configure_for_on() + @log_exceptions() + def _start_observation(self): + """Configure the station for this observation.""" # Apply ObservationID self.antennafield_proxy.FPGA_sdp_info_observation_id_RW = ( - self._apply_observation_id(self._observation_settings.observation_id) + self._apply_observation_id(self.read_attribute("observation_id_R")) ) # Apply Antenna Set - self.digitalbeam_proxy.Antenna_Set_RW = self._observation_settings.antenna_set + self.digitalbeam_proxy.Antenna_Set_RW = self.read_attribute("antenna_set_R") - (frequency_band,) = self._apply_antennafield_settings(self.read_filter_R()) + (frequency_band,) = self._apply_antennafield_settings( + self.read_attribute("filter_R") + ) self.antennafield_proxy.Frequency_Band_RW = frequency_band # Apply Beamlet configuration self.beamlet_proxy.subband_select_RW = self._apply_saps_subbands( - self.read_saps_subband_R() + self.read_attribute("saps_subband_R") ) self.digitalbeam_proxy.Pointing_direction_RW = self._apply_saps_pointing( - self.read_saps_pointing_R() + self.read_attribute("saps_pointing_R") ) # Apply Tile Beam pointing direction # NB: This is applied to all tiles, not just the ones in the selected # antenna set. - tile_beam = self.read_tile_beam_R() - if tile_beam is not None: + if self._has_tilebeam(): + tile_beam = self.read_attribute("tile_beam_R") + self.tilebeam_proxy.Pointing_direction_RW = [ tuple(tile_beam) ] * self.antennafield_proxy.nr_antennas_R - logger.info( - "Started the observation with ID=%s.", - {self._observation_settings.observation_id}, - ) - - @only_when_on() - @fault_on_error() - @log_exceptions() - def read_observation_id_R(self): - """Return the observation_id_R attribute.""" - return self._observation_settings.observation_id - - @only_when_on() - @fault_on_error() @log_exceptions() - def read_stop_time_R(self): - """Return the stop_time_R attribute.""" - return self._observation_settings.stop_time.timestamp() + def _stop_observation(self): + """Tear down station resources we used.""" - @only_in_states([DevState.STANDBY, DevState.ON]) - @fault_on_error() - @log_exceptions() - def read_antenna_field_R(self): - """Return the antenna_field_R attribute.""" - return self._observation_settings.antenna_field - - @only_in_states([DevState.STANDBY, DevState.ON]) - @fault_on_error() - @log_exceptions() - def read_antenna_set_R(self): - """Return the antenna_set_R attribute.""" - return self._observation_settings.antenna_set - - @only_in_states([DevState.STANDBY, DevState.ON]) - @fault_on_error() - @log_exceptions() - def read_filter_R(self): - """Return the filter_R attribute.""" - return self._observation_settings.filter - - @only_in_states([DevState.STANDBY, DevState.ON]) - @fault_on_error() - @log_exceptions() - def read_saps_subband_R(self): - """Return the saps_subband_R attribute.""" - return [sap.subbands for sap in self._observation_settings.SAPs] - - @only_in_states([DevState.STANDBY, DevState.ON]) - @fault_on_error() - @log_exceptions() - def read_saps_pointing_R(self): - """Return the saps_pointing_R attribute.""" - return self._saps_pointing - - @only_in_states([DevState.STANDBY, DevState.ON]) - @fault_on_error() - @log_exceptions() - def read_tile_beam_R(self): - """Return the tile_beam_R attribute.""" - if self._observation_settings.tile_beam is None: - return None - pointing_direction = self._observation_settings.tile_beam - return [ - str(pointing_direction.direction_type), - f"{pointing_direction.angle1}deg", - f"{pointing_direction.angle2}deg", - ] - - @only_in_states([DevState.STANDBY, DevState.ON]) - @fault_on_error() - @log_exceptions() - def read_first_beamlet_R(self): - """Return the first_beamlet_R attribute.""" - return self._observation_settings.first_beamlet + pass @fault_on_error() @log_exceptions() @@ -284,30 +331,11 @@ class Observation(LOFARDevice): self._observation_settings = ObservationSettings.from_json(parameters) except ValidationError: self._observation_settings = None - # Except.throw_exception("IllegalCommand", e.message, __name__) + raise - @only_when_on() - @fault_on_error() - @log_exceptions() - def read_observation_running_R(self): - """Return the observation_running_R attribute.""" - # TODO(Corne): Actually keep track of the running time and perform proper - # value - return time() - - def _build_saps_pointing(self, parameters: ObservationSettings): - """Build the sap pointing array preserving the correct order from JSON""" - saps_pointing = [] - for sap in parameters.SAPs: - for _ in sap.subbands: - saps_pointing.append( - ( - sap.pointing.direction_type, - f"{sap.pointing.angle1}deg", - f"{sap.pointing.angle2}deg", - ), - ) - return saps_pointing + def _has_tilebeam(self): + """Return whether this observation should control a TileBeam device.""" + return self._observation_settings.antenna_field.startswith("HBA") def _apply_antennafield_settings(self, filter_name: str): """Retrieve the RCU band from filter name, returning the correct format for @@ -320,20 +348,23 @@ class Observation(LOFARDevice): def _apply_saps_subbands(self, sap_subbands: list): """Convert an array of subbands into the correct format for Beamlet device""" subband_select = self.beamlet_proxy.subband_select_RW - first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64) + first_beamlet = self.read_attribute("first_beamlet_R") + # Insert subband values starting from the first beamlet - sap_subbands = numpy.array(sap_subbands).flatten() - subband_select[first_beamlet : len(sap_subbands)] = sap_subbands + subband_select[first_beamlet : first_beamlet + len(sap_subbands)] = sap_subbands + return subband_select def _apply_saps_pointing(self, sap_pointing: list): """Convert an array of string directions into the correct format for DigitalBeam device""" - pointing_direction = list( - self.digitalbeam_proxy.Pointing_direction_RW - ) # convert to list to allows item assignment - first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64) + pointing_direction = list(self.digitalbeam_proxy.Pointing_direction_RW) + first_beamlet = self.read_attribute("first_beamlet_R") + # Insert pointing values starting from the first beamlet - pointing_direction[first_beamlet : len(sap_pointing)] = sap_pointing + pointing_direction[ + first_beamlet : first_beamlet + len(sap_pointing) + ] = sap_pointing + return tuple(pointing_direction) def _apply_observation_id(self, observation_id: numpy.int64): diff --git a/tangostationcontrol/tangostationcontrol/devices/observation_control.py b/tangostationcontrol/tangostationcontrol/devices/observation_control.py index 3cb53961a88b3e6d8e19f84d287335484a864406..acf35a664d82d2523b3fc14b7e947586f1715f48 100644 --- a/tangostationcontrol/tangostationcontrol/devices/observation_control.py +++ b/tangostationcontrol/tangostationcontrol/devices/observation_control.py @@ -5,9 +5,7 @@ import logging import numpy from tango import ( - Except, DevState, - AttrWriteType, DebugIt, Util, DevBoolean, @@ -21,7 +19,7 @@ from tangostationcontrol.common.lofar_logging import ( log_exceptions, ) from tangostationcontrol.configuration import ObservationSettings -from tangostationcontrol.devices.device_decorators import only_when_on, fault_on_error +from tangostationcontrol.devices.device_decorators import only_when_on from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice from tangostationcontrol.devices.observation import Observation @@ -48,13 +46,14 @@ class ObservationControl(LOFARDevice): are available to perform the set-up. Essentially this is what happens: - Somebody calls ObservationControl.start_observation(parameters). + Somebody calls ObservationControl.add_observation(parameters). Then ObservationControl will perform: - Creates a new instance of an Observation device in the Tango DB - Call Initialise(parameters) - Wait for initialise to return - Check status() - If status() is NOT STANDBY, abort with an exception + - Wait for start_time - lead_time - Call On() - Subscribe to the Observation.running MP's periodic event - Register the observation in the dict self.running_observations[ID] @@ -74,9 +73,10 @@ class ObservationControl(LOFARDevice): Functions - Normal lifecycle funcs: initialise, on, off - - start_observation(parameters) - - stop_observation(ID) - - stop_all_observations() + - add_observation(parameters) + - start_observation_now(ID) + - stop_observation_now(ID) + - stop_all_observations_now() - running_observations() -> dict - is_observation_running(obs_id) -> bool @@ -85,9 +85,23 @@ class ObservationControl(LOFARDevice): """ # Attributes - running_observations_R = attribute( - dtype=(numpy.int64,), max_dim_x=1000, access=AttrWriteType.READ + @attribute( + doc="List of registered observations.", + dtype=(numpy.int64,), + max_dim_x=1000, + fisallowed="is_attribute_access_allowed", ) + def observations_R(self): + return list(self._observation_controller) + + @attribute( + doc="List of registered observations that are running.", + dtype=(numpy.int64,), + max_dim_x=1000, + fisallowed="is_attribute_access_allowed", + ) + def running_observations_R(self): + return self._observation_controller.running_observations def __init__(self, cl, name): # Super must be called after variable assignment due to executing init_device! @@ -109,79 +123,79 @@ class ObservationControl(LOFARDevice): self.set_state(DevState.OFF) # Increase the number of polling threads for this device server. + # NB: This server hosts both ObservationControl and Observation + # devices. Util.instance().set_polling_threads_pool_size(10) # Lifecycle functions - def configure_for_initialise(self): - self._observation_controller = ObservationController(self.myTangoDomain) - + @log_exceptions() def configure_for_off(self): - self.stop_all_observations() + super().configure_for_off() - @only_when_on() - @fault_on_error() - @log_exceptions() - def read_running_observations_R(self): - return self._observation_controller.running_observations + self.stop_all_observations_now() # API @command(dtype_in=DevString) @only_when_on() @log_exceptions() def start_observation(self, parameters: DevString = None): - self._observation_controller.start_observation( + """Deprecated. For backward compatibility with old lofar-station-clients.""" + + self.add_observation(parameters) + + @command(dtype_in=numpy.int64) + @only_when_on() + @log_exceptions() + def stop_observation(self, obs_id: numpy.int64): + """Deprecated. For backward compatibility with old lofar-station-clients.""" + + self.stop_observation_now(obs_id) + + @command(dtype_in=DevString) + @only_when_on() + @log_exceptions() + def add_observation(self, parameters: DevString = None): + """Add an observation. Start/stop it at the specified start/stop times.""" + self._observation_controller.add_observation( ObservationSettings.from_json(parameters) ) + @command(dtype_in=DevString) + @only_when_on() + @log_exceptions() + def start_observation_now(self, obs_id: numpy.int64 = 0): + """Force an observation to start now.""" + logger.info("Force starting the observation with ID=%s.", obs_id) + + self._observation_controller.start_observation(obs_id) + @command(dtype_in=numpy.int64) @only_when_on() @log_exceptions() - def stop_observation(self, obs_id: numpy.int64 = 0): - # Parameter check, do not execute an observation in case - # the parameters are not sufficient. - if obs_id < 1: - # Do not execute - error = f"Cannot stop an observation with ID={obs_id}, \ - because the observation ID is invalid." - Except.throw_exception("IllegalCommand", error, __name__) - elif self._observation_controller.is_observation_running(obs_id) is False: - error = f"Cannot stop an observation with ID={obs_id}, \ - because the observation is not running." - Except.throw_exception("IllegalCommand", error, __name__) - - logger.info("Stopping the observation with ID=%s.", obs_id) - - self._observation_controller.stop_observation(obs_id) + def stop_observation_now(self, obs_id: numpy.int64): + """Force an observation to stop now.""" + logger.info("Force stopping the observation with ID=%s.", obs_id) + + self._observation_controller.stop_observation_now(obs_id) @command() @only_when_on() @log_exceptions() - def stop_all_observations(self): - # Make a copy of the running_observations dict. This - # should prevent race conditions. - if not self.is_any_observation_running(): - return - # Make certain that the dict does not get modified - # while I am iterating over it. - self._observation_controller.stop_all_observations() + def stop_all_observations_now(self): + """Force all observations to stop now.""" + self._observation_controller.stop_all_observations_now() @command(dtype_in=numpy.int64, dtype_out=DevBoolean) @only_when_on() @log_exceptions() - def is_observation_running(self, obs_id: numpy.int64 = -1) -> DevBoolean: - # Parameter check, do not execute if obs_id is invalid - if obs_id < 1: - # Do not execute - error = f"Cannot check if an observation with ID={obs_id} is running, \ - because the observation ID is invalid" - Except.throw_exception("IllegalCommand", error, __name__) - return self._observation_controller.is_observation_running(obs_id) + def is_observation_running(self, obs_id: numpy.int64) -> DevBoolean: + return obs_id in self._observation_controller.running_observations @command(dtype_out=DevBoolean) @only_when_on() @log_exceptions() def is_any_observation_running(self) -> DevBoolean: - return self._observation_controller.is_any_observation_running() + return len(self._observation_controller.running_observations) > 0 # ---------- @@ -189,4 +203,7 @@ class ObservationControl(LOFARDevice): # ---------- def main(**kwargs): """Main function of the ObservationControl module.""" + + # The ObservationControl device spawns Observation devices, so we manage + # both in this DeviceServer. return entry((ObservationControl, Observation), verbose=True, **kwargs) diff --git a/tangostationcontrol/test/common/test_observation_controller.py b/tangostationcontrol/test/common/test_observation_controller.py index 3aa1091ab8212dee9233c7e0b8bec47570775171..a9d31c01c77e80dd712f843bdd436918e12fa090 100644 --- a/tangostationcontrol/test/common/test_observation_controller.py +++ b/tangostationcontrol/test/common/test_observation_controller.py @@ -10,39 +10,48 @@ from unittest.mock import Mock from tango import DevState from tangostationcontrol.common import ObservationController -from tangostationcontrol.common.observation_controller import RunningObservation +from tangostationcontrol.common.observation_controller import Observation from tangostationcontrol.common.proxy import create_device_proxy from tangostationcontrol.configuration import ObservationSettings, Pointing, Sap from test import base +class MockObservation(object): + def __init__(self, running): + self._running = running + + def is_running(self): + return self._running + + @mock.patch("tango.Util.instance") class TestObservationController(base.TestCase): """Test Observation Controller main operations""" - def test_is_any_observation_running(self, _): + def test_observations_running(self, _): sut = ObservationController("DMR") - self.assertFalse(sut.is_any_observation_running()) - sut._running_observations[1] = {} - self.assertTrue(sut.is_any_observation_running()) + self.assertListEqual([], sut.running_observations) + sut[1] = MockObservation(True) + self.assertListEqual([1], sut.running_observations) - def test_is_observation_running(self, _): + def test_observations_not_running(self, _): sut = ObservationController("DMR") - self.assertFalse(sut.is_observation_running(2)) - sut._running_observations[2] = {} - self.assertTrue(sut.is_observation_running(2)) + self.assertListEqual([], sut.running_observations) + sut[2] = MockObservation(False) + self.assertListEqual([], sut.running_observations) - def test_stop_all_observations_no_running(self, _): + def test_stop_all_observations_now_no_running(self, _): sut = ObservationController("DMR") - sut.stop_all_observations() + sut.stop_all_observations_now() @mock.patch("tango.Util.instance") -class TestRunningObservation(base.TestCase): +class TestObservation(base.TestCase): SETTINGS = ObservationSettings( 5, datetime.fromisoformat("2022-10-26T11:35:54.704150"), + datetime.fromisoformat("2022-10-27T11:35:54.704150"), "HBA", "ALL", "filter settings", @@ -50,34 +59,42 @@ class TestRunningObservation(base.TestCase): ) def test_properties(self, _): - sut = RunningObservation("DMR", TestRunningObservation.SETTINGS) + sut = Observation("DMR", TestObservation.SETTINGS) self.assertEqual(5, sut.observation_id) self.assertEqual("Observation", sut.class_name) self.assertEqual("DMR/Observation/5", sut.device_name) - self.assertEqual("DMR/Observation/5/observation_running_R", sut.attribute_name) + self.assertEqual("DMR/Observation/5/alive_R", sut.attribute_name) - def test_create_tango_device(self, tu_mock): - sut = RunningObservation("DMR", TestRunningObservation.SETTINGS) - sut.create_tango_device() + def test_create_observation_device(self, tu_mock): + sut = Observation("DMR", TestObservation.SETTINGS) + sut.create_observation_device() @mock.patch("tango.DeviceProxy") - def test_create_device_proxy(self, dp_mock, tu_mock): + def test_initialise_observation(self, dp_mock, tu_mock): importlib.reload(sys.modules[create_device_proxy.__module__]) - sut = RunningObservation("DMR", TestRunningObservation.SETTINGS) - sut.create_device_proxy() + sut = Observation("DMR", TestObservation.SETTINGS) + sut.initialise_observation() self.assertEqual( dp_mock.return_value.observation_settings_RW, - TestRunningObservation.SETTINGS.to_json(), + TestObservation.SETTINGS.to_json(), ) dp_mock.return_value.Initialise.assert_called() + + @mock.patch("tango.DeviceProxy") + def test_start(self, dp_mock, tu_mock): + importlib.reload(sys.modules[create_device_proxy.__module__]) + sut = Observation("DMR", TestObservation.SETTINGS) + sut.initialise_observation() + sut.start() + dp_mock.return_value.On.assert_called() def test_subscribe(self, _): def dummy(): pass - sut = RunningObservation("DMR", TestRunningObservation.SETTINGS) + sut = Observation("DMR", TestObservation.SETTINGS) dp_mock = Mock() sut._device_proxy = dp_mock sut.subscribe(dummy) @@ -85,20 +102,19 @@ class TestRunningObservation(base.TestCase): dp_mock.poll_attribute.assert_called() dp_mock.subscribe_event.assert_called() - def test_shutdown(self, tu_mock): - importlib.reload(sys.modules[RunningObservation.__module__]) - sut = RunningObservation("DMR", TestRunningObservation.SETTINGS) + def test_stop(self, tu_mock): + importlib.reload(sys.modules[Observation.__module__]) + sut = Observation("DMR", TestObservation.SETTINGS) dp_mock = Mock() dp_mock.state.return_value = DevState.OFF sut._device_proxy = dp_mock - sut.shutdown() + sut.stop() dp_mock.ping.assert_called() dp_mock.unsubscribe_event.assert_called() dp_mock.Off.assert_called() - dp_mock.state.assert_called() tu_mock.return_value.delete_device.assert_called() diff --git a/tangostationcontrol/test/configuration/test_observation_settings.py b/tangostationcontrol/test/configuration/test_observation_settings.py index 7f67d02e141da7e663b8dd480f0015c11ed23013..049ec2416b806181ff9149542d4edff1d2668140 100644 --- a/tangostationcontrol/test/configuration/test_observation_settings.py +++ b/tangostationcontrol/test/configuration/test_observation_settings.py @@ -95,6 +95,7 @@ class TestObservationSettings(base.TestCase): sut = ObservationSettings( 5, datetime.fromisoformat("2022-10-26T11:35:54.704150"), + datetime.fromisoformat("2022-10-27T11:35:54.704150"), "HBA", "ALL", "filter settings", @@ -105,7 +106,8 @@ class TestObservationSettings(base.TestCase): ) self.assertEqual( sut.to_json(), - '{"observation_id": 5, "stop_time": "2022-10-26T11:35:54.704150", ' + '{"observation_id": 5, "start_time": "2022-10-26T11:35:54.704150", ' + '"stop_time": "2022-10-27T11:35:54.704150", ' '"antenna_field": "HBA", ' '"antenna_set": "ALL", "filter": "filter settings", "SAPs": ' '[{"subbands": [3, 2], "pointing": {"angle1": 1.2, "angle2": 2.1, '