Skip to content
Snippets Groups Projects
Commit 4b50e2cf authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'master' into L2SS-480-delays-to-beam-weights

parents 31bc380b abf64b5a
No related branches found
No related tags found
1 merge request!220Resolve L2SS-480 "Delays to beam weights"
......@@ -70,47 +70,32 @@ class attribute_wrapper(attribute):
max_dim_y = 0
if access == AttrWriteType.READ_WRITE:
""" if the attribute is of READ_WRITE type, assign the RW and write function to it"""
""" if the attribute is of READ_WRITE type, assign the write function to it"""
@only_in_states([DevState.STANDBY, DevState.ON], log=False)
@fault_on_error()
def read_RW(device):
# print("read_RW {}, {}x{}, {}, {}".format(me.name, me.dim_x, me.dim_y, me.attr_type, me.value))
def write_func_wrapper(device, value):
"""
read_RW returns the value that was last written to the attribute
"""
try:
return device.value_dict[self]
except Exception as e:
raise Exception(f"Attribute read_RW function error, attempted to read value_dict with key: `{self}`, are you sure this exists?") from e
@only_in_states([DevState.STANDBY, DevState.ON], log=False)
@fault_on_error()
def write_RW(device, value):
"""
_write_RW writes a value to this attribute
write_func_wrapper writes a value to this attribute
"""
self.write_function(value)
device.value_dict[self] = value
self.fget = read_RW
self.fset = write_RW
self.fset = write_func_wrapper
else:
""" if the attribute is of READ type, assign the read function to it"""
""" Assign the read function to the attribute"""
@only_in_states([DevState.STANDBY, DevState.ON], log=False)
@fault_on_error()
def read_R(device):
"""
_read_R reads the attribute value, stores it and returns it"
"""
device.value_dict[self] = self.read_function()
return device.value_dict[self]
@only_in_states([DevState.STANDBY, DevState.ON], log=False)
@fault_on_error()
def read_func_wrapper(device):
"""
read_func_wrapper reads the attribute value, stores it and returns it"
"""
device.value_dict[self] = self.read_function()
return device.value_dict[self]
self.fget = read_R
self.fget = read_func_wrapper
super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, **kwargs)
......
......@@ -308,7 +308,7 @@ class TestAttributeTypes(base.TestCase):
if test_type == "scalar":
expected = numpy.zeros((1,), dtype=dtype)
val = proxy.scalar_RW
val = proxy.scalar_R
elif test_type == "spectrum":
expected = numpy.zeros(spectrum_dims, dtype=dtype)
val = proxy.spectrum_R
......
......@@ -73,8 +73,12 @@ class test_client(CommClient):
"""
takes all gathered data to configure and return the correct read and write functions
"""
self.value = numpy.zeros(dims, dtype)
if dtype == str and dims == (1,):
self.value = ''
elif dims == (1,):
self.value = dtype(0)
else:
self.value = numpy.zeros(dims, dtype)
def read_function():
logger.debug("from read_function, reading {} array of type {}".format(dims, dtype))
......
......@@ -2,7 +2,7 @@
import logging
from tango import DeviceProxy, AttributeProxy
from tango import DeviceProxy, AttributeProxy, DevState, DevFailed
import time
import json, os
......@@ -55,6 +55,27 @@ def split_tango_name(tango_fqname:str, tango_type:str):
else:
raise ValueError(f"Invalid value: {tango_type}. Please provide 'device' or 'attribute'.")
def warn_if_attribute_not_found():
"""
Log a warning if an exception is thrown indicating access to an non-existing attribute
was requested, and swallow the exception.
"""
def inner(func):
@wraps(func)
def warn_wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except DevFailed as e:
if e.args[0].reason == 'Attribute not found':
logger.warning(f"Attribute {attribute_name} not found!")
else:
raise
return warn_wrapper
return inner
class Archiver():
"""
The Archiver class implements the basic operations to perform attributes archiving
......@@ -68,12 +89,11 @@ class Archiver():
self.cm_name = cm_name
self.cm = DeviceProxy(cm_name)
try:
cm_state = self.cm.state() # ping the device server
if 'FAULT' in str(cm_state):
raise Exception("Configuration Manager is in FAULT state")
if self.cm.state() == DevState.FAULT:
raise Exception(f"Configuration Manager {cm_name} is in FAULT state")
except Exception as e:
raise Exception("Connection failed with Configuration Manager device") from e
self.es_list = [es_name for es_name in self.get_subscribers(from_db=False)] or []
raise Exception(f"Connection failed with Configuration Manager {cm_name}") from e
self.es_list = [es_name for es_name in self.get_subscribers(from_db=False)]
self.cm.write_attribute('Context',context) # Set default Context Archiving for all the subscribers
self.selector = Selector() if selector_filename is None else Selector(selector_filename) # Create selector for customized strategies
try:
......@@ -81,27 +101,25 @@ class Archiver():
except Exception as e:
raise Exception("Error in selecting configuration. Archiving framework will not be updated.") from e
def get_db_config(self, device_name:str):
def get_db_config(self, device_name:str) -> dict:
"""
Retrieve the DB credentials from the Tango properties of Configuration Manager or EventSubscribers
"""
device = DeviceProxy(device_name)
config_list = device.get_property('LibConfiguration')['LibConfiguration'] # dictionary {'LibConfiguration': list of strings}
host = str([s for s in config_list if "host" in s][0].split('=')[1])
libname = str([s for s in config_list if "libname" in s][0].split('=')[1])
dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1])
port = str([s for s in config_list if "port" in s][0].split('=')[1])
user = str([s for s in config_list if "user" in s][0].split('=')[1])
pw = str([s for s in config_list if "password" in s][0].split('=')[1])
return [host,libname,dbname,port,user,pw]
# example LibConfiguration property value:
# ['connect_string= user=postgres password=password host=archiver-timescale port=5432 dbname=hdb', 'host=archiver-timescale', 'libname=libhdb++timescale.so', 'dbname=hdb', 'port=5432', 'user=postgres', 'password=password']
config_strs = device.get_property('LibConfiguration')['LibConfiguration']
config = dict(config_str.split("=",1) for config_str in config_strs)
return config
def get_hdbpp_libname(self, device_name:str):
"""
Get the hdbpp library name used by the Configuration Manager or by the EventSubscribers
Useful in the case of different DBMS architectures (e.g. MySQL, TimescaleDB)
"""
config_list = self.get_db_config(device_name)
return config_list[1]
config = self.get_db_config(device_name)
return config["libname"]
def get_subscribers(self, from_db:bool=False):
"""
......@@ -186,15 +204,14 @@ class Archiver():
es_name = last_es_name[:-2]+'0'+str(last_es_idx+1)
try:
es = DeviceProxy(es_name)
es_state = es.state() # ping the device server
if 'FAULT' in str(es_state):
raise Exception(f"{es_name} is in FAULT state")
if es.state() == DevState.FAULT:
raise Exception(f"Event Subscriber {es_name} is in FAULT state")
self.cm.ArchiverAdd(device_name_url(es_name))
except Exception as e:
if 'already_present' in str(e):
logger.warning(f"Subscriber {es_name} already present in Configuration Manager")
except DevFailed as e:
if e.args[0].reason == "Archiver already present":
logger.warning(f"Event Subscriber {es_name} already present in Configuration Manager")
else:
raise Exception from e
raise
def add_attribute_to_archiver(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN', es_name:str=None):
"""
......@@ -212,11 +229,11 @@ class Archiver():
self.cm.write_attribute('SetPeriodEvent', event_period)
self.cm.AttributeAdd()
logger.info(f"Attribute {attribute_name} added to archiving list!")
except Exception as e:
if 'already archived' not in str(e).lower():
raise Exception from e
else:
except DevFailed as e:
if e.args[0].reason == 'Already archived':
logger.warning(f"Attribute {attribute_name} already in archiving list!")
else:
raise
def add_attributes_by_device(self,device_name,global_archive_period:int = None, es_name:str=None, exclude:list = []):
"""
......@@ -244,21 +261,17 @@ class Archiver():
raise Exception from e
else:
logger.warning(f"Attribute {attr_fullname} will not be archived because polling is set to FALSE!")
@warn_if_attribute_not_found()
def remove_attribute_from_archiver(self, attribute_name:str):
"""
Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list.
"""
attribute_name = attribute_name_from_url(attribute_name)
try:
self.cm.AttributeStop(attribute_name)
self.cm.AttributeRemove(attribute_name)
logger.warning(f"Attribute {attribute_name} removed!")
except Exception as e:
if 'attribute not found' not in str(e).lower():
raise Exception from e
else:
logger.warning(f"Attribute {attribute_name} not found in archiving list!")
self.cm.AttributeStop(attribute_name)
self.cm.AttributeRemove(attribute_name)
logger.warning(f"Attribute {attribute_name} removed!")
def remove_attributes_by_device(self,device_name:str,exclude:list=[]):
"""
......@@ -293,33 +306,25 @@ class Archiver():
attr_fullname = attribute_name_from_url(a)
self.remove_attribute_from_archiver(attr_fullname)
@warn_if_attribute_not_found()
def start_archiving_attribute(self, attribute_name:str):
"""
Starts the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list
"""
attribute_name = attribute_name_from_url(attribute_name)
try:
self.cm.AttributeStart(attribute_name)
except Exception as e:
if 'attribute not found' not in str(e).lower():
raise Exception from e
else:
logger.warning(f"Attribute {attribute_name} not found!")
self.cm.AttributeStart(attribute_name)
@warn_if_attribute_not_found()
def stop_archiving_attribute(self, attribute_name:str):
"""
Stops the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list
"""
attribute_name = attribute_name_from_url(attribute_name)
try:
self.cm.AttributeStop(attribute_name)
except Exception as e:
if 'attribute not found' not in str(e).lower():
raise Exception from e
else:
logger.warning(f"Attribute {attribute_name} not found!")
self.cm.AttributeStop(attribute_name)
def is_attribute_archived(self,attribute_name:str):
"""
......@@ -369,15 +374,13 @@ class Archiver():
"""
attrs = []
errs = []
if es_name is not None:
es_list = [es_name]
else:
es_list = self.get_subscribers()
es_list = [es_name] if es_name else self.get_subscribers()
for es_name in es_list:
es = DeviceProxy(es_name)
attrs.extend(list(es.AttributeList or []))
errs.extend(list(es.AttributeErrorList or []))
return dict((a,e) for a,e in zip(attrs,errs) if e) or {}
return {a: e for a,e in zip(attrs,errs) if e}
def get_attribute_errors(self,attribute_name:str):
"""
......@@ -387,7 +390,7 @@ class Archiver():
errs_dict = self.get_subscriber_errors()
for e in errs_dict:
if attribute_name in e:
return errs_dict.get(e)
return errs_dict[e]
return None
def get_subscriber_load(self,use_freq:bool=True,es_name:str = None):
......@@ -395,10 +398,7 @@ class Archiver():
Return the estimated load of an archiver, in frequency of records or number
of attributes
"""
if es_name is not None:
es = DeviceProxy(es_name)
else:
es = DeviceProxy(self.get_next_subscriber())
es = DeviceProxy(es_name or self.get_next_subscriber())
if use_freq:
return str(es.AttributeRecordFreq)+(' events/period' )
else:
......@@ -426,10 +426,10 @@ class Archiver():
attribute_name = attribute_name_from_url(attribute_name)
if self.is_attribute_archived(attribute_name):
es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
freq_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeRecordFreqList))
freq_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeRecordFreqList)}
for f in freq_dict:
if attribute_name.lower() in f:
return freq_dict.get(f,0.)
return freq_dict[f]
else:
logger.warning(f"Attribute {attribute_name} not found!")
......@@ -440,10 +440,10 @@ class Archiver():
attribute_name = attribute_name_from_url(attribute_name)
if self.is_attribute_archived(attribute_name):
es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
fail_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeFailureFreqList))
fail_dict = {a: r for a,r in zip(es.AttributeList,es.AttributeFailureFreqList)}
for f in fail_dict:
if attribute_name.lower() in f:
return fail_dict.get(f,0.)
return fail_dict[f]
else:
logger.warning(f"Attribute {attribute_name} not found!")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment