Skip to content
Snippets Groups Projects
Select Git revision
  • 940b45ac6798c2e2f0a7bbd43f69f9fb5513ca83
  • MCCS-163 default
  • main
  • sar-277-update-docs-with-examples-for-lrc
  • st-946-automate
  • sar_302-log-fix
  • sar-287_subarray_commands_to_lrc
  • sar_302-POC_await_sub_device_state
  • sat_302_fix_pipelines
  • sar-286_lrc_one_subarry_command
  • sar-286_lrc_improvements
  • sar-288-async-controller
  • sar-276-combine-tango-queue
  • sar-255_remove_nexus_reference
  • sar-275-add-LRC
  • sar-273-add-lrc-attributes
  • sar-272
  • sp-1106-marvin-1230525148-ska-tango-base
  • sp-1106-marvin-813091765-ska-tango-base
  • sar-255/Publish-package-to-CAR
  • mccs-661-device-under-test-fixture
  • mccs-659-pep257-docstring-linting
  • 0.11.3
  • 0.11.2
  • 0.11.1
  • 0.11.0
  • 0.10.1
  • 0.10.0
  • 0.9.1
  • 0.9.0
  • 0.8.1
  • 0.8.0
  • 0.7.2
  • 0.7.1
  • 0.7.0
  • 0.6.6
  • 0.6.5
  • 0.6.4
  • 0.6.3
  • 0.6.2
  • 0.6.1
  • 0.6.0
42 results

SKAAlarmHandler.rst

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    archiver.py 6.36 KiB
    #! /usr/bin/env python3
    
    from logging import error
    from .lofar2_config import configure_logging
    from tango import DeviceProxy
    from datetime import datetime, timedelta
    
    import mysql.connector
    from sqlalchemy import create_engine, and_
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.orm.session import Session
    from .archiver_base import *
    
    def add_attribute_to_archiver(attribute: str, polling_period: float, event_period: float, archive_manager: str = 'archiving/hdbpp/confmanager01', archiver: str = 'archiving/hdbpp/eventsubscriber01'):
        """
        Takes as input the attribute name, polling period (ms) and event period (ms), and start the data archiving
        for the selected attribute.
        The ConfigurationManager and EventSubscriber devices must be already up and running.
        The archiving-DBMS must be already properly configured.
        """
        am = DeviceProxy(archive_manager)
        am.write_attribute('SetAttributeName', attribute)
        am.write_attribute('SetArchiver', archiver)
        am.write_attribute('SetStrategy', 'ALWAYS')
        am.write_attribute('SetPollingPeriod', int(polling_period))
        am.write_attribute('SetPeriodEvent', int(event_period))
        am.AttributeAdd()
        #am.AttributeStart(attribute)    #the attribute archiving starts even without this line
    
    def remove_attribute_from_archiver(attribute: str, archive_manager: str = 'archiving/hdbpp/confmanager01'):
        """
        Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list. 
        """
        am = DeviceProxy(archive_manager)
        am.AttributeStop(attribute)
        am.AttributeRemove(attribute)
    
    def connect_to_archiving_db(host: str = 'archiver-maria-db', port: int = 3306, user: str = 'root', password: str = 'secret', database: str = 'hdbpp'):
        """
        Returns a session to a MySQL DBMS using default or user-defined credentials.
        """
        engine = create_engine('mysql+mysqlconnector://'+user+':'+password+'@'+host+':'+str(port)+'/'+database)
        Session = sessionmaker(bind=engine)
        return Session()
    
    def get_all_archived_attributes():
        """
        Returns a list of the archived attributes in the DB.
        """
        session = connect_to_archiving_db()
        attrs = session.query(Attribute).order_by(Attribute.att_conf_id).all()
        # Returns the representation as set in __repr__ method of the mapper class
        return attrs
    
    def get_attribute_id(attribute_fqname: str):
        """
        Takes as input the fully-qualified name of an attribute and returns its id.
        """
        session = connect_to_archiving_db()
        try:
            [domain, family, member, name] = attribute_fqname.split('/')
        except:
            print("Attribute name error. Use FQDN - eg: LTS/Device/1/Attribute")
            return
        try:
            result = session.query(Attribute.att_conf_id).filter(and_(Attribute.domain == domain, Attribute.family == family, \
                                    Attribute.member == member, Attribute.name == name)).one()
            return result[0]
        except TypeError:
            print("Attribute not found!")
            return
    
    def get_attribute_datatype(attribute_fqname: str):
        """
        Takes as input the fully-qualified name of an attribute and returns its Data-Type.
        Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used
        as DB table name suffix in which values are stored.
        """
        session = connect_to_archiving_db()
        try:
            [domain, family, member, name] = attribute_fqname.split('/')
        except:
            print("Attribute name error. Use FQDN - eg: LTS/Device/1/Attribute")
            return
        try:
            result = session.query(DataType.data_type).join(Attribute,Attribute.att_conf_data_type_id==DataType.att_conf_data_type_id).\
                        filter(and_(Attribute.domain == domain, Attribute.family == family, Attribute.member == member, Attribute.name == name)).one()
            return result[0]
        except TypeError:
            print("Attribute not found!")
            return
    
    def get_attribute_value_by_hours(attribute_fqname: str, hours: float = 1.0):
        """
        Takes as input the attribute fully-qualified name and the number of past hours since the actual time 
        (e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half).
        Returns a list of timestamps and a list of values
        """
        attr_id = get_attribute_id(attribute_fqname)
        attr_datatype = get_attribute_datatype(attribute_fqname)
        attr_table_name = 'att_'+str(attr_datatype)
        # Retrieves the class that maps the DB table given the tablename
        base_class = get_class_by_tablename(attr_table_name)
        session = connect_to_archiving_db()
        # Retrieves the timestamp 
        time_now = datetime.now()
        time_delta = time_now - timedelta(hours=hours)
        # Converts the timestamps in the right format for the query
        time_now_db = str(time_now.strftime("%Y-%m-%d %X"))
        time_delta_db = str(time_delta.strftime("%Y-%m-%d %X"))
        result = session.query(base_class).\
                join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
                filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \
                           base_class.data_time <= time_now_db)).order_by(base_class.data_time).all()
        #timestamp = [item[0].strftime("%Y-%m-%d %X:%f") for item in result]
        #value = [item[1] for item in result]
        return result
    
    def get_attribute_value_by_interval(attribute_fqname: str, start_time: datetime, stop_time: datetime):
        '''
        Takes as input the attribute name and a certain starting and ending point-time. 
        The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss").
        Returns a list of timestamps and a list of values
        '''
        attr_id = get_attribute_id(attribute_fqname)
        attr_datatype = get_attribute_datatype(attribute_fqname)
        attr_table_name = 'att_'+str(attr_datatype)
        # Retrieves the class that maps the DB table given the tablename
        base_class = get_class_by_tablename(attr_table_name)
        session = connect_to_archiving_db()
        result = session.query(base_class).\
                join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\
                    filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \
                            base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all()
        #timestamp = [item[0].strftime("%Y-%m-%d %X:%f") for item in result]
        #value = [item[1] for item in result]
        return result