Skip to content
Snippets Groups Projects
Commit d4e3fd8b authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

L2SS-398: redesign archiver to support multiple subscribers

parent 4ef981a1
Branches
Tags
1 merge request!161Resolve L2SS-398 "Archiver multi es"
...@@ -48,7 +48,7 @@ class Archiver(): ...@@ -48,7 +48,7 @@ class Archiver():
dev_polling_time = None dev_polling_time = None
dev_archive_time = None dev_archive_time = None
def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', es_name: str = 'archiving/hdbpp/eventsubscriber01', context: str = 'RUN'): def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', context: str = 'RUN'):
self.cm_name = cm_name self.cm_name = cm_name
self.cm = DeviceProxy(cm_name) self.cm = DeviceProxy(cm_name)
try: try:
...@@ -57,8 +57,7 @@ class Archiver(): ...@@ -57,8 +57,7 @@ class Archiver():
raise Exception("Configuration Manager is in FAULT state") raise Exception("Configuration Manager is in FAULT state")
except Exception as e: except Exception as e:
raise Exception("Connection failed with Configuration Manager device") from e raise Exception("Connection failed with Configuration Manager device") from e
self.es_name = es_name self.es_list = [DeviceProxy(es_name) for es_name in self.get_subscribers(from_db=True)] or []
self.es = DeviceProxy(es_name)
self.cm.write_attribute('Context',context) # Set default Context Archiving for all the subscribers self.cm.write_attribute('Context',context) # Set default Context Archiving for all the subscribers
self.selector = Selector() if selector_filename is None else Selector(selector_filename) # Create selector for customized strategies self.selector = Selector() if selector_filename is None else Selector(selector_filename) # Create selector for customized strategies
try: try:
...@@ -88,6 +87,14 @@ class Archiver(): ...@@ -88,6 +87,14 @@ class Archiver():
es_list = self.cm.ArchiverList or [] es_list = self.cm.ArchiverList or []
return es_list return es_list
def get_next_subscriber(self):
"""
Return the first available Event Subscriber for archiving operations
TODO: the choice among subscribers should be done analysing their load
"""
es_list = self.get_subscribers()
return es_list[0]
def apply_selector(self): def apply_selector(self):
""" """
Apply the customized strategy defined by the selector Apply the customized strategy defined by the selector
...@@ -139,7 +146,7 @@ class Archiver(): ...@@ -139,7 +146,7 @@ class Archiver():
parse_attribute_name(attribute_name) parse_attribute_name(attribute_name)
try: try:
self.cm.write_attribute('SetAttributeName', attribute_name) self.cm.write_attribute('SetAttributeName', attribute_name)
self.cm.write_attribute('SetArchiver', self.es_name) self.cm.write_attribute('SetArchiver', self.get_next_subscriber())
self.cm.write_attribute('SetStrategy', strategy) self.cm.write_attribute('SetStrategy', strategy)
self.cm.write_attribute('SetPollingPeriod', polling_period) self.cm.write_attribute('SetPollingPeriod', polling_period)
self.cm.write_attribute('SetPeriodEvent', event_period) self.cm.write_attribute('SetPeriodEvent', event_period)
...@@ -164,7 +171,8 @@ class Archiver(): ...@@ -164,7 +171,8 @@ class Archiver():
attr_proxy = AttributeProxy(attr_fullname) attr_proxy = AttributeProxy(attr_fullname)
if attr_proxy.is_polled() is True: # if not polled attribute is also not archived if attr_proxy.is_polled() is True: # if not polled attribute is also not archived
try: try:
if self.es.AttributeList is None or not(self.cm.AttributeSearch(a)): es = DeviceProxy(self.get_next_subscriber())
if es.AttributeList is None or not(self.cm.AttributeSearch(a)):
polling_period = attr_proxy.get_poll_period() or self.dev_polling_time polling_period = attr_proxy.get_poll_period() or self.dev_polling_time
archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_time archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_time
self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period, self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period,
...@@ -210,18 +218,20 @@ class Archiver(): ...@@ -210,18 +218,20 @@ class Archiver():
def remove_attributes_in_error(self,exclude:list=[],es_name:str=None): def remove_attributes_in_error(self,exclude:list=[],es_name:str=None):
""" """
Remove from the subscriber's list all the attributes currently in error (not being archived) Remove from the subscribers list all the attributes currently in error (not being archived)
""" """
if es_name is not None: if es_name is not None:
es_list = [es_name]
else:
es_list = self.get_subscribers()
for es_name in es_list:
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: attributes_nok = es.AttributeNokList or []
es = self.es exclude_list = [a.lower() for a in exclude]
attributes_nok = es.AttributeNokList or [] attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list]
exclude_list = [a.lower() for a in exclude] for a in attrs_list:
attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list] attr_fullname = reduce_attribute_name(a)
for a in attrs_list: self.remove_attribute_from_archiver(attr_fullname)
attr_fullname = reduce_attribute_name(a)
self.remove_attribute_from_archiver(attr_fullname)
def start_archiving_attribute(self, attribute_name:str): def start_archiving_attribute(self, attribute_name:str):
""" """
...@@ -281,27 +291,34 @@ class Archiver(): ...@@ -281,27 +291,34 @@ class Archiver():
def get_subscriber_attributes(self,es_name:str = None): def get_subscriber_attributes(self,es_name:str = None):
""" """
Return the list of attributes managed by the event subscriber Return the list of attributes managed by the event subscribers
""" """
attrs = []
if es_name is not None: if es_name is not None:
es_list = [es_name]
else:
es_list = self.get_subscribers()
for es_name in es_list:
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: attrs.extend(es.AttributeList)
es = self.es
attrs = es.AttributeList or []
return attrs return attrs
def get_subscriber_errors(self,es_name:str = None): def get_subscriber_errors(self,es_name:str = None):
""" """
Return a dictionary of the attributes currently in error, defined as AttributeName -> AttributeError Return a dictionary of the attributes currently in error, defined as AttributeName -> AttributeError
""" """
attrs = []
errs = []
if es_name is not None: if es_name is not None:
es_list = [es_name]
else:
es_list = self.get_subscribers()
for es_name in es_list:
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: attrs.extend(es.AttributeList)
es = self.es errs.extend(es.AttributeErrorList)
attrs = es.AttributeList or []
errs = es.AttributeErrorList or []
return dict((a,e) for a,e in zip(attrs,errs) if e) or {} return dict((a,e) for a,e in zip(attrs,errs) if e) or {}
def get_attribute_errors(self,attribute_name:str): def get_attribute_errors(self,attribute_name:str):
""" """
Return the error related to the attribute Return the error related to the attribute
...@@ -321,19 +338,34 @@ class Archiver(): ...@@ -321,19 +338,34 @@ class Archiver():
if es_name is not None: if es_name is not None:
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: else:
es = self.es es = DeviceProxy(self.get_next_subscriber())
if use_freq: if use_freq:
return str(es.AttributeRecordFreq)+(' events/period' ) return str(es.AttributeRecordFreq)+(' events/period' )
else: else:
return len(es.AttributeList or []) return len(es.AttributeList or [])
def get_attribute_subscriber(self,attribute_name:str):
"""
Given an attribute name, return the event subscriber associated with it
"""
parse_attribute_name(attribute_name)
# If the ConfManager manages more than one subscriber
if len(self.get_subscribers())>1:
for es_name in self.get_subscribers():
# Search the attribute in the subscriber list
if attribute_name.lower() in list(DeviceProxy(es_name).AttributeList):
return es_name
else:
return self.get_next_subscriber()
def get_attribute_freq(self,attribute_name:str): def get_attribute_freq(self,attribute_name:str):
""" """
Return the attribute archiving frequency in events/minute Return the attribute archiving frequency in events/minute
""" """
parse_attribute_name(attribute_name) parse_attribute_name(attribute_name)
if self.is_attribute_archived(attribute_name): if self.is_attribute_archived(attribute_name):
freq_dict = dict((a,r) for a,r in zip(self.es.AttributeList,self.es.AttributeRecordFreqList)) es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
freq_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeRecordFreqList))
for f in freq_dict: for f in freq_dict:
if attribute_name.lower() in f: if attribute_name.lower() in f:
return freq_dict.get(f,0.) return freq_dict.get(f,0.)
...@@ -346,7 +378,8 @@ class Archiver(): ...@@ -346,7 +378,8 @@ class Archiver():
""" """
parse_attribute_name(attribute_name) parse_attribute_name(attribute_name)
if self.is_attribute_archived(attribute_name): if self.is_attribute_archived(attribute_name):
fail_dict = dict((a,r) for a,r in zip(self.es.AttributeList,self.es.AttributeFailureFreqList)) es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
fail_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeFailureFreqList))
for f in fail_dict: for f in fail_dict:
if attribute_name.lower() in f: if attribute_name.lower() in f:
return fail_dict.get(f,0.) return fail_dict.get(f,0.)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment