Skip to content
Snippets Groups Projects
Commit 1be0203e authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-375-archiving-load' into 'master'

Resolve L2SS-375 "Archiving load"

Closes L2SS-375

See merge request !153
parents 326439fa cee9c3c8
No related branches found
No related tags found
1 merge request!153Resolve L2SS-375 "Archiving load"
...@@ -16,6 +16,29 @@ from .archiver_base import * ...@@ -16,6 +16,29 @@ from .archiver_base import *
logger = logging.getLogger() logger = logging.getLogger()
def parse_attribute_name(attribute_name:str):
"""
Parse the attribute names since most of the archiving operations require only
Tango full-qualified names.
"""
chunk_num = len(attribute_name.split('/'))
if (chunk_num!=4):
raise AttributeFormatException
def reduce_attribute_name(attribute_name:str):
"""
For some operations Tango attribute must be transformed from the form 'tango://db:port/domain/family/name/attribute'
to canonical 'domain/family/name/attribute'
"""
chunk_num = len(attribute_name.split('/'))
if (chunk_num==7 and attribute_name.split('/')[0]=='tango:'):
return '/'.join(attribute_name.split('/')[3:])
else:
if (chunk_num!=4):
raise AttributeFormatException
else:
return attribute_name
class Archiver(): class Archiver():
""" """
The Archiver class implements the basic operations to perform attributes archiving The Archiver class implements the basic operations to perform attributes archiving
...@@ -50,8 +73,8 @@ class Archiver(): ...@@ -50,8 +73,8 @@ class Archiver():
config_dict = self.selector.get_dict() config_dict = self.selector.get_dict()
# Set global development env variables # Set global development env variables
var_dict = config_dict.get('global_variables') var_dict = config_dict.get('global_variables')
self.dev_polling_time = int(var_dict.get('development_polling_time')) or 10000 self.dev_polling_time = int(var_dict.get('development_polling_time'))
self.dev_archive_time = int(var_dict.get('development_archive_time')) or 60000 self.dev_archive_time = int(var_dict.get('development_archive_time'))
# Set devices archiving # Set devices archiving
env_dict = config_dict.get('devices') env_dict = config_dict.get('devices')
for device in env_dict: for device in env_dict:
...@@ -91,8 +114,7 @@ class Archiver(): ...@@ -91,8 +114,7 @@ class Archiver():
The ConfigurationManager and EventSubscriber devices must be already up and running. The ConfigurationManager and EventSubscriber devices must be already up and running.
The archiving-DBMS must be already properly configured. The archiving-DBMS must be already properly configured.
""" """
if (len(attribute_name.split('/'))!=4): parse_attribute_name(attribute_name)
raise AttributeFormatException
try: try:
self.cm.write_attribute('SetAttributeName', attribute_name) self.cm.write_attribute('SetAttributeName', attribute_name)
self.cm.write_attribute('SetArchiver', self.es_name) self.cm.write_attribute('SetArchiver', self.es_name)
...@@ -137,8 +159,7 @@ class Archiver(): ...@@ -137,8 +159,7 @@ class Archiver():
""" """
Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list. Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list.
""" """
if (len(attribute_name.split('/'))!=4): parse_attribute_name(attribute_name)
raise AttributeFormatException
try: try:
self.cm.AttributeStop(attribute_name) self.cm.AttributeStop(attribute_name)
self.cm.AttributeRemove(attribute_name) self.cm.AttributeRemove(attribute_name)
...@@ -164,14 +185,28 @@ class Archiver(): ...@@ -164,14 +185,28 @@ class Archiver():
self.remove_attribute_from_archiver(attr_fullname) self.remove_attribute_from_archiver(attr_fullname)
except Exception as e: except Exception as e:
raise Exception from e raise Exception from e
def remove_attributes_in_error(self,exclude:list=[],es_name:str=None):
"""
Remove from the subscriber's list all the attributes currently in error (not being archived)
"""
if es_name is not None:
es = DeviceProxy(es_name)
else:
es = self.es
attributes_nok = es.AttributeNokList or []
exclude_list = [a.lower() for a in exclude]
attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list]
for a in attrs_list:
attr_fullname = reduce_attribute_name(a)
self.remove_attribute_from_archiver(attr_fullname)
def start_archiving_attribute(self, attribute_name:str): def start_archiving_attribute(self, attribute_name:str):
""" """
Starts the archiving of the attribute passed as input. Starts the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list The attribute must be already present in the subscriber's list
""" """
if (len(attribute_name.split('/'))!=4): parse_attribute_name(attribute_name)
raise AttributeFormatException
try: try:
self.cm.AttributeStart(attribute_name) self.cm.AttributeStart(attribute_name)
except Exception as e: except Exception as e:
...@@ -185,8 +220,7 @@ class Archiver(): ...@@ -185,8 +220,7 @@ class Archiver():
Stops the archiving of the attribute passed as input. Stops the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list The attribute must be already present in the subscriber's list
""" """
if (len(attribute_name.split('/'))!=4): parse_attribute_name(attribute_name)
raise AttributeFormatException
try: try:
self.cm.AttributeStop(attribute_name) self.cm.AttributeStop(attribute_name)
except Exception as e: except Exception as e:
...@@ -199,29 +233,29 @@ class Archiver(): ...@@ -199,29 +233,29 @@ class Archiver():
""" """
Check if an attribute is in the archiving list Check if an attribute is in the archiving list
""" """
if (len(attribute_name.split('/'))!=4): parse_attribute_name(attribute_name)
raise AttributeFormatException attributes = self.cm.AttributeSearch(attribute_name.lower())
try: if len(attributes)>1:
attributes = self.cm.AttributeSearch(attribute_name) # Handle case same attribute_name r/rw
a = [a for a in attributes if a.lower().endswith(attribute_name.lower())] # handle cases differences if len(attributes)==2 and (attributes[0].endswith(attributes[1]+'w') or attributes[1].endswith(attributes[0]+'w')):
if len(attributes)>1:
raise Exception("MultipleAttributesMatched!")
if len(attributes)==1:
return True return True
else: else:
return False raise Exception(f"Multiple Attributes Matched! {attributes}")
except Exception as e: elif len(attributes)==1:
raise Exception from e return True
else:
return False
def update_archiving_attribute(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN'): def update_archiving_attribute(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN'):
""" """
Update the archiving properties of an attribute already in a subscriber list Update the archiving properties of an attribute already in a subscriber list
""" """
self.remove_attribute_from_archiver(attribute_name) self.remove_attribute_from_archiver(attribute_name)
time.sleep(1) time.sleep(3.)
self.add_attribute_to_archiver(attribute_name,polling_period,event_period,strategy) self.add_attribute_to_archiver(attribute_name,polling_period,event_period,strategy)
time.sleep(1) time.sleep(3.)
self.start_archiving_attribute(attribute_name) self.start_archiving_attribute(attribute_name)
logger.info(f"Attribute {attribute_name} successfully updated!")
def get_subscriber_attributes(self,es_name:str = None): def get_subscriber_attributes(self,es_name:str = None):
""" """
...@@ -242,20 +276,15 @@ class Archiver(): ...@@ -242,20 +276,15 @@ class Archiver():
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: else:
es = self.es es = self.es
try: attrs = es.AttributeList or []
attrs = es.AttributeList or [] errs = es.AttributeErrorList or []
errs = es.AttributeErrorList or [] return dict((a,e) for a,e in zip(attrs,errs) if e) or {}
return dict((a,e) for a,e in zip(attrs,errs) if e)
except:
print('No attribute errors in the subscriber')
return {}
def get_attribute_errors(self,attribute_name:str): def get_attribute_errors(self,attribute_name:str):
""" """
Return the error related to the attribute Return the error related to the attribute
""" """
if (len(attribute_name.split('/'))!=4): parse_attribute_name(attribute_name)
raise AttributeFormatException
errs_dict = self.get_subscriber_errors() errs_dict = self.get_subscriber_errors()
for e in errs_dict: for e in errs_dict:
if attribute_name in e: if attribute_name in e:
...@@ -276,6 +305,32 @@ class Archiver(): ...@@ -276,6 +305,32 @@ class Archiver():
else: else:
return len(es.AttributeList or []) return len(es.AttributeList or [])
def get_attribute_freq(self,attribute_name:str):
"""
Return the attribute archiving frequency in events/minute
"""
parse_attribute_name(attribute_name)
if self.is_attribute_archived(attribute_name):
freq_dict = dict((a,r) for a,r in zip(self.es.AttributeList,self.es.AttributeRecordFreqList))
for f in freq_dict:
if attribute_name.lower() in f:
return freq_dict.get(f,0.)
else:
logger.warning(f"Attribute {attribute_name} not found!")
def get_attribute_failures(self,attribute_name:str):
"""
Return the attribute failure archiving frequency in events/minute
"""
parse_attribute_name(attribute_name)
if self.is_attribute_archived(attribute_name):
fail_dict = dict((a,r) for a,r in zip(self.es.AttributeList,self.es.AttributeFailureFreqList))
for f in fail_dict:
if attribute_name.lower() in f:
return fail_dict.get(f,0.)
else:
logger.warning(f"Attribute {attribute_name} not found!")
class AttributeFormatException(Exception): class AttributeFormatException(Exception):
""" """
Exception that handles wrong attribute naming Exception that handles wrong attribute naming
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment