diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py index 7562e88620c897bbc09c35fce92f87540d9bb04b..715ca039db4aaf03121f3b1c15815de7746a133e 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py @@ -2,7 +2,7 @@ import logging -from tango import DeviceProxy, AttributeProxy +from tango import DeviceProxy, AttributeProxy, DevState, DevFailed import time import json, os @@ -55,6 +55,27 @@ def split_tango_name(tango_fqname:str, tango_type:str): else: raise ValueError(f"Invalid value: {tango_type}. Please provide 'device' or 'attribute'.") + +def warn_if_attribute_not_found(): + """ + Log a warning if an exception is thrown indicating access to an non-existing attribute + was requested, and swallow the exception. + """ + def inner(func): + @wraps(func) + def warn_wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except DevFailed as e: + if e.args[0].reason == 'Attribute not found': + logger.warning(f"Attribute {attribute_name} not found!") + else: + raise + + return warn_wrapper + + return inner + class Archiver(): """ The Archiver class implements the basic operations to perform attributes archiving @@ -68,12 +89,11 @@ class Archiver(): self.cm_name = cm_name self.cm = DeviceProxy(cm_name) try: - cm_state = self.cm.state() # ping the device server - if 'FAULT' in str(cm_state): - raise Exception("Configuration Manager is in FAULT state") + if self.cm.state() == DevState.FAULT: + raise Exception(f"Configuration Manager {cm_name} is in FAULT state") except Exception as e: - raise Exception("Connection failed with Configuration Manager device") from e - self.es_list = [es_name for es_name in self.get_subscribers(from_db=False)] or [] + raise Exception(f"Connection failed with Configuration Manager {cm_name}") from e + self.es_list = [es_name for es_name in self.get_subscribers(from_db=False)] self.cm.write_attribute('Context',context) # Set default Context Archiving for all the subscribers self.selector = Selector() if selector_filename is None else Selector(selector_filename) # Create selector for customized strategies try: @@ -81,27 +101,25 @@ class Archiver(): except Exception as e: raise Exception("Error in selecting configuration. Archiving framework will not be updated.") from e - def get_db_config(self, device_name:str): + def get_db_config(self, device_name:str) -> dict: """ Retrieve the DB credentials from the Tango properties of Configuration Manager or EventSubscribers """ device = DeviceProxy(device_name) - config_list = device.get_property('LibConfiguration')['LibConfiguration'] # dictionary {'LibConfiguration': list of strings} - host = str([s for s in config_list if "host" in s][0].split('=')[1]) - libname = str([s for s in config_list if "libname" in s][0].split('=')[1]) - dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1]) - port = str([s for s in config_list if "port" in s][0].split('=')[1]) - user = str([s for s in config_list if "user" in s][0].split('=')[1]) - pw = str([s for s in config_list if "password" in s][0].split('=')[1]) - return [host,libname,dbname,port,user,pw] + # example LibConfiguration property value: + # ['connect_string= user=postgres password=password host=archiver-timescale port=5432 dbname=hdb', 'host=archiver-timescale', 'libname=libhdb++timescale.so', 'dbname=hdb', 'port=5432', 'user=postgres', 'password=password'] + config_strs = device.get_property('LibConfiguration')['LibConfiguration'] + + config = dict(config_str.split("=",1) for config_str in config_strs) + return config def get_hdbpp_libname(self, device_name:str): """ Get the hdbpp library name used by the Configuration Manager or by the EventSubscribers Useful in the case of different DBMS architectures (e.g. MySQL, TimescaleDB) """ - config_list = self.get_db_config(device_name) - return config_list[1] + config = self.get_db_config(device_name) + return config["libname"] def get_subscribers(self, from_db:bool=False): """ @@ -186,15 +204,14 @@ class Archiver(): es_name = last_es_name[:-2]+'0'+str(last_es_idx+1) try: es = DeviceProxy(es_name) - es_state = es.state() # ping the device server - if 'FAULT' in str(es_state): - raise Exception(f"{es_name} is in FAULT state") + if es.state() == DevState.FAULT: + raise Exception(f"Event Subscriber {es_name} is in FAULT state") self.cm.ArchiverAdd(device_name_url(es_name)) - except Exception as e: - if 'already_present' in str(e): - logger.warning(f"Subscriber {es_name} already present in Configuration Manager") + except DevFailed as e: + if e.args[0].reason == "Archiver already present": + logger.warning(f"Event Subscriber {es_name} already present in Configuration Manager") else: - raise Exception from e + raise def add_attribute_to_archiver(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN', es_name:str=None): """ @@ -212,11 +229,11 @@ class Archiver(): self.cm.write_attribute('SetPeriodEvent', event_period) self.cm.AttributeAdd() logger.info(f"Attribute {attribute_name} added to archiving list!") - except Exception as e: - if 'already archived' not in str(e).lower(): - raise Exception from e - else: + except DevFailed as e: + if e.args[0].reason == 'Already archived': logger.warning(f"Attribute {attribute_name} already in archiving list!") + else: + raise def add_attributes_by_device(self,device_name,global_archive_period:int = None, es_name:str=None, exclude:list = []): """ @@ -244,21 +261,17 @@ class Archiver(): raise Exception from e else: logger.warning(f"Attribute {attr_fullname} will not be archived because polling is set to FALSE!") - + + @warn_if_attribute_not_found() def remove_attribute_from_archiver(self, attribute_name:str): """ Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list. """ attribute_name = attribute_name_from_url(attribute_name) - try: - self.cm.AttributeStop(attribute_name) - self.cm.AttributeRemove(attribute_name) - logger.warning(f"Attribute {attribute_name} removed!") - except Exception as e: - if 'attribute not found' not in str(e).lower(): - raise Exception from e - else: - logger.warning(f"Attribute {attribute_name} not found in archiving list!") + + self.cm.AttributeStop(attribute_name) + self.cm.AttributeRemove(attribute_name) + logger.warning(f"Attribute {attribute_name} removed!") def remove_attributes_by_device(self,device_name:str,exclude:list=[]): """ @@ -293,33 +306,25 @@ class Archiver(): attr_fullname = attribute_name_from_url(a) self.remove_attribute_from_archiver(attr_fullname) + @warn_if_attribute_not_found() def start_archiving_attribute(self, attribute_name:str): """ Starts the archiving of the attribute passed as input. The attribute must be already present in the subscriber's list """ attribute_name = attribute_name_from_url(attribute_name) - try: - self.cm.AttributeStart(attribute_name) - except Exception as e: - if 'attribute not found' not in str(e).lower(): - raise Exception from e - else: - logger.warning(f"Attribute {attribute_name} not found!") + + self.cm.AttributeStart(attribute_name) + @warn_if_attribute_not_found() def stop_archiving_attribute(self, attribute_name:str): """ Stops the archiving of the attribute passed as input. The attribute must be already present in the subscriber's list """ attribute_name = attribute_name_from_url(attribute_name) - try: - self.cm.AttributeStop(attribute_name) - except Exception as e: - if 'attribute not found' not in str(e).lower(): - raise Exception from e - else: - logger.warning(f"Attribute {attribute_name} not found!") + + self.cm.AttributeStop(attribute_name) def is_attribute_archived(self,attribute_name:str): """ @@ -369,15 +374,13 @@ class Archiver(): """ attrs = [] errs = [] - if es_name is not None: - es_list = [es_name] - else: - es_list = self.get_subscribers() + es_list = [es_name] if es_name else self.get_subscribers() + for es_name in es_list: es = DeviceProxy(es_name) attrs.extend(list(es.AttributeList or [])) errs.extend(list(es.AttributeErrorList or [])) - return dict((a,e) for a,e in zip(attrs,errs) if e) or {} + return {a: e for a,e in zip(attrs,errs) if e} def get_attribute_errors(self,attribute_name:str): """ @@ -387,7 +390,7 @@ class Archiver(): errs_dict = self.get_subscriber_errors() for e in errs_dict: if attribute_name in e: - return errs_dict.get(e) + return errs_dict[e] return None def get_subscriber_load(self,use_freq:bool=True,es_name:str = None): @@ -395,10 +398,7 @@ class Archiver(): Return the estimated load of an archiver, in frequency of records or number of attributes """ - if es_name is not None: - es = DeviceProxy(es_name) - else: - es = DeviceProxy(self.get_next_subscriber()) + es = DeviceProxy(es_name or self.get_next_subscriber()) if use_freq: return str(es.AttributeRecordFreq)+(' events/period' ) else: @@ -426,10 +426,10 @@ class Archiver(): attribute_name = attribute_name_from_url(attribute_name) if self.is_attribute_archived(attribute_name): es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) - freq_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeRecordFreqList)) + freq_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeRecordFreqList)} for f in freq_dict: if attribute_name.lower() in f: - return freq_dict.get(f,0.) + return freq_dict[f] else: logger.warning(f"Attribute {attribute_name} not found!") @@ -440,10 +440,10 @@ class Archiver(): attribute_name = attribute_name_from_url(attribute_name) if self.is_attribute_archived(attribute_name): es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) - fail_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeFailureFreqList)) + fail_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeFailureFreqList)} for f in fail_dict: if attribute_name.lower() in f: - return fail_dict.get(f,0.) + return fail_dict[f] else: logger.warning(f"Attribute {attribute_name} not found!")