Skip to content
Snippets Groups Projects
Commit 624fc1a6 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-234-inspect-archive-from-python' into 'master'

L2SS-234: Inspect archive from python

Closes L2SS-234

See merge request !59
parents 649b4bdf f0f4eb70
No related branches found
No related tags found
1 merge request!59L2SS-234: Inspect archive from python
......@@ -17,7 +17,7 @@ sys.path.append(parentdir)
# PyTango imports
from tango import DevState
from tango.server import run, Device, attribute, command
from numpy import random
from numpy import random, double
__all__ = ["Random_Data", "main"]
......
#! /usr/bin/env python3
from clients.attribute_wrapper import attribute_wrapper
from tango import DeviceProxy
from datetime import datetime, timedelta
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import sessionmaker
from .archiver_base import *
def add_attribute_to_archiver(attribute: str, polling_period: float, event_period: float, archive_manager: str = 'archiving/hdbpp/confmanager01', archiver: str = 'archiving/hdbpp/eventsubscriber01'):
am = DeviceProxy(archive_manager)
am.write_attribute('SetAttributeName', attribute)
am.write_attribute('SetArchiver', archiver)
am.write_attribute('SetStrategy', 'ALWAYS')
am.write_attribute('SetPollingPeriod', int(polling_period))
am.write_attribute('SetPeriodEvent', int(event_period))
am.AttributeAdd()
am.AttributeStart(attribute)
def remove_attribute_from_archiver(attribute: str, archive_manager: str = 'archiving/hdbpp/confmanager01'):
am = DeviceProxy(archive_manager)
am.AttributeStop(attribute)
am.AttributeRemove(attribute)
class Archiver():
"""
The Archiver class implements the basic operations to perform attributes archiving
"""
def __init__(self, cm_name: str = 'archiving/hdbpp/confmanager01', es_name: str = 'archiving/hdbpp/eventsubscriber01'):
self.cm_name = cm_name
self.cm = DeviceProxy(cm_name)
self.es_name = es_name
self.es = DeviceProxy(es_name)
def add_attribute_to_archiver(self, attribute: str, polling_period: float = 1000, event_period: float = 1000, strategy: str = 'ALWAYS'):
"""
Takes as input the attribute name, polling period (ms), event period (ms) and archiving strategy,
and adds the selected attribute to the subscriber's list of archiving attributes.
The ConfigurationManager and EventSubscriber devices must be already up and running.
The archiving-DBMS must be already properly configured.
"""
self.cm.write_attribute('SetAttributeName', attribute)
self.cm.write_attribute('SetArchiver', self.es_name)
self.cm.write_attribute('SetStrategy', strategy)
self.cm.write_attribute('SetPollingPeriod', int(polling_period))
self.cm.write_attribute('SetPeriodEvent', int(event_period))
self.cm.AttributeAdd()
def remove_attribute_from_archiver(self, attribute: str):
"""
Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list.
"""
self.cm.AttributeStop(attribute)
self.cm.AttributeRemove(attribute)
class Retriever():
"""
The Retriever class implements retrieve operations on a given DBMS
"""
def __init__(self, cm_name: str = 'archiving/hdbpp/confmanager01'):
self.cm_name = cm_name
self.session = self.connect_to_archiving_db()
def get_db_credentials(self):
"""
Retrieves the DB credentials from the Tango properties of Configuration Manager
"""
cm = DeviceProxy(self.cm_name)
config_list = cm.get_property('LibConfiguration')['LibConfiguration'] # dictionary {'LibConfiguration': list of strings}
host = str([s for s in config_list if "host" in s][0].split('=')[1])
dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1])
port = str([s for s in config_list if "port" in s][0].split('=')[1])
user = str([s for s in config_list if "user" in s][0].split('=')[1])
pw = str([s for s in config_list if "password" in s][0].split('=')[1])
return host,dbname,port,user,pw
def connect_to_archiving_db(self):
"""
Returns a session to a MySQL DBMS using default credentials.
"""
host,dbname,port,user,pw = self.get_db_credentials()
engine = create_engine('mysql+pymysql://'+user+':'+pw+'@'+host+':'+port+'/'+dbname)
Session = sessionmaker(bind=engine)
return Session()
def get_all_archived_attributes(self):
"""
Returns a list of the archived attributes in the DB.
"""
attrs = self.session.query(Attribute).order_by(Attribute.att_conf_id).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_archived_attributes_by_device(self,device_fqname: str):
"""
Takes as input the fully-qualified name of a device and returns a list of its archived attributes
"""
try:
[domain, family, member] = device_fqname.split('/')
except:
print("Device name error. Use FQDN - eg: LTS/Device/1")
return
attrs = self.session.query(Attribute).filter(and_(Attribute.domain == domain, Attribute.family == family, \
Attribute.member == member)).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_attribute_id(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its id.
"""
try:
[domain, family, member, name] = attribute_fqname.split('/')
except:
print("Attribute name error. Use FQDN - eg: LTS/Device/1/Attribute")
return
try:
result = self.session.query(Attribute.att_conf_id).filter(and_(Attribute.domain == domain, Attribute.family == family, \
Attribute.member == member, Attribute.name == name)).one()
return result[0]
except TypeError:
print("Attribute not found!")
return
def get_attribute_datatype(self,attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its Data-Type.
Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
as DB table name suffix in which values are stored.
"""
try:
[domain, family, member, name] = attribute_fqname.split('/')
except:
print("Attribute name error. Use FQDN - eg: LTS/Device/1/Attribute")
return
try:
result = self.session.query(DataType.data_type).join(Attribute,Attribute.att_conf_data_type_id==DataType.att_conf_data_type_id).\
filter(and_(Attribute.domain == domain, Attribute.family == family, Attribute.member == member, Attribute.name == name)).one()
return result[0]
except TypeError:
print("Attribute not found!")
return
def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
"""
attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
# Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name)
# Retrieves the timestamp
time_now = datetime.now()
time_delta = time_now - timedelta(hours=hours)
# Converts the timestamps in the right format for the query
time_now_db = str(time_now.strftime("%Y-%m-%d %X"))
time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
result = self.session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
return result
def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime):
'''
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
'''
attr_id = self.get_attribute_id(attribute_fqname)
attr_datatype = self.get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
# Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name)
result = self.session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
return result
This diff is collapsed.
......@@ -4,3 +4,5 @@ opcua >= 0.98.13
astropy
python-logstash-async
gitpython
PyMySQL[rsa]
sqlalchemy
......@@ -25,6 +25,9 @@ COPY jupyter-kernels /usr/local/share/jupyter/kernels/
RUN sudo pip3 install python-logstash-async
COPY jupyter-notebook /usr/local/bin/jupyter-notebook
#Install further python modules
RUN sudo pip3 install PyMySQL[rsa] sqlalchemy
# Add Tini. Tini operates as a process subreaper for jupyter. This prevents kernel crashes.
ENV TINI_VERSION v0.6.0
ENV JUPYTER_RUNTIME_DIR=/tmp
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment