Skip to content
Snippets Groups Projects
Commit 1e1a7e80 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

L2SS-528: move retriever in separate file

parent 83091a58
No related branches found
No related tags found
1 merge request!193Resolve L2SS-528 "Timescaledb defaults"
......@@ -3,15 +3,9 @@
import logging
from tango import DeviceProxy, AttributeProxy
from datetime import datetime, timedelta
import time
import json, os
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
import importlib
import numpy
logger = logging.getLogger()
......@@ -482,216 +476,3 @@ class Selector():
except FileNotFoundError as e:
raise
return data
class Retriever():
"""
The Retriever class implements retrieve operations on a given DBMS
"""
def __init__(self, cm_name: str = 'archiving/hdbppts/confmanager01'):
self.cm_name = cm_name
self.session, self.dbms = self.connect_to_archiving_db()
self.ab = self.set_archiver_base()
def get_db_credentials(self):
"""
Retrieves the DB credentials from the Tango properties of Configuration Manager
"""
cm = DeviceProxy(self.cm_name)
config_list = list(cm.get_property('LibConfiguration')['LibConfiguration']) # dictionary {'LibConfiguration': list of strings}
if 'connect_string=' in config_list[0]: config_list.pop(0) # possibly remove connect string because it causes errors
host = str([s for s in config_list if "host" in s][0].split('=')[1])
dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1])
port = str([s for s in config_list if "port" in s][0].split('=')[1])
user = str([s for s in config_list if "user" in s][0].split('=')[1])
pw = str([s for s in config_list if "password" in s][0].split('=')[1])
return host,dbname,port,user,pw
def connect_to_archiving_db(self):
"""
Returns a session to a MySQL DBMS using default credentials.
"""
host,dbname,port,user,pw = self.get_db_credentials()
# Set sqlalchemy library connection
if host=='archiver-maria-db':
libname = 'mysql+pymysql'
dbms = 'mysql'
elif host=='archiver-timescale':
libname = 'postgresql+psycopg2'
dbms = 'postgres'
else:
raise ValueError(f"Invalid hostname: {host}")
engine = create_engine(libname+'://'+user+':'+pw+'@'+host+':'+port+'/'+dbname)
Session = sessionmaker(bind=engine)
return Session(),dbms
def set_archiver_base(self):
"""
Sets the right mapper class following the DBMS connection
"""
if self.dbms == 'postgres':
ab = importlib.import_module('.archiver_base_ts', package=__package__)
elif self.dbms == 'mysql':
ab = importlib.import_module('.archiver_base_mysql', package=__package__)
return ab
def get_all_archived_attributes(self):
"""
Returns a list of the archived attributes in the DB.
"""
attrs = self.session.query(self.ab.Attribute).order_by(self.ab.Attribute.att_conf_id).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_archived_attributes_by_device(self,device_fqname: str):
"""
Takes as input the fully-qualified name of a device and returns a list of its archived attributes
"""
domain, family, member = split_tango_name(device_fqname,"device")
attrs = self.session.query(self.ab.Attribute).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member)).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_attribute_id(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its id.
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Attribute.att_conf_id).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception(f"Attribute {attribute_fqname} not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_datatype(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its Data-Type.
Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
as DB table name suffix in which values are stored.
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
if self.dbms=='mysql':
result = self.session.query(self.ab.DataType.data_type).join(self.ab.Attribute,self.ab.Attribute.att_conf_data_type_id==self.ab.DataType.att_conf_data_type_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
elif self.dbms=='postgres':
result = self.session.query(self.ab.DataType.type).join(self.ab.Attribute,self.ab.Attribute.att_conf_type_id==self.ab.DataType.att_conf_type_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception(f"Attribute not {attribute_fqname} found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_format(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its format.
Formats are basically three: Scalar, Spectrum and Image.
* Works only for POSTGRESQL *
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Format.format).join(self.ab.Attribute,self.ab.Attribute.att_conf_format_id==self.ab.Format.att_conf_format_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_tablename(self,attribute_fqname: str):
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Attribute.table_name).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname)
# Retrieves the class that maps the DB table given the tablename
if self.dbms=='mysql':
tablename = f"att_{attr_datatype}"
elif self.dbms=='postgres':
tablename = self.get_attribute_tablename(attribute_fqname)
base_class = self.ab.get_class_by_tablename(tablename)
# Retrieves the timestamp
time_now = datetime.now()
time_delta = time_now - timedelta(hours=hours)
# Converts the timestamps in the right format for the query
time_now_db = str(time_now.strftime("%Y-%m-%d %X"))
time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
try:
result = self.session.query(base_class).\
join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
except AttributeError as e:
raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e
return result
def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime):
"""
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname)
# Retrieves the class that maps the DB table given the tablename
if self.dbms=='mysql':
tablename = f"att_{attr_datatype}"
elif self.dbms=='postgres':
tablename = self.get_attribute_tablename(attribute_fqname)
base_class = self.ab.get_class_by_tablename(tablename)
try:
result = self.session.query(base_class).\
join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
except AttributeError as e:
raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e
return result
def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='stat/sdp/1/fpga_temp_r',
mask_attr_name:str='stat/sdp/1/tr_fpga_mask_r'):
"""
Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE
"""
mask_values = self.get_attribute_value_by_interval(mask_attr_name,start_time,stop_time)
temp_values = self.get_attribute_value_by_interval(temp_attr_name,start_time,stop_time)
# Since timestamps can be not syncrhonized, remove first or last element from arrays
if len(mask_values)==len(temp_values):
first_mask_datatime = mask_values[0].data_time
first_temp_datatime = temp_values[0].data_time
if (first_mask_datatime>first_temp_datatime):
mask_values = mask_values[:-int(mask_values[0].dim_x_r)]
temp_values = temp_values[int(temp_values[0].dim_x_r):]
elif (first_mask_datatime<first_temp_datatime):
mask_values = mask_values[int(mask_values[0].dim_x_r)]
temp_values = temp_values[:-int(temp_values[0].dim_x_r):]
else:
raise Exception
# Convert DB Array records into Python lists
mask_data = self.ab.build_array_from_record(mask_values,mask_values[0].dim_x_r)
temp_data = self.ab.build_array_from_record(temp_values,temp_values[0].dim_x_r)
# Extract only the value from the array
mask_array_values = self.ab.get_values_from_record(mask_data)
temp_array_values = self.ab.get_values_from_record(temp_data)
# Multiply the matrix
#masked_values = np.multiply(temp_array_values,mask_array_values)
masked_values = numpy.ma.masked_array(temp_array_values,mask=numpy.invert(mask_array_values.astype(bool)))
return masked_values, mask_values, temp_values
#! /usr/bin/env python3
from tango import DeviceProxy, AttributeProxy
from tangostationcontrol.toolkit.archiver import *
from datetime import datetime, timedelta
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
import importlib
import numpy
class Retriever():
"""
The Retriever class implements retrieve operations on a given DBMS
"""
def __init__(self, cm_name: str = 'archiving/hdbppts/confmanager01'):
self.cm_name = cm_name
self.session, self.dbms = self.connect_to_archiving_db()
self.ab = self.set_archiver_base()
def get_db_credentials(self):
"""
Retrieves the DB credentials from the Tango properties of Configuration Manager
"""
cm = DeviceProxy(self.cm_name)
config_list = list(cm.get_property('LibConfiguration')['LibConfiguration']) # dictionary {'LibConfiguration': list of strings}
if 'connect_string=' in config_list[0]: config_list.pop(0) # possibly remove connect string because it causes errors
host = str([s for s in config_list if "host" in s][0].split('=')[1])
dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1])
port = str([s for s in config_list if "port" in s][0].split('=')[1])
user = str([s for s in config_list if "user" in s][0].split('=')[1])
pw = str([s for s in config_list if "password" in s][0].split('=')[1])
return host,dbname,port,user,pw
def connect_to_archiving_db(self):
"""
Returns a session to a MySQL DBMS using default credentials.
"""
host,dbname,port,user,pw = self.get_db_credentials()
# Set sqlalchemy library connection
if host=='archiver-maria-db':
libname = 'mysql+pymysql'
dbms = 'mysql'
elif host=='archiver-timescale':
libname = 'postgresql+psycopg2'
dbms = 'postgres'
else:
raise ValueError(f"Invalid hostname: {host}")
engine = create_engine(libname+'://'+user+':'+pw+'@'+host+':'+port+'/'+dbname)
Session = sessionmaker(bind=engine)
return Session(),dbms
def set_archiver_base(self):
"""
Sets the right mapper class following the DBMS connection
"""
if self.dbms == 'postgres':
ab = importlib.import_module('.archiver_base_ts', package=__package__)
elif self.dbms == 'mysql':
ab = importlib.import_module('.archiver_base_mysql', package=__package__)
return ab
def get_all_archived_attributes(self):
"""
Returns a list of the archived attributes in the DB.
"""
attrs = self.session.query(self.ab.Attribute).order_by(self.ab.Attribute.att_conf_id).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_archived_attributes_by_device(self,device_fqname: str):
"""
Takes as input the fully-qualified name of a device and returns a list of its archived attributes
"""
domain, family, member = split_tango_name(device_fqname,"device")
attrs = self.session.query(self.ab.Attribute).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member)).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_attribute_id(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its id.
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Attribute.att_conf_id).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception(f"Attribute {attribute_fqname} not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_datatype(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its Data-Type.
Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
as DB table name suffix in which values are stored.
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
if self.dbms=='mysql':
result = self.session.query(self.ab.DataType.data_type).join(self.ab.Attribute,self.ab.Attribute.att_conf_data_type_id==self.ab.DataType.att_conf_data_type_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
elif self.dbms=='postgres':
result = self.session.query(self.ab.DataType.type).join(self.ab.Attribute,self.ab.Attribute.att_conf_type_id==self.ab.DataType.att_conf_type_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception(f"Attribute not {attribute_fqname} found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_format(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its format.
Formats are basically three: Scalar, Spectrum and Image.
* Works only for POSTGRESQL *
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Format.format).join(self.ab.Attribute,self.ab.Attribute.att_conf_format_id==self.ab.Format.att_conf_format_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_tablename(self,attribute_fqname: str):
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Attribute.table_name).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname)
# Retrieves the class that maps the DB table given the tablename
if self.dbms=='mysql':
tablename = f"att_{attr_datatype}"
elif self.dbms=='postgres':
tablename = self.get_attribute_tablename(attribute_fqname)
base_class = self.ab.get_class_by_tablename(tablename)
# Retrieves the timestamp
time_now = datetime.now()
time_delta = time_now - timedelta(hours=hours)
# Converts the timestamps in the right format for the query
time_now_db = str(time_now.strftime("%Y-%m-%d %X"))
time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
try:
result = self.session.query(base_class).\
join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
except AttributeError as e:
raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e
return result
def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime):
"""
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname)
# Retrieves the class that maps the DB table given the tablename
if self.dbms=='mysql':
tablename = f"att_{attr_datatype}"
elif self.dbms=='postgres':
tablename = self.get_attribute_tablename(attribute_fqname)
base_class = self.ab.get_class_by_tablename(tablename)
try:
result = self.session.query(base_class).\
join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
except AttributeError as e:
raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e
return result
def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='stat/sdp/1/fpga_temp_r',
mask_attr_name:str='stat/sdp/1/tr_fpga_mask_r'):
"""
Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE
"""
mask_values = self.get_attribute_value_by_interval(mask_attr_name,start_time,stop_time)
temp_values = self.get_attribute_value_by_interval(temp_attr_name,start_time,stop_time)
# Since timestamps can be not syncrhonized, remove first or last element from arrays
if len(mask_values)==len(temp_values):
first_mask_datatime = mask_values[0].data_time
first_temp_datatime = temp_values[0].data_time
if (first_mask_datatime>first_temp_datatime):
mask_values = mask_values[:-int(mask_values[0].dim_x_r)]
temp_values = temp_values[int(temp_values[0].dim_x_r):]
elif (first_mask_datatime<first_temp_datatime):
mask_values = mask_values[int(mask_values[0].dim_x_r)]
temp_values = temp_values[:-int(temp_values[0].dim_x_r):]
else:
raise Exception
# Convert DB Array records into Python lists
mask_data = self.ab.build_array_from_record(mask_values,mask_values[0].dim_x_r)
temp_data = self.ab.build_array_from_record(temp_values,temp_values[0].dim_x_r)
# Extract only the value from the array
mask_array_values = self.ab.get_values_from_record(mask_data)
temp_array_values = self.ab.get_values_from_record(temp_data)
# Multiply the matrix
#masked_values = np.multiply(temp_array_values,mask_array_values)
masked_values = numpy.ma.masked_array(temp_array_values,mask=numpy.invert(mask_array_values.astype(bool)))
return masked_values, mask_values, temp_values
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment