diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index 8988f2c3fc31e5e870eb9749ab84510004544220..740d45626ae02dcb62b1ff2a00d5b30b31be91ab 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -13,7 +13,7 @@ # PyTango imports from tango.server import attribute, command, Device, DeviceMeta -from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy, AttrDataFormat, DevSource +from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy, AttrDataFormat, DevSource, DevDouble import time import math import numpy @@ -23,6 +23,7 @@ from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.common.lofar_version import get_version from tangostationcontrol.devices.device_decorators import only_in_states, fault_on_error +from tangostationcontrol.toolkit.archiver import Archiver __all__ = ["lofar_device"] @@ -133,6 +134,7 @@ class lofar_device(Device, metaclass=DeviceMeta): :return:None """ + self.set_state(DevState.INIT) self.set_status("Device is in the INIT state.") @@ -322,6 +324,13 @@ class lofar_device(Device, metaclass=DeviceMeta): # This is just the command version of _initialise_hardware(). self._initialise_hardware() + + @only_in_states([DevState.STANDBY, DevState.ON]) + @command(dtype_out = DevDouble) + def max_archiving_load(self): + """ Return the maximum archiving load for the device attributes """ + archiver = Archiver() + return archiver.get_maximum_device_load(self.get_name()) def _boot(self, initialise_hardware=True): # setup connections @@ -434,4 +443,3 @@ class lofar_device(Device, metaclass=DeviceMeta): return alarm_state.item() else: return alarm_state - diff --git a/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py b/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py index 0ffa589ed12e03fd5410edf296048d658d47385a..a6c361435cd953616bbd5cdf9514b2550a9e824a 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py @@ -122,3 +122,23 @@ class TestArchiver(BaseIntegrationTestCase): # Test if the attribute has been correctly removed self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) sdp_proxy.off() + + def test_get_maximum_device_load(self): + """ Test if the maximum device load is correctly computed """ + # Start RECV Device + device_name = "STAT/RECV/1" + recv_proxy = TestDeviceProxy(device_name) + recv_proxy.off() + time.sleep(1) + recv_proxy.initialise() + time.sleep(1) + self.assertEqual(DevState.STANDBY, recv_proxy.state()) + recv_proxy.set_defaults() + recv_proxy.on() + self.assertEqual(DevState.ON, recv_proxy.state()) + + config_dict = self.archiver.get_configuration() + self.archiver.apply_configuration(config_dict) + time.sleep(3) + max_load = self.archiver.get_maximum_device_load(device_name) + self.assertGreater(max_load,0) diff --git a/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_util.py b/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_util.py index c0322593510b580202d8e26211ce2df01e5ac5f7..4265c277554adb9d4e92c8491f392bffaddb3084 100644 --- a/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_util.py +++ b/tangostationcontrol/tangostationcontrol/test/toolkit/test_archiver_util.py @@ -8,13 +8,18 @@ # See LICENSE.txt for more info. from tangostationcontrol.test import base -from tangostationcontrol.toolkit.archiver_util import device_fqdn, attribute_fqdn +from tangostationcontrol.toolkit.archiver_util import get_attribute_from_fqdn, split_tango_name, device_fqdn, attribute_fqdn, get_size_from_datatype class TestArchiverUtil(base.TestCase): device_name = 'STAT/RECV/1' attribute_name = 'ant_mask_rw' + def test_get_attribute_from_fqdn(self): + """Test if a Tango attribute name is correctly retrieved from a Tango FQDN""" + fqdn = f"tango://databaseds:10000/{self.device_name}/{self.attribute_name}" + self.assertEqual('STAT/RECV/1/ant_mask_rw', get_attribute_from_fqdn(fqdn)) + def test_device_fqdn(self): """Test if a device name is correctly converted in a Tango FQDN""" self.assertEqual(f"tango://databaseds:10000/{self.device_name}".lower(), device_fqdn(self.device_name)) @@ -24,4 +29,16 @@ class TestArchiverUtil(base.TestCase): self.assertEqual(f"tango://databaseds:10000/{self.device_name}/{self.attribute_name}".lower(), attribute_fqdn(f"{self.device_name}/{self.attribute_name}")) self.assertRaises(ValueError, lambda: attribute_fqdn(self.attribute_name)) + + def test_split_tango_name(self): + """Test if the Tango full qualified domain names are correctly splitted""" + self.assertEqual(('STAT','RECV','1'), split_tango_name(self.device_name, 'device')) + self.assertEqual(('STAT','RECV','1', 'ant_mask_rw'), split_tango_name(f"{self.device_name}/{self.attribute_name}", 'attribute')) + + def test_get_size_from_datatype(self): + """Test if the bytesize of a certain datatype is correctly retrieved""" + datatype_boolean = 1 # 1 byte + self.assertEqual(1, get_size_from_datatype(datatype_boolean)) + datatype_double = 5 # 8 bytes + self.assertEqual(8, get_size_from_datatype(datatype_double)) diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py index dab6e15c6209dba7d662c11b4f594f30d8c36ebf..2c002f1cf866c98738c13203fbf8e0cd864edabd 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py @@ -3,10 +3,11 @@ import logging from tango import DeviceProxy, AttributeProxy, DevState, DevFailed -from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn +from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn, get_size_from_datatype from tangostationcontrol.toolkit.archiver_configurator import get_parameters_from_attribute, get_include_attribute_list, get_exclude_attribute_list, get_global_env_parameters import time +import re import json import pkg_resources from functools import wraps @@ -238,7 +239,7 @@ class Archiver(): else: raise - def add_attributes_by_device(self, device_name, global_archive_period:int = None, global_abs_change:int = None, + def add_attributes_by_device(self, device_name, global_archive_period:int = None, global_abs_change:int = 1, global_rel_change:int = None, es_name:str=None, exclude:list = None): """ Add sequentially all the attributes of the selected device in the event subscriber list, if not already present @@ -251,18 +252,19 @@ class Archiver(): exclude = [] d = DeviceProxy(device_fqdn(device_name)) - dev_attrs_list = d.get_attribute_list() + device_attrs_list = d.get_attribute_list() + # Filter out the attributes in exclude-list exclude_list = [a.lower() for a in exclude] - attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison + attrs_list = [a.lower() for a in list(device_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison for a in attrs_list: attr_fullname = attribute_fqdn(f"{device_name}/{a}") attr_proxy = AttributeProxy(attr_fullname) - if attr_proxy.is_polled() and not self.is_attribute_archived(attr_fullname): # if not polled attribute is also not archived + if attr_proxy.is_polled() and not self.is_attribute_archived(attr_fullname): # if not polled, attribute is also not archived try: es = DeviceProxy(es_name or self.get_next_subscriber()) # choose an e.s. or get the first one available - polling_period = attr_proxy.get_poll_period() or self.dev_polling_time - archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_period - abs_change = global_abs_change or 1 + polling_period = attr_proxy.get_poll_period() or self.prod_polling_time + archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) + abs_change = global_abs_change rel_change = global_rel_change self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period, archive_event_period = archive_period, abs_change=abs_change, rel_change=rel_change, es_name = es.name()) @@ -348,7 +350,7 @@ class Archiver(): """ self.cm.AttributeStop(attribute_name) - def is_attribute_archived(self,attribute_name:str): + def is_attribute_archived(self, attribute_name:str): """ Check if an attribute is in the archiving list """ @@ -371,7 +373,7 @@ class Archiver(): self.start_archiving_attribute(attribute_name) logger.info(f"Attribute {attribute_name} successfully updated!") - def get_subscriber_attributes(self,es_name:str = None): + def get_subscriber_attributes(self, es_name:str = None): """ Return the list of attributes managed by the event subscribers """ @@ -385,7 +387,7 @@ class Archiver(): attrs.extend(list(es.AttributeList or [])) return attrs - def get_subscriber_errors(self,es_name:str = None): + def get_subscriber_errors(self, es_name:str = None): """ Return a dictionary of the attributes currently in error, defined as AttributeName -> AttributeError """ @@ -410,7 +412,7 @@ class Archiver(): return errs_dict[e] return None - def get_subscriber_load(self,use_freq:bool=True,es_name:str = None): + def get_subscriber_load(self,use_freq:bool=True, es_name:str = None): """ Return the estimated load of an archiver, in frequency of records or number of attributes @@ -419,9 +421,18 @@ class Archiver(): if use_freq: return str(es.AttributeRecordFreq)+(' events/period' ) else: - return len(es.AttributeList or []) - - def get_attribute_subscriber(self,attribute_name:str): + return len(es.AttributeList or []) + + def get_started_attributes(self, regex:str = '/*', es_name:str = None): + """ + Return a list of the attributes that are being currently archived + """ + es = DeviceProxy(es_name or self.get_next_subscriber()) + attribute_list = es.AttributeStartedList or [] + pattern = re.compile(regex) + return [a for a in attribute_list if pattern.search(a)] + + def get_attribute_subscriber(self, attribute_name:str): """ Given an attribute name, return the event subscriber associated with it """ @@ -440,7 +451,7 @@ class Archiver(): else: logger.warning(f"Attribute {attribute_name} not found!") - def get_attribute_freq(self,attribute_name:str): + def get_attribute_freq(self, attribute_name:str): """ Return the attribute archiving frequency in events/minute """ @@ -454,7 +465,7 @@ class Archiver(): else: logger.warning(f"Attribute {attribute_name} not found!") - def get_attribute_failures(self,attribute_name:str): + def get_attribute_failures(self, attribute_name:str): """ Return the attribute failure archiving frequency in events/minute """ @@ -467,6 +478,32 @@ class Archiver(): return fail_dict[f] else: logger.warning(f"Attribute {attribute_name} not found!") + + def get_maximum_device_load(self, device_name:str): + """ Compute maximum archiving load (bytes/second) based on device configuration """ + load_list = [] + # Get the list of started attributes (truncated in order to match AttributeInfo names) + attributes_started = [str(a).split('/')[-1] for a in self.get_started_attributes(regex=device_name.lower())] + # Get the list of attributes info + attributes_info = DeviceProxy(device_name).attribute_list_query() + # Filter the archived attributes + for attribute_info in attributes_info: + if attribute_info.name.lower() in attributes_started: + attr_dict = {'attribute': attribute_info.name.lower(), + 'polling_period': AttributeProxy(device_name+'/'+attribute_info.name).get_poll_period(), + 'data_type': attribute_info.data_type, + 'dim_x': attribute_info.max_dim_x, + 'dim_y': attribute_info.max_dim_y} + load_list.append(attr_dict) + # Compute the total load + polling_load = 0 + for a in load_list: + polling_period = a['polling_period']/1000 # in seconds + n_bytes = get_size_from_datatype(a['data_type']) + x = int(a['dim_x']) or 1 + y = int(a['dim_y']) or 1 + polling_load = polling_load + ( (n_bytes * (x * y) ) / polling_period ) + return polling_load class AttributeFormatException(Exception): """ diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py index bc6e2b37ad37c9f3f9638f54c4f5b9c25c8f5a3f..0d5e8f4d992170016bae7405c204b785c8cb8573 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py @@ -4,9 +4,23 @@ Utility functions for the Archiver functionality. """ -from tango import DeviceProxy +from tango import DeviceProxy, CmdArgType import re +""" +A dictionary whose keys are the Tango datatypes mapping, and the values are the relative byte size +See reference https://tango-controls.readthedocs.io/en/latest/development/advanced/reference.html#tango-data-type +and https://www.tutorialspoint.com/cplusplus/cpp_data_types.htm +TODO: manage String attributes +""" +DATATYPES_SIZE_DICT = {CmdArgType.DevBoolean:1, CmdArgType.DevShort:2, CmdArgType.DevLong:8, CmdArgType.DevFloat:4, CmdArgType.DevDouble:8, + CmdArgType.DevUShort:2, CmdArgType.DevULong:8, CmdArgType.DevString:20, CmdArgType.DevVarCharArray:None, CmdArgType.DevVarShortArray:None, + CmdArgType.DevVarLongArray: None,CmdArgType.DevVarFloatArray:None, CmdArgType.DevVarDoubleArray:None, CmdArgType.DevVarUShortArray:None, + CmdArgType.DevVarULongArray: None, CmdArgType.DevVarStringArray: None, CmdArgType.DevVarLongStringArray: None, CmdArgType.DevVarDoubleStringArray:None, + CmdArgType.DevState:3, CmdArgType.ConstDevString:None, CmdArgType.DevVarBooleanArray:None, CmdArgType.DevUChar:1, CmdArgType.DevLong64:8, + CmdArgType.DevULong64:8,CmdArgType.DevVarLong64Array:None,CmdArgType.DevVarULong64Array:None, CmdArgType.DevInt:4,CmdArgType.DevEncoded:None, + CmdArgType.DevEnum:None, CmdArgType.DevPipeBlob:None} + def get_db_config(device_name:str) -> dict: """ Retrieve the DB credentials from the Tango properties of Configuration Manager or EventSubscribers @@ -104,3 +118,12 @@ def retrieve_attributes_from_wildcards(device_name: str, matching_list: list): if pattern.search(a): matched_list.append(a) return matched_list + +def get_size_from_datatype(datatype:int) -> int: + """ + Return the number of bytes for a given Tango datatype + """ + try : + return DATATYPES_SIZE_DICT[datatype] + except IndexError: + return 1 diff --git a/tangostationcontrol/tox.ini b/tangostationcontrol/tox.ini index 0dd2a01453d6e5d8a10cbf155df9408eae3a78d5..9a7799463c22c24e85236c4e780a57e951502964 100644 --- a/tangostationcontrol/tox.ini +++ b/tangostationcontrol/tox.ini @@ -66,7 +66,7 @@ commands = [testenv:xenon]; commands = - {envpython} -m xenon tangostationcontrol -b B -m A -a A + {envpython} -m xenon tangostationcontrol -b B -m A -a A -i libhdbpp-python [testenv:docs] deps =