Select Git revision
SKAAlarmHandler.rst
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
archiver.py 6.36 KiB
#! /usr/bin/env python3
from logging import error
from .lofar2_config import configure_logging
from tango import DeviceProxy
from datetime import datetime, timedelta
import mysql.connector
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session
from .archiver_base import *
def add_attribute_to_archiver(attribute: str, polling_period: float, event_period: float, archive_manager: str = 'archiving/hdbpp/confmanager01', archiver: str = 'archiving/hdbpp/eventsubscriber01'):
"""
Takes as input the attribute name, polling period (ms) and event period (ms), and start the data archiving
for the selected attribute.
The ConfigurationManager and EventSubscriber devices must be already up and running.
The archiving-DBMS must be already properly configured.
"""
am = DeviceProxy(archive_manager)
am.write_attribute('SetAttributeName', attribute)
am.write_attribute('SetArchiver', archiver)
am.write_attribute('SetStrategy', 'ALWAYS')
am.write_attribute('SetPollingPeriod', int(polling_period))
am.write_attribute('SetPeriodEvent', int(event_period))
am.AttributeAdd()
#am.AttributeStart(attribute) #the attribute archiving starts even without this line
def remove_attribute_from_archiver(attribute: str, archive_manager: str = 'archiving/hdbpp/confmanager01'):
"""
Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list.
"""
am = DeviceProxy(archive_manager)
am.AttributeStop(attribute)
am.AttributeRemove(attribute)
def connect_to_archiving_db(host: str = 'archiver-maria-db', port: int = 3306, user: str = 'root', password: str = 'secret', database: str = 'hdbpp'):
"""
Returns a session to a MySQL DBMS using default or user-defined credentials.
"""
engine = create_engine('mysql+mysqlconnector://'+user+':'+password+'@'+host+':'+str(port)+'/'+database)
Session = sessionmaker(bind=engine)
return Session()
def get_all_archived_attributes():
"""
Returns a list of the archived attributes in the DB.
"""
session = connect_to_archiving_db()
attrs = session.query(Attribute).order_by(Attribute.att_conf_id).all()
# Returns the representation as set in __repr__ method of the mapper class
return attrs
def get_attribute_id(attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its id.
"""
session = connect_to_archiving_db()
try:
[domain, family, member, name] = attribute_fqname.split('/')
except:
print("Attribute name error. Use FQDN - eg: LTS/Device/1/Attribute")
return
try:
result = session.query(Attribute.att_conf_id).filter(and_(Attribute.domain == domain, Attribute.family == family, \
Attribute.member == member, Attribute.name == name)).one()
return result[0]
except TypeError:
print("Attribute not found!")
return
def get_attribute_datatype(attribute_fqname: str):
"""
Takes as input the fully-qualified name of an attribute and returns its Data-Type.
Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
as DB table name suffix in which values are stored.
"""
session = connect_to_archiving_db()
try:
[domain, family, member, name] = attribute_fqname.split('/')
except:
print("Attribute name error. Use FQDN - eg: LTS/Device/1/Attribute")
return
try:
result = session.query(DataType.data_type).join(Attribute,Attribute.att_conf_data_type_id==DataType.att_conf_data_type_id).\
filter(and_(Attribute.domain == domain, Attribute.family == family, Attribute.member == member, Attribute.name == name)).one()
return result[0]
except TypeError:
print("Attribute not found!")
return
def get_attribute_value_by_hours(attribute_fqname: str, hours: float = 1.0):
"""
Takes as input the attribute fully-qualified name and the number of past hours since the actual time
(e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
Returns a list of timestamps and a list of values
"""
attr_id = get_attribute_id(attribute_fqname)
attr_datatype = get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
# Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name)
session = connect_to_archiving_db()
# Retrieves the timestamp
time_now = datetime.now()
time_delta = time_now - timedelta(hours=hours)
# Converts the timestamps in the right format for the query
time_now_db = str(time_now.strftime("%Y-%m-%d %X"))
time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
result = session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
#timestamp = [item[0].strftime("%Y-%m-%d %X:%f") for item in result]
#value = [item[1] for item in result]
return result
def get_attribute_value_by_interval(attribute_fqname: str, start_time: datetime, stop_time: datetime):
'''
Takes as input the attribute name and a certain starting and ending point-time.
The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
Returns a list of timestamps and a list of values
'''
attr_id = get_attribute_id(attribute_fqname)
attr_datatype = get_attribute_datatype(attribute_fqname)
attr_table_name = 'att_'+str(attr_datatype)
# Retrieves the class that maps the DB table given the tablename
base_class = get_class_by_tablename(attr_table_name)
session = connect_to_archiving_db()
result = session.query(base_class).\
join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
#timestamp = [item[0].strftime("%Y-%m-%d %X:%f") for item in result]
#value = [item[1] for item in result]
return result