Skip to content
Snippets Groups Projects
Commit d6051a2f authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

merge master into L2SS-480

parents b31d6721 e15625f7
No related branches found
No related tags found
1 merge request!220Resolve L2SS-480 "Delays to beam weights"
......@@ -9,7 +9,8 @@ nbconvert
notebook-as-pdf
python-logstash-async
PyMySQL[rsa]
psycopg2-binary >= 2.9.2 #LGPL
sqlalchemy
pyvisa
pyvisa-py
opcua
\ No newline at end of file
opcua
......@@ -7,6 +7,9 @@
version: '2'
volumes:
prometheus-data: {}
services:
prometheus:
image: prometheus
......@@ -15,6 +18,8 @@ services:
container_name: ${CONTAINER_NAME_PREFIX}prometheus
networks:
- control
volumes:
- prometheus-data:/prometheus
ports:
- "9090:9090"
logging:
......
FROM prom/prometheus
COPY prometheus.yml /etc/prometheus/prometheus.yml
CMD ["--config.file=/etc/prometheus/prometheus.yml", "--storage.tsdb.path=/prometheus", "--web.console.libraries=/usr/share/prometheus/console_libraries", "--web.console.templates=/usr/share/prometheus/consoles", "--storage.tsdb.retention.time=31d"]
......@@ -4,6 +4,7 @@
asyncua >= 0.9.90 # LGPLv3
PyMySQL[rsa] >= 1.0.2 # MIT
psycopg2-binary >= 2.9.2 #LGPL
sqlalchemy >= 1.4.26 #MIT
snmp >= 0.1.7 # GPL3
h5py >= 3.5.0 # BSD
......
#! /usr/bin/env python3
#from logging import raiseExceptions
import logging
from tango import DeviceProxy, AttributeProxy
from datetime import datetime, timedelta
import time
import json, os
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
from .archiver_base import *
logger = logging.getLogger()
def parse_attribute_name(attribute_name:str):
def attribute_name_from_url(attribute_name:str):
"""
For some operations Tango attribute must be transformed from the form 'tango://db:port/domain/family/name/attribute'
to canonical 'domain/family/name/attribute'
"""
chunk_num = len(attribute_name.split('/'))
if (chunk_num==7 and attribute_name.split('/')[0]=='tango:'):
if attribute_name.startswith('tango://'):
return '/'.join(attribute_name.split('/')[3:])
else:
if (chunk_num!=4):
raise AttributeFormatException
else:
return attribute_name
def parse_device_name(device_name:str, tango_host:str = 'databaseds:10000'):
if len(attribute_name.split('/')) != 4:
raise ValueError(f"Expected attribute of format 'domain/family/name/attribute', got {attribute_name}")
return attribute_name
def device_name_url(device_name:str, tango_host:str = 'databaseds:10000'):
"""
For some operations Tango devices must be transformed from the form 'domain/family/name'
to 'tango://db:port/domain/family/name'
"""
chunk_num = len(device_name.split('/'))
if (chunk_num==3):
return 'tango://'+tango_host+'/'+device_name
elif (chunk_num==6 and device_name.split('/')[0]=='tango:'):
if device_name.startswith('tango://'):
return device_name
if len(device_name.split('/')) != 3:
raise ValueError(f"Expected device name of format 'domain/family/name', got {device_name}")
return f"tango://{tango_host}/{device_name}"
def split_tango_name(tango_fqname:str, tango_type:str):
"""
Helper function to split device or attribute Tango full qualified names
into its components
"""
if tango_type.lower() == 'device':
try:
domain, family, member = tango_fqname.split('/')
return domain, family, member
except ValueError as e:
raise ValueError(f"Could not parse device name {tango_fqname}. Please provide FQDN, e.g. STAT/Device/1") from e
elif tango_type.lower() == 'attribute':
try:
domain, family, member, name = tango_fqname.split('/')
return domain, family, member, name
except ValueError as e:
raise ValueError(f"Could not parse attribute name {tango_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute") from e
else:
raise Exception(f'{device_name} is a wrong device name')
raise ValueError(f"Invalid value: {tango_type}. Please provide 'device' or 'attribute'.")
class Archiver():
"""
......@@ -52,7 +64,7 @@ class Archiver():
dev_polling_time = None
dev_archive_time = None
def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', context: str = 'RUN'):
def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbppts/confmanager01', context: str = 'RUN'):
self.cm_name = cm_name
self.cm = DeviceProxy(cm_name)
try:
......@@ -67,7 +79,7 @@ class Archiver():
try:
self.apply_selector()
except Exception as e:
raise Exception("Error in selecting configuration! Archiving framework will not be updated!") from e
raise Exception("Error in selecting configuration. Archiving framework will not be updated.") from e
def get_db_config(self, device_name:str):
"""
......@@ -177,7 +189,7 @@ class Archiver():
es_state = es.state() # ping the device server
if 'FAULT' in str(es_state):
raise Exception(f"{es_name} is in FAULT state")
self.cm.ArchiverAdd(parse_device_name(es_name))
self.cm.ArchiverAdd(device_name_url(es_name))
except Exception as e:
if 'already_present' in str(e):
logger.warning(f"Subscriber {es_name} already present in Configuration Manager")
......@@ -191,7 +203,7 @@ class Archiver():
The ConfigurationManager and EventSubscriber devices must be already up and running.
The archiving-DBMS must be already properly configured.
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
try:
self.cm.write_attribute('SetAttributeName', attribute_name)
self.cm.write_attribute('SetArchiver', es_name or self.get_next_subscriber())
......@@ -237,7 +249,7 @@ class Archiver():
"""
Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list.
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
try:
self.cm.AttributeStop(attribute_name)
self.cm.AttributeRemove(attribute_name)
......@@ -278,7 +290,7 @@ class Archiver():
exclude_list = [a.lower() for a in exclude]
attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list]
for a in attrs_list:
attr_fullname = parse_attribute_name(a)
attr_fullname = attribute_name_from_url(a)
self.remove_attribute_from_archiver(attr_fullname)
def start_archiving_attribute(self, attribute_name:str):
......@@ -286,7 +298,7 @@ class Archiver():
Starts the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
try:
self.cm.AttributeStart(attribute_name)
except Exception as e:
......@@ -300,7 +312,7 @@ class Archiver():
Stops the archiving of the attribute passed as input.
The attribute must be already present in the subscriber's list
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
try:
self.cm.AttributeStop(attribute_name)
except Exception as e:
......@@ -313,14 +325,14 @@ class Archiver():
"""
Check if an attribute is in the archiving list
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
attributes = self.cm.AttributeSearch(attribute_name.lower())
if len(attributes)>1:
# Handle case same attribute_name r/rw
if len(attributes)==2 and (attributes[0].endswith(attributes[1]+'w') or attributes[1].endswith(attributes[0]+'w')):
return True
else:
raise Exception(f"Multiple Attributes Matched! {attributes}")
raise Exception(f"Multiple Attributes Matched: {attributes}")
elif len(attributes)==1:
return True
else:
......@@ -371,7 +383,7 @@ class Archiver():
"""
Return the error related to the attribute
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
errs_dict = self.get_subscriber_errors()
for e in errs_dict:
if attribute_name in e:
......@@ -396,7 +408,7 @@ class Archiver():
"""
Given an attribute name, return the event subscriber associated with it
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
# If the ConfManager manages more than one subscriber
if len(self.get_subscribers())>1:
for es_name in self.get_subscribers():
......@@ -411,7 +423,7 @@ class Archiver():
"""
Return the attribute archiving frequency in events/minute
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
if self.is_attribute_archived(attribute_name):
es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
freq_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeRecordFreqList))
......@@ -425,7 +437,7 @@ class Archiver():
"""
Return the attribute failure archiving frequency in events/minute
"""
attribute_name = parse_attribute_name(attribute_name)
attribute_name = attribute_name_from_url(attribute_name)
if self.is_attribute_archived(attribute_name):
es = DeviceProxy(self.get_attribute_subscriber(attribute_name))
fail_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeFailureFreqList))
......@@ -461,168 +473,5 @@ class Selector():
data = json.load(f)
f.close()
except FileNotFoundError as e:
raise Exception("JSON configuration file not found!") from e
raise
return data
class Retriever():
"""
The Retriever class implements retrieve operations on a given DBMS
"""
def __init__(self, cm_name: str = 'archiving/hdbpp/confmanager01'):
self.cm_name = cm_name
self.session = self.connect_to_archiving_db()
def get_db_credentials(self):
"""
Retrieves the DB credentials from the Tango properties of Configuration Manager
"""
cm = DeviceProxy(self.cm_name)
config_list = cm.get_property('LibConfiguration')['LibConfiguration'] # dictionary {'LibConfiguration': list of strings}
host = str([s for s in config_list if "host" in s][0].split('=')[1])
dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1])
port = str([s for s in config_list if "port" in s][0].split('=')[1])
user = str([s for s in config_list if "user" in s][0].split('=')[1])
pw = str([s for s in config_list if "password" in s][0].split('=')[1])
return host,dbname,port,user,pw
def connect_to_archiving_db(self):
"""
Returns a session to a MySQL DBMS using default credentials.
"""
host,dbname,port,user,pw = self.get_db_credentials()
engine = create_engine('mysql+pymysql://'+user+':'+pw+'@'+host+':'+port+'/'+dbname)
Session = sessionmaker(bind=engine)
return Session()
def get_all_archived_attributes(self):
"""
Returns a list of the archived attributes in the DB.
"""
attrs = self.session.query(Attribute).order_by(Attribute.att_conf_id).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_archived_attributes_by_device(self,device_fqname: str):
"""
Takes as input the fully-qualified name of a device and returns a list of its archived attributes
"""
try:
[domain, family, member] = device_fqname.split('/')
except:
raise AttributeFormatException(f"Could not parse device name {device_fqname}. Please provide FQDN, e.g. STAT/Device/1")
attrs = self.session.query(Attribute).filter(and_(Attribute.domain == domain, Attribute.family == family, \
Attribute.member == member)).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_attribute_id(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its id.
"""
try:
[domain, family, member, name] = attribute_fqname.split('/')
except:
raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute")
try:
result = self.session.query(Attribute.att_conf_id).filter(and_(Attribute.domain == domain, Attribute.family == family, \
Attribute.member == member, Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_datatype(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its Data-Type.
Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
as DB table name suffix in which values are stored.
"""
try:
[domain, family, member, name] = attribute_fqname.split('/')
except:
raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute")
try:
result = self.session.query(DataType.data_type).join(Attribute,Attribute.att_conf_data_type_id==DataType.att_conf_data_type_id).\
filter(and_(Attribute.domain == domain, Attribute.family == family, Attribute.member == member, Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
# Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name)
# Retrieves the timestamp
time_now = datetime.now()
time_delta = time_now - timedelta(hours=hours)
# Converts the timestamps in the right format for the query
time_now_db = str(time_now.strftime("%Y-%m-%d %X"))
time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
try:
result = self.session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
except AttributeError as e:
raise Exception(f"Empty result! Attribute {attribute_fqname} not found") from e
return result
def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime):
"""
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
# Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name)
try:
result = self.session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
except AttributeError as e:
raise Exception(f"Empty result! Attribute {attribute_fqname} not found") from e
return result
def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='LTS/SDP/1/fpga_temp_r',
mask_attr_name:str='LTS/SDP/1/tr_fpga_mask_r'):
"""
Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE
"""
mask_values = self.get_attribute_value_by_interval(mask_attr_name,start_time,stop_time)
temp_values = self.get_attribute_value_by_interval(temp_attr_name,start_time,stop_time)
# Since timestamps can be not syncrhonized, remove first or last element from arrays
if len(mask_values)==len(temp_values):
first_mask_datatime = mask_values[0].data_time
first_temp_datatime = temp_values[0].data_time
if (first_mask_datatime>first_temp_datatime):
mask_values = mask_values[:-int(mask_values[0].dim_x_r)]
temp_values = temp_values[int(temp_values[0].dim_x_r):]
elif (first_mask_datatime<first_temp_datatime):
mask_values = mask_values[int(mask_values[0].dim_x_r)]
temp_values = temp_values[:-int(temp_values[0].dim_x_r):]
else:
raise Exception
# Convert DB Array records into Python lists
mask_data = build_array_from_record(mask_values,mask_values[0].dim_x_r)
temp_data = build_array_from_record(temp_values,temp_values[0].dim_x_r)
# Extract only the value from the array
mask_array_values = get_values_from_record(mask_data)
temp_array_values = get_values_from_record(temp_data)
# Multiply the matrix
#masked_values = np.multiply(temp_array_values,mask_array_values)
masked_values = np.ma.masked_array(temp_array_values,mask=np.invert(mask_array_values.astype(bool)))
return masked_values, mask_values, temp_values
......@@ -7,7 +7,7 @@ from sqlalchemy.dialects.mysql import DOUBLE,TIMESTAMP,BLOB, FLOAT, BIGINT
from sqlalchemy.sql.expression import table
from typing import List
from itertools import groupby
import numpy as np
import numpy
#Declarative system used to define classes mapped to relational DB tables
Base = declarative_base()
......@@ -28,10 +28,9 @@ class Attribute(Base):
family = Column(String)
member = Column(String)
name = Column(String)
def __repr__(self):
return f"<Attribute(fullname='{self.att_name}',data_type ='{self.att_conf_data_type_id}',ttl='{self.att_ttl}',facility ='{elf.facility}',domain ='{self.domain}',family ='{self.family}',member ='{self.member}',name ='{self.name}')>"
return f"<Attribute(fullname='{self.att_name}',data_type ='{self.att_conf_data_type_id}',ttl='{self.att_ttl}',facility ='{self.facility}',domain ='{self.domain}',family ='{self.family}',member ='{self.member}',name ='{self.name}')>"
class DataType(Base):
"""
......@@ -906,14 +905,14 @@ def build_array_from_record(rows: List[Array], dim_x: int):
"""
Converts Array database items in Python lists
"""
matrix = np.array([])
matrix = numpy.array([])
for i in range(0,dim_x):
x = np.array([item for item in rows if item.idx==i]) #group records by array index
x = numpy.array([item for item in rows if item.idx==i]) #group records by array index
if i==0:
matrix = np.append(matrix,x) #append first row
matrix = numpy.append(matrix,x) #append first row
else:
matrix = np.vstack([matrix,x]) #stack vertically
result = np.transpose(matrix) #transpose -> each row is a distinct array of value
matrix = numpy.vstack([matrix,x]) #stack vertically
result = numpy.transpose(matrix) #transpose -> each row is a distinct array of value
list_result = result.tolist()
return list_result
......@@ -921,8 +920,8 @@ def get_values_from_record(data_matrix: List[Array]):
"""
Returns a matrix of values from a matrix of Array records
"""
array_matrix = np.matrix(data_matrix)
value_matrix = np.empty(array_matrix.shape)
array_matrix = numpy.matrix(data_matrix)
value_matrix = numpy.empty(array_matrix.shape)
for index in range(array_matrix.size): # for each object element
value_matrix.itemset(index,array_matrix.item(index).value_r) # extract the value from object and put in the matrix
return value_matrix
......
#! /usr/bin/env python3
from sqlalchemy.dialects.postgresql import ARRAY,TIMESTAMP,FLOAT, JSON
from sqlalchemy.dialects.postgresql.base import BYTEA
from sqlalchemy.dialects.postgresql.ranges import INT4RANGE, INT8RANGE
from sqlalchemy.sql.sqltypes import INTEGER, TEXT, Boolean
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.sql.expression import table
from typing import List
from itertools import groupby
import numpy
#Declarative system used to define classes mapped to relational DB tables
Base = declarative_base()
class Attribute(Base):
"""
Class that represents a Tango Attribute mapped to table 'att_conf'
"""
__tablename__ = 'att_conf'
__table_args__ = {'extend_existing': True}
att_conf_id = Column(Integer, primary_key=True)
att_name = Column(String)
att_conf_type_id = Column(Integer)
att_conf_format_id = Column(Integer)
table_name = Column(String)
cs_name = Column(String)
domain = Column(String)
family = Column(String)
member = Column(String)
name = Column(String)
ttl = Column(Integer)
def __repr__(self):
return f"<Attribute(fullname='{self.att_name}',data_type ='{self.att_conf_type_id}',format='{self.att_conf_format_id}',table_name='{self.table_name}',cs_name ='{self.cs_name}',domain ='{self.domain}',family ='{self.family}',member ='{self.member}',name ='{self.name}'),ttl='{self.ttl}'>"
class DataType(Base):
"""
Class that represents a Tango Data Type mapped to table 'att_conf_data_type'
"""
__tablename__ = 'att_conf_type'
__table_args__ = {'extend_existing': True}
att_conf_type_id = Column(Integer, primary_key=True)
type = Column(String)
def __repr__(self):
return f"<DataType(type='{self.type}')>"
class Format(Base):
"""
Class that represents a Tango Format mapped to table 'att_conf_format'
"""
__tablename__ = 'att_conf_format'
__table_args__ = {'extend_existing': True}
att_conf_format_id = Column(Integer, primary_key=True)
format = Column(String)
format_num = Column(Integer)
def __repr__(self):
return f"<Format(format='{self.format}', format_num='{self.format_num}')>"
class Scalar(Base):
"""
Abstract class that represents Super-class of Scalar mapper classes
"""
# In the concrete inheritance use case, it is common that the base class is not represented
# within the database, only the subclasses. In other words, the base class is abstract.
__abstract__ = True
# Primary key is not defined for tables which store values, but SQLAlchemy requires a mandatory
# primary key definition. Anyway, this definition is on Python-side and does not compromise
# DBMS architecture
att_conf_id = Column(Integer, primary_key=True)
data_time = Column(TIMESTAMP, primary_key=True)
quality = Column(Integer)
att_error_desc_id = Column(Integer)
details = Column(JSON)
class Scalar_Boolean(Scalar):
"""
Class that represents a Tango Boolean mapped to table 'att_scalar_devboolean'
"""
__tablename__ = 'att_scalar_devboolean'
__table_args__ = {'extend_existing': True}
value_r = Column(Boolean)
value_w = Column(Boolean)
def __repr__(self):
return f"<Scalar_Boolean(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_Double(Scalar):
"""
Class that represents a Tango Double mapped to table 'att_scalar_devdouble'
"""
__tablename__ = 'att_scalar_devdouble'
__table_args__ = {'extend_existing': True}
value_r = Column(FLOAT)
value_w = Column(FLOAT)
def __repr__(self):
return f"<Scalar_Double(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_Encoded(Scalar):
"""
Class that represents a Tango Encoded mapped to table 'att_scalar_devencoded'
"""
__tablename__ = 'att_scalar_devencoded'
__table_args__ = {'extend_existing': True}
value_r = Column(BYTEA)
value_w = Column(BYTEA)
def __repr__(self):
return f"<Scalar_Encoded(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_Enum(Scalar):
"""
Class that represents a Tango Enum mapped to table 'att_scalar_devenum'
"""
__tablename__ = 'att_scalar_devenum'
__table_args__ = {'extend_existing': True}
value_r_label = Column(TEXT)
value_r = Column(INTEGER)
value_w_label = Column(TEXT)
value_w = Column(INTEGER)
def __repr__(self):
return f"<Scalar_Enum(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r_label='{self.value_r_label}',value_r='{self.value_r}',value_w_label='{self.value_w_label}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_Float(Scalar):
"""
Class that represents a Tango Float mapped to table 'att_scalar_devfloat'
"""
__tablename__ = 'att_scalar_devfloat'
__table_args__ = {'extend_existing': True}
value_r = Column(FLOAT)
value_w = Column(FLOAT)
def __repr__(self):
return f"<Scalar_Float(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_Long(Scalar):
"""
Class that represents a Tango Long mapped to table 'att_scalar_devlong'
"""
__tablename__ = 'att_scalar_devlong'
__table_args__ = {'extend_existing': True}
value_r = Column(INT4RANGE)
value_w = Column(INT4RANGE)
def __repr__(self):
return f"<Scalar_Long(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_Long64(Scalar):
"""
Class that represents a Tango Long64 mapped to table 'att_scalar_devlong64'
"""
__tablename__ = 'att_scalar_devlong64'
__table_args__ = {'extend_existing': True}
value_r = Column(INT8RANGE)
value_w = Column(INT8RANGE)
def __repr__(self):
return f"<Scalar_Long64(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_Short(Scalar):
"""
Class that represents a Tango Short mapped to table 'att_scalar_devshort'
"""
__tablename__ = 'att_scalar_devshort'
__table_args__ = {'extend_existing': True}
value_r = Column(INTEGER)
value_w = Column(INTEGER)
def __repr__(self):
return f"<Scalar_Short(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_State(Scalar):
"""
Class that represents a Tango State mapped to table 'att_scalar_devstate'
"""
__tablename__ = 'att_scalar_devstate'
__table_args__ = {'extend_existing': True}
value_r = Column(INTEGER)
value_w = Column(INTEGER)
def __repr__(self):
return f"<Scalar_State(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_String(Scalar):
"""
Class that represents a Tango String mapped to table 'att_scalar_devstring'
"""
__tablename__ = 'att_scalar_devstring'
__table_args__ = {'extend_existing': True}
value_r = Column(TEXT)
value_w = Column(TEXT)
def __repr__(self):
return f"<Scalar_String(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_UChar(Scalar):
"""
Class that represents a Tango UChar mapped to table 'att_scalar_devuchar'
"""
__tablename__ = 'att_scalar_devuchar'
__table_args__ = {'extend_existing': True}
value_r = Column(INTEGER)
value_w = Column(INTEGER)
def __repr__(self):
return f"<Scalar_UChar(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_ULong(Scalar):
"""
Class that represents a Tango ULong mapped to table 'att_scalar_devulong'
"""
__tablename__ = 'att_scalar_devulong'
__table_args__ = {'extend_existing': True}
value_r = Column(INTEGER)
value_w = Column(INTEGER)
def __repr__(self):
return f"<Scalar_ULong(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_ULong64(Scalar):
"""
Class that represents a Tango ULong64 mapped to table 'att_scalar_devulong64'
"""
__tablename__ = 'att_scalar_devulong64'
__table_args__ = {'extend_existing': True}
value_r = Column(INTEGER)
value_w = Column(INTEGER)
def __repr__(self):
return f"<Scalar_ULong64(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Scalar_UShort(Scalar):
"""
Class that represents a Tango UShort mapped to table 'att_scalar_devushort'
"""
__tablename__ = 'att_scalar_devushort'
__table_args__ = {'extend_existing': True}
value_r = Column(INTEGER)
value_w = Column(INTEGER)
def __repr__(self):
return f"<Scalar_UShort(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array(Base):
"""
Abstract class that represents Super-class of Array mapper classes
"""
__abstract__ = True
# Primary key is not defined for tables which store values, but SQLAlchemy requires a mandatory
# primary key definition. Anyway, this definition is on Python-side and does not compromise
# DBMS architecture
att_conf_id = Column(Integer, primary_key=True)
data_time = Column(TIMESTAMP, primary_key=True)
quality = Column(Integer)
att_error_desc_id = Column(Integer)
details = Column(JSON)
class Array_Boolean(Array):
"""
Class that represents a Tango Boolean Array mapped to table 'att_array_devboolean'
"""
__tablename__ = 'att_array_devboolean'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(Boolean))
value_w = Column(ARRAY(Boolean))
def __repr__(self):
return f"<Array_Boolean(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_Double(Array):
"""
Class that represents a Tango Double Array mapped to table 'att_array_devdouble'
"""
__tablename__ = 'att_array_devdouble'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(FLOAT))
value_w = Column(ARRAY(FLOAT))
def __repr__(self):
return f"<Array_Double(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_Encoded(Array):
"""
Class that represents a Tango Encoded Array mapped to table 'att_array_devencoded'
"""
__tablename__ = 'att_array_devencoded'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(BYTEA))
value_w = Column(ARRAY(BYTEA))
def __repr__(self):
return f"<Array_Encoded(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_Enum(Array):
"""
Class that represents a Tango Enum Array mapped to table 'att_array_devenum'
"""
__tablename__ = 'att_array_devenum'
__table_args__ = {'extend_existing': True}
value_r_label = Column(ARRAY(TEXT))
value_r = Column(ARRAY(INTEGER))
value_w_label = Column(ARRAY(TEXT))
value_w = Column(ARRAY(INTEGER))
def __repr__(self):
return f"<Array_Enum(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r_label='{self.value_r_label}',value_r='{self.value_r}',value_w_label='{self.value_w_label}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_Float(Array):
"""
Class that represents a Tango Float Array mapped to table 'att_array_devfloat'
"""
__tablename__ = 'att_array_devfloat'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(FLOAT))
value_w = Column(ARRAY(FLOAT))
def __repr__(self):
return f"<Array_Float(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_Long(Array):
"""
Class that represents a Tango Long Array mapped to table 'att_array_devlong'
"""
__tablename__ = 'att_array_devlong'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(INT4RANGE))
value_w = Column(ARRAY(INT4RANGE))
def __repr__(self):
return f"<Array_Long(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_Long64(Array):
"""
Class that represents a Tango Long64 Array mapped to table 'att_array_devlong64'
"""
__tablename__ = 'att_array_devlong64'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(INT8RANGE))
value_w = Column(ARRAY(INT8RANGE))
def __repr__(self):
return f"<Array_Long64(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_Short(Array):
"""
Class that represents a Tango Short Array mapped to table 'att_array_devshort'
"""
__tablename__ = 'att_array_devshort'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(INTEGER))
value_w = Column(ARRAY(INTEGER))
def __repr__(self):
return f"<Array_Short(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_State(Array):
"""
Class that represents a Tango State Array mapped to table 'att_array_devstate'
"""
__tablename__ = 'att_array_devstate'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(INT4RANGE))
value_w = Column(ARRAY(INT4RANGE))
def __repr__(self):
return f"<Array_State(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_String(Array):
"""
Class that represents a Tango String Array mapped to table 'att_array_devstring'
"""
__tablename__ = 'att_array_devstring'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(TEXT))
value_w = Column(ARRAY(TEXT))
def __repr__(self):
return f"<Array_String(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_UChar(Array):
"""
Class that represents a Tango UChar Array mapped to table 'att_array_devuchar'
"""
__tablename__ = 'att_array_devuchar'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(INTEGER))
value_w = Column(ARRAY(INTEGER))
def __repr__(self):
return f"<Array_UChar(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_ULong(Array):
"""
Class that represents a Tango ULong Array mapped to table 'att_array_devulong'
"""
__tablename__ = 'att_array_devulong'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(INTEGER))
value_w = Column(ARRAY(INTEGER))
def __repr__(self):
return f"<Array_ULong(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_ULong64(Array):
"""
Class that represents a Tango ULong64 Array mapped to table 'att_array_devulong64'
"""
__tablename__ = 'att_array_devulong64'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(INTEGER))
value_w = Column(ARRAY(INTEGER))
def __repr__(self):
return f"<Array_ULong64(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
class Array_UShort(Array):
"""
Class that represents a Tango UShort Array mapped to table 'att_array_devushort'
"""
__tablename__ = 'att_array_devushort'
__table_args__ = {'extend_existing': True}
value_r = Column(ARRAY(INTEGER))
value_w = Column(ARRAY(INTEGER))
def __repr__(self):
return f"<Array_UShort(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>"
def get_class_by_tablename(tablename: str):
"""
Returns class reference mapped to a table.
"""
for mapper in Base.registry.mappers:
c = mapper.class_
classname = c.__name__
if not classname.startswith('_'):
if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
return c
return None
def build_array_from_record(rows: List[Array], dim_x: int):
"""
Converts Array database items in Python lists
"""
matrix = numpy.array([])
for i in range(0,dim_x):
x = numpy.array([item for item in rows if item.idx==i]) #group records by array index
if i==0:
matrix = numpy.append(matrix,x) #append first row
else:
matrix = numpy.vstack([matrix,x]) #stack vertically
result = numpy.transpose(matrix) #transpose -> each row is a distinct array of value
list_result = result.tolist()
return list_result
def get_values_from_record(data_matrix: List[Array]):
"""
Returns a matrix of values from a matrix of Array records
"""
array_matrix = numpy.matrix(data_matrix)
value_matrix = numpy.empty(array_matrix.shape)
for index in range(array_matrix.size): # for each object element
value_matrix.itemset(index,array_matrix.item(index).value_r) # extract the value from object and put in the matrix
return value_matrix
#! /usr/bin/env python3
from tango import DeviceProxy, AttributeProxy
from tangostationcontrol.toolkit.archiver import split_tango_name
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
import importlib
import numpy
class Retriever(ABC):
"""
The Retriever abstract class implements retrieve operations on a given DBMS
"""
def get_db_credentials(self):
"""
Retrieves the DB credentials from the Tango properties of Configuration Manager
"""
cm = DeviceProxy(self.cm_name)
config_list = list(cm.get_property('LibConfiguration')['LibConfiguration']) # dictionary {'LibConfiguration': list of strings}
if 'connect_string=' in config_list[0]: config_list.pop(0) # possibly remove connect string because it causes errors
host = str([s for s in config_list if "host" in s][0].split('=')[1])
dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1])
port = str([s for s in config_list if "port" in s][0].split('=')[1])
user = str([s for s in config_list if "user" in s][0].split('=')[1])
pw = str([s for s in config_list if "password" in s][0].split('=')[1])
return host,dbname,port,user,pw
def create_session(self,libname:str,user:str,pw:str,host:str,port:str,dbname:str):
"""
Returns a session to a DBMS using default credentials.
"""
connection_string = f"{libname}://{user}:{pw}@{host}:{port}/{dbname}"
engine = create_engine(connection_string)
Session = sessionmaker(bind=engine)
return Session
@abstractmethod
def set_archiver_base(self):
return
@abstractmethod
def connect_to_archiving_db(self):
return
def get_all_archived_attributes(self):
"""
Returns a list of the archived attributes in the DB.
"""
attrs = self.session.query(self.ab.Attribute).order_by(self.ab.Attribute.att_conf_id).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_archived_attributes_by_device(self,device_fqname: str):
"""
Takes as input the fully-qualified name of a device and returns a list of its archived attributes
"""
domain, family, member = split_tango_name(device_fqname,"device")
attrs = self.session.query(self.ab.Attribute).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member)).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_attribute_id(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its id.
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Attribute.att_conf_id).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception(f"Attribute {attribute_fqname} not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
@abstractmethod
def get_attribute_datatype(self,attribute_fqname: str):
return
def get_attribute_value_by_hours(self, attribute_fqname: str, hours: float, tablename:str):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
# Retrieves the class that maps the DB table given the tablename
base_class = self.ab.get_class_by_tablename(tablename)
# Retrieves the timestamp
time_now = datetime.now()
time_delta = time_now - timedelta(hours=hours)
# Converts the timestamps in the right format for the query
time_now_db = str(time_now.strftime("%Y-%m-%d %X"))
time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
try:
result = self.session.query(base_class).\
join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
except AttributeError as e:
raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e
return result
def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime, tablename:str):
"""
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
# Retrieves the class that maps the DB table given the tablename
base_class = self.ab.get_class_by_tablename(tablename)
try:
result = self.session.query(base_class).\
join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
except AttributeError as e:
raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e
return result
class RetrieverMySQL(Retriever):
def __init__(self, cm_name: str = 'archiving/hdbpp/confmanager01'):
self.cm_name = cm_name
self.session = self.connect_to_archiving_db()
self.ab = self.set_archiver_base()
def connect_to_archiving_db(self):
"""
Returns a session to a MySQL DBMS using default credentials.
"""
host,dbname,port,user,pw = super().get_db_credentials()
# Set sqlalchemy library connection
if host=='archiver-maria-db':
libname = 'mysql+pymysql'
else:
raise ValueError(f"Invalid hostname: {host}")
Session = super().create_session(libname,user,pw,host,port,dbname)
return Session()
def set_archiver_base(self):
"""
Sets the right mapper class following the DBMS connection
"""
return importlib.import_module('.archiver_base_mysql', package=__package__)
def get_attribute_datatype(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its Data-Type.
Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
as DB table name suffix in which values are stored.
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.DataType.data_type).join(self.ab.Attribute,self.ab.Attribute.att_conf_data_type_id==self.ab.DataType.att_conf_data_type_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception(f"Attribute not {attribute_fqname} found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
"""
attr_datatype = self.get_attribute_datatype(attribute_fqname)
# Retrieves the class that maps the DB table given the tablename
tablename = f"att_{attr_datatype}"
return super().get_attribute_value_by_hours(attribute_fqname,hours,tablename)
def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime):
"""
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
"""
attr_datatype = self.get_attribute_datatype(attribute_fqname)
# Retrieves the class that maps the DB table given the tablename
tablename = f"att_{attr_datatype}"
return super().get_attribute_value_by_interval(attribute_fqname,start_time,stop_time,tablename)
# DRAFT #
def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='stat/sdp/1/fpga_temp_r',
mask_attr_name:str='stat/sdp/1/tr_fpga_mask_r'):
"""
Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE
"""
mask_values = self.get_attribute_value_by_interval(mask_attr_name,start_time,stop_time)
temp_values = self.get_attribute_value_by_interval(temp_attr_name,start_time,stop_time)
# Since timestamps can be not syncrhonized, remove first or last element from arrays
if len(mask_values)==len(temp_values):
first_mask_datatime = mask_values[0].data_time
first_temp_datatime = temp_values[0].data_time
if (first_mask_datatime>first_temp_datatime):
mask_values = mask_values[:-int(mask_values[0].dim_x_r)]
temp_values = temp_values[int(temp_values[0].dim_x_r):]
elif (first_mask_datatime<first_temp_datatime):
mask_values = mask_values[int(mask_values[0].dim_x_r)]
temp_values = temp_values[:-int(temp_values[0].dim_x_r):]
else:
raise Exception
# Convert DB Array records into Python lists
mask_data = self.ab.build_array_from_record(mask_values,mask_values[0].dim_x_r)
temp_data = self.ab.build_array_from_record(temp_values,temp_values[0].dim_x_r)
# Extract only the value from the array
mask_array_values = self.ab.get_values_from_record(mask_data)
temp_array_values = self.ab.get_values_from_record(temp_data)
# Multiply the matrix
#masked_values = np.multiply(temp_array_values,mask_array_values)
masked_values = numpy.ma.masked_array(temp_array_values,mask=numpy.invert(mask_array_values.astype(bool)))
return masked_values, mask_values, temp_values
class RetrieverTimescale(Retriever):
def __init__(self, cm_name: str = 'archiving/hdbppts/confmanager01'):
self.cm_name = cm_name
self.session = self.connect_to_archiving_db()
self.ab = self.set_archiver_base()
def connect_to_archiving_db(self):
"""
Returns a session to a MySQL DBMS using default credentials.
"""
host,dbname,port,user,pw = super().get_db_credentials()
# Set sqlalchemy library connection
if host=='archiver-timescale':
libname = 'postgresql+psycopg2'
else:
raise ValueError(f"Invalid hostname: {host}")
Session = super().create_session(libname,user,pw,host,port,dbname)
return Session()
def set_archiver_base(self):
"""
Sets the right mapper class following the DBMS connection
"""
return importlib.import_module('.archiver_base_ts', package=__package__)
def get_attribute_datatype(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its Data-Type.
Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
as DB table name suffix in which values are stored.
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.DataType.type).join(self.ab.Attribute,self.ab.Attribute.att_conf_type_id==self.ab.DataType.att_conf_type_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception(f"Attribute not {attribute_fqname} found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_format(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its format.
Formats are basically three: Scalar, Spectrum and Image.
* Works only for POSTGRESQL *
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Format.format).join(self.ab.Attribute,self.ab.Attribute.att_conf_format_id==self.ab.Format.att_conf_format_id).\
filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_tablename(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns the tablename where it is stored.
* Works only for POSTGRESQL *
"""
domain, family, member, name = split_tango_name(attribute_fqname,"attribute")
try:
result = self.session.query(self.ab.Attribute.table_name).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \
self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one()
return result[0]
except TypeError as e:
raise Exception("Attribute not found!") from e
except NoResultFound as e:
raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e
def get_attribute_value_by_hours(self, attribute_fqname: str, hours: float = 1.0):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
"""
tablename = self.get_attribute_tablename(attribute_fqname)
return super().get_attribute_value_by_hours(attribute_fqname,hours,tablename)
def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime):
"""
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
"""
tablename = self.get_attribute_tablename(attribute_fqname)
return super().get_attribute_value_by_interval(attribute_fqname,start_time,stop_time,tablename)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment