diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json index b8dcb0bb74baf78e66e88374b4319dcd7b5ff448..bbfb3c47ef31db0c647bbc2c5a62b908915601fa 100644 --- a/CDB/LOFAR_ConfigDb.json +++ b/CDB/LOFAR_ConfigDb.json @@ -7,6 +7,13 @@ } } }, + "observation_control": { + "LTS": { + "ObservationControl": { + "LTS/ObservationControl/1": {} + } + } + }, "PCC": { "LTS": { "PCC": { diff --git a/devices/common/lofar_logging.py b/devices/common/lofar_logging.py index b0e1c2ac4d23d67522cbc984280015fa18a042b6..aed0353461d75ae6ad46b4b10ad51289fb08b553 100644 --- a/devices/common/lofar_logging.py +++ b/devices/common/lofar_logging.py @@ -35,13 +35,13 @@ class TangoLoggingHandler(logging.Handler): class LogAnnotator(logging.Formatter): """ Annotates log records with: - + record.tango_device: the Tango Device that is executing. """ @staticmethod def get_current_tango_device() -> Device: - """ Return the tango Device we're currently executing for, or None if it can't be detected. - + """ Return the tango Device we're currently executing for, or None if it can't be detected. + This is derived by traversing the stack and find a Device as 'self'. In some cases, this fails, for example if a separate Thread is started for a certain Device. """ @@ -130,7 +130,7 @@ def device_logging_to_python(): """ Decorator. Call this on a Tango Device instance or class to have your Tango Device log to python instead. """ def inner(cls): - # we'll be doing very weird things if this class isnt + # we'll be doing very weird things if this class isnt if not issubclass(cls, Device): raise ValueError("device_logging_to_python decorator is to be used on Tango Device classes only.") diff --git a/devices/devices/observation.py b/devices/devices/observation.py new file mode 100644 index 0000000000000000000000000000000000000000..0ac9cbc1837fdd8e7ded14bb6c8459226c223866 --- /dev/null +++ b/devices/devices/observation.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR2.0 Station Control project. +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + + +# TODO(Corne): Remove sys.path.append hack once packaging is in place! +import os, sys +currentdir = os.path.dirname(os.path.realpath(__file__)) +parentdir = os.path.dirname(currentdir) +sys.path.append(parentdir) + +# PyTango imports +from tango import server, Except, DevState, AttrWriteType, DevString, DebugIt +from tango.server import Device, run, command, attribute +import numpy +from time import time + +from devices.device_decorators import * +from common.lofar_logging import device_logging_to_python, log_exceptions +from common.lofar_git import get_version + +from json import loads + +__all__ = ["Observation", "main"] + +@device_logging_to_python() +class Observation(Device): + """ Observation Device for LOFAR2.0 + This Tango device is responsible for the set-up of hardware for a specific observation. It will, if necessary keep tabs on HW MPs to signal issues that are not caught by MPs being outside their nominal range. + + The lifecycle of instances of this device is controlled by ObservationControl + """ + # Attributes + version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version()) + observation_running_R = attribute(dtype = numpy.float, access = AttrWriteType.READ, polling_period = 1000, period = 1000, rel_change = "1.0") + observation_id_R = attribute(dtype = numpy.int64, access = AttrWriteType.READ) + stop_time_R = attribute(dtype = numpy.float, access = AttrWriteType.READ) + + # Core functions + @log_exceptions() + def init_device(self): + Device.init_device(self) + self.set_state(DevState.OFF) + self._observation_id = -1 + self._stop_time = 0.0 + + @log_exceptions() + def delete_device(self): + """Hook to delete resources allocated in init_device. + This method allows for any memory or other resources + allocated in the init_device method to be released. + This method is called by the device destructor and by + the device Init command (a Tango built-in). + """ + self.debug_stream("Shutting down...") + if self.get_state() != DevState.OFF: + self.Off() + self.debug_stream("Shut down. Good bye.") + + # Lifecycle functions + @command(dtype_in = DevString) + @only_in_states([DevState.FAULT, DevState.OFF]) + @log_exceptions() + def Initialise(self, parameters: DevString = None): + self.set_state(DevState.INIT) + # ObservationControl takes already good care of checking that the + # parameters are in order and sufficient. It is therefore unnecessary + # at the moment to check the parameters here again. + # This could change when the parameter check becomes depending on + # certain aspects that only an Observation device can know. + self.observation_parameters = loads(parameters) + + self._observation_id = int(self.observation_parameters.get("id")) + self._stop_time = float(self.observation_parameters.get("stop_time")) + self.set_state(DevState.STANDBY) + self.info_stream("The observation with ID={} is configured. It will begin as soon as On() is called and it is supposed to stop at {}.".format(self._observation_id, self._stop_time)) + + @command() + @only_in_states([DevState.STANDBY]) + @log_exceptions() + def On(self): + self.set_state(DevState.ON) + self.info_stream("Started the observation with ID={}.".format(self._observation_id)) + + @command() + @log_exceptions() + def Off(self): + self.stop_polling(True) + self.set_state(DevState.OFF) + self.info_stream("Stopped the observation with ID={}.".format(self._observation_id)) + + @only_when_on() + @fault_on_error() + @log_exceptions() + def read_observation_id_R(self): + """Return the observation_id_R attribute.""" + return self._observation_id + + @only_when_on() + @fault_on_error() + @log_exceptions() + def read_stop_time_R(self): + """Return the stop_time_R attribute.""" + return self._stop_time + + @only_when_on() + @fault_on_error() + @log_exceptions() + def read_observation_running_R(self): + """Return the observation_running_R attribute.""" + return time() + + +# ---------- +# Run server +# ---------- +def main(args = None, **kwargs): + """Main function of the ObservationControl module.""" + return run((Observation,), args = args, **kwargs) + + +if __name__ == '__main__': + main() diff --git a/devices/devices/observation_control.py b/devices/devices/observation_control.py new file mode 100644 index 0000000000000000000000000000000000000000..9b60f86bb983057d023483ebaa61164bdfba5bee --- /dev/null +++ b/devices/devices/observation_control.py @@ -0,0 +1,454 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR2.0 Station Control project. +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + + +# TODO(Corne): Remove sys.path.append hack once packaging is in place! +import os, sys +currentdir = os.path.dirname(os.path.realpath(__file__)) +parentdir = os.path.dirname(currentdir) +sys.path.append(parentdir) + +# PyTango imports +from tango import Except, DevFailed, DevState, AttrWriteType, DebugIt, DeviceProxy, Util, DevBoolean, DevString +from tango.server import Device, run, command, device_property, attribute +from tango import EventType + +import numpy +import time +from json import loads + +from devices.device_decorators import * +from common.lofar_logging import device_logging_to_python, log_exceptions +from common.lofar_git import get_version + +from observation import Observation + + +__all__ = ["ObservationControl", "main"] + +@device_logging_to_python() +class ObservationControl(Device): + """ Observation Control Device Server for LOFAR2.0 + The ObservationControl Tango device controls the instantiation of a Tango Dynamic Device from the Observation class. ObservationControl then keeps a record of the Observation devices and if they are still alive. + + At the end of an observation ObservationControl checks if the respective Observation device has stopped its execution and releases it. If the Observation device has not stopped its execution yet, it is attempted to forcefully stop the execution of the Observation device. Then the Observation device is removed from the list of running observations. + + The Observation devices are responsible for the "real" execution of an observation. They get references to the hardware devices that are needed to set values in the relevant Control Points. The Observation device performs only a check if enough parameters are available to perform the set-up. + + Essentially this is what happens: + Somebody calls ObservationControl.start_observation(parameters). Then ObservationControl will perform: + - Creates a new instance of an Observation device in the Tango DB + - Call Initialise(parameters) + - Wait for initialise to return + - Check status() + - If status() is NOT STANDBY, abort with an exception + - Call On() + - Subscribe to the Observation.running MP's periodic event + - Register the observation in the dict self.running_observations[ID] + - The Observation updates the MP every second with the current time + - The callback gets called periodically. It checks if MP value > stop (stored in the dict under the obs IDS. By this it can determine if the observation is done. + - if MP value > observation end + - Remove observation ID from running obs dict + - Unsubscribe from the MP's event + - Call off() + - Remove the device from the Tango DB which will make the device disappear + + This should in broad strokes pretty much cover any type of observation. + + ObservationControl can expose this interface: + + Functions + - Normal lifecycle funcs: initialise, on, off + - start_observation(parameters) + - stop_observation(ID) + - stop_all_observations() + - running_observations() -> dict + - is_observation_running(obs_id) -> bool + + MPs + - array[int] running_observations + - string version + """ + # Attributes + version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version()) + running_observations_R = attribute(dtype = (numpy.int64, ), access = AttrWriteType.READ) + + # Core functions + @log_exceptions() + @DebugIt() + def init_device(self): + """ Keep all the observation objects for currently running + observations in this dict. The key is the observation idea + and the value is a dict of the parameters. + The value dict contains at least these key/value pairs: + ["proxy"]: tango.DeviceProxy - the DeviceProxy for the Observation object + ["event_id"]: int + ["parameters"]: {parameters as passed to the start_observation call} + ["stop_time"]: timestamp when the observation is supposed to stop. + ["device_name"]: name of the device instance in Tango DB + ["class_name"]: name of the device's class, needed to create an instance + on the fly. + """ + Device.init_device(self) + self.set_state(DevState.OFF) + + self.running_observations = {} + + # The pyTango.Util class is a singleton and every DS can only + # have one instance of it. + self.tango_util = Util.instance() + + # Increase the number of polling threads for this device server. + self.tango_util.set_polling_threads_pool_size(10) + + # The top level tango domain is the left-most part of a + # device's name. + self.myTangoDomain = self.get_name().split('/')[0] + + @log_exceptions() + @DebugIt() + def delete_device(self): + """Hook to delete resources allocated in init_device. + This method allows for any memory or other resources + allocated in the init_device method to be released. + This method is called by the device destructor and by + the device Init command (a Tango built-in). + """ + if self.get_state != DevState.OFF: + self.Off() + + # Lifecycle functions + @command() + @only_in_states([DevState.FAULT, DevState.OFF]) + @log_exceptions() + @DebugIt() + def Initialise(self): + self.set_state(DevState.INIT) + self.running_observations.clear() + self.set_state(DevState.STANDBY) + + @command() + @only_in_states([DevState.STANDBY]) + @log_exceptions() + @DebugIt() + def On(self): + self.set_state(DevState.ON) + + @command() + @log_exceptions() + @DebugIt() + def Off(self): + if self.get_state() != DevState.OFF: + self.stop_all_observations() + self.set_state(DevState.OFF) + + @command() + @log_exceptions() + @DebugIt() + def Fault(self): + stop_all_observations() + self.set_state(DevState.FAULT) + + @only_when_on() + @fault_on_error() + @log_exceptions() + def read_running_observations_R(self): + obs = [ key for key in self.running_observations ] + self.debug_stream("{}".format(obs)) + return obs + + @log_exceptions() + def observation_running_callback(self, event): + """ + This callback checks if a running observation is still + supposed to run. If this function finds out that the + observation is not supposed to run any more, then + self.stop_observation(obs_id) is called which takes care of the + clean up. + """ + if event.err: + # Something is fishy with this event. + self.warn_stream("The Observation device {} sent an event but the event signals an error. It is advised to check the logs for any indication that something went wrong in that device. Event data={} ".format(event.device, event)) + return + + # Get the Observation ID from the sending device. + obs_id = event.device.observation_id_R + + # Check if the observation is still supposed to run. + running_obs = self.running_observations.copy() + if not running_obs: + # No obs is running??? + self.warn_stream("Received an observation_running event for the observation with ID={}. According to the records in ObservationControl, this observation is not supposed to run. Please check previous logs, especially around the time an observation with this ID was started. Will continue and ignore this event.".format(obs_id)) + return + + if id in running_obs: + # Get the Observation's stop_time from the Observation device. + obs_stop_time = event.device.stop_time_R + current_obs_time = event.attr_value.value + # I expect that current_obs_time is always + # smaller than the stop time that I read from my + # records. + delta_t = obs_stop_time - current_obs_time + if delta_t < 0.0: + # The observation has not finished yet and is + # more than 1.0 seconds past its scheduled stop + # time. Tell the observation to finish and clean up. + obs = running_obs[id] + self.stop_observation(obs_id) + + @only_when_on() + @log_exceptions() + def check_and_convert_parameters(self, parameters: DevString = None) -> dict: + """ + The parameters are passed as JSON in a char[]. + Conveniently json.loads() creates a dict from the input. + :param parameters: The parameters as JSON + :type parameters: DevVarUCharArray + :return: None + """ + # Convert the input parameter to a dict. + parameter_dict = loads(parameters) + + self.debug_stream("incoming parameter_array = {}, parameter_dict = {}".format(parameters, parameter_dict)) + + # Parameter check, do not execute an observation in case + # the parameters are not sufficiently defined. + obs_id = int(parameter_dict.get("obs_id")) + stop_time = float(parameter_dict.get("stop_time")) + # TODO: Once ticket https://support.astron.nl/jira/browse/L2SS-254 is + # done, this needs to be replaced by a proper JSON verification + # against a schema. + if obs_id is None or obs_id < 1: + # Do not execute + error = "Cannot start an observation with ID={} because the observation ID is invalid. The ID must be any integer >= 1.".format(obs_id) + Except.throw_exception("IllegalCommand", error, __name__) + elif stop_time is None or stop_time <= time.time(): + error = "Cannot start an observation with ID={} because the parameter stop_time parameter value=\"{}\" is invalid. It needs to be expressed as the number of seconds since the Unix epoch.".format(obs_id, stop_time) + Except.throw_exception("IllegalCommand", error, __name__) + elif len(parameters) == 0: + error = "Cannot start an observation with ID={} because the parameter set is empty.".format(obs_id) + Except.throw_exception("IllegalCommand", error, __name__) + return parameter_dict + + def delete_dynamic_device(self, class_name: str = None, device_name: str = None): + """ + Remove a Tango device from the Tango DB. This calls delete_device(). + """ + if class_name is not None and device_name is not None: + try: + # Remove the device from the Tango DB. + self.tango_util.delete_device(class_name, device_name) + except DevFailed as ex: + # It is OK if this fails. This likely means that the device did + # never exist in the Tango DB. Still add a warning to the logs. + self.warn_stream("Something went wrong when it was attempted to remove the device {} from the Tango DB. You should better go and check the logs. Exception: {}".format(device_name, ex)) + pass + else: + self.error_stream("Cannot delete a device from the Tango DB if the device's class name or the device name are not provided: class_name={}, device_name={}".format(class_name, device_name)) + + def create_dynamic_device(self, class_name: str = None, device_name: str = None): + """ + Create a Tango device instance for a Device class in the Tango DB. + This will automatically instantiate the device and also call + init_device. + """ + try: + self.tango_util.create_device(class_name, device_name) + except DevFailed as ex: + self.delete_dynamic_device(class_name, device_name) + error_string = "Cannot start the device {} for the device class {}. Exception: {}".format(device_name, class_name, ex) + self.error_stream("{}, {}".format(error_string, ex)) + Except.re_throw_exception(ex, "DevFailed", error_string, __name__) + + # API + @command(dtype_in = DevString) + @only_when_on() + @log_exceptions() + def start_observation(self, parameters: DevString = None): + # Store everything about the observation in this dict. I store this + # dict at the end in self.running_observations. + observation = {"parameters": self.check_and_convert_parameters(parameters)} + obs_id = int(observation["parameters"].get("obs_id")) + + # The class name of the Observation class is needed to create and + # delete the device. + class_name = Observation.__name__ + observation["class_name"] = class_name + + # Generate the Tango DB name for the Observation device. + device_name = "{}/{}/{}".format(self.myTangoDomain, class_name, obs_id) + observation["device_name"] = device_name + + try: + # Create the Observation device and instantiate it. + self.create_dynamic_device(class_name, device_name) + except DevFailed as ex: + error_string = "Cannot create the Observation device instance {} for ID={}. This means that the observation did not start.".format(device_name, obs_id) + self.error_stream(error_string) + Except.re_throw_exception(ex, "DevFailed", error_string, __name__) + + try: + # Instantiate a dynamic Tango Device "Observation". + device_proxy = DeviceProxy(device_name) + observation["device_proxy"] = device_proxy + + # Take the Observation device through the motions. Pass the + # entire JSON set of parameters so that it can pull from it what it + # needs. + device_proxy.Initialise(parameters) + + # The call to On will actually tell the Observation device to + # become fully active. + device_proxy.On() + except DevFailed as ex: + # Remove the device again. + self.delete_dynamic_device(class_name, device_name) + error_string = "Cannot access the Observation device instance for observation ID={} with device class name={} and device instance name={}. This means that the observation likely did not start but certainly cannot be controlled and/or forcefully be stopped.".format(obs_id, class_name, device_name) + self.error_stream("{}, {}".format(error_string, ex)) + Except.re_throw_exception(ex, "DevFailed", error_string, __name__) + + try: + # Subscribe to the obs.observation_running MP + # + # Generate the name for the Observation.observation_running + # MP. + attribute_name = "{}/observation_running_R".format(device_name) + observation["attribute_name"] = attribute_name + + # Turn on the polling for the attribute. + # Note that this is not automatically done despite the attribute + # having the right polling values set in the ctor. + device_proxy.poll_attribute(attribute_name.split('/')[-1], 1000) + + # Note: I update the running_observations dict already here because + # the addition of an event listener immediately triggers that + # event. And since the call back checks if the obs_id is in the dict + # this triggers an error message if the ID is not already known. + # There is no harm in copying the dict twice. + self.running_observations[obs_id] = observation + + # Right. Now subscribe to periodic events. + event_id = device_proxy.subscribe_event(attribute_name.split('/')[-1], EventType.PERIODIC_EVENT, self.observation_running_callback) + observation["event_id"] = event_id + + # Finally update the self.running_observation dict's entry of this + # observation with the complete set of info. + self.running_observations[obs_id] = observation + self.info_stream("Successfully started an observation with ID={}.".format(obs_id)) + except DevFailed as ex: + self.delete_dynamic_device(class_name, device_name) + error_string = "Cannot access the Observation device instance for observation ID={} with device class name={} and device instance name={}. This means that the observation cannot be controlled and/or forcefully be stopped.".format(obs_id, Observation.__name__, device_name) + self.error_stream("{}, {}".format(error_string, ex)) + Except.re_throw_exception(ex, "DevFailed", error_string, __name__) + + @command(dtype_in = numpy.int64) + @only_when_on() + @log_exceptions() + def stop_observation(self, obs_id: numpy.int64 = 0): + # Parameter check, do not execute an observation in case + # the parameters are not sufficient. + if obs_id < 1: + # Do not execute + error = "Cannot stop an observation with ID={}, because the observation ID is invalid.".format(obs_id) + Except.throw_exception("IllegalCommand", error, __name__) + elif self.is_observation_running(obs_id) is False: + error = "Cannot stop an observation with ID={}, because the observation is not running.".format(obs_id) + Except.throw_exception("IllegalCommand", error, __name__) + + self.info_stream("Stopping the observation with ID={}.".format(obs_id)) + # Fetch the obs data and remove it from the dict of + # currently running observations. + observation = self.running_observations.pop(obs_id) + device_proxy = observation.pop("device_proxy") + + # Check if the device has not terminated itself in the meanwhile. + try: + device_proxy.ping() + except DevFailed: + self.warn_stream("The device for the Observation with ID={} has unexpectedly already disappeared. It is advised to check the logs up to 10s prior to this message to see what happened.".format(obs_id)) + else: + # Unsubscribe from the subscribed event. + event_id = observation.pop("event_id") + device_proxy.unsubscribe_event(event_id) + + # Tell the Observation device to stop the running + # observation. This is a synchronous call and the clean-up + # does not take long. + device_proxy.Off() + + # Wait for 1s for the Observation device to go to + # DevState.OFF. Force shutdown if observation.state() is + # not OFF. + remaining_wait_time = 1.0 + sleep_time = 0.1 + stopped = False + while remaining_wait_time > 0.0: + if device_proxy.state() is DevState.OFF: + stopped = True + break + time.sleep(sleep_time) + remaining_wait_time = remaining_wait_time - sleep_time + # Check if the observation object is really in OFF state. + if stopped: + self.info_stream("Successfully stopped the observation with ID={}.".format(obs_id)) + else: + self.warn_stream("Could not shut down the Observation device (\"{}\") for observation ID={}. This means that there is a chance for a memory leak. Will continue anyway and forcefully delete the Observation object.".format(observation["device_name"], obs_id)) + + # Finally remove the device object from the Tango DB. + try: + self.delete_dynamic_device(observation["class_name"], observation["device_name"]) + except DevFailed: + self.warn_stream("Something went wrong when the device {} was removed from the Tango DB. There is nothing that can be done about this here at this moment but you should check the Tango DB yourself.".format(observation["device_name"])) + + @command() + @only_when_on() + @log_exceptions() + def stop_all_observations(self): + # Make a copy of the running_observations dict. This + # should prevent race conditions. + if self.is_any_observation_running(): + # Make certain that the dict does not get modified + # while I am iterating over it. + active_obs = self.running_observations.copy() + for obs_id in active_obs.keys(): + self.stop_observation(obs_id) + + @command(dtype_in = numpy.int64, dtype_out = DevBoolean) + @only_when_on() + @log_exceptions() + def is_observation_running(self, obs_id: numpy.int64 = -1) -> DevBoolean: + # Parameter check, do not execute if obs_id is invalid + if obs_id < 1: + # Do not execute + error = "Cannot check if an observation with ID={} is running, because the observation ID is invalid".format(obs_id) + Except.throw_exception("IllegalCommand", error, __name__) + + observation = self.running_observations.get(obs_id) + info = "An observation with ID={} is".format(obs_id) + if observation is not None: + self.debug_stream("{} running.".format(info)) + return True + self.debug_stream("{} not running.".format(info)) + return False + + @command(dtype_out = DevBoolean) + @only_when_on() + @log_exceptions() + def is_any_observation_running(self) -> DevBoolean: + return len(self.running_observations) > 0 + + +# ---------- +# Run server +# ---------- +def main(args = None, **kwargs): + """Main function of the ObservationControl module.""" + return run((ObservationControl, Observation), verbose = True, args = args, **kwargs) + + +if __name__ == '__main__': + main() diff --git a/docker-compose/device-observation_control.yml b/docker-compose/device-observation_control.yml new file mode 100644 index 0000000000000000000000000000000000000000..c3cbb19d6bcf331b9ce96fccb74c9d0d6f76b758 --- /dev/null +++ b/docker-compose/device-observation_control.yml @@ -0,0 +1,41 @@ +# +# Docker compose file that launches a LOFAR2.0 station's +# ObservationControl device. It also runs the dynamically +# created Observation devices. +# +# Defines: +# - device-observation_control: LOFAR2.0 station ObvservationControl +# +# Requires: +# - lofar-device-base.yml +# +version: '2' + +services: + device-observation_control: + image: device-observation_control + # build explicitly, as docker-compose does not understand a local image + # being shared among services. + build: + context: lofar-device-base + args: + SOURCE_IMAGE: ${DOCKER_REGISTRY_HOST}/${DOCKER_REGISTRY_USER}-tango-itango:${TANGO_ITANGO_VERSION} + container_name: ${CONTAINER_NAME_PREFIX}device-observation_control + networks: + - control + ports: + - "5703:5703" # unique port for this DS + volumes: + - ${TANGO_LOFAR_CONTAINER_MOUNT} + environment: + - TANGO_HOST=${TANGO_HOST} + entrypoint: + - /usr/local/bin/wait-for-it.sh + - ${TANGO_HOST} + - --timeout=30 + - --strict + - -- + # configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA + # can't know about our Docker port forwarding + - python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/devices/observation_control.py LTS -v -ORBendPoint giop:tcp:0:5703 -ORBendPointPublish giop:tcp:${HOSTNAME}:5703 + restart: on-failure