Skip to content
Snippets Groups Projects
Commit 2a46a483 authored by Corné Lukken's avatar Corné Lukken Committed by Hannes Feldt
Browse files

Resolve L2SS-1740: Statistics metadata

parent 83fa2763
No related branches found
No related tags found
1 merge request!903Resolve L2SS-1740: Statistics metadata
Showing
with 420 additions and 56 deletions
...@@ -146,8 +146,27 @@ ...@@ -146,8 +146,27 @@
"STAT/ObservationControl/1": { "STAT/ObservationControl/1": {
"properties": { "properties": {
"Power_Children": [ "Power_Children": [
"STAT/Metadata/1"
], ],
"Control_Children": [ "Control_Children": [
"STAT/Metadata/1"
]
}
}
}
}
},
"Metadata": {
"STAT": {
"Metadata": {
"STAT/Metadata/1": {
"properties": {
"Power_Children": [
],
"Control_Children": [
],
"Metadata_Port": [
"6001"
] ]
} }
} }
......
...@@ -161,6 +161,7 @@ Next change the version in the following places: ...@@ -161,6 +161,7 @@ Next change the version in the following places:
# Release Notes # Release Notes
* 0.38.0 Add metadata device publishing zmq events
* 0.37.2 Improved event-subscription interface, avoid overlap between polling loops. * 0.37.2 Improved event-subscription interface, avoid overlap between polling loops.
* 0.37.1 Improved asyncio resource teardown when devices go Off. * 0.37.1 Improved asyncio resource teardown when devices go Off.
* 0.37.0-1 Fix for deploying on DTS Lab * 0.37.0-1 Fix for deploying on DTS Lab
......
...@@ -78,11 +78,11 @@ ccd = OptionalDeviceProxy("STAT/CCD/1") ...@@ -78,11 +78,11 @@ ccd = OptionalDeviceProxy("STAT/CCD/1")
ec = OptionalDeviceProxy("STAT/EC/1") ec = OptionalDeviceProxy("STAT/EC/1")
pcon = OptionalDeviceProxy("STAT/PCON/1") pcon = OptionalDeviceProxy("STAT/PCON/1")
psoc = OptionalDeviceProxy("STAT/PSOC/1") psoc = OptionalDeviceProxy("STAT/PSOC/1")
docker = OptionalDeviceProxy("STAT/Docker/1")
temperaturemanager = OptionalDeviceProxy("STAT/TemperatureManager/1") temperaturemanager = OptionalDeviceProxy("STAT/TemperatureManager/1")
configuration = OptionalDeviceProxy("STAT/Configuration/1") configuration = OptionalDeviceProxy("STAT/Configuration/1")
calibration = OptionalDeviceProxy("STAT/Calibration/1") calibration = OptionalDeviceProxy("STAT/Calibration/1")
observationcontrol = OptionalDeviceProxy("STAT/ObservationControl/1") observationcontrol = OptionalDeviceProxy("STAT/ObservationControl/1")
metadata = OptionalDeviceProxy("STAT/Metadata/1")
# Put them in a list in case one wants to iterate # Put them in a list in case one wants to iterate
devices = ( devices = (
...@@ -94,7 +94,6 @@ devices = ( ...@@ -94,7 +94,6 @@ devices = (
ec, ec,
pcon, pcon,
psoc, psoc,
docker,
temperaturemanager, temperaturemanager,
configuration, configuration,
sdpfirmware_l, sdpfirmware_l,
......
...@@ -78,6 +78,7 @@ devices: ...@@ -78,6 +78,7 @@ devices:
- DigitalBeam - DigitalBeam
- EC - EC
- ObservationControl - ObservationControl
- Metadata
- PCON - PCON
- PSOC - PSOC
- RECVH - RECVH
......
...@@ -5,7 +5,6 @@ if [ ! -f "setup.sh" ]; then ...@@ -5,7 +5,6 @@ if [ ! -f "setup.sh" ]; then
exit 1 exit 1
fi fi
pip install pre-commit
pre-commit install --hook-type pre-push pre-commit install --hook-type pre-push
# --allow-missing-config: Allows the installation to proceed # --allow-missing-config: Allows the installation to proceed
......
...@@ -282,7 +282,7 @@ echo "Using tango host $TANGO_HOST" ...@@ -282,7 +282,7 @@ echo "Using tango host $TANGO_HOST"
# Devices list is used to explitly word split when supplied to commands, must # Devices list is used to explitly word split when supplied to commands, must
# disable shellcheck SC2086 for each case. # disable shellcheck SC2086 for each case.
DEVICES=(device-stationmanager device-aps device-apsct device-ccd device-ec device-apspu device-sdpfirmware device-sdp device-recvh device-recvl device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-afh device-afl device-temperaturemanager device-observationcontrol device-configuration device-calibration) DEVICES=(device-stationmanager device-aps device-apsct device-ccd device-ec device-apspu device-sdpfirmware device-sdp device-recvh device-recvl device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-afh device-afl device-temperaturemanager device-observationcontrol device-configuration device-calibration device-metadata)
# Wait for devices to restart # Wait for devices to restart
......
...@@ -22,6 +22,7 @@ fi ...@@ -22,6 +22,7 @@ fi
# Activate the virtual environment # Activate the virtual environment
source "$VENV_DIR/bin/activate" source "$VENV_DIR/bin/activate"
pip install pre-commit
# Install git hooks # Install git hooks
if [ ! -f "${LOFAR20_DIR}/.git/hooks/post-checkout" ]; then if [ ! -f "${LOFAR20_DIR}/.git/hooks/post-checkout" ]; then
......
0.37.2 0.38.0
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import logging
from integration_test.default.devices.base import AbstractTestBases
logger = logging.getLogger()
class TestMetadataDevice(AbstractTestBases.TestDeviceBase):
def setUp(self):
super().setUp("STAT/Metadata/1")
self.psoc_proxy = self.setup_proxy("STAT/PSOC/1")
def test_send_metadata(self):
"""Turn on the device and emit metadata"""
self.proxy.boot()
self.proxy.send_metadata()
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import json import json
import logging import logging
import time
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
...@@ -14,7 +15,7 @@ from tangostationcontrol.common.constants import CS001_TILES ...@@ -14,7 +15,7 @@ from tangostationcontrol.common.constants import CS001_TILES
from tangostationcontrol.test.dummy_observation_settings import ( from tangostationcontrol.test.dummy_observation_settings import (
get_observation_settings_hba_immediate, get_observation_settings_hba_immediate,
) )
from timeout_decorator import timeout_decorator
logger = logging.getLogger() logger = logging.getLogger()
...@@ -83,6 +84,7 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): ...@@ -83,6 +84,7 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase):
self.recv_proxy = self.setup_proxy("STAT/RECVH/H0", defaults=True) self.recv_proxy = self.setup_proxy("STAT/RECVH/H0", defaults=True)
self.sdpfirmware_proxy = self.setup_proxy("STAT/SDPFirmware/HBA0") self.sdpfirmware_proxy = self.setup_proxy("STAT/SDPFirmware/HBA0")
self.sdp_proxy = self.setup_proxy("STAT/SDP/HBA0") self.sdp_proxy = self.setup_proxy("STAT/SDP/HBA0")
self.metadata_proxy = self.setup_proxy("STAT/Metadata/1")
control_mapping = [[1, i] for i in range(CS001_TILES)] control_mapping = [[1, i] for i in range(CS001_TILES)]
self.antennafield_proxy = self.setup_proxy( self.antennafield_proxy = self.setup_proxy(
...@@ -284,3 +286,39 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): ...@@ -284,3 +286,39 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase):
self.on_device_assert(self.proxy) self.on_device_assert(self.proxy)
self.assertRaises(DevFailed, self.proxy.add_observation, json.dumps(parameters)) self.assertRaises(DevFailed, self.proxy.add_observation, json.dumps(parameters))
@timeout_decorator.timeout(60)
def test_add_observation_callbacks(self):
"""Test adding an observation and checking start / stop callbacks work"""
self.on_device_assert(self.proxy)
parameters = json.loads(self.VALID_JSON)
for antenna_field in parameters["antenna_fields"]:
antenna_field["start_time"] = (
datetime.now() + timedelta(seconds=5)
).isoformat()
antenna_field["stop_time"] = (
datetime.now() + timedelta(seconds=20)
).isoformat()
self.proxy.add_observation(json.dumps(parameters))
self.assertFalse(self.proxy.is_observation_running(self.EXPECTED_OBS_ID))
time.sleep(5)
while not self.proxy.is_observation_running(self.EXPECTED_OBS_ID):
logging.info("Waiting for observation to start...")
time.sleep(0.1)
self.assertTrue(self.proxy.is_observation_running(self.EXPECTED_OBS_ID))
self.assertGreater(self.metadata_proxy.messages_published_R, 0)
time.sleep(20)
while self.proxy.is_observation_running(self.EXPECTED_OBS_ID):
logging.info("Waiting for observation to stop...")
time.sleep(0.1)
self.assertFalse(self.proxy.is_observation_running(self.EXPECTED_OBS_ID))
...@@ -3,7 +3,8 @@ ...@@ -3,7 +3,8 @@
# integration process, which may cause wedges in the gate later. # integration process, which may cause wedges in the gate later.
lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client # Apache 2 lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client # Apache 2
PyTango>=9.5.1rc1 # LGPL v3 PyTango>=9.5.1 # LGPL v3
pyzmq>=24 # LGPL + BSD
numpy>=1.21.6 # BSD3 numpy>=1.21.6 # BSD3
asyncua >= 0.9.90 # LGPLv3 asyncua >= 0.9.90 # LGPLv3
psycopg2-binary >= 2.9.2 # LGPL psycopg2-binary >= 2.9.2 # LGPL
......
...@@ -6,7 +6,7 @@ import time ...@@ -6,7 +6,7 @@ import time
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from lofar_station_client.statistics.collector import StatisticsCollector from lofar_station_client.statistics.collectors import StatisticsCollector
from lofar_station_client.statistics.packets import StatisticsPacket from lofar_station_client.statistics.packets import StatisticsPacket
from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
......
...@@ -106,6 +106,9 @@ MAX_ETH_FRAME_SIZE = 9000 ...@@ -106,6 +106,9 @@ MAX_ETH_FRAME_SIZE = 9000
# The default polling period for polled attributes (in milliseconds) # The default polling period for polled attributes (in milliseconds)
DEFAULT_POLLING_PERIOD_MS = 1000 DEFAULT_POLLING_PERIOD_MS = 1000
# The metadata polling period for polled attributes (in milliseconds)
DEFAULT_POLLING_PERIOD_METADATA_MS = 60000
# The default polling period for attributes polled to update metrics in Prometheus (in milliseconds) # The default polling period for attributes polled to update metrics in Prometheus (in milliseconds)
DEFAULT_METRICS_POLLING_PERIOD_MS = 2500 DEFAULT_METRICS_POLLING_PERIOD_MS = 2500
......
...@@ -4,13 +4,16 @@ ...@@ -4,13 +4,16 @@
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial from functools import partial
import logging import logging
from typing import List, Dict, Callable from typing import List, Dict, Callable, Optional
from tango import DeviceProxy, EventType, DevFailed from tango import DeviceProxy, EventType, DevFailed
from prometheus_client import Counter from prometheus_client import Counter
from tangostationcontrol.common.lofar_logging import exception_to_str from tangostationcontrol.common.lofar_logging import exception_to_str
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_MS from tangostationcontrol.common.constants import (
DEFAULT_POLLING_PERIOD_MS,
DEFAULT_METRICS_POLLING_PERIOD_MS,
)
from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString
from tangostationcontrol.metrics import AttributeMetric from tangostationcontrol.metrics import AttributeMetric
...@@ -43,7 +46,9 @@ class EventSubscriptions: ...@@ -43,7 +46,9 @@ class EventSubscriptions:
dynamic_labels=["event_device", "event_attribute"], dynamic_labels=["event_device", "event_attribute"],
) )
def _get_subscription(self, dev_name: str, attr_name: str) -> bool: def _get_subscription(
self, dev_name: str, attr_name: str
) -> Optional[Subscription]:
for sub in self.subscriptions: for sub in self.subscriptions:
if ( if (
CaseInsensitiveString(sub.proxy.name()) == dev_name CaseInsensitiveString(sub.proxy.name()) == dev_name
...@@ -100,11 +105,31 @@ class EventSubscriptions: ...@@ -100,11 +105,31 @@ class EventSubscriptions:
f"Callback {callback} for device {device} attribute {attribute_name} threw an exception: {exception_to_str(ex)}" f"Callback {callback} for device {device} attribute {attribute_name} threw an exception: {exception_to_str(ex)}"
) )
def poll_period(self, proxy: DeviceProxy, attr_name: str) -> int | None:
"""Return the polling period for the given attribute, in milliseconds,
or None if the attribute is not polled at all."""
# LofarDevice polls a set of attributes not covered by Tango's poll loop
try:
if proxy.is_attribute_polled_by_lofardevice(attr_name):
return DEFAULT_METRICS_POLLING_PERIOD_MS
except AttributeError:
# Not a LofarDevice
pass
# Check if polled by Tango
if proxy.is_attribute_polled(attr_name):
return proxy.get_attribute_poll_period(attr_name)
# No polling detected
return None
def subscribe_change_event( def subscribe_change_event(
self, self,
proxy: DeviceProxy, proxy: DeviceProxy,
attr_name: str, attr_name: str,
callback: EventCallbackType, callback: EventCallbackType,
period: int = DEFAULT_POLLING_PERIOD_MS,
): ):
"""Subscribe to changes to an attribute of another device. """Subscribe to changes to an attribute of another device.
Immediately and on change, the provided callback will be called as Immediately and on change, the provided callback will be called as
...@@ -117,6 +142,24 @@ class EventSubscriptions: ...@@ -117,6 +142,24 @@ class EventSubscriptions:
the callback. the callback.
""" """
# make sure th attribute is polled often enough
current_period = self.poll_period(proxy, attr_name)
if current_period is None or current_period > period:
if current_period is None:
logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.")
else:
logger.info(
"Adjusting tango controlled polling period from %d to %d",
current_period,
period,
)
# NB: This requires either abs_change or rel_change to be set
# for numerical attributes.
proxy.poll_attribute(attr_name, period)
# add or extend subscription to CHANGE_EVENT
if sub := self._get_subscription(proxy.name(), attr_name): if sub := self._get_subscription(proxy.name(), attr_name):
# extend existing subscription # extend existing subscription
logger.debug( logger.debug(
...@@ -143,13 +186,6 @@ class EventSubscriptions: ...@@ -143,13 +186,6 @@ class EventSubscriptions:
callbacks=[callback], callbacks=[callback],
) )
# make sure the attribute is polled, otherwise we wont receive event
if not proxy.is_attribute_polled_by_lofardevice(attr_name):
# LOFARDevice does not poll this attribute, but maybe Tango does
if not proxy.is_attribute_polled(attr_name):
logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.")
proxy.poll_attribute(attr_name, DEFAULT_POLLING_PERIOD_MS)
# subscribe # subscribe
logger.debug( logger.debug(
f"Subscribing callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}" f"Subscribing callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}"
......
...@@ -6,17 +6,12 @@ This directory contains the sources for our custom Tango devices. ...@@ -6,17 +6,12 @@ This directory contains the sources for our custom Tango devices.
If a new device is added, it will (likely) need to be referenced in several places. Adjust or add the following files (referenced from the repository root), following the pattern shown by the devices already there: If a new device is added, it will (likely) need to be referenced in several places. Adjust or add the following files (referenced from the repository root), following the pattern shown by the devices already there:
- Adjust `CDB/LOFAR_ConfigDb.json` to create the device in the Tango device database, - Adjust `CDB/stations/common.json` to create the device in the Tango device database,
- Add the device hierarchies in `CDB/hierarchies` to define the power, control and clock configuration, - Add the device hierarchies to define the power and control configuration in `common.json`
- Adjust `docker/jupyter-lab/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py` to make an alias for it available in Jupyter-Lab, - Adjust `docker/jupyter-lab/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py` to make an alias for it available in Jupyter-Lab,
- Adjust `tangostationcontrol/tangostationcontrol/devices/boot.py` to add the device to the station initialisation sequence, - Add to `infra/env.yaml` the device class.
- Add to `docker-compose/` to create a YaML file to start the device in a docker container. NOTE: it needs a unique 57xx port assigned (current _unused_ port value: 5731), a unique 58xx port for ZMQ events, and a unique 59xx port for ZMQ heartbeat - Adjust `tangostationcontrol/devices/__init__.py` to include the new device class and add to `__all__`
- Adjust `tangostationcontrol/setup.cfg` to add an entry point for the device in the package installation, - Add the device class to `/sbin/run_integration_test.sh` list of devices
- Add to `tangostationcontrol/tangostationcontrol/integration_test/default/devices/` to add an integration test, - Create `tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_xxx.py` to add an integration test,
- Adjust `sbin/run_integration_test.sh` to have the device started when running the integration tests,
- Adjust `.gitlab-ci.yml` to add the device to the `docker_build_image_all` step and to create a `docker_build_image_device_XXX` step (N.B. not necessary if device is based on `lofar-device-base`),
- Add to `sbin/tag_and_push_docker_image.sh` the LOCAL_IMAGES device name, imagine name and build for integration boolean triple (N.B. not necessary if device is based on `lofar-device-base`),
- Add to `tangostationcontrol/docs/source/devices/` to mention the device in the end-user documentation. - Add to `tangostationcontrol/docs/source/devices/` to mention the device in the end-user documentation.
- Adjust `tangostationcontrol/docs/source/index.rst` to include the newly created file in `docs/source/devices/`. - Adjust `tangostationcontrol/docs/source/index.rst` to include the newly created file in `docs/source/devices/`.
- Adjust `docker-compose/tango-prometheus-exporter/lofar2-policy.json` (in all lowercase) to include this device in the prometheus exporter
- Adjust `docker-compose/tango-prometheus-exporter/lofar2-fast-policy.json` (in all lowercase) to include this device in the prometheus exporter.
...@@ -12,6 +12,7 @@ from .configuration import Configuration ...@@ -12,6 +12,7 @@ from .configuration import Configuration
from .ec import EC from .ec import EC
from .observation_field import ObservationField from .observation_field import ObservationField
from .observation_control import ObservationControl from .observation_control import ObservationControl
from .metadata import Metadata
from .pcon import PCON from .pcon import PCON
from .psoc import PSOC from .psoc import PSOC
from .recv.recvh import RECVH from .recv.recvh import RECVH
...@@ -41,6 +42,7 @@ __all__ = [ ...@@ -41,6 +42,7 @@ __all__ = [
"EC", "EC",
"ObservationField", "ObservationField",
"ObservationControl", "ObservationControl",
"Metadata",
"PCON", "PCON",
"PSOC", "PSOC",
"RECVL", "RECVL",
......
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Metadata device to emit various data"""
# Additional import
import logging
import time
from typing import Any
import numpy
from tango import DebugIt, DeviceProxy
from tango.server import device_property, attribute, command
# PyTango imports
from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_METADATA_MS
from tangostationcontrol.common.device_decorators import only_in_states
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice
from tangostationcontrol.metadata.metadata_organizer import MetadataOrganizer
from tangostationcontrol.metadata.metadata_types import metadata_config_type
from tangostationcontrol.metadata.metadata_types import metadata_proxy_type
from tangostationcontrol.metrics import device_metrics
from tangostationcontrol.zeromq.publisher import ZeroMQPublisher
logger = logging.getLogger()
__all__ = ["Metadata"]
@device_metrics()
class Metadata(LOFARDevice):
"""Metadata device to emit data about relevant to statistics and observations"""
# -----------------
# Device Properties
# -----------------
metadata_protocol = device_property(dtype="DevString", default_value="tcp")
metadata_bind = device_property(dtype="DevString", default_value="*")
metadata_port = device_property(dtype="DevUShort", mandatory=True)
metadata_topic = device_property(dtype="DevString", default_value="metadata")
# ----------
# Attributes
# ----------
@attribute(
doc="Is the metadata publisher running",
dtype=bool,
)
def is_running_R(self):
if self._publisher:
return self._publisher.is_running
return False
@attribute(
doc="Is the metadata publisher stopping",
dtype=bool,
)
def is_stopping_R(self):
if self._publisher:
return self._publisher.is_stopping
return False
@attribute(
doc="Queue fill percentage",
dtype=numpy.float64,
)
def queue_fill_percentage_R(self):
if self._publisher:
return self._publisher.queue_fill / self._publisher.queue_size
return 0.0
@attribute(
doc="Number of messages published since device was started", dtype=numpy.int64
)
def messages_published_R(self):
return self._num_published
# --------
# Overloaded functions
# --------
STOP_TIME_LIMIT = 10
METADATA_CONFIG: metadata_config_type = CaseInsensitiveDict(
{
"AFH": [
"Antenna_to_SDP_Mapping_R",
"Antenna_Names_R",
"RCU_PCB_ID_R",
"RCU_PCB_version_R",
"Antenna_Usage_Mask_R",
"Antenna_Reference_ITRF_R",
"Frequency_Band_RW",
"RCU_attenuator_dB_R",
"RCU_DTH_on_R",
"RCU_DTH_freq_R",
"HBAT_PWR_on_R",
],
"AFL": [
"Antenna_to_SDP_Mapping_R",
"Antenna_Names_R",
"RCU_PCB_ID_R",
"RCU_PCB_version_R",
"Antenna_Usage_Mask_R",
"Antenna_Reference_ITRF_R",
"Frequency_Band_RW",
"RCU_attenuator_dB_R",
"RCU_DTH_on_R",
"RCU_DTH_freq_R",
],
"SDP": ["subband_frequency_R"],
"TileBeam": ["Pointing_direction_str_R", "Tracking_enabled_R"],
"DigitalBeam": [
"Pointing_direction_str_R",
"Tracking_enabled_R",
"subband_select_RW",
],
"SDPFirmware": [
"FPGA_firmware_version_R",
"FPGA_hardware_version_R",
"nr_signal_inputs_R",
"first_signal_input_index_R",
],
}
)
def __init__(self, cl, name):
self._num_published = 0
self._publisher = None
self._organizer = None
# Super must be called after variable assignment due to executing init_device!
super().__init__(cl, name)
def _wait_for_done(self):
"""Spin until publisher is done"""
accumulate = 0
while not self._publisher.is_done and accumulate < self.STOP_TIME_LIMIT:
logger.debug("Waiting for publisher thread to stop..")
time.sleep(0.1)
accumulate += 0.1
if accumulate > self.STOP_TIME_LIMIT:
raise TimeoutError("Failed to stop publisher thread!")
def configure_for_off(self):
"""user code here. is called when the state is set to OFF"""
super().configure_for_off()
# Delete so that event subscriptions are recreated
if self._organizer:
del self._organizer
self._organizer = None
if self._publisher:
self._publisher.shutdown()
try:
self._wait_for_done()
finally:
self._publisher = None
@log_exceptions()
def configure_for_initialise(self):
"""Initialises the attributes and properties of the statistics device."""
super().configure_for_initialise()
if not self._publisher:
self._publisher = ZeroMQPublisher(
ZeroMQPublisher.contstruct_bind_uri(
self.metadata_protocol, self.metadata_bind, self.metadata_port
),
[self.metadata_topic],
)
if not self._organizer:
self._organizer = MetadataOrganizer(config=self.METADATA_CONFIG)
proxies = self._organizer.create_proxies()
self.register_change_event_subscriptions(proxies)
def register_change_event_subscriptions(self, proxies: metadata_proxy_type):
"""Register change events for working device proxies"""
for device_name, proxy in proxies.items():
if not isinstance(proxy, DeviceProxy):
logger.exception(
"Failed to subscribe to change events for device %s due to %s",
device_name,
proxy,
)
continue
dev_info = proxy.info()
attributes = self.METADATA_CONFIG.get(dev_info.dev_class, None)
if not attributes:
logger.warning(
"register_change_event_subscriptions called for device (%s) not "
"present in METADATA_CONFIG, aborting subscription",
device_name,
)
continue
for attribute_name in attributes:
try:
self.events.subscribe_change_event(
proxy,
attribute_name,
self.change_event_handler,
DEFAULT_POLLING_PERIOD_METADATA_MS,
)
except Exception as e:
logger.exception(
"Subscription for attribute %s on device %s failed due to %s",
attribute_name,
device_name,
e,
)
def change_event_handler(
self, device: DeviceProxy, attribute_name: str, value: Any
):
logger.debug(
"Attribute %s for device %s changed",
device,
attribute_name,
)
self._organizer.partial_update(device, attribute_name, value)
self._send_metadata()
def _send_metadata(self):
""""""
self._publisher.send(self._organizer.get_json())
self._num_published += 1
@command()
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
def send_metadata(self):
"""Gather and publish metadata attempt to connect failed proxies"""
new_proxies = self._organizer.create_proxies()
self.register_change_event_subscriptions(new_proxies)
self._organizer.gather_metadata()
self._send_metadata()
...@@ -19,6 +19,7 @@ from tangostationcontrol.common.lofar_logging import ( ...@@ -19,6 +19,7 @@ from tangostationcontrol.common.lofar_logging import (
from tangostationcontrol.configuration import ObservationSettings from tangostationcontrol.configuration import ObservationSettings
from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice
from tangostationcontrol.devices.observation_field import ObservationField from tangostationcontrol.devices.observation_field import ObservationField
from tangostationcontrol.devices.types import DeviceTypes
from tangostationcontrol.common.device_decorators import only_when_on, debugit from tangostationcontrol.common.device_decorators import only_when_on, debugit
from tangostationcontrol.metrics import device_metrics from tangostationcontrol.metrics import device_metrics
...@@ -108,14 +109,27 @@ class ObservationControl(LOFARDevice): ...@@ -108,14 +109,27 @@ class ObservationControl(LOFARDevice):
# The top level tango domain is the left-most part of a # The top level tango domain is the left-most part of a
# device's name. # device's name.
self.metadata = None
self.myTangoDomain: str = self.get_name().split("/")[0] self.myTangoDomain: str = self.get_name().split("/")[0]
self._observation_controller: ObservationController = ObservationController( self._observation_controller: ObservationController = ObservationController(
self.myTangoDomain tango_domain=self.myTangoDomain,
start_callback=self._start_observation_callback,
) )
self._stop_all_observations_now() self._stop_all_observations_now()
def _start_observation_callback(self, observation_id: int):
"""Callback for when an observation is started, use to emit metadata"""
try:
self.metadata.send_metadata()
except Exception as e:
logger.exception(
f"Observation start callback for observation {observation_id} "
f"failed due to {e}"
)
# Core functions # Core functions
@log_exceptions() @log_exceptions()
@DebugIt() @DebugIt()
...@@ -128,6 +142,12 @@ class ObservationControl(LOFARDevice): ...@@ -128,6 +142,12 @@ class ObservationControl(LOFARDevice):
# devices. # devices.
Util.instance().set_polling_threads_pool_size(10) Util.instance().set_polling_threads_pool_size(10)
@log_exceptions()
def configure_for_initialise(self):
super().configure_for_initialise()
self.metadata = self.control.child(DeviceTypes.Metadata)
# Lifecycle functions # Lifecycle functions
@log_exceptions() @log_exceptions()
def configure_for_off(self): def configure_for_off(self):
...@@ -135,30 +155,6 @@ class ObservationControl(LOFARDevice): ...@@ -135,30 +155,6 @@ class ObservationControl(LOFARDevice):
self._stop_all_observations_now() self._stop_all_observations_now()
# API
@command(dtype_in=DevString)
@only_when_on()
@log_exceptions()
def start_observation(self, parameters: DevString = None):
"""Deprecated. For backward compatibility with old lofar-station-clients."""
logger.warning(
"Deprecated start_observation command used, please use add_observation "
"or start_observation_now instead"
)
self.add_observation(parameters)
@command(dtype_in=numpy.int64)
@only_when_on()
@log_exceptions()
def stop_observation(self, obs_id: numpy.int64):
"""Deprecated. For backward compatibility with old lofar-station-clients."""
logger.warning(
"Deprecated stop_observation command used, please use stop_observation_now "
"instead"
)
self.stop_observation_now(obs_id)
@command(dtype_in=DevString) @command(dtype_in=DevString)
@only_when_on() @only_when_on()
@debugit() @debugit()
......
...@@ -9,7 +9,7 @@ import numpy ...@@ -9,7 +9,7 @@ import numpy
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from attribute_wrapper.attribute_wrapper import AttributeWrapper from attribute_wrapper.attribute_wrapper import AttributeWrapper
from lofar_station_client.statistics.collector import BSTCollector from lofar_station_client.statistics.collectors import BSTCollector
from tango import AttrWriteType from tango import AttrWriteType
from tango.server import device_property, attribute from tango.server import device_property, attribute
from tangostationcontrol.clients.opcua_client import OPCUAConnection from tangostationcontrol.clients.opcua_client import OPCUAConnection
......
...@@ -9,7 +9,7 @@ import numpy ...@@ -9,7 +9,7 @@ import numpy
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from attribute_wrapper.attribute_wrapper import AttributeWrapper from attribute_wrapper.attribute_wrapper import AttributeWrapper
from lofar_station_client.statistics.collector import SSTCollector from lofar_station_client.statistics.collectors import SSTCollector
# PyTango imports # PyTango imports
from tango import AttrWriteType from tango import AttrWriteType
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment