Skip to content
Snippets Groups Projects
Commit 071fb7fa authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

L2SS-234: Update Archiver class with SqlAlchemy framework

parent 3a17cfc9
No related branches found
No related tags found
1 merge request!59L2SS-234: Inspect archive from python
......@@ -4,7 +4,12 @@ from logging import error
from .lofar2_config import configure_logging
from tango import DeviceProxy
from datetime import datetime, timedelta
import mysql.connector as sql
import mysql.connector
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session
from util.archiver_base import *
def add_attribute_to_archiver(attribute: str, polling_period: float, event_period: float, archive_manager: str = 'archiving/hdbpp/confmanager01', archiver: str = 'archiving/hdbpp/eventsubscriber01'):
"""
......@@ -20,124 +25,113 @@ def add_attribute_to_archiver(attribute: str, polling_period: float, event_perio
am.write_attribute('SetPollingPeriod', int(polling_period))
am.write_attribute('SetPeriodEvent', int(event_period))
am.AttributeAdd()
am.AttributeStart(attribute) #the attribute archiving starts even without this line
#am.AttributeStart(attribute) #the attribute archiving starts even without this line
def remove_attribute_from_archiver(attribute: str, archive_manager: str = 'archiving/hdbpp/confmanager01'):
'''
"""
Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list.
'''
"""
am = DeviceProxy(archive_manager)
am.AttributeStop(attribute)
am.AttributeRemove(attribute)
def connect_to_archiving_db(host: str = 'archiver-maria-db', port: int = 3306, user: str = 'root', password: str = 'secret', database: str = 'hdbpp'):
'''
Returns a connection to a MySQL DBMS using default or user-defined credentials.
'''
return sql.connect(host=host,port=port,user=user,password=password,database=database)
"""
Returns a session to a MySQL DBMS using default or user-defined credentials.
"""
engine = create_engine('mysql+mysqlconnector://'+user+':'+password+'@'+host+':'+str(port)+'/'+database)
Session = sessionmaker(bind=engine)
return Session()
def get_all_archived_attributes():
'''
"""
Returns a list of the archived attributes in the DB.
'''
db = connect_to_archiving_db()
cursor = db.cursor()
sql_script = 'SELECT att_name FROM att_conf'
cursor.execute(sql_script)
result = cursor.fetchall()
attrs = [item[0] for item in result]
"""
session = connect_to_archiving_db()
attrs = session.query(Attribute).order_by(Attribute.att_conf_id).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_attribute_id(attribute: str):
'''
def get_attribute_id(attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its id.
'''
db = connect_to_archiving_db()
cursor = db.cursor()
"""
session = connect_to_archiving_db()
try:
[domain, family, member, name] = attribute.split('/')
[domain, family, member, name] = attribute_fqname.split('/')
except:
print("Attribute name error. Use FQDN - eg: LTS/Device/1/Attribute")
return
attrid_sql_script = "SELECT att_conf_id \
FROM att_conf \
WHERE domain = %s AND family = %s AND member = %s AND name = %s;"
attrid_params = (domain,family,member,name)
cursor.execute(attrid_sql_script,attrid_params)
try:
attr_id = cursor.fetchone()[0]
return attr_id
result = session.query(Attribute.att_conf_id).filter(and_(Attribute.domain == domain, Attribute.family == family, \
Attribute.member == member, Attribute.name == name)).one()
return result[0]
except TypeError:
print("Attribute not found!")
return
def get_attribute_datatype(attribute: str):
'''
def get_attribute_datatype(attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its Data-Type.
Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
as DB table name suffix in which values are stored.
'''
db = connect_to_archiving_db()
cursor = db.cursor()
"""
session = connect_to_archiving_db()
try:
[domain, family, member, name] = attribute.split('/')
[domain, family, member, name] = attribute_fqname.split('/')
except:
print("Attribute name error. Use FQDN - eg: LTS/Device/1/Attribute")
return
attr_sql_script = "SELECT att_conf_data_type.data_type \
FROM att_conf_data_type JOIN att_conf ON att_conf.att_conf_data_type_id = att_conf_data_type.att_conf_data_type_id \
WHERE att_conf.domain = %s AND att_conf.family = %s AND att_conf.member = %s AND att_conf.name = %s;"
attrid_params = (domain,family,member,name)
cursor.execute(attr_sql_script,attrid_params)
try:
attr_datatype = cursor.fetchone()[0]
return attr_datatype
result = session.query(DataType.data_type).join(Attribute,Attribute.att_conf_data_type_id==DataType.att_conf_data_type_id).\
filter(and_(Attribute.domain == domain, Attribute.family == family, Attribute.member == member, Attribute.name == name)).one()
return result[0]
except TypeError:
print("Attribute not found!")
return
def get_attribute_value_by_hours(attribute: str, hours: float = 1):
'''
Takes as input the attribute name and the number of past hours since the actual time (e.g. hours=1
retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
def get_attribute_value_by_hours(attribute_fqname: str, hours: float = 1.0):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
'''
attr_id = get_attribute_id(attribute)
attr_datatype = get_attribute_datatype(attribute)
"""
attr_id = get_attribute_id(attribute_fqname)
attr_datatype = get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
db = connect_to_archiving_db()
cursor = db.cursor()
# Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name)
session = connect_to_archiving_db()
# Retrieves the timestamp
time_now = datetime.now()
time_delta = time_now - timedelta(hours=hours)
history_sql_script = "SELECT data_time, value_r \
FROM " + attr_table_name + " INNER JOIN att_conf ON att_conf.att_conf_id = " + attr_table_name +".att_conf_id \
WHERE att_conf.att_conf_id = %s AND data_time >= %s AND data_time <= %s \
ORDER BY data_time;"
history_params = (attr_id,str(time_delta.strftime("%Y-%m-%d %X")),str(time_now.strftime("%Y-%m-%d %X")))
cursor.execute(history_sql_script,history_params)
result = cursor.fetchall()
timestamp = [item[0].strftime("%Y-%m-%d %X:%f") for item in result]
value = [item[1] for item in result]
return timestamp,value
# Converts the timestamps in the right format for the query
time_now_db = str(time_now.strftime("%Y-%m-%d %X"))
time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
result = session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
#timestamp = [item[0].strftime("%Y-%m-%d %X:%f") for item in result]
#value = [item[1] for item in result]
return result
def get_attribute_value_by_interval(attribute: str, start_time: datetime, stop_time: datetime):
def get_attribute_value_by_interval(attribute_fqname: str, start_time: datetime, stop_time: datetime):
'''
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
'''
attr_id = get_attribute_id(attribute)
attr_datatype = get_attribute_datatype(attribute)
attr_id = get_attribute_id(attribute_fqname)
attr_datatype = get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
db = connect_to_archiving_db()
cursor = db.cursor()
history_sql_script = "SELECT data_time, value_r \
FROM " + attr_table_name + " INNER JOIN att_conf ON att_conf.att_conf_id = " + attr_table_name +".att_conf_id \
WHERE att_conf.att_conf_id = %s AND data_time >= %s AND data_time <= %s \
ORDER BY data_time;"
history_params = (attr_id,str(start_time),str(stop_time))
cursor.execute(history_sql_script,history_params)
result = cursor.fetchall()
timestamp = [item[0].strftime("%Y-%m-%d %X:%f") for item in result]
value = [item[1] for item in result]
return timestamp,value
\ No newline at end of file
# Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name)
session = connect_to_archiving_db()
result = session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
#timestamp = [item[0].strftime("%Y-%m-%d %X:%f") for item in result]
#value = [item[1] for item in result]
return result
\ No newline at end of file
#! /usr/bin/env python3
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.dialects.mysql import DOUBLE,TIMESTAMP
from sqlalchemy.sql.expression import table
#Declarative system used to define classes mapped to relational DB tables
Base = declarative_base()
class Attribute(Base):
'''
Class that represents a Tango Attribute mapped to table 'att_conf'
'''
__tablename__ = 'att_conf'
__table_args__ = {'extend_existing': True}
att_conf_id = Column(Integer, primary_key=True)
att_name = Column(String)
att_conf_data_type_id = Column(Integer)
att_ttl = Column(Integer)
facility = Column(String)
domain = Column(String)
family = Column(String)
member = Column(String)
name = Column(String)
def __repr__(self):
return "<Attribute(fullname='%s',data_type ='%s',ttl='%s',facility ='%s',domain ='%s',family ='%s',member ='%s',name ='%s')>" \
% (self.att_name,self.att_conf_data_type_id,self.att_ttl,self.facility,self.domain,self.family,self.member,self.name)
class DataType(Base):
'''
Class that represents a Tango Data Type mapped to table 'att_conf_data_type'
'''
__tablename__ = 'att_conf_data_type'
__table_args__ = {'extend_existing': True}
att_conf_data_type_id = Column(Integer, primary_key=True)
data_type = Column(String)
def __repr__(self):
return "<DataType(type='%s')>" \
% (self.data_type)
class Scalar_Double_RO(Base):
'''
Class that represents a Tango Scalar Read-Only Value mapped to table 'att_scalar_devdouble_ro'
'''
__tablename__ = 'att_scalar_devdouble_ro'
__table_args__ = {'extend_existing': True}
# Primary key is not defined for this kind of tables, but SQLAlchemy requires a mandatory
# primary key definition. Anyway, this definition is on Python-side and does not compromise
# DBMS architecture
att_conf_id = Column(Integer, primary_key=True)
data_time = Column(TIMESTAMP)
recv_time = Column(TIMESTAMP)
insert_time = Column(TIMESTAMP, primary_key=True)
value_r = Column(DOUBLE, primary_key=True)
quality = Column(Integer)
att_error_desc_id = Column(Integer)
def __repr__(self):
return "<Scalar_Double_RO(att_conf_id='%s',data_time='%s',recv_time='%s',insert_time='%s',value_r='%s',quality='%s',att_error_desc_id='%s')>" \
% (self.att_conf_id,self.data_time,self.recv_time,self.insert_time,self.value_r,self.quality,self.att_error_desc_id)
def get_class_by_tablename(tablename: str):
"""
Returns class reference mapped to a table.
"""
for mapper in Base.registry.mappers:
c = mapper.class_
classname = c.__name__
if not classname.startswith('_'):
if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
return c
return None
\ No newline at end of file
......@@ -24,7 +24,7 @@ RUN sudo pip3 install python-logstash-async
COPY jupyter-notebook /usr/local/bin/jupyter-notebook
#Install further python modules
RUN sudo pip3 install mysql-connector-python
RUN sudo pip3 install mysql-connector-python sqlalchemy
# Add Tini. Tini operates as a process subreaper for jupyter. This prevents kernel crashes.
ENV TINI_VERSION v0.6.0
......
This diff is collapsed.
%% Cell type:code id:c7dd05cd tags:
``` python
import sys
sys.path.append('/hosthome/tango/devices')
from util.archiver import *
from util.archiver_base import *
import mysql.connector
```
%% Cell type:code id:57834b5e tags:
``` python
dev_rand = DeviceProxy("LTS/RandomData/1")
dev_rand.get_attribute_list()
```
%% Cell type:code id:6816f78f tags:
``` python
cm = DeviceProxy("archiving/hdbpp/confmanager01")
```
%% Cell type:code id:7bda559b tags:
``` python
get_all_archived_attributes()
```
%% Cell type:code id:f0ef38a4 tags:
``` python
add_attribute_to_archiver('lts/randomdata/1/rnd7',3000,1000)
```
%% Cell type:code id:7d0746e7 tags:
``` python
remove_attribute_from_archiver('lts/randomdata/1/rnd7')
```
%% Cell type:code id:129a75c6 tags:
``` python
get_all_archived_attributes()
```
%% Output
[<Attribute(fullname='tango://databaseds:10000/lts/randomdata/1/rnd2',data_type ='37',ttl='0',facility ='tango://databaseds:10000',domain ='lts',family ='randomdata',member ='1',name ='rnd2')>,
<Attribute(fullname='tango://databaseds:10000/lts/randomdata/1/rnd3',data_type ='37',ttl='0',facility ='tango://databaseds:10000',domain ='lts',family ='randomdata',member ='1',name ='rnd3')>,
<Attribute(fullname='tango://databaseds:10000/lts/randomdata/1/rnd15',data_type ='37',ttl='0',facility ='tango://databaseds:10000',domain ='lts',family ='randomdata',member ='1',name ='rnd15')>,
<Attribute(fullname='tango://databaseds:10000/lts/randomdata/1/rnd7',data_type ='37',ttl='0',facility ='tango://databaseds:10000',domain ='lts',family ='randomdata',member ='1',name ='rnd7')>]
%% Cell type:code id:5f9865ee tags:
``` python
main_att = 'lts/randomdata/1/rnd7'
get_attribute_id(main_att)
```
%% Output
4
%% Cell type:code id:2a3707a3 tags:
``` python
get_attribute_datatype(main_att)
```
%% Output
'scalar_devdouble_ro'
%% Cell type:code id:18100623 tags:
``` python
get_attribute_value_by_hours(main_att,24)
```
%% Output
[<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:21.892781',recv_time='2021-06-23 15:00:23.040862',insert_time='2021-06-23 15:00:23.042065',value_r='0.9263038657',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:24.909897',recv_time='2021-06-23 15:00:24.911329',insert_time='2021-06-23 15:00:24.912696',value_r='0.6869173892',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:27.909636',recv_time='2021-06-23 15:00:27.910767',insert_time='2021-06-23 15:00:27.912330',value_r='0.1070759253',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:30.909940',recv_time='2021-06-23 15:00:30.911529',insert_time='2021-06-23 15:00:30.913542',value_r='0.2699634793',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:33.910365',recv_time='2021-06-23 15:00:33.910897',insert_time='2021-06-23 15:00:33.911735',value_r='0.8323236082',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:36.910232',recv_time='2021-06-23 15:00:36.911389',insert_time='2021-06-23 15:00:36.912912',value_r='0.7979368397',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:39.909182',recv_time='2021-06-23 15:00:39.909785',insert_time='2021-06-23 15:00:39.910372',value_r='0.2178505902',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:42.909868',recv_time='2021-06-23 15:00:42.911390',insert_time='2021-06-23 15:00:42.913124',value_r='0.6949464171',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:45.909946',recv_time='2021-06-23 15:00:45.911036',insert_time='2021-06-23 15:00:45.912787',value_r='0.1093199257',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:48.909615',recv_time='2021-06-23 15:00:48.910722',insert_time='2021-06-23 15:00:48.912259',value_r='0.9768430390',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:51.909200',recv_time='2021-06-23 15:00:51.910290',insert_time='2021-06-23 15:00:51.911793',value_r='0.8888621550',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:54.909296',recv_time='2021-06-23 15:00:54.910679',insert_time='2021-06-23 15:00:54.912332',value_r='0.3799629383',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='4',data_time='2021-06-23 15:00:57.909804',recv_time='2021-06-23 15:00:57.910850',insert_time='2021-06-23 15:00:57.912311',value_r='0.7681945847',quality='2',att_error_desc_id='None')>]
%% Cell type:code id:ab476d57 tags:
``` python
get_attribute_value_by_interval('lts/randomdata/1/rnd15', '2021-06-21 13:20:00', '2021-06-23 15:21:00')
```
%% Output
[<Scalar_Double_RO(att_conf_id='3',data_time='2021-06-21 14:50:06.179562',recv_time='2021-06-21 14:50:07.180444',insert_time='2021-06-21 14:50:07.204869',value_r='0.1400088842',quality='2',att_error_desc_id='None')>,
<Scalar_Double_RO(att_conf_id='3',data_time='2021-06-21 14:50:10.213110',recv_time='2021-06-21 14:50:10.214549',insert_time='2021-06-21 14:50:10.216117',value_r='0.6627579896',quality='2',att_error_desc_id='None')>]
%% Cell type:code id:21c9b91b tags:
``` python
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment