Skip to content
Snippets Groups Projects

L2SS-1906: Periodically publish all metadata

Merged Jan David Mol requested to merge enhance-metadata-device into master

Files

@@ -14,7 +14,11 @@ from tango.server import device_property, attribute, command
@@ -14,7 +14,11 @@ from tango.server import device_property, attribute, command
# PyTango imports
# PyTango imports
from lofar_station_client.common import CaseInsensitiveDict
from lofar_station_client.common import CaseInsensitiveDict
from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_METADATA_MS
from tangostationcontrol.common.asyncio import PeriodicTask
 
from tangostationcontrol.common.constants import (
 
DEFAULT_POLLING_PERIOD_METADATA_MS,
 
DEFAULT_POLLING_PERIOD_MS,
 
)
from tangostationcontrol.common.device_decorators import only_in_states
from tangostationcontrol.common.device_decorators import only_in_states
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
@@ -47,39 +51,60 @@ class Metadata(LOFARDevice):
@@ -47,39 +51,60 @@ class Metadata(LOFARDevice):
metadata_topic = device_property(dtype="DevString", default_value="metadata")
metadata_topic = device_property(dtype="DevString", default_value="metadata")
 
metadata_periodic_publish_interval = device_property(
 
dtype="DevFloat",
 
doc="Interval with which to force publication of all metadata.",
 
mandatory=False,
 
default_value=600.0,
 
)
 
# ----------
# ----------
# Attributes
# Attributes
# ----------
# ----------
 
@attribute(
 
doc="Whether the metadata is periodically published",
 
dtype=bool,
 
# Tango needs to poll this, as otherwise this attribute will never
 
# be exposed as "False" as the event thread must run to do so.
 
polling_period=DEFAULT_POLLING_PERIOD_MS,
 
)
 
def metadata_periodic_publish_thread_running_R(self):
 
return (
 
self.event_loop_thread
 
and self.event_loop_thread.is_running()
 
and self.metadata_periodic_publish_task
 
and self.metadata_periodic_publish_task.is_running()
 
)
 
@attribute(
@attribute(
doc="Is the metadata publisher running",
doc="Is the metadata publisher running",
dtype=bool,
dtype=bool,
)
)
def is_running_R(self):
def is_publisher_running_R(self):
if self._publisher:
return self._publisher.is_running if self._publisher else False
return self._publisher.is_running
return False
@attribute(
@attribute(
doc="Is the metadata publisher stopping",
doc="Is the metadata publisher stopping",
dtype=bool,
dtype=bool,
)
)
def is_stopping_R(self):
def is_publisher_stopping_R(self):
if self._publisher:
return self._publisher.is_stopping if self._publisher else False
return self._publisher.is_stopping
return False
@attribute(
@attribute(
doc="Queue fill percentage",
doc="Fraction of queue filled between metadata organizer and metadata publisher",
dtype=numpy.float64,
dtype=numpy.float64,
)
)
def queue_fill_percentage_R(self):
def publisher_queue_fill_fraction_R(self):
if self._publisher:
return (
return self._publisher.queue_fill / self._publisher.queue_size
self._publisher.queue_fill / self._publisher.queue_size
return 0.0
if self._publisher
 
else 0.0
 
)
@attribute(
@attribute(
doc="Number of messages published since device was started", dtype=numpy.int64
doc="Number of messages sent to the publisher since device was started",
 
dtype=numpy.int64,
)
)
def messages_published_R(self):
def messages_published_R(self):
return self._num_published
return self._num_published
@@ -139,6 +164,7 @@ class Metadata(LOFARDevice):
@@ -139,6 +164,7 @@ class Metadata(LOFARDevice):
self._num_published = 0
self._num_published = 0
self._publisher = None
self._publisher = None
self._organizer = None
self._organizer = None
 
self.metadata_periodic_publish_task = None
# Super must be called after variable assignment due to executing init_device!
# Super must be called after variable assignment due to executing init_device!
super().__init__(cl, name)
super().__init__(cl, name)
@@ -186,9 +212,20 @@ class Metadata(LOFARDevice):
@@ -186,9 +212,20 @@ class Metadata(LOFARDevice):
if not self._organizer:
if not self._organizer:
self._organizer = MetadataOrganizer(config=self.METADATA_CONFIG)
self._organizer = MetadataOrganizer(config=self.METADATA_CONFIG)
 
@log_exceptions()
 
def configure_for_on(self):
 
"""Initialises the attributes and properties of the statistics device."""
 
super().configure_for_on()
 
proxies = self._organizer.create_proxies()
proxies = self._organizer.create_proxies()
self.register_change_event_subscriptions(proxies)
self.register_change_event_subscriptions(proxies)
 
self.metadata_periodic_publish_task = PeriodicTask(
 
self.event_loop_thread.event_loop,
 
self._send_metadata_async,
 
self.metadata_periodic_publish_interval,
 
)
 
def register_change_event_subscriptions(self, proxies: metadata_proxy_type):
def register_change_event_subscriptions(self, proxies: metadata_proxy_type):
"""Register change events for working device proxies"""
"""Register change events for working device proxies"""
@@ -243,6 +280,10 @@ class Metadata(LOFARDevice):
@@ -243,6 +280,10 @@ class Metadata(LOFARDevice):
self._publisher.send(self._organizer.get_json())
self._publisher.send(self._organizer.get_json())
self._num_published += 1
self._num_published += 1
 
async def _send_metadata_async(self):
 
""""""
 
self._send_metadata()
 
@command()
@command()
@DebugIt()
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
@only_in_states(DEFAULT_COMMAND_STATES)
Loading