diff --git a/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py b/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py index a1286d28be21b7e2e41eb01de5e59cfb66d5484a..1e6015c7437014904492f82038fb8b251fcd103a 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/toolkit/test_archiver.py @@ -10,6 +10,7 @@ from tangostationcontrol.integration_test.base import BaseIntegrationTestCase from tangostationcontrol.toolkit.archiver import * from tangostationcontrol.toolkit.retriever import RetrieverTimescale +from tangostationcontrol.toolkit.archiver_util import attribute_fqdn from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy import time @@ -61,7 +62,7 @@ class TestArchiver(BaseIntegrationTestCase): self.archiver.add_attribute_to_archiver(attr_fullname, polling_period=1000, event_period=3000) time.sleep(3) # Test if the attribute has been correctly added to event subscriber - self.assertTrue(self.archiver.is_attribute_archived(attr_fullname)) + self.assertTrue(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) # Retrieve data from DB views self.retriever = RetrieverTimescale() @@ -78,7 +79,7 @@ class TestArchiver(BaseIntegrationTestCase): self.archiver.remove_attribute_from_archiver(attr_fullname) time.sleep(3) # Test if the attribute has been correctly removed - self.assertFalse(self.archiver.is_attribute_archived(attr_fullname)) + self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) recv_proxy.off() def test_archive_array_attribute(self): @@ -101,7 +102,7 @@ class TestArchiver(BaseIntegrationTestCase): self.archiver.add_attribute_to_archiver(attr_fullname, polling_period=1000, event_period=3000) time.sleep(3) # Test if the attribute has been correctly added to event subscriber - self.assertTrue(self.archiver.is_attribute_archived(attr_fullname)) + self.assertTrue(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) # Retrieve data from DB views self.retriever = RetrieverTimescale() @@ -119,5 +120,5 @@ class TestArchiver(BaseIntegrationTestCase): self.archiver.remove_attribute_from_archiver(attr_fullname) time.sleep(3) # Test if the attribute has been correctly removed - self.assertFalse(self.archiver.is_attribute_archived(attr_fullname)) + self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname))) sdp_proxy.off() diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py index 4f5e43530af284d6fd51e1a99538b5bbeba2b3fa..eae77f0abee704eaf406d4456fd0c0e4e4297d2d 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py @@ -3,7 +3,7 @@ import logging from tango import DeviceProxy, AttributeProxy, DevState, DevFailed -from tangostationcontrol.toolkit.archiver_util import get_db_config, attribute_name_from_url, device_name_url +from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn import time import json @@ -79,7 +79,7 @@ class Archiver(): """ es_list = self.get_subscribers() # Only one subscriber in ConfManager list - if len(es_list)==1: + if len(es_list) == 1: return es_list[0] else : # Choose the best subscriber analysing their load @@ -114,7 +114,7 @@ class Archiver(): self.remove_attributes_by_device(device, exclude=include_att_list) # Include attributes by custom configuration for att in include_att_list: - att_fqname = f"{device}/{att}".lower() + att_fqname = attribute_fqdn(f"{device}/{att}") self.add_attribute_to_archiver(att_fqname,self.dev_polling_time,self.dev_archive_time) elif dev_env == 'production': # PROD environment -> all attributes are included by default exclude_att_list = env_dict[device].get('exclude', []) @@ -123,7 +123,7 @@ class Archiver(): # The following cycle is a security check in the special case that an attribute is in the # included list in DEV mode, and in the excluded list in PROD mode for att in exclude_att_list: - att_fqname = f"{device}/{att}".lower() + att_fqname = attribute_fqdn(f"{device}/{att}") self.remove_attribute_from_archiver(att_fqname) except Exception as e: if 'API_DeviceNotExported' in str(e): # ignore if device is offline @@ -148,7 +148,7 @@ class Archiver(): es = DeviceProxy(es_name) if es.state() == DevState.FAULT: raise Exception(f"Event Subscriber {es_name} is in FAULT state") - self.cm.ArchiverAdd(device_name_url(es_name)) + self.cm.ArchiverAdd(device_fqdn(es_name)) except DevFailed as e: if e.args[0].reason == "Archiver already present": logger.warning(f"Event Subscriber {es_name} already present in Configuration Manager") @@ -162,7 +162,7 @@ class Archiver(): The ConfigurationManager and EventSubscriber devices must be already up and running. The archiving-DBMS must be already properly configured. """ - attribute_name = attribute_name_from_url(attribute_name) + attribute_name = attribute_fqdn(attribute_name) try: self.cm.write_attribute('SetAttributeName', attribute_name) self.cm.write_attribute('SetArchiver', es_name or self.get_next_subscriber()) @@ -177,16 +177,16 @@ class Archiver(): else: raise - def add_attributes_by_device(self,device_name,global_archive_period:int = None, es_name:str=None, exclude:list = []): + def add_attributes_by_device(self, device_name, global_archive_period:int = None, es_name:str=None, exclude:list = []): """ Add sequentially all the attributes of the selected device in the event subscriber list, if not already present """ - d = DeviceProxy(device_name) + d = DeviceProxy(device_fqdn(device_name)) dev_attrs_list = d.get_attribute_list() exclude_list = [a.lower() for a in exclude] attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison for a in attrs_list: - attr_fullname = f"{device_name}/{a}".lower() + attr_fullname = attribute_fqdn(f"{device_name}/{a}") attr_proxy = AttributeProxy(attr_fullname) if attr_proxy.is_polled() and not self.is_attribute_archived(attr_fullname): # if not polled attribute is also not archived try: @@ -195,7 +195,6 @@ class Archiver(): archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_time self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period, event_period=archive_period, es_name = es.name()) - #time.sleep(0.5) except IndexError as e: logger.warning(f"Attribute {attr_fullname} will not be archived because archive event period is not defined!") except Exception as e: @@ -208,30 +207,29 @@ class Archiver(): """ Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list. """ - attribute_name = attribute_name_from_url(attribute_name) - + attribute_name = attribute_fqdn(attribute_name) self.cm.AttributeStop(attribute_name) self.cm.AttributeRemove(attribute_name) logger.warning(f"Attribute {attribute_name} removed!") - def remove_attributes_by_device(self,device_name:str,exclude:list=[]): + def remove_attributes_by_device(self, device_name:str, exclude:list=[]): """ Stops the data archiving of all the attributes of the selected device, and remove them from the subscriber's list """ - d = DeviceProxy(device_name) - dev_attrs_list = d.get_attribute_list() + d = DeviceProxy(device_fqdn(device_name)) + dev_attrs_list = d.get_attribute_list() # this list contains only the attribute names (not fqdn) exclude_list = [a.lower() for a in exclude] attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison for a in attrs_list: try: - attr_fullname = f"{device_name}/{a}".lower() + attr_fullname = attribute_fqdn(f"{device_name}/{a}") if self.is_attribute_archived(attr_fullname): self.remove_attribute_from_archiver(attr_fullname) except Exception as e: raise Exception from e - def remove_attributes_in_error(self,exclude:list=[],es_name:str=None): + def remove_attributes_in_error(self, exclude:list=[], es_name:str=None): """ Remove from the subscribers list all the attributes currently in error (not being archived) """ @@ -242,11 +240,10 @@ class Archiver(): for es_name in es_list: es = DeviceProxy(es_name) attributes_nok = es.AttributeNokList or [] - exclude_list = [a.lower() for a in exclude] + exclude_list = [attribute_fqdn(a.lower()) for a in exclude] attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list] for a in attrs_list: - attr_fullname = attribute_name_from_url(a) - self.remove_attribute_from_archiver(attr_fullname) + self.remove_attribute_from_archiver(a) @warn_if_attribute_not_found() def start_archiving_attribute(self, attribute_name:str): @@ -254,8 +251,6 @@ class Archiver(): Starts the archiving of the attribute passed as input. The attribute must be already present in the subscriber's list """ - attribute_name = attribute_name_from_url(attribute_name) - self.cm.AttributeStart(attribute_name) @warn_if_attribute_not_found() @@ -264,25 +259,24 @@ class Archiver(): Stops the archiving of the attribute passed as input. The attribute must be already present in the subscriber's list """ - attribute_name = attribute_name_from_url(attribute_name) - self.cm.AttributeStop(attribute_name) def is_attribute_archived(self,attribute_name:str): """ Check if an attribute is in the archiving list """ - attribute_name = attribute_name_from_url(attribute_name) - attributes = self.cm.AttributeSearch(attribute_name.lower()) + attribute_name = attribute_fqdn(attribute_name) + attributes = self.cm.AttributeSearch(attribute_name) # search returns all matches in which attribute_name is part of the name, # so check whether an exact match is included. - return any(attribute_name.lower() in a for a in attributes) + return any(attribute_name == a for a in attributes) def update_archiving_attribute(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN'): """ Update the archiving properties of an attribute already in a subscriber list """ + attribute_name = attribute_fqdn(attribute_name) self.remove_attribute_from_archiver(attribute_name) time.sleep(3.) self.add_attribute_to_archiver(attribute_name,polling_period,event_period,strategy) @@ -322,10 +316,10 @@ class Archiver(): """ Return the error related to the attribute """ - attribute_name = attribute_name_from_url(attribute_name) + attribute_name = attribute_fqdn(attribute_name) errs_dict = self.get_subscriber_errors() for e in errs_dict: - if attribute_name in e: + if attribute_name == e: return errs_dict[e] return None @@ -344,27 +338,31 @@ class Archiver(): """ Given an attribute name, return the event subscriber associated with it """ - attribute_name = attribute_name_from_url(attribute_name) - # If the ConfManager manages more than one subscriber - if len(self.get_subscribers())>1: - for es_name in self.get_subscribers(): - # Search the attribute in the subscriber list (search substring because of the Tango naming conventions) - for a in list(DeviceProxy(es_name).AttributeList or []): - if attribute_name.lower() in a: - return es_name + attribute_name = attribute_fqdn(attribute_name) + # Check if attribute is archived + if self.is_attribute_archived(attribute_name): + # If the ConfManager manages more than one subscriber + if len(self.get_subscribers())>1: + for es_name in self.get_subscribers(): + # Search the attribute in the subscriber list + for a in list(DeviceProxy(es_name).AttributeList or []): + if attribute_name.lower() == a: + return es_name + else: + return self.get_next_subscriber() else: - return self.get_next_subscriber() + logger.warning(f"Attribute {attribute_name} not found!") def get_attribute_freq(self,attribute_name:str): """ Return the attribute archiving frequency in events/minute """ - attribute_name = attribute_name_from_url(attribute_name) + attribute_name = attribute_fqdn(attribute_name) if self.is_attribute_archived(attribute_name): es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) freq_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeRecordFreqList)} for f in freq_dict: - if attribute_name.lower() in f: + if attribute_name.lower() == f: return freq_dict[f] else: logger.warning(f"Attribute {attribute_name} not found!") @@ -373,12 +371,12 @@ class Archiver(): """ Return the attribute failure archiving frequency in events/minute """ - attribute_name = attribute_name_from_url(attribute_name) + attribute_name = attribute_fqdn(attribute_name) if self.is_attribute_archived(attribute_name): es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) fail_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeFailureFreqList)} for f in fail_dict: - if attribute_name.lower() in f: + if attribute_name.lower() == f: return fail_dict[f] else: logger.warning(f"Attribute {attribute_name} not found!") diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py index 7d808edc3e0fbe030b9bb4728a81abf93e56d73c..211fbd33286148b4615764f42b76d7fee1b2f010 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py @@ -18,7 +18,7 @@ def get_db_config(device_name:str) -> dict: config = dict(config_str.split("=",1) for config_str in config_strs) return config -def attribute_name_from_url(attribute_name:str): +def get_attribute_from_fqdn(attribute_name:str): """ For some operations Tango attribute must be transformed from the form 'tango://db:port/domain/family/name/attribute' to canonical 'domain/family/name/attribute' @@ -31,35 +31,35 @@ def attribute_name_from_url(attribute_name:str): return attribute_name -def device_name_url(device_name:str, tango_host:str = 'databaseds:10000'): +def device_fqdn(device_name:str, tango_host:str = 'databaseds:10000'): """ For some operations Tango devices must be transformed from the form 'domain/family/name' to 'tango://db:port/domain/family/name' """ if device_name.startswith('tango://'): - return device_name + return device_name.lower() if len(device_name.split('/')) != 3: raise ValueError(f"Expected device name of format 'domain/family/name', got {device_name}") - return f"tango://{tango_host}/{device_name}" + return f"tango://{tango_host}/{device_name}".lower() -def attribute_name_url(attribute_name:str, tango_host:str = 'databaseds:10000'): +def attribute_fqdn(attribute_name:str, tango_host:str = 'databaseds:10000'): """ For some operations Tango devices must be transformed from the form 'domain/family/name/attribute' to 'tango://db:port/domain/family/name/attribute' """ if attribute_name.startswith('tango://'): - return attribute_name + return attribute_name.lower() if len(attribute_name.split('/')) != 4: raise ValueError(f"Expected attribute name of format 'domain/family/name/attribute', got {attribute_name}") - return f"tango://{tango_host}/{attribute_name}" + return f"tango://{tango_host}/{attribute_name}".lower() def split_tango_name(tango_fqname:str, tango_type:str): """ - Helper function to split device or attribute Tango full qualified names + Helper function to split device or attribute Tango full qualified domain names into its components """ if tango_type.lower() == 'device':