From 1e1a7e80784dd34056fc6fea1c770c70a3cca490 Mon Sep 17 00:00:00 2001 From: stedif <stefano.difrischia@inaf.it> Date: Mon, 13 Dec 2021 15:22:58 +0100 Subject: [PATCH] L2SS-528: move retriever in separate file --- .../tangostationcontrol/toolkit/archiver.py | 219 ----------------- .../tangostationcontrol/toolkit/retriever.py | 224 ++++++++++++++++++ 2 files changed, 224 insertions(+), 219 deletions(-) create mode 100644 tangostationcontrol/tangostationcontrol/toolkit/retriever.py diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py index d56ec7d3c..d30a0aa36 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py @@ -3,15 +3,9 @@ import logging from tango import DeviceProxy, AttributeProxy -from datetime import datetime, timedelta import time import json, os -from sqlalchemy import create_engine, and_ -from sqlalchemy.orm import sessionmaker -from sqlalchemy.orm.exc import NoResultFound -import importlib -import numpy logger = logging.getLogger() @@ -482,216 +476,3 @@ class Selector(): except FileNotFoundError as e: raise return data - -class Retriever(): - """ - The Retriever class implements retrieve operations on a given DBMS - """ - def __init__(self, cm_name: str = 'archiving/hdbppts/confmanager01'): - self.cm_name = cm_name - self.session, self.dbms = self.connect_to_archiving_db() - self.ab = self.set_archiver_base() - - - def get_db_credentials(self): - """ - Retrieves the DB credentials from the Tango properties of Configuration Manager - """ - cm = DeviceProxy(self.cm_name) - config_list = list(cm.get_property('LibConfiguration')['LibConfiguration']) # dictionary {'LibConfiguration': list of strings} - if 'connect_string=' in config_list[0]: config_list.pop(0) # possibly remove connect string because it causes errors - host = str([s for s in config_list if "host" in s][0].split('=')[1]) - dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1]) - port = str([s for s in config_list if "port" in s][0].split('=')[1]) - user = str([s for s in config_list if "user" in s][0].split('=')[1]) - pw = str([s for s in config_list if "password" in s][0].split('=')[1]) - return host,dbname,port,user,pw - - def connect_to_archiving_db(self): - """ - Returns a session to a MySQL DBMS using default credentials. - """ - host,dbname,port,user,pw = self.get_db_credentials() - # Set sqlalchemy library connection - if host=='archiver-maria-db': - libname = 'mysql+pymysql' - dbms = 'mysql' - elif host=='archiver-timescale': - libname = 'postgresql+psycopg2' - dbms = 'postgres' - else: - raise ValueError(f"Invalid hostname: {host}") - engine = create_engine(libname+'://'+user+':'+pw+'@'+host+':'+port+'/'+dbname) - Session = sessionmaker(bind=engine) - return Session(),dbms - - def set_archiver_base(self): - """ - Sets the right mapper class following the DBMS connection - """ - if self.dbms == 'postgres': - ab = importlib.import_module('.archiver_base_ts', package=__package__) - elif self.dbms == 'mysql': - ab = importlib.import_module('.archiver_base_mysql', package=__package__) - return ab - - def get_all_archived_attributes(self): - """ - Returns a list of the archived attributes in the DB. - """ - attrs = self.session.query(self.ab.Attribute).order_by(self.ab.Attribute.att_conf_id).all() - # Returns the representation as set in __repr__ method of the mapper class - return attrs - - def get_archived_attributes_by_device(self,device_fqname: str): - """ - Takes as input the fully-qualified name of a device and returns a list of its archived attributes - """ - domain, family, member = split_tango_name(device_fqname,"device") - attrs = self.session.query(self.ab.Attribute).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ - self.ab.Attribute.member == member)).all() - # Returns the representation as set in __repr__ method of the mapper class - return attrs - - def get_attribute_id(self,attribute_fqname: str): - """ - Takes as input the fully-qualified name of an attribute and returns its id. - """ - domain, family, member, name = split_tango_name(attribute_fqname,"attribute") - try: - result = self.session.query(self.ab.Attribute.att_conf_id).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ - self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() - return result[0] - except TypeError as e: - raise Exception(f"Attribute {attribute_fqname} not found!") from e - except NoResultFound as e: - raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e - - def get_attribute_datatype(self,attribute_fqname: str): - """ - Takes as input the fully-qualified name of an attribute and returns its Data-Type. - Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used - as DB table name suffix in which values are stored. - """ - domain, family, member, name = split_tango_name(attribute_fqname,"attribute") - try: - if self.dbms=='mysql': - result = self.session.query(self.ab.DataType.data_type).join(self.ab.Attribute,self.ab.Attribute.att_conf_data_type_id==self.ab.DataType.att_conf_data_type_id).\ - filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() - elif self.dbms=='postgres': - result = self.session.query(self.ab.DataType.type).join(self.ab.Attribute,self.ab.Attribute.att_conf_type_id==self.ab.DataType.att_conf_type_id).\ - filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() - return result[0] - except TypeError as e: - raise Exception(f"Attribute not {attribute_fqname} found!") from e - except NoResultFound as e: - raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e - - def get_attribute_format(self,attribute_fqname: str): - """ - Takes as input the fully-qualified name of an attribute and returns its format. - Formats are basically three: Scalar, Spectrum and Image. - * Works only for POSTGRESQL * - """ - domain, family, member, name = split_tango_name(attribute_fqname,"attribute") - try: - result = self.session.query(self.ab.Format.format).join(self.ab.Attribute,self.ab.Attribute.att_conf_format_id==self.ab.Format.att_conf_format_id).\ - filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() - return result[0] - except TypeError as e: - raise Exception("Attribute not found!") from e - except NoResultFound as e: - raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e - - def get_attribute_tablename(self,attribute_fqname: str): - domain, family, member, name = split_tango_name(attribute_fqname,"attribute") - try: - result = self.session.query(self.ab.Attribute.table_name).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ - self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() - return result[0] - except TypeError as e: - raise Exception("Attribute not found!") from e - except NoResultFound as e: - raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e - - def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0): - """ - Takes as input the attribute fully-qualified name and the number of past hours since the actual time - (e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half). - Returns a list of timestamps and a list of values - """ - attr_id = self.get_attribute_id(attribute_fqname) - attr_datatype = self.get_attribute_datatype(attribute_fqname) - # Retrieves the class that maps the DB table given the tablename - if self.dbms=='mysql': - tablename = f"att_{attr_datatype}" - elif self.dbms=='postgres': - tablename = self.get_attribute_tablename(attribute_fqname) - base_class = self.ab.get_class_by_tablename(tablename) - # Retrieves the timestamp - time_now = datetime.now() - time_delta = time_now - timedelta(hours=hours) - # Converts the timestamps in the right format for the query - time_now_db = str(time_now.strftime("%Y-%m-%d %X")) - time_delta_db = str(time_delta.strftime("%Y-%m-%d %X")) - try: - result = self.session.query(base_class).\ - join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\ - filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \ - base_class.data_time <= time_now_db)).order_by(base_class.data_time).all() - except AttributeError as e: - raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e - return result - - def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime): - """ - Takes as input the attribute name and a certain starting and ending point-time. - The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss"). - Returns a list of timestamps and a list of values - """ - attr_id = self.get_attribute_id(attribute_fqname) - attr_datatype = self.get_attribute_datatype(attribute_fqname) - # Retrieves the class that maps the DB table given the tablename - if self.dbms=='mysql': - tablename = f"att_{attr_datatype}" - elif self.dbms=='postgres': - tablename = self.get_attribute_tablename(attribute_fqname) - base_class = self.ab.get_class_by_tablename(tablename) - try: - result = self.session.query(base_class).\ - join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\ - filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \ - base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all() - except AttributeError as e: - raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e - return result - - def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='stat/sdp/1/fpga_temp_r', - mask_attr_name:str='stat/sdp/1/tr_fpga_mask_r'): - """ - Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE - """ - mask_values = self.get_attribute_value_by_interval(mask_attr_name,start_time,stop_time) - temp_values = self.get_attribute_value_by_interval(temp_attr_name,start_time,stop_time) - # Since timestamps can be not syncrhonized, remove first or last element from arrays - if len(mask_values)==len(temp_values): - first_mask_datatime = mask_values[0].data_time - first_temp_datatime = temp_values[0].data_time - if (first_mask_datatime>first_temp_datatime): - mask_values = mask_values[:-int(mask_values[0].dim_x_r)] - temp_values = temp_values[int(temp_values[0].dim_x_r):] - elif (first_mask_datatime<first_temp_datatime): - mask_values = mask_values[int(mask_values[0].dim_x_r)] - temp_values = temp_values[:-int(temp_values[0].dim_x_r):] - else: - raise Exception - # Convert DB Array records into Python lists - mask_data = self.ab.build_array_from_record(mask_values,mask_values[0].dim_x_r) - temp_data = self.ab.build_array_from_record(temp_values,temp_values[0].dim_x_r) - # Extract only the value from the array - mask_array_values = self.ab.get_values_from_record(mask_data) - temp_array_values = self.ab.get_values_from_record(temp_data) - # Multiply the matrix - #masked_values = np.multiply(temp_array_values,mask_array_values) - masked_values = numpy.ma.masked_array(temp_array_values,mask=numpy.invert(mask_array_values.astype(bool))) - return masked_values, mask_values, temp_values diff --git a/tangostationcontrol/tangostationcontrol/toolkit/retriever.py b/tangostationcontrol/tangostationcontrol/toolkit/retriever.py new file mode 100644 index 000000000..190c014da --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/toolkit/retriever.py @@ -0,0 +1,224 @@ +#! /usr/bin/env python3 + +from tango import DeviceProxy, AttributeProxy +from tangostationcontrol.toolkit.archiver import * + +from datetime import datetime, timedelta +from sqlalchemy import create_engine, and_ +from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm.exc import NoResultFound +import importlib +import numpy + +class Retriever(): + """ + The Retriever class implements retrieve operations on a given DBMS + """ + def __init__(self, cm_name: str = 'archiving/hdbppts/confmanager01'): + self.cm_name = cm_name + self.session, self.dbms = self.connect_to_archiving_db() + self.ab = self.set_archiver_base() + + + def get_db_credentials(self): + """ + Retrieves the DB credentials from the Tango properties of Configuration Manager + """ + cm = DeviceProxy(self.cm_name) + config_list = list(cm.get_property('LibConfiguration')['LibConfiguration']) # dictionary {'LibConfiguration': list of strings} + if 'connect_string=' in config_list[0]: config_list.pop(0) # possibly remove connect string because it causes errors + host = str([s for s in config_list if "host" in s][0].split('=')[1]) + dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1]) + port = str([s for s in config_list if "port" in s][0].split('=')[1]) + user = str([s for s in config_list if "user" in s][0].split('=')[1]) + pw = str([s for s in config_list if "password" in s][0].split('=')[1]) + return host,dbname,port,user,pw + + def connect_to_archiving_db(self): + """ + Returns a session to a MySQL DBMS using default credentials. + """ + host,dbname,port,user,pw = self.get_db_credentials() + # Set sqlalchemy library connection + if host=='archiver-maria-db': + libname = 'mysql+pymysql' + dbms = 'mysql' + elif host=='archiver-timescale': + libname = 'postgresql+psycopg2' + dbms = 'postgres' + else: + raise ValueError(f"Invalid hostname: {host}") + engine = create_engine(libname+'://'+user+':'+pw+'@'+host+':'+port+'/'+dbname) + Session = sessionmaker(bind=engine) + return Session(),dbms + + def set_archiver_base(self): + """ + Sets the right mapper class following the DBMS connection + """ + if self.dbms == 'postgres': + ab = importlib.import_module('.archiver_base_ts', package=__package__) + elif self.dbms == 'mysql': + ab = importlib.import_module('.archiver_base_mysql', package=__package__) + return ab + + def get_all_archived_attributes(self): + """ + Returns a list of the archived attributes in the DB. + """ + attrs = self.session.query(self.ab.Attribute).order_by(self.ab.Attribute.att_conf_id).all() + # Returns the representation as set in __repr__ method of the mapper class + return attrs + + def get_archived_attributes_by_device(self,device_fqname: str): + """ + Takes as input the fully-qualified name of a device and returns a list of its archived attributes + """ + domain, family, member = split_tango_name(device_fqname,"device") + attrs = self.session.query(self.ab.Attribute).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ + self.ab.Attribute.member == member)).all() + # Returns the representation as set in __repr__ method of the mapper class + return attrs + + def get_attribute_id(self,attribute_fqname: str): + """ + Takes as input the fully-qualified name of an attribute and returns its id. + """ + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + result = self.session.query(self.ab.Attribute.att_conf_id).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ + self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception(f"Attribute {attribute_fqname} not found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + def get_attribute_datatype(self,attribute_fqname: str): + """ + Takes as input the fully-qualified name of an attribute and returns its Data-Type. + Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used + as DB table name suffix in which values are stored. + """ + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + if self.dbms=='mysql': + result = self.session.query(self.ab.DataType.data_type).join(self.ab.Attribute,self.ab.Attribute.att_conf_data_type_id==self.ab.DataType.att_conf_data_type_id).\ + filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + elif self.dbms=='postgres': + result = self.session.query(self.ab.DataType.type).join(self.ab.Attribute,self.ab.Attribute.att_conf_type_id==self.ab.DataType.att_conf_type_id).\ + filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception(f"Attribute not {attribute_fqname} found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + def get_attribute_format(self,attribute_fqname: str): + """ + Takes as input the fully-qualified name of an attribute and returns its format. + Formats are basically three: Scalar, Spectrum and Image. + * Works only for POSTGRESQL * + """ + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + result = self.session.query(self.ab.Format.format).join(self.ab.Attribute,self.ab.Attribute.att_conf_format_id==self.ab.Format.att_conf_format_id).\ + filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception("Attribute not found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + def get_attribute_tablename(self,attribute_fqname: str): + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + result = self.session.query(self.ab.Attribute.table_name).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ + self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception("Attribute not found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0): + """ + Takes as input the attribute fully-qualified name and the number of past hours since the actual time + (e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half). + Returns a list of timestamps and a list of values + """ + attr_id = self.get_attribute_id(attribute_fqname) + attr_datatype = self.get_attribute_datatype(attribute_fqname) + # Retrieves the class that maps the DB table given the tablename + if self.dbms=='mysql': + tablename = f"att_{attr_datatype}" + elif self.dbms=='postgres': + tablename = self.get_attribute_tablename(attribute_fqname) + base_class = self.ab.get_class_by_tablename(tablename) + # Retrieves the timestamp + time_now = datetime.now() + time_delta = time_now - timedelta(hours=hours) + # Converts the timestamps in the right format for the query + time_now_db = str(time_now.strftime("%Y-%m-%d %X")) + time_delta_db = str(time_delta.strftime("%Y-%m-%d %X")) + try: + result = self.session.query(base_class).\ + join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\ + filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \ + base_class.data_time <= time_now_db)).order_by(base_class.data_time).all() + except AttributeError as e: + raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e + return result + + def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime): + """ + Takes as input the attribute name and a certain starting and ending point-time. + The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss"). + Returns a list of timestamps and a list of values + """ + attr_id = self.get_attribute_id(attribute_fqname) + attr_datatype = self.get_attribute_datatype(attribute_fqname) + # Retrieves the class that maps the DB table given the tablename + if self.dbms=='mysql': + tablename = f"att_{attr_datatype}" + elif self.dbms=='postgres': + tablename = self.get_attribute_tablename(attribute_fqname) + base_class = self.ab.get_class_by_tablename(tablename) + try: + result = self.session.query(base_class).\ + join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\ + filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \ + base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all() + except AttributeError as e: + raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e + return result + + def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='stat/sdp/1/fpga_temp_r', + mask_attr_name:str='stat/sdp/1/tr_fpga_mask_r'): + """ + Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE + """ + mask_values = self.get_attribute_value_by_interval(mask_attr_name,start_time,stop_time) + temp_values = self.get_attribute_value_by_interval(temp_attr_name,start_time,stop_time) + # Since timestamps can be not syncrhonized, remove first or last element from arrays + if len(mask_values)==len(temp_values): + first_mask_datatime = mask_values[0].data_time + first_temp_datatime = temp_values[0].data_time + if (first_mask_datatime>first_temp_datatime): + mask_values = mask_values[:-int(mask_values[0].dim_x_r)] + temp_values = temp_values[int(temp_values[0].dim_x_r):] + elif (first_mask_datatime<first_temp_datatime): + mask_values = mask_values[int(mask_values[0].dim_x_r)] + temp_values = temp_values[:-int(temp_values[0].dim_x_r):] + else: + raise Exception + # Convert DB Array records into Python lists + mask_data = self.ab.build_array_from_record(mask_values,mask_values[0].dim_x_r) + temp_data = self.ab.build_array_from_record(temp_values,temp_values[0].dim_x_r) + # Extract only the value from the array + mask_array_values = self.ab.get_values_from_record(mask_data) + temp_array_values = self.ab.get_values_from_record(temp_data) + # Multiply the matrix + #masked_values = np.multiply(temp_array_values,mask_array_values) + masked_values = numpy.ma.masked_array(temp_array_values,mask=numpy.invert(mask_array_values.astype(bool))) + return masked_values, mask_values, temp_values -- GitLab