Skip to content
Snippets Groups Projects
Commit bd450c67 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

L2SS-528: split archiver base in two versions

parent 5f19629d
No related branches found
No related tags found
1 merge request!193Resolve L2SS-528 "Timescaledb defaults"
#! /usr/bin/env python3 #! /usr/bin/env python3
#from logging import raiseExceptions
import logging import logging
from tango import DeviceProxy, AttributeProxy from tango import DeviceProxy, AttributeProxy
...@@ -11,7 +10,8 @@ import json, os ...@@ -11,7 +10,8 @@ import json, os
from sqlalchemy import create_engine, and_ from sqlalchemy import create_engine, and_
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm.exc import NoResultFound
from .archiver_base import * import importlib
import numpy as np
logger = logging.getLogger() logger = logging.getLogger()
...@@ -52,7 +52,7 @@ class Archiver(): ...@@ -52,7 +52,7 @@ class Archiver():
dev_polling_time = None dev_polling_time = None
dev_archive_time = None dev_archive_time = None
def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', context: str = 'RUN'): def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbppts/confmanager01', context: str = 'RUN'):
self.cm_name = cm_name self.cm_name = cm_name
self.cm = DeviceProxy(cm_name) self.cm = DeviceProxy(cm_name)
try: try:
...@@ -468,16 +468,19 @@ class Retriever(): ...@@ -468,16 +468,19 @@ class Retriever():
""" """
The Retriever class implements retrieve operations on a given DBMS The Retriever class implements retrieve operations on a given DBMS
""" """
def __init__(self, cm_name: str = 'archiving/hdbpp/confmanager01'): def __init__(self, cm_name: str = 'archiving/hdbppts/confmanager01'):
self.cm_name = cm_name self.cm_name = cm_name
self.session = self.connect_to_archiving_db() self.session, self.dbms = self.connect_to_archiving_db()
self.ab = self.set_archiver_base()
def get_db_credentials(self): def get_db_credentials(self):
""" """
Retrieves the DB credentials from the Tango properties of Configuration Manager Retrieves the DB credentials from the Tango properties of Configuration Manager
""" """
cm = DeviceProxy(self.cm_name) cm = DeviceProxy(self.cm_name)
config_list = cm.get_property('LibConfiguration')['LibConfiguration'] # dictionary {'LibConfiguration': list of strings} config_list = list(cm.get_property('LibConfiguration')['LibConfiguration']) # dictionary {'LibConfiguration': list of strings}
if 'connect_string=' in config_list[0]: config_list.pop(0) # possibly remove connect string because it causes errors
host = str([s for s in config_list if "host" in s][0].split('=')[1]) host = str([s for s in config_list if "host" in s][0].split('=')[1])
dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1]) dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1])
port = str([s for s in config_list if "port" in s][0].split('=')[1]) port = str([s for s in config_list if "port" in s][0].split('=')[1])
...@@ -490,15 +493,34 @@ class Retriever(): ...@@ -490,15 +493,34 @@ class Retriever():
Returns a session to a MySQL DBMS using default credentials. Returns a session to a MySQL DBMS using default credentials.
""" """
host,dbname,port,user,pw = self.get_db_credentials() host,dbname,port,user,pw = self.get_db_credentials()
engine = create_engine('mysql+pymysql://'+user+':'+pw+'@'+host+':'+port+'/'+dbname) # Set sqlalchemy library connection
if host=='archiver-maria-db':
libname = 'mysql+pymysql'
dbms = 'mysql'
elif host=='archiver-timescale':
libname = 'postgresql+psycopg2'
dbms = 'postgres'
else:
raise Exception(f"Invalid hostname! {host}")
engine = create_engine(libname+'://'+user+':'+pw+'@'+host+':'+port+'/'+dbname)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
return Session() return Session(),dbms
def set_archiver_base(self):
"""
Sets the right mapper class following the DBMS connection
"""
if self.dbms == 'postgres':
ab = importlib.import_module('.archiver_base_ts', package=__package__)
elif self.dbms == 'mysql':
ab = importlib.import_module('.archiver_base_mysql', package=__package__)
return ab
def get_all_archived_attributes(self): def get_all_archived_attributes(self):
""" """
Returns a list of the archived attributes in the DB. Returns a list of the archived attributes in the DB.
""" """
attrs = self.session.query(Attribute).order_by(Attribute.att_conf_id).all() attrs = self.session.query(self.ab.Attribute).order_by(self.ab.Attribute.att_conf_id).all()
# Returns the representation as set in __repr__ method of the mapper class # Returns the representation as set in __repr__ method of the mapper class
return attrs return attrs
...@@ -510,8 +532,8 @@ class Retriever(): ...@@ -510,8 +532,8 @@ class Retriever():
[domain, family, member] = device_fqname.split('/') [domain, family, member] = device_fqname.split('/')
except: except:
raise AttributeFormatException(f"Could not parse device name {device_fqname}. Please provide FQDN, e.g. STAT/Device/1") raise AttributeFormatException(f"Could not parse device name {device_fqname}. Please provide FQDN, e.g. STAT/Device/1")
attrs = self.session.query(Attribute).filter(and_(Attribute.domain == domain, Attribute.family == family, \ attrs = self.session.query(self.ab.Attribute).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
Attribute.member == member)).all() self.ab.Attribute.member == member)).all()
# Returns the representation as set in __repr__ method of the mapper class # Returns the representation as set in __repr__ method of the mapper class
return attrs return attrs
...@@ -524,8 +546,8 @@ class Retriever(): ...@@ -524,8 +546,8 @@ class Retriever():
except: except:
raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute") raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute")
try: try:
result = self.session.query(Attribute.att_conf_id).filter(and_(Attribute.domain == domain, Attribute.family == family, \ result = self.session.query(self.ab.Attribute.att_conf_id).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
Attribute.member == member, Attribute.name == name)).one() self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0] return result[0]
except TypeError as e: except TypeError as e:
raise Exception("Attribute not found!") from e raise Exception("Attribute not found!") from e
...@@ -543,8 +565,45 @@ class Retriever(): ...@@ -543,8 +565,45 @@ class Retriever():
except: except:
raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute") raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute")
try: try:
result = self.session.query(DataType.data_type).join(Attribute,Attribute.att_conf_data_type_id==DataType.att_conf_data_type_id).\ if self.dbms=='mysql':
filter(and_(Attribute.domain == domain, Attribute.family == family, Attribute.member == member, Attribute.name == name)).one() result = self.session.query(self.ab.DataType.data_type).join(self.ab.Attribute,self.ab.Attribute.att_conf_data_type_id==self.ab.DataType.att_conf_data_type_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
elif self.dbms=='postgres':
result = self.session.query(self.ab.DataType.type).join(self.ab.Attribute,self.ab.Attribute.att_conf_type_id==self.ab.DataType.att_conf_type_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_format(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its format.
Formats are basically three: Scalar, Spectrum and Image.
* Works only for POSTGRESQL *
"""
try:
[domain, family, member, name] = attribute_fqname.split('/')
except:
raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute")
try:
result = self.session.query(self.ab.Format.format).join(self.ab.Attribute,self.ab.Attribute.att_conf_format_id==self.ab.Format.att_conf_format_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_tablename(self,attribute_fqname: str):
try:
[domain, family, member, name] = attribute_fqname.split('/')
except:
raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute")
try:
result = self.session.query(self.ab.Attribute.table_name).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0] return result[0]
except TypeError as e: except TypeError as e:
raise Exception("Attribute not found!") from e raise Exception("Attribute not found!") from e
...@@ -559,9 +618,12 @@ class Retriever(): ...@@ -559,9 +618,12 @@ class Retriever():
""" """
attr_id = self.get_attribute_id(attribute_fqname) attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname) attr_datatype = self.get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
# Retrieves the class that maps the DB table given the tablename # Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name) if self.dbms=='mysql':
tablename = 'att_'+str(attr_datatype)
elif self.dbms=='postgres':
tablename = self.get_attribute_tablename(attribute_fqname)
base_class = self.ab.get_class_by_tablename(tablename)
# Retrieves the timestamp # Retrieves the timestamp
time_now = datetime.now() time_now = datetime.now()
time_delta = time_now - timedelta(hours=hours) time_delta = time_now - timedelta(hours=hours)
...@@ -570,8 +632,8 @@ class Retriever(): ...@@ -570,8 +632,8 @@ class Retriever():
time_delta_db = str(time_delta.strftime("%Y-%m-%d %X")) time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
try: try:
result = self.session.query(base_class).\ result = self.session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\ join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \ filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
base_class.data_time <= time_now_db)).order_by(base_class.data_time).all() base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
except AttributeError as e: except AttributeError as e:
raise Exception(f"Empty result! Attribute {attribute_fqname} not found") from e raise Exception(f"Empty result! Attribute {attribute_fqname} not found") from e
...@@ -587,18 +649,22 @@ class Retriever(): ...@@ -587,18 +649,22 @@ class Retriever():
attr_datatype = self.get_attribute_datatype(attribute_fqname) attr_datatype = self.get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype) attr_table_name = 'att_'+str(attr_datatype)
# Retrieves the class that maps the DB table given the tablename # Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name) if self.dbms=='mysql':
tablename = 'att_'+str(attr_datatype)
elif self.dbms=='postgres':
tablename = self.get_attribute_tablename(attribute_fqname)
base_class = self.ab.get_class_by_tablename(tablename)
try: try:
result = self.session.query(base_class).\ result = self.session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\ join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \ filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all() base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
except AttributeError as e: except AttributeError as e:
raise Exception(f"Empty result! Attribute {attribute_fqname} not found") from e raise Exception(f"Empty result! Attribute {attribute_fqname} not found") from e
return result return result
def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='LTS/SDP/1/fpga_temp_r', def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='stat/sdp/1/fpga_temp_r',
mask_attr_name:str='LTS/SDP/1/tr_fpga_mask_r'): mask_attr_name:str='stat/sdp/1/tr_fpga_mask_r'):
""" """
Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE
""" """
...@@ -617,11 +683,11 @@ class Retriever(): ...@@ -617,11 +683,11 @@ class Retriever():
else: else:
raise Exception raise Exception
# Convert DB Array records into Python lists # Convert DB Array records into Python lists
mask_data = build_array_from_record(mask_values,mask_values[0].dim_x_r) mask_data = self.ab.build_array_from_record(mask_values,mask_values[0].dim_x_r)
temp_data = build_array_from_record(temp_values,temp_values[0].dim_x_r) temp_data = self.ab.build_array_from_record(temp_values,temp_values[0].dim_x_r)
# Extract only the value from the array # Extract only the value from the array
mask_array_values = get_values_from_record(mask_data) mask_array_values = self.ab.get_values_from_record(mask_data)
temp_array_values = get_values_from_record(temp_data) temp_array_values = self.ab.get_values_from_record(temp_data)
# Multiply the matrix # Multiply the matrix
#masked_values = np.multiply(temp_array_values,mask_array_values) #masked_values = np.multiply(temp_array_values,mask_array_values)
masked_values = np.ma.masked_array(temp_array_values,mask=np.invert(mask_array_values.astype(bool))) masked_values = np.ma.masked_array(temp_array_values,mask=np.invert(mask_array_values.astype(bool)))
......
...@@ -29,9 +29,8 @@ class Attribute(Base): ...@@ -29,9 +29,8 @@ class Attribute(Base):
member = Column(String) member = Column(String)
name = Column(String) name = Column(String)
def __repr__(self): def __repr__(self):
return f"<Attribute(fullname='{self.att_name}',data_type ='{self.att_conf_data_type_id}',ttl='{self.att_ttl}',facility ='{elf.facility}',domain ='{self.domain}',family ='{self.family}',member ='{self.member}',name ='{self.name}')>" return f"<Attribute(fullname='{self.att_name}',data_type ='{self.att_conf_data_type_id}',ttl='{self.att_ttl}',facility ='{self.facility}',domain ='{self.domain}',family ='{self.family}',member ='{self.member}',name ='{self.name}')>"
class DataType(Base): class DataType(Base):
""" """
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment