Skip to content
Snippets Groups Projects
Commit eaf164c7 authored by Hannes Feldt's avatar Hannes Feldt
Browse files

L2SS-1019: Create and use obeservation control JSON schema

parent f3d1efdc
No related branches found
No related tags found
1 merge request!471L2SS-1019: Create and use obeservation control JSON schema
Showing
with 792 additions and 376 deletions
......@@ -23,6 +23,7 @@ tangostationcontrol/docs/build
**/coverage.xml
**/.coverage
**/.coverage*
**/.ipynb_checkpoints
**/pending_log_messages.db
**/.eggs
......
......@@ -56,3 +56,5 @@ services:
restart: unless-stopped
stop_signal: SIGINT # request a graceful shutdown of Tango
stop_grace_period: 2s
depends_on:
- schemas
#
# Docker compose file that launches a LOFAR2.0 station's
# ObservationControl device. It also runs the dynamically
# created Observation devices.
#
# Defines:
# - schemas: LOFAR2.0 station
#
version: '2.1'
services:
schemas:
build:
context: schemas
container_name: ${CONTAINER_NAME_PREFIX}schemas
networks:
- control
# set the hostname, otherwise duplicate device registrations result every
# time the hostname changes as the container is restarted.
hostname: schemas
environment:
- NGINX_HOST=schemas
- NGINX_PORT=80
ports:
- 9999:80
logging:
driver: "json-file"
options:
max-size: "100m"
max-file: "10"
restart: unless-stopped
\ No newline at end of file
FROM nginx
COPY definitions /usr/share/nginx/html
\ No newline at end of file
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"observation_id",
"stop_time",
"antenna_mask",
"filter",
"SAPs"
],
"properties": {
"observation_id": {
"type": "number",
"minimum": 1
},
"stop_time": {
"type": "string",
"format": "date-time"
},
"antenna_mask": {
"type": "array",
"uniqueItems": true,
"minItems": 1,
"items": {
"type": "number"
}
},
"filter": {
"type": "string",
"enum": [
"LBA_10_90",
"LBA_10_70",
"LBA_30_90",
"LBA_30_70",
"HBA_170_230",
"HBA_110_190",
"HBA_210_250"
]
},
"SAPs": {
"type": "array",
"minItems": 1,
"items": {
"$ref": "sap.json"
}
},
"tile_beam": {
"$ref": "pointing.json"
},
"first_beamlet": {
"type": "number",
"default": 0,
"minimum": 0
}
}
}
\ No newline at end of file
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"angle1",
"angle2",
"direction_type"
],
"properties": {
"angle1": {
"default": 0.6624317181687094,
"description": "First angle (e.g. RA)",
"title": "Angle 1",
"type": "number"
},
"angle2": {
"default": 1.5579526427549426,
"description": "Second angle (e.g. DEC)",
"title": "Angle 2",
"type": "number"
},
"direction_type": {
"default": "J2000",
"description": "",
"enum": [
"J2000",
"AZELGEO",
"LMN",
"SUN",
"MOON",
"MERCURY",
"VENUS",
"MARS",
"JUPITER",
"SATURN",
"URANUS",
"NEPTUNE",
"PLUTO"
],
"title": "Reference frame",
"type": "string"
}
}
}
\ No newline at end of file
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"subbands",
"pointing"
],
"properties": {
"subbands": {
"type": "array",
"minItems": 1,
"items": {
"type": "number"
}
},
"pointing": {
"$ref": "pointing.json"
}
}
}
\ No newline at end of file
......@@ -27,7 +27,7 @@ function integration_test {
echo "Updating config ${config} ..."
bash "${LOFAR20_DIR}"/sbin/update_ConfigDb.sh "${config}"
done
if [ ! -z "${2+x}" ]; then
if [ -n "${2+x}" ]; then
# shellcheck disable=SC2145
echo "make restart ${restarts[@]} ..."
make restart "${restarts[@]}"
......@@ -76,28 +76,27 @@ sleep 1 # dsconfig container must be up and running...
# shellcheck disable=SC2016
echo '/usr/local/bin/wait-for-it.sh ${TANGO_HOST} --strict --timeout=300 -- true' | make run dsconfig bash -
# Devices list is used to explitly word split when supplied to commands, must
# disable shellcheck SC2086 for each case.
DEVICES="device-boot device-apsct device-ccd device-apspu device-sdp device-recv device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-antennafield device-temperature-manager device-observation device-observation-control"
DEVICES=(device-boot device-apsct device-ccd device-apspu device-sdp device-recv device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-antennafield device-temperature-manager device-observation device-observation-control)
SIMULATORS="sdptr-sim recv-sim unb2-sim apsct-sim apspu-sim ccd-sim"
SIMULATORS=(sdptr-sim recv-sim unb2-sim apsct-sim apspu-sim ccd-sim)
# Build only the required images, please do not build everything that makes CI
# take really long to finish, especially grafana / jupyter / prometheus.
# jupyter is physically large > 2.5gb and overlayfs is really slow.
# shellcheck disable=SC2086
make build $DEVICES $SIMULATORS
make build "${DEVICES[@]}" "${SIMULATORS[@]}"
# make build elk integration-test # L2SS-970: elk temporarily disabled
make build logstash integration-test
make build logstash integration-test schemas
make build archiver-timescale hdbppts-cm hdbppts-es
# Start and stop sequence
# shellcheck disable=SC2086
make stop $DEVICES $SIMULATORS hdbppts-es hdbppts-cm archiver-timescale
make stop schemas
make stop "${DEVICES[@]}" "${SIMULATORS[@]}" hdbppts-es hdbppts-cm archiver-timescale
make stop device-docker # this one does not test well in docker-in-docker
# make stop elk # L2SS-970: elk temporarily disabled
make stop logstash
make stop logstash schemas
# Run dummy integration test to install pytango in tox virtualenv without
# the memory pressure of the ELK stack.
......@@ -108,7 +107,7 @@ make stop logstash
integration_test dummy
# make start elk # L2SS-970: elk temporarily disabled
make start logstash
make start logstash schemas
# Update the dsconfig
# Do not remove `bash`, otherwise statement ignored by gitlab ci shell!
......@@ -120,19 +119,18 @@ bash "${LOFAR20_DIR}"/sbin/update_ConfigDb.sh "${LOFAR20_DIR}"/CDB/stations/dumm
cd "$LOFAR20_DIR/docker-compose" || exit 1
# shellcheck disable=SC2086
make start $SIMULATORS
make start "${SIMULATORS[@]}"
# Give the simulators time to start
sleep 5
# shellcheck disable=SC2086
make start $DEVICES
make start "${DEVICES[@]}"
# Archive devices: archive-timescale first
make start archiver-timescale
# Wait for archiver and devices to restart
# shellcheck disable=SC2086
make await archiver-timescale $DEVICES
make await archiver-timescale "${DEVICES[@]}"
# Give archiver-timescale time to start
# shellcheck disable=SC2016
......
......@@ -12,7 +12,7 @@ psycopg2-binary >= 2.9.2 # LGPL
sqlalchemy >= 1.4.26 # MIT
pysnmp >= 0.1.7 # BSD
h5py >= 3.1.0 # BSD
jsonschema >= 3.2.0 # MIT
jsonschema >= 4.0.0 # MIT
psutil >= 5.8.0 # BSD
docker >= 5.0.3 # Apache 2
python-logstash-async >= 2.3.0 # MIT
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from .observation_controller import ObservationController
__all__ = ['ObservationController', ]
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import logging
import time
from datetime import datetime
from tango import DevFailed, DevState, Except, Util, EventType, DeviceProxy
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.configuration import ObservationSettings
logger = logging.getLogger()
class RunningObservation(object):
@property
def observation_id(self) -> int:
return self._parameters.observation_id
@property
def class_name(self) -> str:
from tangostationcontrol.devices.observation import Observation
return Observation.__name__
@property
def device_name(self) -> str:
return f"{self._tango_domain}/{self.class_name}/{self.observation_id}"
# Name for the Observation.observation_running subscription
@property
def attribute_name(self) -> str:
return f"{self.device_name}/observation_running_R"
def __init__(self, tango_domain, parameters: ObservationSettings):
self._device_proxy: DeviceProxy | None = None
self._event_id: int | None = None
self._parameters: ObservationSettings = parameters
self._tango_domain: str = tango_domain
# The pyTango.Util class is a singleton and every DS can only
# have one instance of it.
self._tango_util: Util = Util.instance()
def create_tango_device(self):
logger.info(f"Create device: {self.device_name}")
try:
# Create the Observation device and instantiate it.
self._tango_util.create_device(self.class_name, f"{self.device_name}")
except DevFailed as ex:
logger.exception(ex)
if ex.args[0].desc == f"The device {self.device_name.lower()} is already defined in the database":
# and self.is_observation_running(self.observation_id) is False:
self._tango_util.delete_device(self.class_name, self.device_name)
error_string = f"Cannot create the Observation device {self.device_name} because it is already present in the Database but it is not running. Try to re-run the start_observation command"
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
else:
error_string = f"Cannot create the Observation device instance {self.device_name} for ID={self.observation_id}. This means that the observation did not start."
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
def create_device_proxy(self):
# Instantiate a dynamic Tango Device "Observation".
self._device_proxy = DeviceProxy(self.device_name)
# Configure the dynamic device its attribute for the observation
# parameters.
self._device_proxy.observation_settings_RW = self._parameters.to_json()
# Take the Observation device through the motions. Pass the
# entire JSON set of parameters so that it can pull from it what it
# needs.
self._device_proxy.Initialise()
# The call to On will actually tell the Observation device to
# become fully active.
self._device_proxy.On()
def subscribe(self, cb):
# Turn on the polling for the attribute.
# Note that this is not automatically done despite the attribute
# having the right polling values set in the ctor.
self._device_proxy.poll_attribute(self.attribute_name.split('/')[-1], 1000)
# Right. Now subscribe to periodic events.
self._event_id = self._device_proxy.subscribe_event(self.attribute_name.split('/')[-1],
EventType.PERIODIC_EVENT,
cb)
logger.info(f"Successfully started an observation with ID={self.observation_id}.")
def shutdown(self):
# Check if the device has not terminated itself in the meanwhile.
try:
self._device_proxy.ping()
except DevFailed:
logger.warning(
f"The device for the Observation with ID={self.observation_id} has unexpectedly already disappeared. It is advised to check the logs up to 10s prior to this message to see what happened.")
else:
# Unsubscribe from the subscribed event.
event_id = self._event_id
self._device_proxy.unsubscribe_event(event_id)
# Tell the Observation device to stop the running
# observation. This is a synchronous call and the clean-up
# does not take long.
self._device_proxy.Off()
# Wait for 1s for the Observation device to go to
# DevState.OFF. Force shutdown if observation.state() is
# not OFF.
remaining_wait_time = 1.0
sleep_time = 0.1
stopped = False
while remaining_wait_time > 0.0:
if self._device_proxy.state() is DevState.OFF:
stopped = True
break
time.sleep(sleep_time)
remaining_wait_time = remaining_wait_time - sleep_time
# Check if the observation object is really in OFF state.
if stopped:
logger.info(f"Successfully stopped the observation with ID={self.observation_id}")
else:
logger.warning(
f"Could not shut down the Observation device ( {self.device_name} ) for observation ID={self.observation_id}. This means that there is a chance for a memory leak. Will continue anyway and forcefully delete the Observation object.")
# Finally remove the device object from the Tango DB.
try:
self._tango_util.delete_device(self.class_name, self.device_name)
except DevFailed:
logger.warning(
f"Something went wrong when the device {self.device_name} was removed from the Tango DB. There is nothing that can be done about this here at this moment but you should check the Tango DB yourself.")
class ObservationController(object):
@property
def running_observations(self) -> [int]:
return list(self._running_observations.keys())
def __init__(self, tango_domain: str):
self._tango_util = Util.instance()
self._tango_domain = tango_domain
self._running_observations: dict[int, RunningObservation] = {}
def is_any_observation_running(self):
return len(self._running_observations) > 0
def is_observation_running(self, obs_id):
observation = self._running_observations.get(obs_id)
return observation is not None
@log_exceptions()
def observation_running_callback(self, event):
"""
This callback checks if a running observation is still
supposed to run. If this function finds out that the
observation is not supposed to run any more, then
self.stop_observation(obs_id) is called which takes care of the
clean up.
"""
if event.err:
# Something is fishy with this event.
logger.warning(
f"The Observation device {event.device} sent an event but the event signals an error. It is advised to check the logs for any indication that something went wrong in that device. Event data={event}")
return
# Get the Observation ID from the sending device.
obs_id = event.device.observation_id_R
# Check if the observation is still supposed to run.
running_obs = self._running_observations.copy()
if not running_obs:
# No obs is running???
logger.warning(
f"Received an observation_running event for the observation with ID={obs_id}. According to the records in ObservationControl, this observation is not supposed to run. Please check previous logs, especially around the time an observation with this ID was started. Will continue and ignore this event.")
return
if obs_id in running_obs:
# Get the Observation's stop_time from the Observation device.
obs_stop_time = event.device.stop_time_R
current_obs_time = event.attr_value.value
# I expect that current_obs_time is always
# smaller than the stop time that I read from my
# records.
delta_t = obs_stop_time - current_obs_time
if delta_t < 0.0:
# The observation has not finished yet and is
# more than 1.0 seconds past its scheduled stop
# time. Tell the observation to finish and clean up.
obs = running_obs[obs_id]
self.stop_observation(obs_id)
else:
# The observation that we are trying to process is not part of the running_obs dictionary
logger.warning(
f"Received an observation_running event for the observation with ID={obs_id}. According to the records in ObservationControl, this observation is not supposed to run. Please check previous logs, especially around the time an observation with this ID was started. Will continue and ignore this event.")
return
def start_observation(self, settings: ObservationSettings):
# Check further properties that cannot be validated through a JSON schema
if settings.stop_time <= datetime.now():
error = f"Cannot start an observation with ID={settings.observation_id} because the parameter stop_time parameter value=\"{settings.stop_time}\" is invalid. Set a stop_time parameter later in time than the start time."
Except.throw_exception("IllegalCommand", error, __name__)
obs = RunningObservation(self._tango_domain, settings)
obs.create_tango_device()
try:
obs.create_device_proxy()
except DevFailed as ex:
# Remove the device again.
self._tango_util.delete_device(obs.class_name, obs.device_name)
error_string = f"Cannot access the Observation device instance for observation ID={obs.observation_id} with device class name={obs.class_name} and device instance name={obs.device_name}. This means that the observation likely did not start but certainly cannot be controlled and/or forcefully be stopped."
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
# Finally update the self.running_observation dict's entry of this
# observation with the complete set of info.
self._running_observations[obs.observation_id] = obs
try:
obs.subscribe(self.observation_running_callback)
except DevFailed as ex:
self._tango_util.delete_device(obs.class_name, obs.device_name)
error_string = "Cannot access the Observation device instance for observation ID=%s with device class name=%s and device instance name=%s. This means that the observation cannot be controlled and/or forcefully be stopped."
logger.exception(error_string, obs.observation_id, obs.class_name, obs.device_name)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
def stop_observation(self, obs_id):
if self.is_observation_running(obs_id) is False:
error = f"Cannot stop an observation with ID={obs_id}, because the observation is not running."
Except.throw_exception("IllegalCommand", error, __name__)
# Fetch the obs data and remove it from the dict of
# currently running observations.
observation = self._running_observations.pop(obs_id)
observation.shutdown()
def stop_all_observations(self):
for obs_id in self._running_observations.copy().keys():
self.stop_observation(obs_id)
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from .observation_settings import ObservationSettings
from .pointing import Pointing
from .sap import Sap
__all__ = ['ObservationSettings', 'Pointing', 'Sap', ]
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from typing import Type
from jsonschema.exceptions import ValidationError
def _from_json_hook_t(primary: Type):
from tangostationcontrol.configuration import Pointing, Sap, ObservationSettings
def actual_hook(json_dct):
primary_ex = None
for t in [Pointing, Sap, ObservationSettings]:
try:
t.get_validator().validate(json_dct)
except ValidationError as ex:
if t is primary:
primary_ex = ex
pass
else:
return t.to_object(json_dct)
if primary_ex:
raise primary_ex
return None
return actual_hook
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import json
import re
import time
from abc import ABC, abstractmethod
from typing import TypeVar, Type
import jsonschema
import requests
from jsonschema import Draft7Validator, FormatChecker, ValidationError
from jsonschema.validators import RefResolver
from tangostationcontrol.configuration._json_parser import _from_json_hook_t
T = TypeVar('T')
def _fetch_url(url):
attempt_nr = 1
while True:
try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
time.sleep(2) # retry after a little sleep
if attempt_nr >= 5:
raise e
else:
attempt_nr += 1
class RetryHttpRefResolver(RefResolver):
def resolve_remote(self, uri):
result = _fetch_url(uri)
if self.cache_remote:
self.store[uri] = result
return result
def _is_object(_, instance):
return isinstance(instance, dict) or issubclass(type(instance), _ConfigurationBase)
jsonschema.validators.Draft7Validator.TYPE_CHECKER = Draft7Validator.TYPE_CHECKER.redefine(
"object", _is_object,
)
class _ConfigurationBase(ABC):
BASE_URL = "http://schemas/"
@staticmethod
def _class_to_url(cls_name):
return re.sub(r'(?<!^)(?=[A-Z])', '-', cls_name).lower()
@classmethod
def get_validator(cls):
name = cls.__name__
url = f"{_ConfigurationBase.BASE_URL}{_ConfigurationBase._class_to_url(name)}.json"
resolver = RetryHttpRefResolver(base_uri=_ConfigurationBase.BASE_URL, referrer=url)
_, resolved = resolver.resolve(url)
return Draft7Validator(resolved, format_checker=FormatChecker(), resolver=resolver)
@abstractmethod
def __iter__(self):
pass
def __str__(self):
return json.dumps(dict(self), ensure_ascii=False)
def __repr__(self):
return self.__str__()
# required for jsonschema validation
def __getitem__(self, item):
return getattr(self, item)
# required for jsonschema validation
def __contains__(self, item):
return hasattr(self, item) and getattr(self, item) is not None
def to_json(self):
return self.__str__()
@staticmethod
@abstractmethod
def to_object(json_dct) -> T:
pass
@classmethod
def from_json(cls: Type[T], data: str) -> T:
s = json.loads(data, object_hook=_from_json_hook_t(cls))
if not isinstance(s, cls):
raise ValidationError(f"Unexpected type: expected <{cls.__class__.__name__}>, got <{type(s).__name__}>")
return s
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from datetime import datetime
from typing import Sequence
from tangostationcontrol.configuration.configuration_base import _ConfigurationBase
from tangostationcontrol.configuration.pointing import Pointing
from tangostationcontrol.configuration.sap import Sap
class ObservationSettings(_ConfigurationBase):
def __init__(self, observation_id: int, stop_time: datetime, antenna_mask: Sequence[int], filter: str,
SAPs: Sequence[Sap],
tile_beam: Pointing = None, first_beamlet: int = 0):
self.observation_id = observation_id
self.stop_time = stop_time
self.antenna_mask = antenna_mask
self.filter = filter
self.SAPs = SAPs
self.tile_beam = tile_beam
self.first_beamlet = first_beamlet
def __iter__(self):
yield from {
"observation_id": self.observation_id,
"stop_time" : self.stop_time.isoformat(),
"antenna_mask" : self.antenna_mask,
"filter" : self.filter,
"SAPs" : [dict(s) for s in self.SAPs]
}.items()
if self.tile_beam:
yield "tile_beam", dict(self.tile_beam)
yield "first_beamlet", self.first_beamlet
@staticmethod
def to_object(json_dct) -> 'ObservationSettings':
return ObservationSettings(json_dct['observation_id'], datetime.fromisoformat(json_dct['stop_time']),
json_dct['antenna_mask'],
json_dct['filter'], json_dct['SAPs'],
json_dct['tile_beam'] if 'tile_beam' in json_dct else None,
json_dct['first_beamlet'] if 'first_beamlet' in json_dct else 0)
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tangostationcontrol.configuration.configuration_base import _ConfigurationBase
class Pointing(_ConfigurationBase):
VALIDATOR = None
def __init__(self, angle1=0.6624317181687094, angle2=1.5579526427549426, direction_type="J2000"):
self.angle1 = angle1
self.angle2 = angle2
self.direction_type = direction_type
def __iter__(self):
yield from {
"angle1" : self.angle1,
"angle2" : self.angle2,
"direction_type": self.direction_type
}.items()
@staticmethod
def to_object(json_dct) -> 'Pointing':
return Pointing(json_dct['angle1'], json_dct['angle2'], json_dct['direction_type'])
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tangostationcontrol.configuration.configuration_base import _ConfigurationBase
from tangostationcontrol.configuration.pointing import Pointing
class Sap(_ConfigurationBase):
def __init__(self, subbands: [int], pointing: Pointing):
self.subbands = subbands
self.pointing = pointing
def __iter__(self):
yield from {
"subbands": self.subbands,
"pointing": dict(self.pointing)
}.items()
@staticmethod
def to_object(json_dct) -> 'Sap':
return Sap(json_dct['subbands'], json_dct['pointing'])
......@@ -4,27 +4,26 @@
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import logging
from time import time
from typing import Optional
import numpy
from jsonschema.exceptions import ValidationError
# PyTango imports
from tango import AttrWriteType, DeviceProxy, DevState, DevSource, Util
from tango import AttrWriteType, DeviceProxy, DevState, DevSource, Util, Except
from tango.server import attribute
import numpy
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD, MAX_ANTENNA, N_beamlets_ctrl, N_point_prop
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD, MAX_ANTENNA, N_beamlets_ctrl, N_point_prop
from tangostationcontrol.configuration import ObservationSettings
from tangostationcontrol.devices.device_decorators import fault_on_error
from tangostationcontrol.devices.device_decorators import only_when_on
from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.devices.device_decorators import only_when_on
from tangostationcontrol.devices.lofar_device import lofar_device
from datetime import datetime
from json import loads
from time import time
import logging
logger = logging.getLogger()
__all__ = ["Observation", "main"]
......@@ -40,60 +39,58 @@ class Observation(lofar_device):
The lifecycle of instances of this device is controlled by ObservationControl
"""
# Attributes
observation_running_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ, polling_period=DEFAULT_POLLING_PERIOD, period=DEFAULT_POLLING_PERIOD,
observation_running_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ,
polling_period=DEFAULT_POLLING_PERIOD, period=DEFAULT_POLLING_PERIOD,
rel_change="1.0")
observation_id_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ)
stop_time_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ)
antenna_mask_R = attribute(dtype=(numpy.int64,), max_dim_x=MAX_ANTENNA, access=AttrWriteType.READ)
filter_R = attribute(dtype=numpy.str, access=AttrWriteType.READ)
saps_subband_R = attribute(dtype=((numpy.uint32,),), max_dim_x=N_beamlets_ctrl, max_dim_y=N_beamlets_ctrl, access=AttrWriteType.READ)
saps_pointing_R = attribute(dtype=((numpy.str,),), max_dim_x=N_point_prop, max_dim_y=N_beamlets_ctrl, access=AttrWriteType.READ)
saps_subband_R = attribute(dtype=((numpy.uint32,),), max_dim_x=N_beamlets_ctrl, max_dim_y=N_beamlets_ctrl,
access=AttrWriteType.READ)
saps_pointing_R = attribute(dtype=((numpy.str,),), max_dim_x=N_point_prop, max_dim_y=N_beamlets_ctrl,
access=AttrWriteType.READ)
tile_beam_R = attribute(dtype=(numpy.str,), max_dim_x=N_point_prop, access=AttrWriteType.READ)
first_beamlet_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ)
observation_settings_RW = attribute(dtype=str, access=AttrWriteType.READ_WRITE)
def __init__(self, cl, name):
super().__init__(cl, name)
self.recv_proxy: Optional[DeviceProxy] = None
self.antennafield_proxy: Optional[DeviceProxy] = None
self.beamlet_proxy: Optional[DeviceProxy] = None
self.digitalbeam_proxy: Optional[DeviceProxy] = None
self.tilebeam_proxy: Optional[DeviceProxy] = None
self._observation_settings: Optional[ObservationSettings] = None
self._num_inputs: int = 0
def init_device(self):
"""Setup some class member variables for observation state"""
super().init_device()
self._observation_settings = loads("{}")
self._observation_id = -1
self._stop_time = datetime.now()
self._num_inputs = 0
def configure_for_initialise(self):
"""Load the JSON from the attribute and configure member variables"""
super().configure_for_initialise()
if self._observation_settings is None:
Except.throw_exception("IllegalCommand", "Device can not be initialized without configuration", __name__)
# ObservationControl takes already good care of checking that the
# parameters are in order and sufficient. It is therefore unnecessary
# at the moment to check the parameters here again.
# This could change when the parameter check becomes depending on
# certain aspects that only an Observation device can know.
parameters = loads(self._observation_settings)
self._observation_id = parameters["observation_id"]
self._stop_time = datetime.fromisoformat(parameters["stop_time"])
self._antenna_mask = parameters["antenna_mask"]
self._filter = parameters["filter"]
self._num_saps = len(parameters["SAPs"])
self._saps_subband = [ parameters["SAPs"][i]['subbands'] for i in range(0, self._num_saps)]
self._saps_pointing = self._build_saps_pointing(parameters)
self._tile_beam = self._build_tilebeam_pointing(parameters)
self._first_beamlet = parameters["first_beamlet"]
self._num_saps = len(self._observation_settings.SAPs)
self._saps_pointing = self._build_saps_pointing(self._observation_settings)
# Set a reference of AntennaField device that is correlated to this device
util = Util.instance()
# TODO(Stefano): set a proper policy for the devices instance number
# It cannot be inherited from the Observation instance number (i.e. Observation_id)
self.antennafield_proxy = DeviceProxy(
f"{util.get_ds_inst_name()}/AntennaField/1")
self.antennafield_proxy = DeviceProxy(f"{util.get_ds_inst_name()}/AntennaField/1")
self.antennafield_proxy.set_source(DevSource.DEV)
# Set a reference of RECV device that is correlated to this device
......@@ -116,16 +113,16 @@ class Observation(lofar_device):
self._num_inputs = self.digitalbeam_proxy.antenna_select_RW.shape[0]
logger.info(
f"The observation with ID={self._observation_id} is "
f"The observation with ID={self._observation_settings.observation_id} is "
"configured. It will begin as soon as On() is called and it is"
f"supposed to stop at {self._stop_time}")
f"supposed to stop at {self._observation_settings.stop_time}")
def configure_for_off(self):
"""Indicate the observation has stopped"""
super().configure_for_off()
logger.info(f"Stopped the observation with ID={self._observation_id}.")
logger.info(f"Stopped the observation with ID={self._observation_settings.observation_id}.")
def configure_for_on(self):
"""Indicate the observation has started"""
......@@ -133,9 +130,9 @@ class Observation(lofar_device):
super().configure_for_on()
# Apply Antenna Mask and Filter
ANT_mask, RCU_band_select = self._apply_antennafield_settings(self.read_antenna_mask_R(), self.read_filter_R())
self.antennafield_proxy.ANT_mask_RW = ANT_mask
self.antennafield_proxy.RCU_band_select_RW = RCU_band_select
ant_mask, rcu_band_select = self._apply_antennafield_settings(self.read_antenna_mask_R(), self.read_filter_R())
self.antennafield_proxy.ANT_mask_RW = ant_mask
self.antennafield_proxy.RCU_band_select_RW = rcu_band_select
# Apply Beamlet configuration
self.beamlet_proxy.subband_select_RW = self._apply_saps_subbands(self.read_saps_subband_R())
......@@ -143,44 +140,46 @@ class Observation(lofar_device):
self.digitalbeam_proxy.antenna_select_RW = self._apply_saps_antenna_select(self.read_antenna_mask_R())
# Apply Tile Beam pointing direction
self.tilebeam_proxy.Pointing_direction_RW = [tuple(self.read_tile_beam_R())] * self.antennafield_proxy.nr_antennas_R
tile_beam = self.read_tile_beam_R()
if tile_beam is not None:
self.tilebeam_proxy.Pointing_direction_RW = [tuple(tile_beam)] * self.antennafield_proxy.nr_antennas_R
logger.info(f"Started the observation with ID={self._observation_id}.")
logger.info(f"Started the observation with ID={self._observation_settings.observation_id}.")
@only_when_on()
@fault_on_error()
@log_exceptions()
def read_observation_id_R(self):
"""Return the observation_id_R attribute."""
return self._observation_id
return self._observation_settings.observation_id
@only_when_on()
@fault_on_error()
@log_exceptions()
def read_stop_time_R(self):
"""Return the stop_time_R attribute."""
return self._stop_time.timestamp()
return self._observation_settings.stop_time.timestamp()
@only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error()
@log_exceptions()
def read_antenna_mask_R(self):
"""Return the antenna_mask_R attribute."""
return self._antenna_mask
return self._observation_settings.antenna_mask
@only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error()
@log_exceptions()
def read_filter_R(self):
"""Return the filter_R attribute."""
return self._filter
return self._observation_settings.filter
@only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error()
@log_exceptions()
def read_saps_subband_R(self):
"""Return the saps_subband_R attribute."""
return self._saps_subband
return [sap.subbands for sap in self._observation_settings.SAPs]
@only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error()
......@@ -194,27 +193,35 @@ class Observation(lofar_device):
@log_exceptions()
def read_tile_beam_R(self):
"""Return the tile_beam_R attribute."""
return self._tile_beam
if self._observation_settings.tile_beam is None:
return None
pointing_direction = self._observation_settings.tile_beam
return [str(pointing_direction.direction_type), f"{pointing_direction.angle1}deg",
f"{pointing_direction.angle2}deg"]
@only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error()
@log_exceptions()
def read_first_beamlet_R(self):
"""Return the first_beamlet_R attribute."""
return self._first_beamlet
return self._observation_settings.first_beamlet
@fault_on_error()
@log_exceptions()
def read_observation_settings_RW(self):
"""Return current observation_parameters string"""
return self._observation_settings
return None if self._observation_settings is None else self._observation_settings.to_json()
@only_in_states([DevState.OFF])
@fault_on_error()
@log_exceptions()
def write_observation_settings_RW(self, parameters: str):
"""No validation on configuring parameters as task of control device"""
self._observation_settings = parameters
try:
self._observation_settings = ObservationSettings.from_json(parameters)
except ValidationError as e:
self._observation_settings = None
#Except.throw_exception("IllegalCommand", e.message, __name__)
@only_when_on()
@fault_on_error()
......@@ -225,19 +232,16 @@ class Observation(lofar_device):
# value
return time()
def _build_saps_pointing(self, parameters:dict):
def _build_saps_pointing(self, parameters: ObservationSettings):
""" Build the sap pointing array preserving the correct order from JSON """
saps_pointing = []
for i in range(0, self._num_saps):
pointing_direction = parameters["SAPs"][i]['pointing']
saps_pointing.insert(i,(pointing_direction['direction_type'], f"{pointing_direction['angle1']}deg", f"{pointing_direction['angle2']}deg"))
pointing_direction = parameters.SAPs[i].pointing
saps_pointing.insert(i, (
pointing_direction.direction_type, f"{pointing_direction.angle1}deg",
f"{pointing_direction.angle2}deg"))
return saps_pointing
def _build_tilebeam_pointing(self, parameters:dict):
""" Build the sap pointing array preserving the correct order from JSON """
pointing_direction = parameters["tile_beam"]
return [str(pointing_direction['direction_type']), f"{pointing_direction['angle1']}deg", f"{pointing_direction['angle2']}deg"]
def _apply_antennafield_settings(self, antenna_mask: list, filter_name: str):
""" Convert an array of antenna indexes into a boolean mask array and
retrieve the RCU band from filter name, returning the correct format for
......@@ -262,7 +266,8 @@ class Observation(lofar_device):
def _apply_saps_pointing(self, sap_pointing: list):
""" Convert an array of string directions into the correct format for DigitalBeam device"""
pointing_direction = list(self.digitalbeam_proxy.Pointing_direction_RW) # convert to list to allows item assignment
pointing_direction = list(
self.digitalbeam_proxy.Pointing_direction_RW) # convert to list to allows item assignment
first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64)
# Insert pointing values starting from the first beamlet
pointing_direction[first_beamlet:len(sap_pointing)] = sap_pointing
......@@ -277,6 +282,7 @@ class Observation(lofar_device):
antenna_select[a, i] = True
return antenna_select
# ----------
# Run server
# ----------
......
......@@ -5,20 +5,16 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from json import loads
import jsonschema
from jsonschema import Draft7Validator, FormatChecker
import logging
import time
from datetime import datetime
import numpy
from tango import Except, DevFailed, DevState, AttrWriteType, DebugIt, DeviceProxy, Util, DevBoolean, DevString
from tango import Except, DevState, AttrWriteType, DebugIt, Util, DevBoolean, DevString
from tango.server import Device, command, attribute
from tango import EventType
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
from tangostationcontrol.common import ObservationController
from tangostationcontrol.configuration import ObservationSettings
from tangostationcontrol.devices.device_decorators import only_when_on, fault_on_error
from tangostationcontrol.devices.lofar_device import lofar_device
from tangostationcontrol.devices.observation import Observation
......@@ -31,14 +27,19 @@ __all__ = ["ObservationControl", "main"]
@device_logging_to_python()
class ObservationControl(lofar_device):
""" Observation Control Device Server for LOFAR2.0
The ObservationControl Tango device controls the instantiation of a Tango Dynamic Device from the Observation class. ObservationControl then keeps a record of the Observation devices and if they are still alive.
The ObservationControl Tango device controls the instantiation of a Tango Dynamic Device from the Observation class.
ObservationControl then keeps a record of the Observation devices and if they are still alive.
At the end of an observation ObservationControl checks if the respective Observation device has stopped its execution and releases it. If the Observation device has not stopped its execution yet, it is attempted to forcefully stop the execution of the Observation device. Then the Observation device is removed from the list of running observations.
At the end of an observation ObservationControl checks if the respective Observation device has stopped its
execution and releases it. If the Observation device has not stopped its execution yet, it is attempted to
forcefully stop the execution of the Observation device.
The Observation devices are responsible for the "real" execution of an observation. They get references to the hardware devices that are needed to set values in the relevant Control Points. The Observation device performs only a check if enough parameters are available to perform the set-up.
The Observation devices are responsible for the "real" execution of an observation. They get references to the
hardware devices that are needed to set values in the relevant Control Points. The Observation device performs only
a check if enough parameters are available to perform the set-up.
Essentially this is what happens:
Somebody calls ObservationControl.start_observation(parameters). Then ObservationControl will perform:
Somebody calls ObservationControl.start_observation(parameters). Then ObservationControl will perform:
- Creates a new instance of an Observation device in the Tango DB
- Call Initialise(parameters)
- Wait for initialise to return
......@@ -48,11 +49,12 @@ class ObservationControl(lofar_device):
- Subscribe to the Observation.running MP's periodic event
- Register the observation in the dict self.running_observations[ID]
- The Observation updates the MP every second with the current time
- The callback gets called periodically. It checks if MP value > stop (stored in the dict under the obs IDS. By this it can determine if the observation is done.
- The callback gets called periodically. It checks if MP value > stop (stored in the dict under the obs IDS.
By this it can determine if the observation is done.
- if MP value > observation end
- Remove observation ID from running obs dict
- Unsubscribe from the MP's event
- Call off()
- Call off()
- Remove the device from the Tango DB which will make the device disappear
This should in broad strokes pretty much cover any type of observation.
......@@ -68,85 +70,34 @@ class ObservationControl(lofar_device):
- is_observation_running(obs_id) -> bool
MPs
- array[int] running_observations
- string version
"""
# JSON Schema
OBSERVATION_SETTINGS_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [],
"properties": {
"observation_id": {"type": "number", "minimum": 1},
"stop_time": {"type": "string", "format": "date-time"},
"antenna_mask": {"type": "array"},
"filter": {"type": "string"},
"SAPs": {"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
"subbands": {"type": "array"},
"pointing": {"type": "object",
"properties": {
"angle1" : {"type": "number"},
"angle2" : {"type": "number"},
"direction_type": {"type": "string"}
}
}
}
}
},
"tile_beam": {"type": "object",
"properties": {
"angle1" : {"type": "number"},
"angle2" : {"type": "number"},
"direction_type": {"type": "string"}
}
},
"first_beamlet": {"type": "number", "minimum": 0}
},
}
VALIDATOR = Draft7Validator(OBSERVATION_SETTINGS_SCHEMA, format_checker=FormatChecker())
# Attributes
running_observations_R = attribute(dtype=(numpy.int64,), access=AttrWriteType.READ)
def __init__(self, cl, name):
super().__init__(cl, name)
# The top level tango domain is the left-most part of a
# device's name.
self.myTangoDomain: str = self.get_name().split('/')[0]
self._observation_controller: ObservationController = ObservationController(self.myTangoDomain)
# Core functions
@log_exceptions()
@DebugIt()
def init_device(self):
""" Keep all the observation objects for currently running
observations in this dict. The key is the observation idea
and the value is a dict of the parameters.
The value dict contains at least these key/value pairs:
["proxy"]: tango.DeviceProxy - the DeviceProxy for the Observation object
["event_id"]: int
["parameters"]: {parameters as passed to the start_observation call}
["stop_time"]: timestamp when the observation is supposed to stop.
["device_name"]: name of the device instance in Tango DB
["class_name"]: name of the device's class, needed to create an instance
on the fly.
"""
Device.init_device(self)
self.set_state(DevState.OFF)
self.running_observations = {}
# The pyTango.Util class is a singleton and every DS can only
# have one instance of it.
self.tango_util = Util.instance()
# Increase the number of polling threads for this device server.
self.tango_util.set_polling_threads_pool_size(10)
# The top level tango domain is the left-most part of a
# device's name.
self.myTangoDomain = self.get_name().split('/')[0]
Util.instance().set_polling_threads_pool_size(10)
# Lifecycle functions
def configure_for_initialise(self):
self.running_observations.clear()
self._observation_controller = ObservationController(self.myTangoDomain)
def configure_for_off(self):
self.stop_all_observations()
......@@ -155,171 +106,14 @@ class ObservationControl(lofar_device):
@fault_on_error()
@log_exceptions()
def read_running_observations_R(self):
obs = [ key for key in self.running_observations ]
return obs
@log_exceptions()
def observation_running_callback(self, event):
"""
This callback checks if a running observation is still
supposed to run. If this function finds out that the
observation is not supposed to run any more, then
self.stop_observation(obs_id) is called which takes care of the
clean up.
"""
if event.err:
# Something is fishy with this event.
logger.warning(f"The Observation device {event.device} sent an event but the event signals an error. It is advised to check the logs for any indication that something went wrong in that device. Event data={event}")
return
# Get the Observation ID from the sending device.
obs_id = event.device.observation_id_R
# Check if the observation is still supposed to run.
running_obs = self.running_observations.copy()
if not running_obs:
# No obs is running???
logger.warning(f"Received an observation_running event for the observation with ID={obs_id}. According to the records in ObservationControl, this observation is not supposed to run. Please check previous logs, especially around the time an observation with this ID was started. Will continue and ignore this event.")
return
if obs_id in running_obs:
# Get the Observation's stop_time from the Observation device.
obs_stop_time = event.device.stop_time_R
current_obs_time = event.attr_value.value
# I expect that current_obs_time is always
# smaller than the stop time that I read from my
# records.
delta_t = obs_stop_time - current_obs_time
if delta_t < 0.0:
# The observation has not finished yet and is
# more than 1.0 seconds past its scheduled stop
# time. Tell the observation to finish and clean up.
obs = running_obs[obs_id]
self.stop_observation(obs_id)
else:
# The observation that we are trying to process is not part of the running_obs dictionary
logger.warning(f"Received an observation_running event for the observation with ID={obs_id}. According to the records in ObservationControl, this observation is not supposed to run. Please check previous logs, especially around the time an observation with this ID was started. Will continue and ignore this event.")
return
@only_when_on()
@log_exceptions()
def check_and_convert_parameters(self, parameters: DevString = None) -> dict:
"""
The parameters are passed as JSON in a char[].
Conveniently json.loads() creates a dict from the input.
:param parameters: The parameters as JSON
:type parameters: DevVarUCharArray
:return: None
"""
# Convert the input parameter to a dict.
parameter_dict = loads(parameters)
logger.debug("incoming parameter_array = %s, parameter_dict = %s", parameters, parameter_dict)
# Parameter check, do not execute an observation in case
# the parameters are not sufficiently defined.
obs_id = int(parameter_dict["observation_id"])
stop_datetime = datetime.fromisoformat(parameter_dict["stop_time"])
try:
self.VALIDATOR.validate(parameter_dict)
except jsonschema.exceptions.ValidationError as error:
Except.throw_exception("IllegalCommand", error, __name__)
# Check further properties that cannot be validated through a JSON schema
if stop_datetime <= datetime.now():
error = f"Cannot start an observation with ID={obs_id} because the parameter stop_time parameter value=\"{stop_datetime}\" is invalid. Set a stop_time parameter later in time than the start time."
Except.throw_exception("IllegalCommand", error, __name__)
return parameter_dict
return self._observation_controller.running_observations
# API
@command(dtype_in=DevString)
@only_when_on()
@log_exceptions()
def start_observation(self, parameters: DevString = None):
# Store everything about the observation in this dict. I store this
# dict at the end in self.running_observations.
observation = {"parameters": self.check_and_convert_parameters(parameters)}
observation_id = observation['parameters']['observation_id']
# The class name of the Observation class is needed to create and
# delete the device.
class_name = Observation.__name__
observation["class_name"] = class_name
# Generate the Tango DB name for the Observation device.
device_name = f"{self.myTangoDomain}/{class_name}/{observation_id}"
observation["device_name"] = device_name
try:
# Create the Observation device and instantiate it.
self.tango_util.create_device(class_name, device_name)
except DevFailed as ex:
if ex.args[0].desc == f"The device {device_name.lower()} is already defined in the database" and self.is_observation_running(observation_id) is False :
self.tango_util.delete_device(class_name, device_name)
error_string = f"Cannot create the Observation device {device_name} because it is already present in the Database but it is not running. Try to re-run the start_observation command"
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
else:
error_string = f"Cannot create the Observation device instance {device_name} for ID={observation_id}. This means that the observation did not start."
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
try:
# Instantiate a dynamic Tango Device "Observation".
device_proxy = DeviceProxy(device_name)
observation["device_proxy"] = device_proxy
# Configure the dynamic device its attribute for the observation
# parameters.
device_proxy.observation_settings_RW = parameters
# Take the Observation device through the motions. Pass the
# entire JSON set of parameters so that it can pull from it what it
# needs.
device_proxy.Initialise()
# The call to On will actually tell the Observation device to
# become fully active.
device_proxy.On()
except DevFailed as ex:
# Remove the device again.
self.tango_util.delete_device(class_name, device_name)
error_string = f"Cannot access the Observation device instance for observation ID={observation_id} with device class name={class_name} and device instance name={device_name}. This means that the observation likely did not start but certainly cannot be controlled and/or forcefully be stopped."
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
try:
# Subscribe to the obs.observation_running MP
#
# Generate the name for the Observation.observation_running
# MP.
attribute_name = f"{device_name}/observation_running_R"
observation["attribute_name"] = attribute_name
# Turn on the polling for the attribute.
# Note that this is not automatically done despite the attribute
# having the right polling values set in the ctor.
device_proxy.poll_attribute(attribute_name.split('/')[-1], 1000)
# Note: I update the running_observations dict already here because
# the addition of an event listener immediately triggers that
# event. And since the call back checks if the obs_id is in the dict
# this triggers an error message if the ID is not already known.
# There is no harm in copying the dict twice.
self.running_observations[observation_id] = observation
# Right. Now subscribe to periodic events.
event_id = device_proxy.subscribe_event(attribute_name.split('/')[-1], EventType.PERIODIC_EVENT, self.observation_running_callback)
observation["event_id"] = event_id
# Finally update the self.running_observation dict's entry of this
# observation with the complete set of info.
self.running_observations[observation_id] = observation
logger.info(f"Successfully started an observation with ID={observation_id}.")
except DevFailed as ex:
self.tango_util.delete_device(class_name, device_name)
error_string = "Cannot access the Observation device instance for observation ID=%s with device class name=%s and device instance name=%s. This means that the observation cannot be controlled and/or forcefully be stopped."
logger.exception(error_string, observation_id, Observation.__name__, device_name)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
self._observation_controller.start_observation(ObservationSettings.from_json(parameters))
@command(dtype_in=numpy.int64)
@only_when_on()
......@@ -331,54 +125,13 @@ class ObservationControl(lofar_device):
# Do not execute
error = f"Cannot stop an observation with ID={obs_id}, because the observation ID is invalid."
Except.throw_exception("IllegalCommand", error, __name__)
elif self.is_observation_running(obs_id) is False:
elif self._observation_controller.is_observation_running(obs_id) is False:
error = f"Cannot stop an observation with ID={obs_id}, because the observation is not running."
Except.throw_exception("IllegalCommand", error, __name__)
logger.info(f"Stopping the observation with ID={obs_id}.")
# Fetch the obs data and remove it from the dict of
# currently running observations.
observation = self.running_observations.pop(obs_id)
device_proxy = observation.pop("device_proxy")
# Check if the device has not terminated itself in the meanwhile.
try:
device_proxy.ping()
except DevFailed:
logger.warning(f"The device for the Observation with ID={obs_id} has unexpectedly already disappeared. It is advised to check the logs up to 10s prior to this message to see what happened.")
else:
# Unsubscribe from the subscribed event.
event_id = observation.pop("event_id")
device_proxy.unsubscribe_event(event_id)
# Tell the Observation device to stop the running
# observation. This is a synchronous call and the clean-up
# does not take long.
device_proxy.Off()
# Wait for 1s for the Observation device to go to
# DevState.OFF. Force shutdown if observation.state() is
# not OFF.
remaining_wait_time = 1.0
sleep_time = 0.1
stopped = False
while remaining_wait_time > 0.0:
if device_proxy.state() is DevState.OFF:
stopped = True
break
time.sleep(sleep_time)
remaining_wait_time = remaining_wait_time - sleep_time
# Check if the observation object is really in OFF state.
if stopped:
logger.info(f"Successfully stopped the observation with ID={obs_id}")
else:
logger.warning(f"Could not shut down the Observation device ( {observation['device_name']} ) for observation ID={obs_id}. This means that there is a chance for a memory leak. Will continue anyway and forcefully delete the Observation object.")
# Finally remove the device object from the Tango DB.
try:
self.tango_util.delete_device(observation["class_name"], observation["device_name"])
except DevFailed:
logger.warning(f"Something went wrong when the device {observation['device_name']} was removed from the Tango DB. There is nothing that can be done about this here at this moment but you should check the Tango DB yourself.")
self._observation_controller.stop_observation(obs_id)
@command()
@only_when_on()
......@@ -386,12 +139,11 @@ class ObservationControl(lofar_device):
def stop_all_observations(self):
# Make a copy of the running_observations dict. This
# should prevent race conditions.
if self.is_any_observation_running():
if not self.is_any_observation_running():
return
# Make certain that the dict does not get modified
# while I am iterating over it.
active_obs = self.running_observations.copy()
for obs_id in active_obs.keys():
self.stop_observation(obs_id)
self._observation_controller.stop_all_observations()
@command(dtype_in=numpy.int64, dtype_out=DevBoolean)
@only_when_on()
......@@ -402,14 +154,13 @@ class ObservationControl(lofar_device):
# Do not execute
error = f"Cannot check if an observation with ID={obs_id} is running, because the observation ID is invalid"
Except.throw_exception("IllegalCommand", error, __name__)
observation = self.running_observations.get(obs_id)
return observation is not None
return self._observation_controller.is_observation_running(obs_id)
@command(dtype_out=DevBoolean)
@only_when_on()
@log_exceptions()
def is_any_observation_running(self) -> DevBoolean:
return len(self.running_observations) > 0
return self._observation_controller.is_any_observation_running()
# ----------
......
......@@ -106,8 +106,8 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase):
"""Initialize an observation with _invalid_ JSON"""
self.proxy.off()
self.proxy.observation_settings_RW = "{}"
with self.assertRaises(DevFailed):
self.proxy.observation_settings_RW = "{}"
self.proxy.Initialise()
self.assertEqual(DevState.FAULT, self.proxy.state())
......@@ -125,6 +125,7 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase):
def test_attribute_match(self):
"""Test that JSON data is exposed to attributes"""
#failing
data = loads(self.VALID_JSON)
stop_timestamp = datetime.fromisoformat(data["stop_time"]).timestamp()
observation_id = data["observation_id"]
......@@ -209,6 +210,7 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase):
self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[10], [False] * N_beamlets_ctrl)
def test_apply_tilebeam(self):
#failing
"""Test that attribute tilebeam is correctly applied"""
tilebeam_proxy = self.setup_tilebeam_proxy()
pointing_direction = [("J2000","0deg","0deg")] * DEFAULT_N_HBA_TILES
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment