diff --git a/devices/toolkit/archiver.py b/devices/toolkit/archiver.py index 685d5401e75a370c1e93a875a24bf632d882a302..2734926313dafe3b6f7be1b9e39b94fd7d6ccdcb 100644 --- a/devices/toolkit/archiver.py +++ b/devices/toolkit/archiver.py @@ -48,7 +48,7 @@ class Archiver(): dev_polling_time = None dev_archive_time = None - def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', es_name: str = 'archiving/hdbpp/eventsubscriber01', context: str = 'RUN'): + def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', context: str = 'RUN'): self.cm_name = cm_name self.cm = DeviceProxy(cm_name) try: @@ -57,8 +57,7 @@ class Archiver(): raise Exception("Configuration Manager is in FAULT state") except Exception as e: raise Exception("Connection failed with Configuration Manager device") from e - self.es_name = es_name - self.es = DeviceProxy(es_name) + self.es_list = [DeviceProxy(es_name) for es_name in self.get_subscribers(from_db=True)] or [] self.cm.write_attribute('Context',context) # Set default Context Archiving for all the subscribers self.selector = Selector() if selector_filename is None else Selector(selector_filename) # Create selector for customized strategies try: @@ -88,6 +87,14 @@ class Archiver(): es_list = self.cm.ArchiverList or [] return es_list + def get_next_subscriber(self): + """ + Return the first available Event Subscriber for archiving operations + TODO: the choice among subscribers should be done analysing their load + """ + es_list = self.get_subscribers() + return es_list[0] + def apply_selector(self): """ Apply the customized strategy defined by the selector @@ -139,7 +146,7 @@ class Archiver(): parse_attribute_name(attribute_name) try: self.cm.write_attribute('SetAttributeName', attribute_name) - self.cm.write_attribute('SetArchiver', self.es_name) + self.cm.write_attribute('SetArchiver', self.get_next_subscriber()) self.cm.write_attribute('SetStrategy', strategy) self.cm.write_attribute('SetPollingPeriod', polling_period) self.cm.write_attribute('SetPeriodEvent', event_period) @@ -164,7 +171,8 @@ class Archiver(): attr_proxy = AttributeProxy(attr_fullname) if attr_proxy.is_polled() is True: # if not polled attribute is also not archived try: - if self.es.AttributeList is None or not(self.cm.AttributeSearch(a)): + es = DeviceProxy(self.get_next_subscriber()) + if es.AttributeList is None or not(self.cm.AttributeSearch(a)): polling_period = attr_proxy.get_poll_period() or self.dev_polling_time archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_time self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period, @@ -210,18 +218,20 @@ class Archiver(): def remove_attributes_in_error(self,exclude:list=[],es_name:str=None): """ - Remove from the subscriber's list all the attributes currently in error (not being archived) + Remove from the subscribers list all the attributes currently in error (not being archived) """ if es_name is not None: + es_list = [es_name] + else: + es_list = self.get_subscribers() + for es_name in es_list: es = DeviceProxy(es_name) - else: - es = self.es - attributes_nok = es.AttributeNokList or [] - exclude_list = [a.lower() for a in exclude] - attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list] - for a in attrs_list: - attr_fullname = reduce_attribute_name(a) - self.remove_attribute_from_archiver(attr_fullname) + attributes_nok = es.AttributeNokList or [] + exclude_list = [a.lower() for a in exclude] + attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list] + for a in attrs_list: + attr_fullname = reduce_attribute_name(a) + self.remove_attribute_from_archiver(attr_fullname) def start_archiving_attribute(self, attribute_name:str): """ @@ -281,27 +291,34 @@ class Archiver(): def get_subscriber_attributes(self,es_name:str = None): """ - Return the list of attributes managed by the event subscriber + Return the list of attributes managed by the event subscribers """ + attrs = [] if es_name is not None: + es_list = [es_name] + else: + es_list = self.get_subscribers() + for es_name in es_list: es = DeviceProxy(es_name) - else: - es = self.es - attrs = es.AttributeList or [] + attrs.extend(es.AttributeList) return attrs def get_subscriber_errors(self,es_name:str = None): """ Return a dictionary of the attributes currently in error, defined as AttributeName -> AttributeError """ + attrs = [] + errs = [] if es_name is not None: + es_list = [es_name] + else: + es_list = self.get_subscribers() + for es_name in es_list: es = DeviceProxy(es_name) - else: - es = self.es - attrs = es.AttributeList or [] - errs = es.AttributeErrorList or [] + attrs.extend(es.AttributeList) + errs.extend(es.AttributeErrorList) return dict((a,e) for a,e in zip(attrs,errs) if e) or {} - + def get_attribute_errors(self,attribute_name:str): """ Return the error related to the attribute @@ -321,19 +338,34 @@ class Archiver(): if es_name is not None: es = DeviceProxy(es_name) else: - es = self.es + es = DeviceProxy(self.get_next_subscriber()) if use_freq: return str(es.AttributeRecordFreq)+(' events/period' ) else: return len(es.AttributeList or []) + def get_attribute_subscriber(self,attribute_name:str): + """ + Given an attribute name, return the event subscriber associated with it + """ + parse_attribute_name(attribute_name) + # If the ConfManager manages more than one subscriber + if len(self.get_subscribers())>1: + for es_name in self.get_subscribers(): + # Search the attribute in the subscriber list + if attribute_name.lower() in list(DeviceProxy(es_name).AttributeList): + return es_name + else: + return self.get_next_subscriber() + def get_attribute_freq(self,attribute_name:str): """ Return the attribute archiving frequency in events/minute """ parse_attribute_name(attribute_name) if self.is_attribute_archived(attribute_name): - freq_dict = dict((a,r) for a,r in zip(self.es.AttributeList,self.es.AttributeRecordFreqList)) + es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) + freq_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeRecordFreqList)) for f in freq_dict: if attribute_name.lower() in f: return freq_dict.get(f,0.) @@ -346,7 +378,8 @@ class Archiver(): """ parse_attribute_name(attribute_name) if self.is_attribute_archived(attribute_name): - fail_dict = dict((a,r) for a,r in zip(self.es.AttributeList,self.es.AttributeFailureFreqList)) + es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) + fail_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeFailureFreqList)) for f in fail_dict: if attribute_name.lower() in f: return fail_dict.get(f,0.)