Skip to content
Snippets Groups Projects
Commit 0585be44 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-546-fqdns-in-archiving-functions' into 'master'

Resolve L2SS-546 "Fqdns in archiving functions"

Closes L2SS-546

See merge request !250
parents 480cbcdb 0b09d07a
No related branches found
No related tags found
1 merge request!250Resolve L2SS-546 "Fqdns in archiving functions"
......@@ -10,6 +10,7 @@
from tangostationcontrol.integration_test.base import BaseIntegrationTestCase
from tangostationcontrol.toolkit.archiver import *
from tangostationcontrol.toolkit.retriever import RetrieverTimescale
from tangostationcontrol.toolkit.archiver_util import attribute_fqdn
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
import time
......@@ -61,7 +62,7 @@ class TestArchiver(BaseIntegrationTestCase):
self.archiver.add_attribute_to_archiver(attr_fullname, polling_period=1000, event_period=3000)
time.sleep(3)
# Test if the attribute has been correctly added to event subscriber
self.assertTrue(self.archiver.is_attribute_archived(attr_fullname))
self.assertTrue(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname)))
# Retrieve data from DB views
self.retriever = RetrieverTimescale()
......@@ -78,7 +79,7 @@ class TestArchiver(BaseIntegrationTestCase):
self.archiver.remove_attribute_from_archiver(attr_fullname)
time.sleep(3)
# Test if the attribute has been correctly removed
self.assertFalse(self.archiver.is_attribute_archived(attr_fullname))
self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname)))
recv_proxy.off()
def test_archive_array_attribute(self):
......@@ -101,7 +102,7 @@ class TestArchiver(BaseIntegrationTestCase):
self.archiver.add_attribute_to_archiver(attr_fullname, polling_period=1000, event_period=3000)
time.sleep(3)
# Test if the attribute has been correctly added to event subscriber
self.assertTrue(self.archiver.is_attribute_archived(attr_fullname))
self.assertTrue(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname)))
# Retrieve data from DB views
self.retriever = RetrieverTimescale()
......@@ -119,5 +120,5 @@ class TestArchiver(BaseIntegrationTestCase):
self.archiver.remove_attribute_from_archiver(attr_fullname)
time.sleep(3)
# Test if the attribute has been correctly removed
self.assertFalse(self.archiver.is_attribute_archived(attr_fullname))
self.assertFalse(self.archiver.is_attribute_archived(attribute_fqdn(attr_fullname)))
sdp_proxy.off()
......@@ -3,7 +3,7 @@
import logging
from tango import DeviceProxy, AttributeProxy, DevState, DevFailed
from tangostationcontrol.toolkit.archiver_util import get_db_config, attribute_name_from_url, device_name_url
from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn
import time
import json
......@@ -114,7 +114,7 @@ class Archiver():
self.remove_attributes_by_device(device, exclude=include_att_list)
# Include attributes by custom configuration
for att in include_att_list:
att_fqname = f"{device}/{att}".lower()
att_fqname = attribute_fqdn(f"{device}/{att}")
self.add_attribute_to_archiver(att_fqname,self.dev_polling_time,self.dev_archive_time)
elif dev_env == 'production': # PROD environment -> all attributes are included by default
exclude_att_list = env_dict[device].get('exclude', [])
......@@ -123,7 +123,7 @@ class Archiver():
# The following cycle is a security check in the special case that an attribute is in the
# included list in DEV mode, and in the excluded list in PROD mode
for att in exclude_att_list:
att_fqname = f"{device}/{att}".lower()
att_fqname = attribute_fqdn(f"{device}/{att}")
self.remove_attribute_from_archiver(att_fqname)
except Exception as e:
if 'API_DeviceNotExported' in str(e): # ignore if device is offline
......@@ -148,7 +148,7 @@ class Archiver():
es = DeviceProxy(es_name)
if es.state() == DevState.FAULT:
raise Exception(f"Event Subscriber {es_name} is in FAULT state")
self.cm.ArchiverAdd(device_name_url(es_name))
self.cm.ArchiverAdd(device_fqdn(es_name))
except DevFailed as e:
if e.args[0].reason == "Archiver already present":
logger.warning(f"Event Subscriber {es_name} already present in Configuration Manager")
......@@ -162,7 +162,7 @@ class Archiver():
The ConfigurationManager and EventSubscriber devices must be already up and running.
The archiving-DBMS must be already properly configured.
"""
attribute_name = attribute_name_from_url(attribute_name)
attribute_name = attribute_fqdn(attribute_name)
try:
self.cm.write_attribute('SetAttributeName', attribute_name)
self.cm.write_attribute('SetArchiver', es_name or self.get_next_subscriber())
......@@ -181,12 +181,12 @@ class Archiver():
"""
Add sequentially all the attributes of the selected device in the event subscriber list, if not already present
"""
d = DeviceProxy(device_name)
d = DeviceProxy(device_fqdn(device_name))
dev_attrs_list = d.get_attribute_list()
exclude_list = [a.lower() for a in exclude]
attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison
for a in attrs_list:
attr_fullname = f"{device_name}/{a}".lower()
attr_fullname = attribute_fqdn(f"{device_name}/{a}")
attr_proxy = AttributeProxy(attr_fullname)
if attr_proxy.is_polled() and not self.is_attribute_archived(attr_fullname): # if not polled attribute is also not archived
try:
......@@ -195,7 +195,6 @@ class Archiver():
archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_time
self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period,
event_period=archive_period, es_name = es.name())
#time.sleep(0.5)
except IndexError as e:
logger.warning(f"Attribute {attr_fullname} will not be archived because archive event period is not defined!")
except Exception as e:
......@@ -208,8 +207,7 @@ class Archiver():
"""
Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list.
"""
attribute_name = attribute_name_from_url(attribute_name)
attribute_name = attribute_fqdn(attribute_name)
self.cm.AttributeStop(attribute_name)
self.cm.AttributeRemove(attribute_name)
logger.warning(f"Attribute {attribute_name} removed!")
......@@ -219,13 +217,13 @@ class Archiver():
Stops the data archiving of all the attributes of the selected device, and remove them from the
subscriber's list
"""
d = DeviceProxy(device_name)
dev_attrs_list = d.get_attribute_list()
d = DeviceProxy(device_fqdn(device_name))
dev_attrs_list = d.get_attribute_list() # this list contains only the attribute names (not fqdn)
exclude_list = [a.lower() for a in exclude]
attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison
for a in attrs_list:
try:
attr_fullname = f"{device_name}/{a}".lower()
attr_fullname = attribute_fqdn(f"{device_name}/{a}")
if self.is_attribute_archived(attr_fullname):
self.remove_attribute_from_archiver(attr_fullname)
except Exception as e:
......@@ -242,11 +240,10 @@ class Archiver():
for es_name in es_list:
es = DeviceProxy(es_name)
attributes_nok = es.AttributeNokList or []
exclude_list = [a.lower() for a in exclude]
exclude_list = [attribute_fqdn(a.lower()) for a in exclude]
attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list]
for a in attrs_list:
attr_fullname = attribute_name_from_url(a)
self.remove_attribute_from_archiver(attr_fullname)
self.remove_attribute_from_archiver(a)
@warn_if_attribute_not_found()
def start_archiving_attribute(self, attribute_name:str):
......@@ -254,8 +251,6 @@ class Archiver():
Starts the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list
"""
attribute_name = attribute_name_from_url(attribute_name)
self.cm.AttributeStart(attribute_name)
@warn_if_attribute_not_found()
......@@ -264,25 +259,24 @@ class Archiver():
Stops the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list
"""
attribute_name = attribute_name_from_url(attribute_name)
self.cm.AttributeStop(attribute_name)
def is_attribute_archived(self,attribute_name:str):
"""
Check if an attribute is in the archiving list
"""
attribute_name = attribute_name_from_url(attribute_name)
attributes = self.cm.AttributeSearch(attribute_name.lower())
attribute_name = attribute_fqdn(attribute_name)
attributes = self.cm.AttributeSearch(attribute_name)
# search returns all matches in which attribute_name is part of the name,
# so check whether an exact match is included.
return any(attribute_name.lower() in a for a in attributes)
return any(attribute_name == a for a in attributes)
def update_archiving_attribute(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN'):
"""
Update the archiving properties of an attribute already in a subscriber list
"""
attribute_name = attribute_fqdn(attribute_name)
self.remove_attribute_from_archiver(attribute_name)
time.sleep(3.)
self.add_attribute_to_archiver(attribute_name,polling_period,event_period,strategy)
......@@ -322,10 +316,10 @@ class Archiver():
"""
Return the error related to the attribute
"""
attribute_name = attribute_name_from_url(attribute_name)
attribute_name = attribute_fqdn(attribute_name)
errs_dict = self.get_subscriber_errors()
for e in errs_dict:
if attribute_name in e:
if attribute_name == e:
return errs_dict[e]
return None
......@@ -344,27 +338,31 @@ class Archiver():
"""
Given an attribute name, return the event subscriber associated with it
"""
attribute_name = attribute_name_from_url(attribute_name)
attribute_name = attribute_fqdn(attribute_name)
# Check if attribute is archived
if self.is_attribute_archived(attribute_name):
# If the ConfManager manages more than one subscriber
if len(self.get_subscribers())>1:
for es_name in self.get_subscribers():
# Search the attribute in the subscriber list (search substring because of the Tango naming conventions)
# Search the attribute in the subscriber list
for a in list(DeviceProxy(es_name).AttributeList or []):
if attribute_name.lower() in a:
if attribute_name.lower() == a:
return es_name
else:
return self.get_next_subscriber()
else:
logger.warning(f"Attribute {attribute_name} not found!")
def get_attribute_freq(self,attribute_name:str):
"""
Return the attribute archiving frequency in events/minute
"""
attribute_name = attribute_name_from_url(attribute_name)
attribute_name = attribute_fqdn(attribute_name)
if self.is_attribute_archived(attribute_name):
es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
freq_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeRecordFreqList)}
for f in freq_dict:
if attribute_name.lower() in f:
if attribute_name.lower() == f:
return freq_dict[f]
else:
logger.warning(f"Attribute {attribute_name} not found!")
......@@ -373,12 +371,12 @@ class Archiver():
"""
Return the attribute failure archiving frequency in events/minute
"""
attribute_name = attribute_name_from_url(attribute_name)
attribute_name = attribute_fqdn(attribute_name)
if self.is_attribute_archived(attribute_name):
es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
fail_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeFailureFreqList)}
for f in fail_dict:
if attribute_name.lower() in f:
if attribute_name.lower() == f:
return fail_dict[f]
else:
logger.warning(f"Attribute {attribute_name} not found!")
......
......@@ -18,7 +18,7 @@ def get_db_config(device_name:str) -> dict:
config = dict(config_str.split("=",1) for config_str in config_strs)
return config
def attribute_name_from_url(attribute_name:str):
def get_attribute_from_fqdn(attribute_name:str):
"""
For some operations Tango attribute must be transformed from the form 'tango://db:port/domain/family/name/attribute'
to canonical 'domain/family/name/attribute'
......@@ -31,35 +31,35 @@ def attribute_name_from_url(attribute_name:str):
return attribute_name
def device_name_url(device_name:str, tango_host:str = 'databaseds:10000'):
def device_fqdn(device_name:str, tango_host:str = 'databaseds:10000'):
"""
For some operations Tango devices must be transformed from the form 'domain/family/name'
to 'tango://db:port/domain/family/name'
"""
if device_name.startswith('tango://'):
return device_name
return device_name.lower()
if len(device_name.split('/')) != 3:
raise ValueError(f"Expected device name of format 'domain/family/name', got {device_name}")
return f"tango://{tango_host}/{device_name}"
return f"tango://{tango_host}/{device_name}".lower()
def attribute_name_url(attribute_name:str, tango_host:str = 'databaseds:10000'):
def attribute_fqdn(attribute_name:str, tango_host:str = 'databaseds:10000'):
"""
For some operations Tango devices must be transformed from the form 'domain/family/name/attribute'
to 'tango://db:port/domain/family/name/attribute'
"""
if attribute_name.startswith('tango://'):
return attribute_name
return attribute_name.lower()
if len(attribute_name.split('/')) != 4:
raise ValueError(f"Expected attribute name of format 'domain/family/name/attribute', got {attribute_name}")
return f"tango://{tango_host}/{attribute_name}"
return f"tango://{tango_host}/{attribute_name}".lower()
def split_tango_name(tango_fqname:str, tango_type:str):
"""
Helper function to split device or attribute Tango full qualified names
Helper function to split device or attribute Tango full qualified domain names
into its components
"""
if tango_type.lower() == 'device':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment