Skip to content
Snippets Groups Projects
Commit 0b10fec7 authored by Hannes Feldt's avatar Hannes Feldt
Browse files

Merge branch 'L2SS-1019_observation_control_json_schema' into 'master'

L2SS-1019: Create and use obeservation control JSON schema

Closes L2SS-1019

See merge request !471
parents f3d1efdc eaf164c7
No related branches found
No related tags found
1 merge request!471L2SS-1019: Create and use obeservation control JSON schema
Showing
with 792 additions and 376 deletions
...@@ -23,6 +23,7 @@ tangostationcontrol/docs/build ...@@ -23,6 +23,7 @@ tangostationcontrol/docs/build
**/coverage.xml **/coverage.xml
**/.coverage **/.coverage
**/.coverage*
**/.ipynb_checkpoints **/.ipynb_checkpoints
**/pending_log_messages.db **/pending_log_messages.db
**/.eggs **/.eggs
......
...@@ -56,3 +56,5 @@ services: ...@@ -56,3 +56,5 @@ services:
restart: unless-stopped restart: unless-stopped
stop_signal: SIGINT # request a graceful shutdown of Tango stop_signal: SIGINT # request a graceful shutdown of Tango
stop_grace_period: 2s stop_grace_period: 2s
depends_on:
- schemas
#
# Docker compose file that launches a LOFAR2.0 station's
# ObservationControl device. It also runs the dynamically
# created Observation devices.
#
# Defines:
# - schemas: LOFAR2.0 station
#
version: '2.1'
services:
schemas:
build:
context: schemas
container_name: ${CONTAINER_NAME_PREFIX}schemas
networks:
- control
# set the hostname, otherwise duplicate device registrations result every
# time the hostname changes as the container is restarted.
hostname: schemas
environment:
- NGINX_HOST=schemas
- NGINX_PORT=80
ports:
- 9999:80
logging:
driver: "json-file"
options:
max-size: "100m"
max-file: "10"
restart: unless-stopped
\ No newline at end of file
FROM nginx
COPY definitions /usr/share/nginx/html
\ No newline at end of file
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"observation_id",
"stop_time",
"antenna_mask",
"filter",
"SAPs"
],
"properties": {
"observation_id": {
"type": "number",
"minimum": 1
},
"stop_time": {
"type": "string",
"format": "date-time"
},
"antenna_mask": {
"type": "array",
"uniqueItems": true,
"minItems": 1,
"items": {
"type": "number"
}
},
"filter": {
"type": "string",
"enum": [
"LBA_10_90",
"LBA_10_70",
"LBA_30_90",
"LBA_30_70",
"HBA_170_230",
"HBA_110_190",
"HBA_210_250"
]
},
"SAPs": {
"type": "array",
"minItems": 1,
"items": {
"$ref": "sap.json"
}
},
"tile_beam": {
"$ref": "pointing.json"
},
"first_beamlet": {
"type": "number",
"default": 0,
"minimum": 0
}
}
}
\ No newline at end of file
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"angle1",
"angle2",
"direction_type"
],
"properties": {
"angle1": {
"default": 0.6624317181687094,
"description": "First angle (e.g. RA)",
"title": "Angle 1",
"type": "number"
},
"angle2": {
"default": 1.5579526427549426,
"description": "Second angle (e.g. DEC)",
"title": "Angle 2",
"type": "number"
},
"direction_type": {
"default": "J2000",
"description": "",
"enum": [
"J2000",
"AZELGEO",
"LMN",
"SUN",
"MOON",
"MERCURY",
"VENUS",
"MARS",
"JUPITER",
"SATURN",
"URANUS",
"NEPTUNE",
"PLUTO"
],
"title": "Reference frame",
"type": "string"
}
}
}
\ No newline at end of file
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"subbands",
"pointing"
],
"properties": {
"subbands": {
"type": "array",
"minItems": 1,
"items": {
"type": "number"
}
},
"pointing": {
"$ref": "pointing.json"
}
}
}
\ No newline at end of file
...@@ -27,7 +27,7 @@ function integration_test { ...@@ -27,7 +27,7 @@ function integration_test {
echo "Updating config ${config} ..." echo "Updating config ${config} ..."
bash "${LOFAR20_DIR}"/sbin/update_ConfigDb.sh "${config}" bash "${LOFAR20_DIR}"/sbin/update_ConfigDb.sh "${config}"
done done
if [ ! -z "${2+x}" ]; then if [ -n "${2+x}" ]; then
# shellcheck disable=SC2145 # shellcheck disable=SC2145
echo "make restart ${restarts[@]} ..." echo "make restart ${restarts[@]} ..."
make restart "${restarts[@]}" make restart "${restarts[@]}"
...@@ -76,28 +76,27 @@ sleep 1 # dsconfig container must be up and running... ...@@ -76,28 +76,27 @@ sleep 1 # dsconfig container must be up and running...
# shellcheck disable=SC2016 # shellcheck disable=SC2016
echo '/usr/local/bin/wait-for-it.sh ${TANGO_HOST} --strict --timeout=300 -- true' | make run dsconfig bash - echo '/usr/local/bin/wait-for-it.sh ${TANGO_HOST} --strict --timeout=300 -- true' | make run dsconfig bash -
# Devices list is used to explitly word split when supplied to commands, must DEVICES=(device-boot device-apsct device-ccd device-apspu device-sdp device-recv device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-antennafield device-temperature-manager device-observation device-observation-control)
# disable shellcheck SC2086 for each case.
DEVICES="device-boot device-apsct device-ccd device-apspu device-sdp device-recv device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-antennafield device-temperature-manager device-observation device-observation-control"
SIMULATORS="sdptr-sim recv-sim unb2-sim apsct-sim apspu-sim ccd-sim" SIMULATORS=(sdptr-sim recv-sim unb2-sim apsct-sim apspu-sim ccd-sim)
# Build only the required images, please do not build everything that makes CI # Build only the required images, please do not build everything that makes CI
# take really long to finish, especially grafana / jupyter / prometheus. # take really long to finish, especially grafana / jupyter / prometheus.
# jupyter is physically large > 2.5gb and overlayfs is really slow. # jupyter is physically large > 2.5gb and overlayfs is really slow.
# shellcheck disable=SC2086 # shellcheck disable=SC2086
make build $DEVICES $SIMULATORS make build "${DEVICES[@]}" "${SIMULATORS[@]}"
# make build elk integration-test # L2SS-970: elk temporarily disabled # make build elk integration-test # L2SS-970: elk temporarily disabled
make build logstash integration-test make build logstash integration-test schemas
make build archiver-timescale hdbppts-cm hdbppts-es make build archiver-timescale hdbppts-cm hdbppts-es
# Start and stop sequence # Start and stop sequence
# shellcheck disable=SC2086 # shellcheck disable=SC2086
make stop $DEVICES $SIMULATORS hdbppts-es hdbppts-cm archiver-timescale make stop schemas
make stop "${DEVICES[@]}" "${SIMULATORS[@]}" hdbppts-es hdbppts-cm archiver-timescale
make stop device-docker # this one does not test well in docker-in-docker make stop device-docker # this one does not test well in docker-in-docker
# make stop elk # L2SS-970: elk temporarily disabled # make stop elk # L2SS-970: elk temporarily disabled
make stop logstash make stop logstash schemas
# Run dummy integration test to install pytango in tox virtualenv without # Run dummy integration test to install pytango in tox virtualenv without
# the memory pressure of the ELK stack. # the memory pressure of the ELK stack.
...@@ -108,7 +107,7 @@ make stop logstash ...@@ -108,7 +107,7 @@ make stop logstash
integration_test dummy integration_test dummy
# make start elk # L2SS-970: elk temporarily disabled # make start elk # L2SS-970: elk temporarily disabled
make start logstash make start logstash schemas
# Update the dsconfig # Update the dsconfig
# Do not remove `bash`, otherwise statement ignored by gitlab ci shell! # Do not remove `bash`, otherwise statement ignored by gitlab ci shell!
...@@ -120,19 +119,18 @@ bash "${LOFAR20_DIR}"/sbin/update_ConfigDb.sh "${LOFAR20_DIR}"/CDB/stations/dumm ...@@ -120,19 +119,18 @@ bash "${LOFAR20_DIR}"/sbin/update_ConfigDb.sh "${LOFAR20_DIR}"/CDB/stations/dumm
cd "$LOFAR20_DIR/docker-compose" || exit 1 cd "$LOFAR20_DIR/docker-compose" || exit 1
# shellcheck disable=SC2086 # shellcheck disable=SC2086
make start $SIMULATORS make start "${SIMULATORS[@]}"
# Give the simulators time to start # Give the simulators time to start
sleep 5 sleep 5
# shellcheck disable=SC2086 # shellcheck disable=SC2086
make start $DEVICES make start "${DEVICES[@]}"
# Archive devices: archive-timescale first # Archive devices: archive-timescale first
make start archiver-timescale make start archiver-timescale
# Wait for archiver and devices to restart # Wait for archiver and devices to restart
# shellcheck disable=SC2086 make await archiver-timescale "${DEVICES[@]}"
make await archiver-timescale $DEVICES
# Give archiver-timescale time to start # Give archiver-timescale time to start
# shellcheck disable=SC2016 # shellcheck disable=SC2016
......
...@@ -12,7 +12,7 @@ psycopg2-binary >= 2.9.2 # LGPL ...@@ -12,7 +12,7 @@ psycopg2-binary >= 2.9.2 # LGPL
sqlalchemy >= 1.4.26 # MIT sqlalchemy >= 1.4.26 # MIT
pysnmp >= 0.1.7 # BSD pysnmp >= 0.1.7 # BSD
h5py >= 3.1.0 # BSD h5py >= 3.1.0 # BSD
jsonschema >= 3.2.0 # MIT jsonschema >= 4.0.0 # MIT
psutil >= 5.8.0 # BSD psutil >= 5.8.0 # BSD
docker >= 5.0.3 # Apache 2 docker >= 5.0.3 # Apache 2
python-logstash-async >= 2.3.0 # MIT python-logstash-async >= 2.3.0 # MIT
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from .observation_controller import ObservationController
__all__ = ['ObservationController', ]
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import logging
import time
from datetime import datetime
from tango import DevFailed, DevState, Except, Util, EventType, DeviceProxy
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.configuration import ObservationSettings
logger = logging.getLogger()
class RunningObservation(object):
@property
def observation_id(self) -> int:
return self._parameters.observation_id
@property
def class_name(self) -> str:
from tangostationcontrol.devices.observation import Observation
return Observation.__name__
@property
def device_name(self) -> str:
return f"{self._tango_domain}/{self.class_name}/{self.observation_id}"
# Name for the Observation.observation_running subscription
@property
def attribute_name(self) -> str:
return f"{self.device_name}/observation_running_R"
def __init__(self, tango_domain, parameters: ObservationSettings):
self._device_proxy: DeviceProxy | None = None
self._event_id: int | None = None
self._parameters: ObservationSettings = parameters
self._tango_domain: str = tango_domain
# The pyTango.Util class is a singleton and every DS can only
# have one instance of it.
self._tango_util: Util = Util.instance()
def create_tango_device(self):
logger.info(f"Create device: {self.device_name}")
try:
# Create the Observation device and instantiate it.
self._tango_util.create_device(self.class_name, f"{self.device_name}")
except DevFailed as ex:
logger.exception(ex)
if ex.args[0].desc == f"The device {self.device_name.lower()} is already defined in the database":
# and self.is_observation_running(self.observation_id) is False:
self._tango_util.delete_device(self.class_name, self.device_name)
error_string = f"Cannot create the Observation device {self.device_name} because it is already present in the Database but it is not running. Try to re-run the start_observation command"
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
else:
error_string = f"Cannot create the Observation device instance {self.device_name} for ID={self.observation_id}. This means that the observation did not start."
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
def create_device_proxy(self):
# Instantiate a dynamic Tango Device "Observation".
self._device_proxy = DeviceProxy(self.device_name)
# Configure the dynamic device its attribute for the observation
# parameters.
self._device_proxy.observation_settings_RW = self._parameters.to_json()
# Take the Observation device through the motions. Pass the
# entire JSON set of parameters so that it can pull from it what it
# needs.
self._device_proxy.Initialise()
# The call to On will actually tell the Observation device to
# become fully active.
self._device_proxy.On()
def subscribe(self, cb):
# Turn on the polling for the attribute.
# Note that this is not automatically done despite the attribute
# having the right polling values set in the ctor.
self._device_proxy.poll_attribute(self.attribute_name.split('/')[-1], 1000)
# Right. Now subscribe to periodic events.
self._event_id = self._device_proxy.subscribe_event(self.attribute_name.split('/')[-1],
EventType.PERIODIC_EVENT,
cb)
logger.info(f"Successfully started an observation with ID={self.observation_id}.")
def shutdown(self):
# Check if the device has not terminated itself in the meanwhile.
try:
self._device_proxy.ping()
except DevFailed:
logger.warning(
f"The device for the Observation with ID={self.observation_id} has unexpectedly already disappeared. It is advised to check the logs up to 10s prior to this message to see what happened.")
else:
# Unsubscribe from the subscribed event.
event_id = self._event_id
self._device_proxy.unsubscribe_event(event_id)
# Tell the Observation device to stop the running
# observation. This is a synchronous call and the clean-up
# does not take long.
self._device_proxy.Off()
# Wait for 1s for the Observation device to go to
# DevState.OFF. Force shutdown if observation.state() is
# not OFF.
remaining_wait_time = 1.0
sleep_time = 0.1
stopped = False
while remaining_wait_time > 0.0:
if self._device_proxy.state() is DevState.OFF:
stopped = True
break
time.sleep(sleep_time)
remaining_wait_time = remaining_wait_time - sleep_time
# Check if the observation object is really in OFF state.
if stopped:
logger.info(f"Successfully stopped the observation with ID={self.observation_id}")
else:
logger.warning(
f"Could not shut down the Observation device ( {self.device_name} ) for observation ID={self.observation_id}. This means that there is a chance for a memory leak. Will continue anyway and forcefully delete the Observation object.")
# Finally remove the device object from the Tango DB.
try:
self._tango_util.delete_device(self.class_name, self.device_name)
except DevFailed:
logger.warning(
f"Something went wrong when the device {self.device_name} was removed from the Tango DB. There is nothing that can be done about this here at this moment but you should check the Tango DB yourself.")
class ObservationController(object):
@property
def running_observations(self) -> [int]:
return list(self._running_observations.keys())
def __init__(self, tango_domain: str):
self._tango_util = Util.instance()
self._tango_domain = tango_domain
self._running_observations: dict[int, RunningObservation] = {}
def is_any_observation_running(self):
return len(self._running_observations) > 0
def is_observation_running(self, obs_id):
observation = self._running_observations.get(obs_id)
return observation is not None
@log_exceptions()
def observation_running_callback(self, event):
"""
This callback checks if a running observation is still
supposed to run. If this function finds out that the
observation is not supposed to run any more, then
self.stop_observation(obs_id) is called which takes care of the
clean up.
"""
if event.err:
# Something is fishy with this event.
logger.warning(
f"The Observation device {event.device} sent an event but the event signals an error. It is advised to check the logs for any indication that something went wrong in that device. Event data={event}")
return
# Get the Observation ID from the sending device.
obs_id = event.device.observation_id_R
# Check if the observation is still supposed to run.
running_obs = self._running_observations.copy()
if not running_obs:
# No obs is running???
logger.warning(
f"Received an observation_running event for the observation with ID={obs_id}. According to the records in ObservationControl, this observation is not supposed to run. Please check previous logs, especially around the time an observation with this ID was started. Will continue and ignore this event.")
return
if obs_id in running_obs:
# Get the Observation's stop_time from the Observation device.
obs_stop_time = event.device.stop_time_R
current_obs_time = event.attr_value.value
# I expect that current_obs_time is always
# smaller than the stop time that I read from my
# records.
delta_t = obs_stop_time - current_obs_time
if delta_t < 0.0:
# The observation has not finished yet and is
# more than 1.0 seconds past its scheduled stop
# time. Tell the observation to finish and clean up.
obs = running_obs[obs_id]
self.stop_observation(obs_id)
else:
# The observation that we are trying to process is not part of the running_obs dictionary
logger.warning(
f"Received an observation_running event for the observation with ID={obs_id}. According to the records in ObservationControl, this observation is not supposed to run. Please check previous logs, especially around the time an observation with this ID was started. Will continue and ignore this event.")
return
def start_observation(self, settings: ObservationSettings):
# Check further properties that cannot be validated through a JSON schema
if settings.stop_time <= datetime.now():
error = f"Cannot start an observation with ID={settings.observation_id} because the parameter stop_time parameter value=\"{settings.stop_time}\" is invalid. Set a stop_time parameter later in time than the start time."
Except.throw_exception("IllegalCommand", error, __name__)
obs = RunningObservation(self._tango_domain, settings)
obs.create_tango_device()
try:
obs.create_device_proxy()
except DevFailed as ex:
# Remove the device again.
self._tango_util.delete_device(obs.class_name, obs.device_name)
error_string = f"Cannot access the Observation device instance for observation ID={obs.observation_id} with device class name={obs.class_name} and device instance name={obs.device_name}. This means that the observation likely did not start but certainly cannot be controlled and/or forcefully be stopped."
logger.exception(error_string)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
# Finally update the self.running_observation dict's entry of this
# observation with the complete set of info.
self._running_observations[obs.observation_id] = obs
try:
obs.subscribe(self.observation_running_callback)
except DevFailed as ex:
self._tango_util.delete_device(obs.class_name, obs.device_name)
error_string = "Cannot access the Observation device instance for observation ID=%s with device class name=%s and device instance name=%s. This means that the observation cannot be controlled and/or forcefully be stopped."
logger.exception(error_string, obs.observation_id, obs.class_name, obs.device_name)
Except.re_throw_exception(ex, "DevFailed", error_string, __name__)
def stop_observation(self, obs_id):
if self.is_observation_running(obs_id) is False:
error = f"Cannot stop an observation with ID={obs_id}, because the observation is not running."
Except.throw_exception("IllegalCommand", error, __name__)
# Fetch the obs data and remove it from the dict of
# currently running observations.
observation = self._running_observations.pop(obs_id)
observation.shutdown()
def stop_all_observations(self):
for obs_id in self._running_observations.copy().keys():
self.stop_observation(obs_id)
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from .observation_settings import ObservationSettings
from .pointing import Pointing
from .sap import Sap
__all__ = ['ObservationSettings', 'Pointing', 'Sap', ]
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from typing import Type
from jsonschema.exceptions import ValidationError
def _from_json_hook_t(primary: Type):
from tangostationcontrol.configuration import Pointing, Sap, ObservationSettings
def actual_hook(json_dct):
primary_ex = None
for t in [Pointing, Sap, ObservationSettings]:
try:
t.get_validator().validate(json_dct)
except ValidationError as ex:
if t is primary:
primary_ex = ex
pass
else:
return t.to_object(json_dct)
if primary_ex:
raise primary_ex
return None
return actual_hook
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import json
import re
import time
from abc import ABC, abstractmethod
from typing import TypeVar, Type
import jsonschema
import requests
from jsonschema import Draft7Validator, FormatChecker, ValidationError
from jsonschema.validators import RefResolver
from tangostationcontrol.configuration._json_parser import _from_json_hook_t
T = TypeVar('T')
def _fetch_url(url):
attempt_nr = 1
while True:
try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
time.sleep(2) # retry after a little sleep
if attempt_nr >= 5:
raise e
else:
attempt_nr += 1
class RetryHttpRefResolver(RefResolver):
def resolve_remote(self, uri):
result = _fetch_url(uri)
if self.cache_remote:
self.store[uri] = result
return result
def _is_object(_, instance):
return isinstance(instance, dict) or issubclass(type(instance), _ConfigurationBase)
jsonschema.validators.Draft7Validator.TYPE_CHECKER = Draft7Validator.TYPE_CHECKER.redefine(
"object", _is_object,
)
class _ConfigurationBase(ABC):
BASE_URL = "http://schemas/"
@staticmethod
def _class_to_url(cls_name):
return re.sub(r'(?<!^)(?=[A-Z])', '-', cls_name).lower()
@classmethod
def get_validator(cls):
name = cls.__name__
url = f"{_ConfigurationBase.BASE_URL}{_ConfigurationBase._class_to_url(name)}.json"
resolver = RetryHttpRefResolver(base_uri=_ConfigurationBase.BASE_URL, referrer=url)
_, resolved = resolver.resolve(url)
return Draft7Validator(resolved, format_checker=FormatChecker(), resolver=resolver)
@abstractmethod
def __iter__(self):
pass
def __str__(self):
return json.dumps(dict(self), ensure_ascii=False)
def __repr__(self):
return self.__str__()
# required for jsonschema validation
def __getitem__(self, item):
return getattr(self, item)
# required for jsonschema validation
def __contains__(self, item):
return hasattr(self, item) and getattr(self, item) is not None
def to_json(self):
return self.__str__()
@staticmethod
@abstractmethod
def to_object(json_dct) -> T:
pass
@classmethod
def from_json(cls: Type[T], data: str) -> T:
s = json.loads(data, object_hook=_from_json_hook_t(cls))
if not isinstance(s, cls):
raise ValidationError(f"Unexpected type: expected <{cls.__class__.__name__}>, got <{type(s).__name__}>")
return s
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from datetime import datetime
from typing import Sequence
from tangostationcontrol.configuration.configuration_base import _ConfigurationBase
from tangostationcontrol.configuration.pointing import Pointing
from tangostationcontrol.configuration.sap import Sap
class ObservationSettings(_ConfigurationBase):
def __init__(self, observation_id: int, stop_time: datetime, antenna_mask: Sequence[int], filter: str,
SAPs: Sequence[Sap],
tile_beam: Pointing = None, first_beamlet: int = 0):
self.observation_id = observation_id
self.stop_time = stop_time
self.antenna_mask = antenna_mask
self.filter = filter
self.SAPs = SAPs
self.tile_beam = tile_beam
self.first_beamlet = first_beamlet
def __iter__(self):
yield from {
"observation_id": self.observation_id,
"stop_time" : self.stop_time.isoformat(),
"antenna_mask" : self.antenna_mask,
"filter" : self.filter,
"SAPs" : [dict(s) for s in self.SAPs]
}.items()
if self.tile_beam:
yield "tile_beam", dict(self.tile_beam)
yield "first_beamlet", self.first_beamlet
@staticmethod
def to_object(json_dct) -> 'ObservationSettings':
return ObservationSettings(json_dct['observation_id'], datetime.fromisoformat(json_dct['stop_time']),
json_dct['antenna_mask'],
json_dct['filter'], json_dct['SAPs'],
json_dct['tile_beam'] if 'tile_beam' in json_dct else None,
json_dct['first_beamlet'] if 'first_beamlet' in json_dct else 0)
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tangostationcontrol.configuration.configuration_base import _ConfigurationBase
class Pointing(_ConfigurationBase):
VALIDATOR = None
def __init__(self, angle1=0.6624317181687094, angle2=1.5579526427549426, direction_type="J2000"):
self.angle1 = angle1
self.angle2 = angle2
self.direction_type = direction_type
def __iter__(self):
yield from {
"angle1" : self.angle1,
"angle2" : self.angle2,
"direction_type": self.direction_type
}.items()
@staticmethod
def to_object(json_dct) -> 'Pointing':
return Pointing(json_dct['angle1'], json_dct['angle2'], json_dct['direction_type'])
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR2.0 Station Control project.
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tangostationcontrol.configuration.configuration_base import _ConfigurationBase
from tangostationcontrol.configuration.pointing import Pointing
class Sap(_ConfigurationBase):
def __init__(self, subbands: [int], pointing: Pointing):
self.subbands = subbands
self.pointing = pointing
def __iter__(self):
yield from {
"subbands": self.subbands,
"pointing": dict(self.pointing)
}.items()
@staticmethod
def to_object(json_dct) -> 'Sap':
return Sap(json_dct['subbands'], json_dct['pointing'])
...@@ -4,27 +4,26 @@ ...@@ -4,27 +4,26 @@
# #
# Distributed under the terms of the APACHE license. # Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info. # See LICENSE.txt for more info.
import logging
from time import time
from typing import Optional
import numpy
from jsonschema.exceptions import ValidationError
# PyTango imports # PyTango imports
from tango import AttrWriteType, DeviceProxy, DevState, DevSource, Util from tango import AttrWriteType, DeviceProxy, DevState, DevSource, Util, Except
from tango.server import attribute from tango.server import attribute
import numpy from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD, MAX_ANTENNA, N_beamlets_ctrl, N_point_prop
from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD, MAX_ANTENNA, N_beamlets_ctrl, N_point_prop from tangostationcontrol.configuration import ObservationSettings
from tangostationcontrol.devices.device_decorators import fault_on_error from tangostationcontrol.devices.device_decorators import fault_on_error
from tangostationcontrol.devices.device_decorators import only_when_on
from tangostationcontrol.devices.device_decorators import only_in_states from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.devices.device_decorators import only_when_on
from tangostationcontrol.devices.lofar_device import lofar_device from tangostationcontrol.devices.lofar_device import lofar_device
from datetime import datetime
from json import loads
from time import time
import logging
logger = logging.getLogger() logger = logging.getLogger()
__all__ = ["Observation", "main"] __all__ = ["Observation", "main"]
...@@ -40,60 +39,58 @@ class Observation(lofar_device): ...@@ -40,60 +39,58 @@ class Observation(lofar_device):
The lifecycle of instances of this device is controlled by ObservationControl The lifecycle of instances of this device is controlled by ObservationControl
""" """
# Attributes # Attributes
observation_running_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ, polling_period=DEFAULT_POLLING_PERIOD, period=DEFAULT_POLLING_PERIOD, observation_running_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ,
polling_period=DEFAULT_POLLING_PERIOD, period=DEFAULT_POLLING_PERIOD,
rel_change="1.0") rel_change="1.0")
observation_id_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ) observation_id_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ)
stop_time_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ) stop_time_R = attribute(dtype=numpy.float64, access=AttrWriteType.READ)
antenna_mask_R = attribute(dtype=(numpy.int64,), max_dim_x=MAX_ANTENNA, access=AttrWriteType.READ) antenna_mask_R = attribute(dtype=(numpy.int64,), max_dim_x=MAX_ANTENNA, access=AttrWriteType.READ)
filter_R = attribute(dtype=numpy.str, access=AttrWriteType.READ) filter_R = attribute(dtype=numpy.str, access=AttrWriteType.READ)
saps_subband_R = attribute(dtype=((numpy.uint32,),), max_dim_x=N_beamlets_ctrl, max_dim_y=N_beamlets_ctrl, access=AttrWriteType.READ) saps_subband_R = attribute(dtype=((numpy.uint32,),), max_dim_x=N_beamlets_ctrl, max_dim_y=N_beamlets_ctrl,
saps_pointing_R = attribute(dtype=((numpy.str,),), max_dim_x=N_point_prop, max_dim_y=N_beamlets_ctrl, access=AttrWriteType.READ) access=AttrWriteType.READ)
saps_pointing_R = attribute(dtype=((numpy.str,),), max_dim_x=N_point_prop, max_dim_y=N_beamlets_ctrl,
access=AttrWriteType.READ)
tile_beam_R = attribute(dtype=(numpy.str,), max_dim_x=N_point_prop, access=AttrWriteType.READ) tile_beam_R = attribute(dtype=(numpy.str,), max_dim_x=N_point_prop, access=AttrWriteType.READ)
first_beamlet_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ) first_beamlet_R = attribute(dtype=numpy.int64, access=AttrWriteType.READ)
observation_settings_RW = attribute(dtype=str, access=AttrWriteType.READ_WRITE) observation_settings_RW = attribute(dtype=str, access=AttrWriteType.READ_WRITE)
def __init__(self, cl, name):
super().__init__(cl, name)
self.recv_proxy: Optional[DeviceProxy] = None
self.antennafield_proxy: Optional[DeviceProxy] = None
self.beamlet_proxy: Optional[DeviceProxy] = None
self.digitalbeam_proxy: Optional[DeviceProxy] = None
self.tilebeam_proxy: Optional[DeviceProxy] = None
self._observation_settings: Optional[ObservationSettings] = None
self._num_inputs: int = 0
def init_device(self): def init_device(self):
"""Setup some class member variables for observation state""" """Setup some class member variables for observation state"""
super().init_device() super().init_device()
self._observation_settings = loads("{}")
self._observation_id = -1
self._stop_time = datetime.now()
self._num_inputs = 0
def configure_for_initialise(self): def configure_for_initialise(self):
"""Load the JSON from the attribute and configure member variables""" """Load the JSON from the attribute and configure member variables"""
super().configure_for_initialise() super().configure_for_initialise()
if self._observation_settings is None:
Except.throw_exception("IllegalCommand", "Device can not be initialized without configuration", __name__)
# ObservationControl takes already good care of checking that the # ObservationControl takes already good care of checking that the
# parameters are in order and sufficient. It is therefore unnecessary # parameters are in order and sufficient. It is therefore unnecessary
# at the moment to check the parameters here again. # at the moment to check the parameters here again.
# This could change when the parameter check becomes depending on # This could change when the parameter check becomes depending on
# certain aspects that only an Observation device can know. # certain aspects that only an Observation device can know.
parameters = loads(self._observation_settings) self._num_saps = len(self._observation_settings.SAPs)
self._saps_pointing = self._build_saps_pointing(self._observation_settings)
self._observation_id = parameters["observation_id"]
self._stop_time = datetime.fromisoformat(parameters["stop_time"])
self._antenna_mask = parameters["antenna_mask"]
self._filter = parameters["filter"]
self._num_saps = len(parameters["SAPs"])
self._saps_subband = [ parameters["SAPs"][i]['subbands'] for i in range(0, self._num_saps)]
self._saps_pointing = self._build_saps_pointing(parameters)
self._tile_beam = self._build_tilebeam_pointing(parameters)
self._first_beamlet = parameters["first_beamlet"]
# Set a reference of AntennaField device that is correlated to this device # Set a reference of AntennaField device that is correlated to this device
util = Util.instance() util = Util.instance()
# TODO(Stefano): set a proper policy for the devices instance number # TODO(Stefano): set a proper policy for the devices instance number
# It cannot be inherited from the Observation instance number (i.e. Observation_id) # It cannot be inherited from the Observation instance number (i.e. Observation_id)
self.antennafield_proxy = DeviceProxy( self.antennafield_proxy = DeviceProxy(f"{util.get_ds_inst_name()}/AntennaField/1")
f"{util.get_ds_inst_name()}/AntennaField/1")
self.antennafield_proxy.set_source(DevSource.DEV) self.antennafield_proxy.set_source(DevSource.DEV)
# Set a reference of RECV device that is correlated to this device # Set a reference of RECV device that is correlated to this device
...@@ -116,16 +113,16 @@ class Observation(lofar_device): ...@@ -116,16 +113,16 @@ class Observation(lofar_device):
self._num_inputs = self.digitalbeam_proxy.antenna_select_RW.shape[0] self._num_inputs = self.digitalbeam_proxy.antenna_select_RW.shape[0]
logger.info( logger.info(
f"The observation with ID={self._observation_id} is " f"The observation with ID={self._observation_settings.observation_id} is "
"configured. It will begin as soon as On() is called and it is" "configured. It will begin as soon as On() is called and it is"
f"supposed to stop at {self._stop_time}") f"supposed to stop at {self._observation_settings.stop_time}")
def configure_for_off(self): def configure_for_off(self):
"""Indicate the observation has stopped""" """Indicate the observation has stopped"""
super().configure_for_off() super().configure_for_off()
logger.info(f"Stopped the observation with ID={self._observation_id}.") logger.info(f"Stopped the observation with ID={self._observation_settings.observation_id}.")
def configure_for_on(self): def configure_for_on(self):
"""Indicate the observation has started""" """Indicate the observation has started"""
...@@ -133,9 +130,9 @@ class Observation(lofar_device): ...@@ -133,9 +130,9 @@ class Observation(lofar_device):
super().configure_for_on() super().configure_for_on()
# Apply Antenna Mask and Filter # Apply Antenna Mask and Filter
ANT_mask, RCU_band_select = self._apply_antennafield_settings(self.read_antenna_mask_R(), self.read_filter_R()) ant_mask, rcu_band_select = self._apply_antennafield_settings(self.read_antenna_mask_R(), self.read_filter_R())
self.antennafield_proxy.ANT_mask_RW = ANT_mask self.antennafield_proxy.ANT_mask_RW = ant_mask
self.antennafield_proxy.RCU_band_select_RW = RCU_band_select self.antennafield_proxy.RCU_band_select_RW = rcu_band_select
# Apply Beamlet configuration # Apply Beamlet configuration
self.beamlet_proxy.subband_select_RW = self._apply_saps_subbands(self.read_saps_subband_R()) self.beamlet_proxy.subband_select_RW = self._apply_saps_subbands(self.read_saps_subband_R())
...@@ -143,44 +140,46 @@ class Observation(lofar_device): ...@@ -143,44 +140,46 @@ class Observation(lofar_device):
self.digitalbeam_proxy.antenna_select_RW = self._apply_saps_antenna_select(self.read_antenna_mask_R()) self.digitalbeam_proxy.antenna_select_RW = self._apply_saps_antenna_select(self.read_antenna_mask_R())
# Apply Tile Beam pointing direction # Apply Tile Beam pointing direction
self.tilebeam_proxy.Pointing_direction_RW = [tuple(self.read_tile_beam_R())] * self.antennafield_proxy.nr_antennas_R tile_beam = self.read_tile_beam_R()
if tile_beam is not None:
self.tilebeam_proxy.Pointing_direction_RW = [tuple(tile_beam)] * self.antennafield_proxy.nr_antennas_R
logger.info(f"Started the observation with ID={self._observation_id}.") logger.info(f"Started the observation with ID={self._observation_settings.observation_id}.")
@only_when_on() @only_when_on()
@fault_on_error() @fault_on_error()
@log_exceptions() @log_exceptions()
def read_observation_id_R(self): def read_observation_id_R(self):
"""Return the observation_id_R attribute.""" """Return the observation_id_R attribute."""
return self._observation_id return self._observation_settings.observation_id
@only_when_on() @only_when_on()
@fault_on_error() @fault_on_error()
@log_exceptions() @log_exceptions()
def read_stop_time_R(self): def read_stop_time_R(self):
"""Return the stop_time_R attribute.""" """Return the stop_time_R attribute."""
return self._stop_time.timestamp() return self._observation_settings.stop_time.timestamp()
@only_in_states([DevState.STANDBY, DevState.ON]) @only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error() @fault_on_error()
@log_exceptions() @log_exceptions()
def read_antenna_mask_R(self): def read_antenna_mask_R(self):
"""Return the antenna_mask_R attribute.""" """Return the antenna_mask_R attribute."""
return self._antenna_mask return self._observation_settings.antenna_mask
@only_in_states([DevState.STANDBY, DevState.ON]) @only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error() @fault_on_error()
@log_exceptions() @log_exceptions()
def read_filter_R(self): def read_filter_R(self):
"""Return the filter_R attribute.""" """Return the filter_R attribute."""
return self._filter return self._observation_settings.filter
@only_in_states([DevState.STANDBY, DevState.ON]) @only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error() @fault_on_error()
@log_exceptions() @log_exceptions()
def read_saps_subband_R(self): def read_saps_subband_R(self):
"""Return the saps_subband_R attribute.""" """Return the saps_subband_R attribute."""
return self._saps_subband return [sap.subbands for sap in self._observation_settings.SAPs]
@only_in_states([DevState.STANDBY, DevState.ON]) @only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error() @fault_on_error()
...@@ -194,27 +193,35 @@ class Observation(lofar_device): ...@@ -194,27 +193,35 @@ class Observation(lofar_device):
@log_exceptions() @log_exceptions()
def read_tile_beam_R(self): def read_tile_beam_R(self):
"""Return the tile_beam_R attribute.""" """Return the tile_beam_R attribute."""
return self._tile_beam if self._observation_settings.tile_beam is None:
return None
pointing_direction = self._observation_settings.tile_beam
return [str(pointing_direction.direction_type), f"{pointing_direction.angle1}deg",
f"{pointing_direction.angle2}deg"]
@only_in_states([DevState.STANDBY, DevState.ON]) @only_in_states([DevState.STANDBY, DevState.ON])
@fault_on_error() @fault_on_error()
@log_exceptions() @log_exceptions()
def read_first_beamlet_R(self): def read_first_beamlet_R(self):
"""Return the first_beamlet_R attribute.""" """Return the first_beamlet_R attribute."""
return self._first_beamlet return self._observation_settings.first_beamlet
@fault_on_error() @fault_on_error()
@log_exceptions() @log_exceptions()
def read_observation_settings_RW(self): def read_observation_settings_RW(self):
"""Return current observation_parameters string""" """Return current observation_parameters string"""
return self._observation_settings return None if self._observation_settings is None else self._observation_settings.to_json()
@only_in_states([DevState.OFF]) @only_in_states([DevState.OFF])
@fault_on_error() @fault_on_error()
@log_exceptions() @log_exceptions()
def write_observation_settings_RW(self, parameters: str): def write_observation_settings_RW(self, parameters: str):
"""No validation on configuring parameters as task of control device""" """No validation on configuring parameters as task of control device"""
self._observation_settings = parameters try:
self._observation_settings = ObservationSettings.from_json(parameters)
except ValidationError as e:
self._observation_settings = None
#Except.throw_exception("IllegalCommand", e.message, __name__)
@only_when_on() @only_when_on()
@fault_on_error() @fault_on_error()
...@@ -225,19 +232,16 @@ class Observation(lofar_device): ...@@ -225,19 +232,16 @@ class Observation(lofar_device):
# value # value
return time() return time()
def _build_saps_pointing(self, parameters:dict): def _build_saps_pointing(self, parameters: ObservationSettings):
""" Build the sap pointing array preserving the correct order from JSON """ """ Build the sap pointing array preserving the correct order from JSON """
saps_pointing = [] saps_pointing = []
for i in range(0, self._num_saps): for i in range(0, self._num_saps):
pointing_direction = parameters["SAPs"][i]['pointing'] pointing_direction = parameters.SAPs[i].pointing
saps_pointing.insert(i,(pointing_direction['direction_type'], f"{pointing_direction['angle1']}deg", f"{pointing_direction['angle2']}deg")) saps_pointing.insert(i, (
pointing_direction.direction_type, f"{pointing_direction.angle1}deg",
f"{pointing_direction.angle2}deg"))
return saps_pointing return saps_pointing
def _build_tilebeam_pointing(self, parameters:dict):
""" Build the sap pointing array preserving the correct order from JSON """
pointing_direction = parameters["tile_beam"]
return [str(pointing_direction['direction_type']), f"{pointing_direction['angle1']}deg", f"{pointing_direction['angle2']}deg"]
def _apply_antennafield_settings(self, antenna_mask: list, filter_name: str): def _apply_antennafield_settings(self, antenna_mask: list, filter_name: str):
""" Convert an array of antenna indexes into a boolean mask array and """ Convert an array of antenna indexes into a boolean mask array and
retrieve the RCU band from filter name, returning the correct format for retrieve the RCU band from filter name, returning the correct format for
...@@ -262,7 +266,8 @@ class Observation(lofar_device): ...@@ -262,7 +266,8 @@ class Observation(lofar_device):
def _apply_saps_pointing(self, sap_pointing: list): def _apply_saps_pointing(self, sap_pointing: list):
""" Convert an array of string directions into the correct format for DigitalBeam device""" """ Convert an array of string directions into the correct format for DigitalBeam device"""
pointing_direction = list(self.digitalbeam_proxy.Pointing_direction_RW) # convert to list to allows item assignment pointing_direction = list(
self.digitalbeam_proxy.Pointing_direction_RW) # convert to list to allows item assignment
first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64) first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64)
# Insert pointing values starting from the first beamlet # Insert pointing values starting from the first beamlet
pointing_direction[first_beamlet:len(sap_pointing)] = sap_pointing pointing_direction[first_beamlet:len(sap_pointing)] = sap_pointing
...@@ -277,6 +282,7 @@ class Observation(lofar_device): ...@@ -277,6 +282,7 @@ class Observation(lofar_device):
antenna_select[a, i] = True antenna_select[a, i] = True
return antenna_select return antenna_select
# ---------- # ----------
# Run server # Run server
# ---------- # ----------
......
...@@ -106,8 +106,8 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): ...@@ -106,8 +106,8 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase):
"""Initialize an observation with _invalid_ JSON""" """Initialize an observation with _invalid_ JSON"""
self.proxy.off() self.proxy.off()
self.proxy.observation_settings_RW = "{}"
with self.assertRaises(DevFailed): with self.assertRaises(DevFailed):
self.proxy.observation_settings_RW = "{}"
self.proxy.Initialise() self.proxy.Initialise()
self.assertEqual(DevState.FAULT, self.proxy.state()) self.assertEqual(DevState.FAULT, self.proxy.state())
...@@ -125,6 +125,7 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): ...@@ -125,6 +125,7 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase):
def test_attribute_match(self): def test_attribute_match(self):
"""Test that JSON data is exposed to attributes""" """Test that JSON data is exposed to attributes"""
#failing
data = loads(self.VALID_JSON) data = loads(self.VALID_JSON)
stop_timestamp = datetime.fromisoformat(data["stop_time"]).timestamp() stop_timestamp = datetime.fromisoformat(data["stop_time"]).timestamp()
observation_id = data["observation_id"] observation_id = data["observation_id"]
...@@ -209,6 +210,7 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): ...@@ -209,6 +210,7 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase):
self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[10], [False] * N_beamlets_ctrl) self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[10], [False] * N_beamlets_ctrl)
def test_apply_tilebeam(self): def test_apply_tilebeam(self):
#failing
"""Test that attribute tilebeam is correctly applied""" """Test that attribute tilebeam is correctly applied"""
tilebeam_proxy = self.setup_tilebeam_proxy() tilebeam_proxy = self.setup_tilebeam_proxy()
pointing_direction = [("J2000","0deg","0deg")] * DEFAULT_N_HBA_TILES pointing_direction = [("J2000","0deg","0deg")] * DEFAULT_N_HBA_TILES
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment