Skip to content
Snippets Groups Projects
Commit ec6354fb authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-398-archiver-multi-es' into 'master'

Resolve L2SS-398 "Archiver multi es"

Closes L2SS-398

See merge request !161
parents 6f6f374e 9f22b4bc
No related branches found
No related tags found
1 merge request!161Resolve L2SS-398 "Archiver multi es"
...@@ -49,6 +49,25 @@ services: ...@@ -49,6 +49,25 @@ services:
syslog-format: rfc3164 syslog-format: rfc3164
tag: "{{.Name}}" tag: "{{.Name}}"
restart: unless-stopped restart: unless-stopped
hdbpp-es2:
image: ${LOCAL_DOCKER_REGISTRY_HOST}/${LOCAL_DOCKER_REGISTRY_USER}/tango-archiver:2021-05-28
networks:
- control
container_name: hdbpp-es2
depends_on:
- databaseds
- dsconfig
- archiver-maria-db
environment:
- TANGO_HOST=${TANGO_HOST}
- HdbManager=archiving/hdbpp/confmanager01
command: >
/bin/bash -c "
wait-for-it.sh archiver-maria-db:3306 --timeout=30 --strict --
wait-for-it.sh ${TANGO_HOST} --timeout=30 --strict --
hdbppes-srv 03"
restart: unless-stopped
hdbpp-cm: hdbpp-cm:
image: ${LOCAL_DOCKER_REGISTRY_HOST}/${LOCAL_DOCKER_REGISTRY_USER}/tango-archiver:${TANGO_ARCHIVER_VERSION} image: ${LOCAL_DOCKER_REGISTRY_HOST}/${LOCAL_DOCKER_REGISTRY_USER}/tango-archiver:${TANGO_ARCHIVER_VERSION}
......
Subproject commit dddb23ff587f6e9c837cdb77e7955e94272eca6f Subproject commit 774d39a40ca19c9d979ad22565e57b4af3e9a831
...@@ -26,6 +26,19 @@ ...@@ -26,6 +26,19 @@
} }
} }
} }
},
"03": {
"HdbEventSubscriber": {
"archiving/hdbpp/eventsubscriber02": {
"attribute_properties": {},
"properties": {
"CheckPeriodicTimeoutDelay": ["5"],
"PollingThreadPeriod": ["3"],
"LibConfiguration": ["host=archiver-maria-db","libname=libhdb++mysql.so.6","dbname=hdbpp","port=3306", "user=tango", "password=tango"],
"polled_attr": []
}
}
}
} }
}, },
"hdbppcm-srv": { "hdbppcm-srv": {
......
This diff is collapsed.
This diff is collapsed.
...@@ -2,9 +2,6 @@ ...@@ -2,9 +2,6 @@
#from logging import raiseExceptions #from logging import raiseExceptions
import logging import logging
import traceback
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tango import DeviceProxy, AttributeProxy from tango import DeviceProxy, AttributeProxy
from datetime import datetime, timedelta from datetime import datetime, timedelta
...@@ -19,15 +16,6 @@ from .archiver_base import * ...@@ -19,15 +16,6 @@ from .archiver_base import *
logger = logging.getLogger() logger = logging.getLogger()
def parse_attribute_name(attribute_name:str): def parse_attribute_name(attribute_name:str):
"""
Parse the attribute names since most of the archiving operations require only
Tango full-qualified names.
"""
chunk_num = len(attribute_name.split('/'))
if (chunk_num!=4):
raise AttributeFormatException
def reduce_attribute_name(attribute_name:str):
""" """
For some operations Tango attribute must be transformed from the form 'tango://db:port/domain/family/name/attribute' For some operations Tango attribute must be transformed from the form 'tango://db:port/domain/family/name/attribute'
to canonical 'domain/family/name/attribute' to canonical 'domain/family/name/attribute'
...@@ -41,6 +29,20 @@ def reduce_attribute_name(attribute_name:str): ...@@ -41,6 +29,20 @@ def reduce_attribute_name(attribute_name:str):
else: else:
return attribute_name return attribute_name
def parse_device_name(device_name:str, tango_host:str = 'databaseds:10000'):
"""
For some operations Tango devices must be transformed from the form 'domain/family/name'
to 'tango://db:port/domain/family/name'
"""
chunk_num = len(device_name.split('/'))
if (chunk_num==3):
return 'tango://'+tango_host+'/'+device_name
elif (chunk_num==6 and device_name.split('/')[0]=='tango:'):
return device_name
else:
raise Exception(f'{device_name} is a wrong device name')
class Archiver(): class Archiver():
""" """
The Archiver class implements the basic operations to perform attributes archiving The Archiver class implements the basic operations to perform attributes archiving
...@@ -50,7 +52,7 @@ class Archiver(): ...@@ -50,7 +52,7 @@ class Archiver():
dev_polling_time = None dev_polling_time = None
dev_archive_time = None dev_archive_time = None
def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', es_name: str = 'archiving/hdbpp/eventsubscriber01', context: str = 'RUN'): def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', context: str = 'RUN'):
self.cm_name = cm_name self.cm_name = cm_name
self.cm = DeviceProxy(cm_name) self.cm = DeviceProxy(cm_name)
try: try:
...@@ -59,15 +61,67 @@ class Archiver(): ...@@ -59,15 +61,67 @@ class Archiver():
raise Exception("Configuration Manager is in FAULT state") raise Exception("Configuration Manager is in FAULT state")
except Exception as e: except Exception as e:
raise Exception("Connection failed with Configuration Manager device") from e raise Exception("Connection failed with Configuration Manager device") from e
self.es_name = es_name self.es_list = [es_name for es_name in self.get_subscribers(from_db=False)] or []
self.es = DeviceProxy(es_name)
self.cm.write_attribute('Context',context) # Set default Context Archiving for all the subscribers self.cm.write_attribute('Context',context) # Set default Context Archiving for all the subscribers
self.selector = Selector() if selector_filename is None else Selector(selector_filename) # Create selector for customized strategies self.selector = Selector() if selector_filename is None else Selector(selector_filename) # Create selector for customized strategies
try: try:
self.apply_selector() self.apply_selector()
except Exception as e: except Exception as e:
raise Exception("Error in selecting configuration! Archiving framework will not be updated!") from e raise Exception("Error in selecting configuration! Archiving framework will not be updated!") from e
def get_db_config(self, device_name:str):
"""
Retrieve the DB credentials from the Tango properties of Configuration Manager or EventSubscribers
"""
device = DeviceProxy(device_name)
config_list = device.get_property('LibConfiguration')['LibConfiguration'] # dictionary {'LibConfiguration': list of strings}
host = str([s for s in config_list if "host" in s][0].split('=')[1])
libname = str([s for s in config_list if "libname" in s][0].split('=')[1])
dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1])
port = str([s for s in config_list if "port" in s][0].split('=')[1])
user = str([s for s in config_list if "user" in s][0].split('=')[1])
pw = str([s for s in config_list if "password" in s][0].split('=')[1])
return [host,libname,dbname,port,user,pw]
def get_hdbpp_libname(self, device_name:str):
"""
Get the hdbpp library name used by the Configuration Manager or by the EventSubscribers
Useful in the case of different DBMS architectures (e.g. MySQL, TimescaleDB)
"""
config_list = self.get_db_config(device_name)
return config_list[1]
def get_subscribers(self, from_db:bool=False):
"""
Get the list of Event Subscribers managed by the Configuration Manager.
It can be retrieved as a device property (stored in TangoDB) or as a device attribute.
Choose from_db=True only if new subscribers are not added dynamically while ConfManager is running.
"""
if from_db:
es_list = self.cm.get_property('ArchiverList')['ArchiverList'] or []
else:
es_list = self.cm.ArchiverList or []
return es_list
def get_next_subscriber(self):
"""
Return the first available Event Subscriber for archiving operations
TODO: the choice among subscribers should be done analysing their load
"""
es_list = self.get_subscribers()
# Only one subscriber in ConfManager list
if len(es_list)==1:
return es_list[0]
else :
# Choose the best subscriber analysing their load
load_dict = {}
for es_name in es_list:
es = DeviceProxy(es_name)
load_dict[es_name]=float(es.AttributeRecordFreq or 0)
# Return the subscriber's name with min load
min_es = min(load_dict,key=load_dict.get)
return min_es
def apply_selector(self): def apply_selector(self):
""" """
Apply the customized strategy defined by the selector Apply the customized strategy defined by the selector
...@@ -109,17 +163,38 @@ class Archiver(): ...@@ -109,17 +163,38 @@ class Archiver():
raise Exception from e raise Exception from e
return env_dict return env_dict
def add_attribute_to_archiver(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN'): def add_event_subscriber(self, es_name:str=None):
"""
Add an additional Event Subscriber to the Configuration Manager
"""
# If the subscriber name is not defined, generate the name by incrementing the existing one
if es_name is None:
last_es_name = self.get_subscribers()[-1]
last_es_idx = int(last_es_name[-2:])
es_name = last_es_name[:-2]+'0'+str(last_es_idx+1)
try:
es = DeviceProxy(es_name)
es_state = es.state() # ping the device server
if 'FAULT' in str(es_state):
raise Exception(f"{es_name} is in FAULT state")
self.cm.ArchiverAdd(parse_device_name(es_name))
except Exception as e:
if 'already_present' in str(e):
logger.warning(f"Subscriber {es_name} already present in Configuration Manager")
else:
raise Exception from e
def add_attribute_to_archiver(self, attribute_name: str, polling_period: int, event_period: int, strategy: str = 'RUN', es_name:str=None):
""" """
Takes as input the attribute name, polling period (ms), event period (ms) and archiving strategy, Takes as input the attribute name, polling period (ms), event period (ms) and archiving strategy,
and adds the selected attribute to the subscriber's list of archiving attributes. and adds the selected attribute to the subscriber's list of archiving attributes.
The ConfigurationManager and EventSubscriber devices must be already up and running. The ConfigurationManager and EventSubscriber devices must be already up and running.
The archiving-DBMS must be already properly configured. The archiving-DBMS must be already properly configured.
""" """
parse_attribute_name(attribute_name) attribute_name = parse_attribute_name(attribute_name)
try: try:
self.cm.write_attribute('SetAttributeName', attribute_name) self.cm.write_attribute('SetAttributeName', attribute_name)
self.cm.write_attribute('SetArchiver', self.es_name) self.cm.write_attribute('SetArchiver', es_name or self.get_next_subscriber())
self.cm.write_attribute('SetStrategy', strategy) self.cm.write_attribute('SetStrategy', strategy)
self.cm.write_attribute('SetPollingPeriod', polling_period) self.cm.write_attribute('SetPollingPeriod', polling_period)
self.cm.write_attribute('SetPeriodEvent', event_period) self.cm.write_attribute('SetPeriodEvent', event_period)
...@@ -131,7 +206,7 @@ class Archiver(): ...@@ -131,7 +206,7 @@ class Archiver():
else: else:
logger.warning(f"Attribute {attribute_name} already in archiving list!") logger.warning(f"Attribute {attribute_name} already in archiving list!")
def add_attributes_by_device(self,device_name,global_archive_period:int = None, exclude:list = []): def add_attributes_by_device(self,device_name,global_archive_period:int = None, es_name:str=None, exclude:list = []):
""" """
Add sequentially all the attributes of the selected device in the event subscriber list, if not already present Add sequentially all the attributes of the selected device in the event subscriber list, if not already present
""" """
...@@ -144,11 +219,12 @@ class Archiver(): ...@@ -144,11 +219,12 @@ class Archiver():
attr_proxy = AttributeProxy(attr_fullname) attr_proxy = AttributeProxy(attr_fullname)
if attr_proxy.is_polled() is True: # if not polled attribute is also not archived if attr_proxy.is_polled() is True: # if not polled attribute is also not archived
try: try:
if self.es.AttributeList is None or not(self.cm.AttributeSearch(a)): es = DeviceProxy(es_name or self.get_next_subscriber()) # choose an e.s. or get the first one available
if es.AttributeList is None or not(self.cm.AttributeSearch(a)):
polling_period = attr_proxy.get_poll_period() or self.dev_polling_time polling_period = attr_proxy.get_poll_period() or self.dev_polling_time
archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_time archive_period = global_archive_period or int(attr_proxy.get_property('archive_period')['archive_period'][0]) or self.dev_archive_time
self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period, self.add_attribute_to_archiver(attr_fullname,polling_period=polling_period,
event_period=archive_period) event_period=archive_period, es_name = es.name())
#time.sleep(0.5) #time.sleep(0.5)
except IndexError as e: except IndexError as e:
logger.warning(f"Attribute {attr_fullname} will not be archived because archive event period is not defined!") logger.warning(f"Attribute {attr_fullname} will not be archived because archive event period is not defined!")
...@@ -161,7 +237,7 @@ class Archiver(): ...@@ -161,7 +237,7 @@ class Archiver():
""" """
Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list. Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list.
""" """
parse_attribute_name(attribute_name) attribute_name = parse_attribute_name(attribute_name)
try: try:
self.cm.AttributeStop(attribute_name) self.cm.AttributeStop(attribute_name)
self.cm.AttributeRemove(attribute_name) self.cm.AttributeRemove(attribute_name)
...@@ -190,25 +266,27 @@ class Archiver(): ...@@ -190,25 +266,27 @@ class Archiver():
def remove_attributes_in_error(self,exclude:list=[],es_name:str=None): def remove_attributes_in_error(self,exclude:list=[],es_name:str=None):
""" """
Remove from the subscriber's list all the attributes currently in error (not being archived) Remove from the subscribers list all the attributes currently in error (not being archived)
""" """
if es_name is not None: if es_name is not None:
es_list = [es_name]
else:
es_list = self.get_subscribers()
for es_name in es_list:
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: attributes_nok = es.AttributeNokList or []
es = self.es exclude_list = [a.lower() for a in exclude]
attributes_nok = es.AttributeNokList or [] attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list]
exclude_list = [a.lower() for a in exclude] for a in attrs_list:
attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list] attr_fullname = parse_attribute_name(a)
for a in attrs_list: self.remove_attribute_from_archiver(attr_fullname)
attr_fullname = reduce_attribute_name(a)
self.remove_attribute_from_archiver(attr_fullname)
def start_archiving_attribute(self, attribute_name:str): def start_archiving_attribute(self, attribute_name:str):
""" """
Starts the archiving of the attribute passed as input. Starts the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list The attribute must be already present in the subscriber's list
""" """
parse_attribute_name(attribute_name) attribute_name = parse_attribute_name(attribute_name)
try: try:
self.cm.AttributeStart(attribute_name) self.cm.AttributeStart(attribute_name)
except Exception as e: except Exception as e:
...@@ -222,7 +300,7 @@ class Archiver(): ...@@ -222,7 +300,7 @@ class Archiver():
Stops the archiving of the attribute passed as input. Stops the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list The attribute must be already present in the subscriber's list
""" """
parse_attribute_name(attribute_name) attribute_name = parse_attribute_name(attribute_name)
try: try:
self.cm.AttributeStop(attribute_name) self.cm.AttributeStop(attribute_name)
except Exception as e: except Exception as e:
...@@ -235,7 +313,7 @@ class Archiver(): ...@@ -235,7 +313,7 @@ class Archiver():
""" """
Check if an attribute is in the archiving list Check if an attribute is in the archiving list
""" """
parse_attribute_name(attribute_name) attribute_name = parse_attribute_name(attribute_name)
attributes = self.cm.AttributeSearch(attribute_name.lower()) attributes = self.cm.AttributeSearch(attribute_name.lower())
if len(attributes)>1: if len(attributes)>1:
# Handle case same attribute_name r/rw # Handle case same attribute_name r/rw
...@@ -261,32 +339,39 @@ class Archiver(): ...@@ -261,32 +339,39 @@ class Archiver():
def get_subscriber_attributes(self,es_name:str = None): def get_subscriber_attributes(self,es_name:str = None):
""" """
Return the list of attributes managed by the event subscriber Return the list of attributes managed by the event subscribers
""" """
attrs = []
if es_name is not None: if es_name is not None:
es_list = [es_name]
else:
es_list = self.get_subscribers()
for es_name in es_list:
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: attrs.extend(list(es.AttributeList or []))
es = self.es
attrs = es.AttributeList or []
return attrs return attrs
def get_subscriber_errors(self,es_name:str = None): def get_subscriber_errors(self,es_name:str = None):
""" """
Return a dictionary of the attributes currently in error, defined as AttributeName -> AttributeError Return a dictionary of the attributes currently in error, defined as AttributeName -> AttributeError
""" """
attrs = []
errs = []
if es_name is not None: if es_name is not None:
es_list = [es_name]
else:
es_list = self.get_subscribers()
for es_name in es_list:
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: attrs.extend(list(es.AttributeList or []))
es = self.es errs.extend(list(es.AttributeErrorList or []))
attrs = es.AttributeList or []
errs = es.AttributeErrorList or []
return dict((a,e) for a,e in zip(attrs,errs) if e) or {} return dict((a,e) for a,e in zip(attrs,errs) if e) or {}
def get_attribute_errors(self,attribute_name:str): def get_attribute_errors(self,attribute_name:str):
""" """
Return the error related to the attribute Return the error related to the attribute
""" """
parse_attribute_name(attribute_name) attribute_name = parse_attribute_name(attribute_name)
errs_dict = self.get_subscriber_errors() errs_dict = self.get_subscriber_errors()
for e in errs_dict: for e in errs_dict:
if attribute_name in e: if attribute_name in e:
...@@ -301,19 +386,35 @@ class Archiver(): ...@@ -301,19 +386,35 @@ class Archiver():
if es_name is not None: if es_name is not None:
es = DeviceProxy(es_name) es = DeviceProxy(es_name)
else: else:
es = self.es es = DeviceProxy(self.get_next_subscriber())
if use_freq: if use_freq:
return str(es.AttributeRecordFreq)+(' events/period' ) return str(es.AttributeRecordFreq)+(' events/period' )
else: else:
return len(es.AttributeList or []) return len(es.AttributeList or [])
def get_attribute_subscriber(self,attribute_name:str):
"""
Given an attribute name, return the event subscriber associated with it
"""
attribute_name = parse_attribute_name(attribute_name)
# If the ConfManager manages more than one subscriber
if len(self.get_subscribers())>1:
for es_name in self.get_subscribers():
# Search the attribute in the subscriber list (search substring because of the Tango naming conventions)
for a in list(DeviceProxy(es_name).AttributeList or []):
if attribute_name.lower() in a:
return es_name
else:
return self.get_next_subscriber()
def get_attribute_freq(self,attribute_name:str): def get_attribute_freq(self,attribute_name:str):
""" """
Return the attribute archiving frequency in events/minute Return the attribute archiving frequency in events/minute
""" """
parse_attribute_name(attribute_name) attribute_name = parse_attribute_name(attribute_name)
if self.is_attribute_archived(attribute_name): if self.is_attribute_archived(attribute_name):
freq_dict = dict((a,r) for a,r in zip(self.es.AttributeList,self.es.AttributeRecordFreqList)) es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
freq_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeRecordFreqList))
for f in freq_dict: for f in freq_dict:
if attribute_name.lower() in f: if attribute_name.lower() in f:
return freq_dict.get(f,0.) return freq_dict.get(f,0.)
...@@ -324,9 +425,10 @@ class Archiver(): ...@@ -324,9 +425,10 @@ class Archiver():
""" """
Return the attribute failure archiving frequency in events/minute Return the attribute failure archiving frequency in events/minute
""" """
parse_attribute_name(attribute_name) attribute_name = parse_attribute_name(attribute_name)
if self.is_attribute_archived(attribute_name): if self.is_attribute_archived(attribute_name):
fail_dict = dict((a,r) for a,r in zip(self.es.AttributeList,self.es.AttributeFailureFreqList)) es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
fail_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeFailureFreqList))
for f in fail_dict: for f in fail_dict:
if attribute_name.lower() in f: if attribute_name.lower() in f:
return fail_dict.get(f,0.) return fail_dict.get(f,0.)
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
"STAT/SDP/1": { "STAT/SDP/1": {
"environment": "development", "environment": "development",
"include": [], "include": [],
"exclude": [] "exclude": ["FPGA_scrap_R","FPGA_scrap_RW"]
}, },
"STAT/SST/1": { "STAT/SST/1": {
"environment": "development", "environment": "development",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment