Skip to content
Snippets Groups Projects
Commit 29dd72ae authored by Thomas Juerges's avatar Thomas Juerges
Browse files

Merge branch 'L2SS-183-Observation_device' into 'master'

L2SS-183: Observation device

Closes L2SS-183

See merge request !40
parents 624fc1a6 96422862
No related branches found
No related tags found
1 merge request!40L2SS-183: Observation device
...@@ -7,6 +7,13 @@ ...@@ -7,6 +7,13 @@
} }
} }
}, },
"observation_control": {
"LTS": {
"ObservationControl": {
"LTS/ObservationControl/1": {}
}
}
},
"PCC": { "PCC": {
"LTS": { "LTS": {
"PCC": { "PCC": {
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# TODO(Corne): Remove sys.path.append hack once packaging is in place!
import os, sys
currentdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.dirname(currentdir)
sys.path.append(parentdir)
# PyTango imports
from tango import server, Except, DevState, AttrWriteType, DevString, DebugIt
from tango.server import Device, run, command, attribute
import numpy
from time import time
from devices.device_decorators import *
from common.lofar_logging import device_logging_to_python, log_exceptions
from common.lofar_git import get_version
from json import loads
__all__ = ["Observation", "main"]
@device_logging_to_python()
class Observation(Device):
""" Observation Device for LOFAR2.0
This Tango device is responsible for the set-up of hardware for a specific observation. It will, if necessary keep tabs on HW MPs to signal issues that are not caught by MPs being outside their nominal range.
The lifecycle of instances of this device is controlled by ObservationControl
"""
# Attributes
version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version())
observation_running_R = attribute(dtype = numpy.float, access = AttrWriteType.READ, polling_period = 1000, period = 1000, rel_change = "1.0")
observation_id_R = attribute(dtype = numpy.int64, access = AttrWriteType.READ)
stop_time_R = attribute(dtype = numpy.float, access = AttrWriteType.READ)
# Core functions
@log_exceptions()
def init_device(self):
Device.init_device(self)
self.set_state(DevState.OFF)
self._observation_id = -1
self._stop_time = 0.0
@log_exceptions()
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources
allocated in the init_device method to be released.
This method is called by the device destructor and by
the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
if self.get_state() != DevState.OFF:
self.Off()
self.debug_stream("Shut down. Good bye.")
# Lifecycle functions
@command(dtype_in = DevString)
@only_in_states([DevState.FAULT, DevState.OFF])
@log_exceptions()
def Initialise(self, parameters: DevString = None):
self.set_state(DevState.INIT)
# ObservationControl takes already good care of checking that the
# parameters are in order and sufficient. It is therefore unnecessary
# at the moment to check the parameters here again.
# This could change when the parameter check becomes depending on
# certain aspects that only an Observation device can know.
self.observation_parameters = loads(parameters)
self._observation_id = int(self.observation_parameters.get("id"))
self._stop_time = float(self.observation_parameters.get("stop_time"))
self.set_state(DevState.STANDBY)
self.info_stream("The observation with ID={} is configured. It will begin as soon as On() is called and it is supposed to stop at {}.".format(self._observation_id, self._stop_time))
@command()
@only_in_states([DevState.STANDBY])
@log_exceptions()
def On(self):
self.set_state(DevState.ON)
self.info_stream("Started the observation with ID={}.".format(self._observation_id))
@command()
@log_exceptions()
def Off(self):
self.stop_polling(True)
self.set_state(DevState.OFF)
self.info_stream("Stopped the observation with ID={}.".format(self._observation_id))
@only_when_on()
@fault_on_error()
@log_exceptions()
def read_observation_id_R(self):
"""Return the observation_id_R attribute."""
return self._observation_id
@only_when_on()
@fault_on_error()
@log_exceptions()
def read_stop_time_R(self):
"""Return the stop_time_R attribute."""
return self._stop_time
@only_when_on()
@fault_on_error()
@log_exceptions()
def read_observation_running_R(self):
"""Return the observation_running_R attribute."""
return time()
# ----------
# Run server
# ----------
def main(args = None, **kwargs):
"""Main function of the ObservationControl module."""
return run((Observation,), args = args, **kwargs)
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# TODO(Corne): Remove sys.path.append hack once packaging is in place!
import os, sys
currentdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.dirname(currentdir)
sys.path.append(parentdir)
# PyTango imports
from tango import Except, DevFailed, DevState, AttrWriteType, DebugIt, DeviceProxy, Util, DevBoolean, DevString
from tango.server import Device, run, command, device_property, attribute
from tango import EventType
import numpy
import time
from json import loads
from devices.device_decorators import *
from common.lofar_logging import device_logging_to_python, log_exceptions
from common.lofar_git import get_version
from observation import Observation
__all__ = ["ObservationControl", "main"]
@device_logging_to_python()
class ObservationControl(Device):
""" Observation Control Device Server for LOFAR2.0
The ObservationControl Tango device controls the instantiation of a Tango Dynamic Device from the Observation class. ObservationControl then keeps a record of the Observation devices and if they are still alive.
At the end of an observation ObservationControl checks if the respective Observation device has stopped its execution and releases it. If the Observation device has not stopped its execution yet, it is attempted to forcefully stop the execution of the Observation device. Then the Observation device is removed from the list of running observations.
The Observation devices are responsible for the "real" execution of an observation. They get references to the hardware devices that are needed to set values in the relevant Control Points. The Observation device performs only a check if enough parameters are available to perform the set-up.
Essentially this is what happens:
Somebody calls ObservationControl.start_observation(parameters). Then ObservationControl will perform:
- Creates a new instance of an Observation device in the Tango DB
- Call Initialise(parameters)
- Wait for initialise to return
- Check status()
- If status() is NOT STANDBY, abort with an exception
- Call On()
- Subscribe to the Observation.running MP's periodic event
- Register the observation in the dict self.running_observations[ID]
- The Observation updates the MP every second with the current time
- The callback gets called periodically. It checks if MP value > stop (stored in the dict under the obs IDS. By this it can determine if the observation is done.
- if MP value > observation end
- Remove observation ID from running obs dict
- Unsubscribe from the MP's event
- Call off()
- Remove the device from the Tango DB which will make the device disappear
This should in broad strokes pretty much cover any type of observation.
ObservationControl can expose this interface:
Functions
- Normal lifecycle funcs: initialise, on, off
- start_observation(parameters)
- stop_observation(ID)
- stop_all_observations()
- running_observations() -> dict
- is_observation_running(obs_id) -> bool
MPs
- array[int] running_observations
- string version
"""
# Attributes
version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version())
running_observations_R = attribute(dtype = (numpy.int64, ), access = AttrWriteType.READ)
# Core functions
@log_exceptions()
@DebugIt()
def init_device(self):
""" Keep all the observation objects for currently running
observations in this dict. The key is the observation idea
and the value is a dict of the parameters.
The value dict contains at least these key/value pairs:
["proxy"]: tango.DeviceProxy - the DeviceProxy for the Observation object
["event_id"]: int
["parameters"]: {parameters as passed to the start_observation call}
["stop_time"]: timestamp when the observation is supposed to stop.
["device_name"]: name of the device instance in Tango DB
["class_name"]: name of the device's class, needed to create an instance
on the fly.
"""
Device.init_device(self)
self.set_state(DevState.OFF)
self.running_observations = {}
# The pyTango.Util class is a singleton and every DS can only
# have one instance of it.
self.tango_util = Util.instance()
# Increase the number of polling threads for this device server.
self.tango_util.set_polling_threads_pool_size(10)
# The top level tango domain is the left-most part of a
# device's name.
self.myTangoDomain = self.get_name().split('/')[0]
@log_exceptions()
@DebugIt()
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources
allocated in the init_device method to be released.
This method is called by the device destructor and by
the device Init command (a Tango built-in).
"""
if self.get_state != DevState.OFF:
self.Off()
# Lifecycle functions
@command()
@only_in_states([DevState.FAULT, DevState.OFF])
@log_exceptions()
@DebugIt()
def Initialise(self):
self.set_state(DevState.INIT)
self.running_observations.clear()
self.set_state(DevState.STANDBY)
@command()
@only_in_states([DevState.STANDBY])
@log_exceptions()
@DebugIt()
def On(self):
self.set_state(DevState.ON)
@command()
@log_exceptions()
@DebugIt()
def Off(self):
if self.get_state() != DevState.OFF:
self.stop_all_observations()
self.set_state(DevState.OFF)
@command()
@log_exceptions()
@DebugIt()
def Fault(self):
stop_all_observations()
self.set_state(DevState.FAULT)
@only_when_on()
@fault_on_error()
@log_exceptions()
def read_running_observations_R(self):
obs = [ key for key in self.running_observations ]
self.debug_stream("{}".format(obs))
return obs
@log_exceptions()
def observation_running_callback(self, event):
"""
This callback checks if a running observation is still
supposed to run. If this function finds out that the
observation is not supposed to run any more, then
self.stop_observation(obs_id) is called which takes care of the
clean up.
"""
if event.err:
# Something is fishy with this event.
self.warn_stream("The Observation device {} sent an event but the event signals an error. It is advised to check the logs for any indication that something went wrong in that device. Event data={} ".format(event.device, event))
return
# Get the Observation ID from the sending device.
obs_id = event.device.observation_id_R
# Check if the observation is still supposed to run.
running_obs = self.running_observations.copy()
if not running_obs:
# No obs is running???
self.warn_stream("Received an observation_running event for the observation with ID={}. According to the records in ObservationControl, this observation is not supposed to run. Please check previous logs, especially around the time an observation with this ID was started. Will continue and ignore this event.".format(obs_id))
return
if id in running_obs:
# Get the Observation's stop_time from the Observation device.
obs_stop_time = event.device.stop_time_R
current_obs_time = event.attr_value.value
# I expect that current_obs_time is always
# smaller than the stop time that I read from my
# records.
delta_t = obs_stop_time - current_obs_time
if delta_t < 0.0:
# The observation has not finished yet and is
# more than 1.0 seconds past its scheduled stop
# time. Tell the observation to finish and clean up.
obs = running_obs[id]
self.stop_observation(obs_id)
@only_when_on()
@log_exceptions()
def check_and_convert_parameters(self, parameters: DevString = None) -> dict:
"""
The parameters are passed as JSON in a char[].
Conveniently json.loads() creates a dict from the input.
:param parameters: The parameters as JSON
:type parameters: DevVarUCharArray
:return: None
"""
# Convert the input parameter to a dict.
parameter_dict = loads(parameters)
self.debug_stream("incoming parameter_array = {}, parameter_dict = {}".format(parameters, parameter_dict))
# Parameter check, do not execute an observation in case
# the parameters are not sufficiently defined.
obs_id = int(parameter_dict.get("obs_id"))
stop_time = float(parameter_dict.get("stop_time"))
# TODO: Once ticket https://support.astron.nl/jira/browse/L2SS-254 is
# done, this needs to be replaced by a proper JSON verification
# against a schema.
if obs_id is None or obs_id < 1:
# Do not execute
error = "Cannot start an observation with ID={} because the observation ID is invalid. The ID must be any integer >= 1.".format(obs_id)
Except.throw_exception("IllegalCommand", error, __name__)
elif stop_time is None or stop_time <= time.time():
error = "Cannot start an observation with ID={} because the parameter stop_time parameter value=\"{}\" is invalid. It needs to be expressed as the number of seconds since the Unix epoch.".format(obs_id, stop_time)
Except.throw_exception("IllegalCommand", error, __name__)
elif len(parameters) == 0:
error = "Cannot start an observation with ID={} because the parameter set is empty.".format(obs_id)
Except.throw_exception("IllegalCommand", error, __name__)
return parameter_dict
def delete_dynamic_device(self, class_name: str = None, device_name: str = None):
"""
Remove a Tango device from the Tango DB. This calls delete_device().
"""
if class_name is not None and device_name is not None:
try:
# Remove the device from the Tango DB.
self.tango_util.delete_device(class_name, device_name)
except DevFailed as ex:
# It is OK if this fails. This likely means that the device did
# never exist in the Tango DB. Still add a warning to the logs.
self.warn_stream("Something went wrong when it was attempted to remove the device {} from the Tango DB. You should better go and check the logs. Exception: {}".format(device_name, ex))
pass
else:
self.error_stream("Cannot delete a device from the Tango DB if the device's class name or the device name are not provided: class_name={}, device_name={}".format(class_name, device_name))
def create_dynamic_device(self, class_name: str = None, device_name: str = None):
"""
Create a Tango device instance for a Device class in the Tango DB.
This will automatically instantiate the device and also call
init_device.
"""
try:
self.tango_util.create_device(class_name, device_name)
except DevFailed as ex:
self.delete_dynamic_device(class_name, device_name)
error_string = "Cannot start the device {} for the device class {}. Exception: {}".format(device_name, class_name, ex)
self.error_stream("{}, {}".format(error_string, ex))
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
# API
@command(dtype_in = DevString)
@only_when_on()
@log_exceptions()
def start_observation(self, parameters: DevString = None):
# Store everything about the observation in this dict. I store this
# dict at the end in self.running_observations.
observation = {"parameters": self.check_and_convert_parameters(parameters)}
obs_id = int(observation["parameters"].get("obs_id"))
# The class name of the Observation class is needed to create and
# delete the device.
class_name = Observation.__name__
observation["class_name"] = class_name
# Generate the Tango DB name for the Observation device.
device_name = "{}/{}/{}".format(self.myTangoDomain, class_name, obs_id)
observation["device_name"] = device_name
try:
# Create the Observation device and instantiate it.
self.create_dynamic_device(class_name, device_name)
except DevFailed as ex:
error_string = "Cannot create the Observation device instance {} for ID={}. This means that the observation did not start.".format(device_name, obs_id)
self.error_stream(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
try:
# Instantiate a dynamic Tango Device "Observation".
device_proxy = DeviceProxy(device_name)
observation["device_proxy"] = device_proxy
# Take the Observation device through the motions. Pass the
# entire JSON set of parameters so that it can pull from it what it
# needs.
device_proxy.Initialise(parameters)
# The call to On will actually tell the Observation device to
# become fully active.
device_proxy.On()
except DevFailed as ex:
# Remove the device again.
self.delete_dynamic_device(class_name, device_name)
error_string = "Cannot access the Observation device instance for observation ID={} with device class name={} and device instance name={}. This means that the observation likely did not start but certainly cannot be controlled and/or forcefully be stopped.".format(obs_id, class_name, device_name)
self.error_stream("{}, {}".format(error_string, ex))
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
try:
# Subscribe to the obs.observation_running MP
#
# Generate the name for the Observation.observation_running
# MP.
attribute_name = "{}/observation_running_R".format(device_name)
observation["attribute_name"] = attribute_name
# Turn on the polling for the attribute.
# Note that this is not automatically done despite the attribute
# having the right polling values set in the ctor.
device_proxy.poll_attribute(attribute_name.split('/')[-1], 1000)
# Note: I update the running_observations dict already here because
# the addition of an event listener immediately triggers that
# event. And since the call back checks if the obs_id is in the dict
# this triggers an error message if the ID is not already known.
# There is no harm in copying the dict twice.
self.running_observations[obs_id] = observation
# Right. Now subscribe to periodic events.
event_id = device_proxy.subscribe_event(attribute_name.split('/')[-1], EventType.PERIODIC_EVENT, self.observation_running_callback)
observation["event_id"] = event_id
# Finally update the self.running_observation dict's entry of this
# observation with the complete set of info.
self.running_observations[obs_id] = observation
self.info_stream("Successfully started an observation with ID={}.".format(obs_id))
except DevFailed as ex:
self.delete_dynamic_device(class_name, device_name)
error_string = "Cannot access the Observation device instance for observation ID={} with device class name={} and device instance name={}. This means that the observation cannot be controlled and/or forcefully be stopped.".format(obs_id, Observation.__name__, device_name)
self.error_stream("{}, {}".format(error_string, ex))
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
@command(dtype_in = numpy.int64)
@only_when_on()
@log_exceptions()
def stop_observation(self, obs_id: numpy.int64 = 0):
# Parameter check, do not execute an observation in case
# the parameters are not sufficient.
if obs_id < 1:
# Do not execute
error = "Cannot stop an observation with ID={}, because the observation ID is invalid.".format(obs_id)
Except.throw_exception("IllegalCommand", error, __name__)
elif self.is_observation_running(obs_id) is False:
error = "Cannot stop an observation with ID={}, because the observation is not running.".format(obs_id)
Except.throw_exception("IllegalCommand", error, __name__)
self.info_stream("Stopping the observation with ID={}.".format(obs_id))
# Fetch the obs data and remove it from the dict of
# currently running observations.
observation = self.running_observations.pop(obs_id)
device_proxy = observation.pop("device_proxy")
# Check if the device has not terminated itself in the meanwhile.
try:
device_proxy.ping()
except DevFailed:
self.warn_stream("The device for the Observation with ID={} has unexpectedly already disappeared. It is advised to check the logs up to 10s prior to this message to see what happened.".format(obs_id))
else:
# Unsubscribe from the subscribed event.
event_id = observation.pop("event_id")
device_proxy.unsubscribe_event(event_id)
# Tell the Observation device to stop the running
# observation. This is a synchronous call and the clean-up
# does not take long.
device_proxy.Off()
# Wait for 1s for the Observation device to go to
# DevState.OFF. Force shutdown if observation.state() is
# not OFF.
remaining_wait_time = 1.0
sleep_time = 0.1
stopped = False
while remaining_wait_time > 0.0:
if device_proxy.state() is DevState.OFF:
stopped = True
break
time.sleep(sleep_time)
remaining_wait_time = remaining_wait_time - sleep_time
# Check if the observation object is really in OFF state.
if stopped:
self.info_stream("Successfully stopped the observation with ID={}.".format(obs_id))
else:
self.warn_stream("Could not shut down the Observation device (\"{}\") for observation ID={}. This means that there is a chance for a memory leak. Will continue anyway and forcefully delete the Observation object.".format(observation["device_name"], obs_id))
# Finally remove the device object from the Tango DB.
try:
self.delete_dynamic_device(observation["class_name"], observation["device_name"])
except DevFailed:
self.warn_stream("Something went wrong when the device {} was removed from the Tango DB. There is nothing that can be done about this here at this moment but you should check the Tango DB yourself.".format(observation["device_name"]))
@command()
@only_when_on()
@log_exceptions()
def stop_all_observations(self):
# Make a copy of the running_observations dict. This
# should prevent race conditions.
if self.is_any_observation_running():
# Make certain that the dict does not get modified
# while I am iterating over it.
active_obs = self.running_observations.copy()
for obs_id in active_obs.keys():
self.stop_observation(obs_id)
@command(dtype_in = numpy.int64, dtype_out = DevBoolean)
@only_when_on()
@log_exceptions()
def is_observation_running(self, obs_id: numpy.int64 = -1) -> DevBoolean:
# Parameter check, do not execute if obs_id is invalid
if obs_id < 1:
# Do not execute
error = "Cannot check if an observation with ID={} is running, because the observation ID is invalid".format(obs_id)
Except.throw_exception("IllegalCommand", error, __name__)
observation = self.running_observations.get(obs_id)
info = "An observation with ID={} is".format(obs_id)
if observation is not None:
self.debug_stream("{} running.".format(info))
return True
self.debug_stream("{} not running.".format(info))
return False
@command(dtype_out = DevBoolean)
@only_when_on()
@log_exceptions()
def is_any_observation_running(self) -> DevBoolean:
return len(self.running_observations) > 0
# ----------
# Run server
# ----------
def main(args = None, **kwargs):
"""Main function of the ObservationControl module."""
return run((ObservationControl, Observation), verbose = True, args = args, **kwargs)
if __name__ == '__main__':
main()
#
# Docker compose file that launches a LOFAR2.0 station's
# ObservationControl device. It also runs the dynamically
# created Observation devices.
#
# Defines:
# - device-observation_control: LOFAR2.0 station ObvservationControl
#
# Requires:
# - lofar-device-base.yml
#
version: '2'
services:
device-observation_control:
image: device-observation_control
# build explicitly, as docker-compose does not understand a local image
# being shared among services.
build:
context: lofar-device-base
args:
SOURCE_IMAGE: ${DOCKER_REGISTRY_HOST}/${DOCKER_REGISTRY_USER}-tango-itango:${TANGO_ITANGO_VERSION}
container_name: ${CONTAINER_NAME_PREFIX}device-observation_control
networks:
- control
ports:
- "5703:5703" # unique port for this DS
volumes:
- ${TANGO_LOFAR_CONTAINER_MOUNT}
environment:
- TANGO_HOST=${TANGO_HOST}
entrypoint:
- /usr/local/bin/wait-for-it.sh
- ${TANGO_HOST}
- --timeout=30
- --strict
- --
# configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA
# can't know about our Docker port forwarding
- python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/devices/observation_control.py LTS -v -ORBendPoint giop:tcp:0:5703 -ORBendPointPublish giop:tcp:${HOSTNAME}:5703
restart: on-failure
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment