Skip to content
Snippets Groups Projects
Commit bbb96f1b authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-659-compute-archiving-load' into 'master'

Resolve L2SS-659 "Compute archiving load"

Closes L2SS-659

See merge request !273
parents 4c3e7a36 fe47c355
No related branches found
No related tags found
1 merge request!273Resolve L2SS-659 "Compute archiving load"
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# PyTango imports # PyTango imports
from tango.server import attribute, command, Device, DeviceMeta from tango.server import attribute, command, Device, DeviceMeta
from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy, AttrDataFormat, DevSource from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy, AttrDataFormat, DevSource, DevDouble
import time import time
import math import math
import numpy import numpy
...@@ -23,6 +23,7 @@ from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper ...@@ -23,6 +23,7 @@ from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.lofar_version import get_version from tangostationcontrol.common.lofar_version import get_version
from tangostationcontrol.devices.device_decorators import only_in_states, fault_on_error from tangostationcontrol.devices.device_decorators import only_in_states, fault_on_error
from tangostationcontrol.toolkit.archiver import Archiver
__all__ = ["lofar_device"] __all__ = ["lofar_device"]
...@@ -133,6 +134,7 @@ class lofar_device(Device, metaclass=DeviceMeta): ...@@ -133,6 +134,7 @@ class lofar_device(Device, metaclass=DeviceMeta):
:return:None :return:None
""" """
self.set_state(DevState.INIT) self.set_state(DevState.INIT)
self.set_status("Device is in the INIT state.") self.set_status("Device is in the INIT state.")
...@@ -323,6 +325,13 @@ class lofar_device(Device, metaclass=DeviceMeta): ...@@ -323,6 +325,13 @@ class lofar_device(Device, metaclass=DeviceMeta):
# This is just the command version of _initialise_hardware(). # This is just the command version of _initialise_hardware().
self._initialise_hardware() self._initialise_hardware()
@only_in_states([DevState.STANDBY, DevState.ON])
@command(dtype_out = DevDouble)
def max_archiving_load(self):
""" Return the maximum archiving load for the device attributes """
archiver = Archiver()
return archiver.get_maximum_device_load(self.get_name())
def _boot(self, initialise_hardware=True): def _boot(self, initialise_hardware=True):
# setup connections # setup connections
self.Initialise() self.Initialise()
...@@ -434,4 +443,3 @@ class lofar_device(Device, metaclass=DeviceMeta): ...@@ -434,4 +443,3 @@ class lofar_device(Device, metaclass=DeviceMeta):
return alarm_state.item() return alarm_state.item()
else: else:
return alarm_state return alarm_state
...@@ -122,3 +122,23 @@ class TestArchiver(BaseIntegrationTestCase): ...@@ -122,3 +122,23 @@ class TestArchiver(BaseIntegrationTestCase):
# Test if the attribute has been correctly removed # Test if the attribute has been correctly removed
self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname)))
sdp_proxy.off() sdp_proxy.off()
def test_get_maximum_device_load(self):
""" Test if the maximum device load is correctly computed """
# Start RECV Device
device_name = "STAT/RECV/1"
recv_proxy = TestDeviceProxy(device_name)
recv_proxy.off()
time.sleep(1)
recv_proxy.initialise()
time.sleep(1)
self.assertEqual(DevState.STANDBY, recv_proxy.state())
recv_proxy.set_defaults()
recv_proxy.on()
self.assertEqual(DevState.ON, recv_proxy.state())
config_dict = self.archiver.get_configuration()
self.archiver.apply_configuration(config_dict)
time.sleep(3)
max_load = self.archiver.get_maximum_device_load(device_name)
self.assertGreater(max_load,0)
...@@ -8,13 +8,18 @@ ...@@ -8,13 +8,18 @@
# See LICENSE.txt for more info. # See LICENSE.txt for more info.
from tangostationcontrol.test import base from tangostationcontrol.test import base
from tangostationcontrol.toolkit.archiver_util import device_fqdn, attribute_fqdn from tangostationcontrol.toolkit.archiver_util import get_attribute_from_fqdn, split_tango_name, device_fqdn, attribute_fqdn, get_size_from_datatype
class TestArchiverUtil(base.TestCase): class TestArchiverUtil(base.TestCase):
device_name = 'STAT/RECV/1' device_name = 'STAT/RECV/1'
attribute_name = 'ant_mask_rw' attribute_name = 'ant_mask_rw'
def test_get_attribute_from_fqdn(self):
"""Test if a Tango attribute name is correctly retrieved from a Tango FQDN"""
fqdn = f"tango://databaseds:10000/{self.device_name}/{self.attribute_name}"
self.assertEqual('STAT/RECV/1/ant_mask_rw', get_attribute_from_fqdn(fqdn))
def test_device_fqdn(self): def test_device_fqdn(self):
"""Test if a device name is correctly converted in a Tango FQDN""" """Test if a device name is correctly converted in a Tango FQDN"""
self.assertEqual(f"tango://databaseds:10000/{self.device_name}".lower(), device_fqdn(self.device_name)) self.assertEqual(f"tango://databaseds:10000/{self.device_name}".lower(), device_fqdn(self.device_name))
...@@ -25,3 +30,15 @@ class TestArchiverUtil(base.TestCase): ...@@ -25,3 +30,15 @@ class TestArchiverUtil(base.TestCase):
attribute_fqdn(f"{self.device_name}/{self.attribute_name}")) attribute_fqdn(f"{self.device_name}/{self.attribute_name}"))
self.assertRaises(ValueError, lambda: attribute_fqdn(self.attribute_name)) self.assertRaises(ValueError, lambda: attribute_fqdn(self.attribute_name))
def test_split_tango_name(self):
"""Test if the Tango full qualified domain names are correctly splitted"""
self.assertEqual(('STAT','RECV','1'), split_tango_name(self.device_name, 'device'))
self.assertEqual(('STAT','RECV','1', 'ant_mask_rw'), split_tango_name(f"{self.device_name}/{self.attribute_name}", 'attribute'))
def test_get_size_from_datatype(self):
"""Test if the bytesize of a certain datatype is correctly retrieved"""
datatype_boolean = 1 # 1 byte
self.assertEqual(1, get_size_from_datatype(datatype_boolean))
datatype_double = 5 # 8 bytes
self.assertEqual(8, get_size_from_datatype(datatype_double))
...@@ -3,10 +3,11 @@ ...@@ -3,10 +3,11 @@
import logging import logging
from tango import DeviceProxy, AttributeProxy, DevState, DevFailed from tango import DeviceProxy, AttributeProxy, DevState, DevFailed
from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn, get_size_from_datatype
from tangostationcontrol.toolkit.archiver_configurator import get_parameters_from_attribute, get_include_attribute_list, get_exclude_attribute_list, get_global_env_parameters from tangostationcontrol.toolkit.archiver_configurator import get_parameters_from_attribute, get_include_attribute_list, get_exclude_attribute_list, get_global_env_parameters
import time import time
import re
import json import json
import pkg_resources import pkg_resources
from functools import wraps from functools import wraps
...@@ -238,7 +239,7 @@ class Archiver(): ...@@ -238,7 +239,7 @@ class Archiver():
else: else:
raise raise
def add_attributes_by_device(self, device_name, global_archive_period:int = None, global_abs_change:int = None, def add_attributes_by_device(self, device_name, global_archive_period:int = None, global_abs_change:int = 1,
global_rel_change:int = None, es_name:str=None, exclude:list = None): global_rel_change:int = None, es_name:str=None, exclude:list = None):
""" """
Add sequentially all the attributes of the selected device in the event subscriber list, if not already present Add sequentially all the attributes of the selected device in the event subscriber list, if not already present
...@@ -251,18 +252,19 @@ class Archiver(): ...@@ -251,18 +252,19 @@ class Archiver():
exclude = [] exclude = []
d = DeviceProxy(device_fqdn(device_name)) d = DeviceProxy(device_fqdn(device_name))
dev_attrs_list = d.get_attribute_list() device_attrs_list = d.get_attribute_list()
# Filter out the attributes in exclude-list
exclude_list = [a.lower() for a in exclude] exclude_list = [a.lower() for a in exclude]
attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison attrs_list = [a.lower() for a in list(device_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison
for a in attrs_list: for a in attrs_list:
attr_fullname = attribute_fqdn(f"{device_name}/{a}") attr_fullname = attribute_fqdn(f"{device_name}/{a}")
attr_proxy = AttributeProxy(attr_fullname) attr_proxy = AttributeProxy(attr_fullname)
if attr_proxy.is_polled() and not self.is_attribute_archived(attr_fullname): # if not polled attribute is also not archived if attr_proxy.is_polled() and not self.is_attribute_archived(attr_fullname): # if not polled, attribute is also not archived
try: try:
es = DeviceProxy(es_name or self.get_next_subscriber()) # choose an e.s. or get the first one available es = DeviceProxy(es_name or self.get_next_subscriber()) # choose an e.s. or get the first one available
polling_period = attr_proxy.get_poll_period() or self.dev_polling_time polling_period = attr_proxy.get_poll_period() or self.prod_polling_time
archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_period archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0])
abs_change = global_abs_change or 1 abs_change = global_abs_change
rel_change = global_rel_change rel_change = global_rel_change
self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period, self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period,
archive_event_period = archive_period, abs_change=abs_change, rel_change=rel_change, es_name = es.name()) archive_event_period = archive_period, abs_change=abs_change, rel_change=rel_change, es_name = es.name())
...@@ -421,6 +423,15 @@ class Archiver(): ...@@ -421,6 +423,15 @@ class Archiver():
else: else:
return len(es.AttributeList or []) return len(es.AttributeList or [])
def get_started_attributes(self, regex:str = '/*', es_name:str = None):
"""
Return a list of the attributes that are being currently archived
"""
es = DeviceProxy(es_name or self.get_next_subscriber())
attribute_list = es.AttributeStartedList or []
pattern = re.compile(regex)
return [a for a in attribute_list if pattern.search(a)]
def get_attribute_subscriber(self, attribute_name:str): def get_attribute_subscriber(self, attribute_name:str):
""" """
Given an attribute name, return the event subscriber associated with it Given an attribute name, return the event subscriber associated with it
...@@ -468,6 +479,32 @@ class Archiver(): ...@@ -468,6 +479,32 @@ class Archiver():
else: else:
logger.warning(f"Attribute {attribute_name} not found!") logger.warning(f"Attribute {attribute_name} not found!")
def get_maximum_device_load(self, device_name:str):
""" Compute maximum archiving load (bytes/second) based on device configuration """
load_list = []
# Get the list of started attributes (truncated in order to match AttributeInfo names)
attributes_started = [str(a).split('/')[-1] for a in self.get_started_attributes(regex=device_name.lower())]
# Get the list of attributes info
attributes_info = DeviceProxy(device_name).attribute_list_query()
# Filter the archived attributes
for attribute_info in attributes_info:
if attribute_info.name.lower() in attributes_started:
attr_dict = {'attribute': attribute_info.name.lower(),
'polling_period': AttributeProxy(device_name+'/'+attribute_info.name).get_poll_period(),
'data_type': attribute_info.data_type,
'dim_x': attribute_info.max_dim_x,
'dim_y': attribute_info.max_dim_y}
load_list.append(attr_dict)
# Compute the total load
polling_load = 0
for a in load_list:
polling_period = a['polling_period']/1000 # in seconds
n_bytes = get_size_from_datatype(a['data_type'])
x = int(a['dim_x']) or 1
y = int(a['dim_y']) or 1
polling_load = polling_load + ( (n_bytes * (x * y) ) / polling_period )
return polling_load
class AttributeFormatException(Exception): class AttributeFormatException(Exception):
""" """
Exception that handles wrong attribute naming Exception that handles wrong attribute naming
......
...@@ -4,9 +4,23 @@ ...@@ -4,9 +4,23 @@
Utility functions for the Archiver functionality. Utility functions for the Archiver functionality.
""" """
from tango import DeviceProxy from tango import DeviceProxy, CmdArgType
import re import re
"""
A dictionary whose keys are the Tango datatypes mapping, and the values are the relative byte size
See reference https://tango-controls.readthedocs.io/en/latest/development/advanced/reference.html#tango-data-type
and https://www.tutorialspoint.com/cplusplus/cpp_data_types.htm
TODO: manage String attributes
"""
DATATYPES_SIZE_DICT = {CmdArgType.DevBoolean:1, CmdArgType.DevShort:2, CmdArgType.DevLong:8, CmdArgType.DevFloat:4, CmdArgType.DevDouble:8,
CmdArgType.DevUShort:2, CmdArgType.DevULong:8, CmdArgType.DevString:20, CmdArgType.DevVarCharArray:None, CmdArgType.DevVarShortArray:None,
CmdArgType.DevVarLongArray: None,CmdArgType.DevVarFloatArray:None, CmdArgType.DevVarDoubleArray:None, CmdArgType.DevVarUShortArray:None,
CmdArgType.DevVarULongArray: None, CmdArgType.DevVarStringArray: None, CmdArgType.DevVarLongStringArray: None, CmdArgType.DevVarDoubleStringArray:None,
CmdArgType.DevState:3, CmdArgType.ConstDevString:None, CmdArgType.DevVarBooleanArray:None, CmdArgType.DevUChar:1, CmdArgType.DevLong64:8,
CmdArgType.DevULong64:8,CmdArgType.DevVarLong64Array:None,CmdArgType.DevVarULong64Array:None, CmdArgType.DevInt:4,CmdArgType.DevEncoded:None,
CmdArgType.DevEnum:None, CmdArgType.DevPipeBlob:None}
def get_db_config(device_name:str) -> dict: def get_db_config(device_name:str) -> dict:
""" """
Retrieve the DB credentials from the Tango properties of Configuration Manager or EventSubscribers Retrieve the DB credentials from the Tango properties of Configuration Manager or EventSubscribers
...@@ -104,3 +118,12 @@ def retrieve_attributes_from_wildcards(device_name: str, matching_list: list): ...@@ -104,3 +118,12 @@ def retrieve_attributes_from_wildcards(device_name: str, matching_list: list):
if pattern.search(a): if pattern.search(a):
matched_list.append(a) matched_list.append(a)
return matched_list return matched_list
def get_size_from_datatype(datatype:int) -> int:
"""
Return the number of bytes for a given Tango datatype
"""
try :
return DATATYPES_SIZE_DICT[datatype]
except IndexError:
return 1
...@@ -66,7 +66,7 @@ commands = ...@@ -66,7 +66,7 @@ commands =
[testenv:xenon]; [testenv:xenon];
commands = commands =
{envpython} -m xenon tangostationcontrol -b B -m A -a A {envpython} -m xenon tangostationcontrol -b B -m A -a A -i libhdbpp-python
[testenv:docs] [testenv:docs]
deps = deps =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment