diff --git a/.gitmodules b/.gitmodules index ef820d4039a54bf590e5c675c97a718b0681dc6e..295ed3b37fc8ff4b3674422af3b799f3041f45ed 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,3 +2,6 @@ path = docker-compose/tango-prometheus-exporter/ska-tango-grafana-exporter url = https://git.astron.nl/lofar2.0/ska-tango-grafana-exporter.git branch = station-control +[submodule "tangostationcontrol/tangostationcontrol/toolkit/libhdbpp-python"] + path = tangostationcontrol/tangostationcontrol/toolkit/libhdbpp-python + url = https://gitlab.com/tango-controls/hdbpp/libhdbpp-python diff --git a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/__init__.py b/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/__init__.py deleted file mode 100755 index 1cba9fb1b8d94bbf89b9f49fb84506139d138aad..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .reader import * diff --git a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/abstract.py b/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/abstract.py deleted file mode 100755 index 60d3f04fe3c9c43b523787b22a864c2bcf361ba5..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/abstract.py +++ /dev/null @@ -1,154 +0,0 @@ - -from enum import Enum - -class Aggregator(Enum): - """ - Enum to describe aggregation method to use. - Note that this aggregation functions should - be supported at the backend level. - """ - COUNT = 1 - COUNT_ERRORS = 2 - COUNT_NAN = 3 - FIRST = 4 - LAST = 5 - MIN = 6 - MAX = 7 - AVG = 8 - STD_DEV = 9 - - -class AbstractReader(object): - """ - Subclass this class to create a PyTangoArchiving Reader for your specific DB - - e.g. TimeDBReader(AbstractReader) - """ - - def __init__(self, config='',**kwargs): - ''' - Config can be an string like user:passwd@host - or a json-like dictionary "{'user':'...','password':'...','database':}" - ''' - try: - self.db = YourDb(**(config or kwargs)) - except: - raise Exception('WrongDatabaseConfig') - return - - def get_connection(self): - """ - Return the connection object to avoid a client - to open one for custom queries. - The returned object will be implementation specific. - """ - return self.db - - def get_attributes(self, active=False, pattern=''): - """ - Queries the database for the current list of archived attributes. - arguments: - active: True: only attributes currently archived - False: all attributes, even the one not archiving anymore - pattern: '' :filter for attributes to retrieve - """ - return list() - - def is_attribute_archived(self, attribute, active=False): - """ - Returns if an attribute has values in DB. - - arguments: - attribute: fqdn for the attribute. - active: if true, only check for active attributes, - otherwise check all. - """ - return True - - def get_last_attribute_value(self, attribute): - """ - Returns last value inserted in DB for an attribute - - arguments: - attribute: fqdn for the attribute. - returns: - (epoch, r_value, w_value, quality, error_desc) - """ - - return self.get_last_attributes_values([attribute])[attribute] - - def get_last_attributes_values(self, attributes, columns = 'time, r_value'): - """ - Returns last values inserted in DB for a list of attributes - - arguments: - attribute: fqdn for the attribute. - columns: requested columns separated by commas - returns: - {'att1':(epoch, r_value, w_value, quality, error_desc), - 'att2':(epoch, r_value, w_value, quality, error_desc), - ... - } - """ - - return {attributes[0]: (time.time(), 0., 0., 0, "")} - - def get_attribute_values(self, attribute, - start_date, stop_date=None, - decimate=None, - **params): - """ - Returns attribute values between start and stop dates. - - arguments: - attribute: fqdn for the attribute. - start_date: datetime, beginning of the period to query. - stop_date: datetime, end of the period to query. - if None, now() is used. - decimate: aggregation function to use in the form: - {'timedelta0':(MIN, MAX, ...) - , 'timedelta1':(AVG, COUNT, ...) - , ...} - if None, returns raw data. - returns: - [(epoch0, r_value, w_value, quality, error_desc), - (epoch1, r_value, w_value, quality, error_desc), - ... ] - """ - return self.get_attributes_values([attribute], start_date, stop_date, decimate, False, params)[attribute] - - def get_attributes_values(self, attributes, - start_date, stop_date=None, - decimate=None, - correlate = False, - columns = 'time, r_value', - **params): - """ - Returns attributes values between start and stop dates - , using decimation or not, correlating the values or not. - - arguments: - attributes: a list of the attributes' fqdn - start_date: datetime, beginning of the period to query. - stop_date: datetime, end of the period to query. - if None, now() is used. - decimate: aggregation function to use in the form: - {'timedelta0':(MIN, MAX, ...) - , 'timedelta1':(AVG, COUNT, ...) - , ...} - if None, returns raw data. - correlate: if True, data is generated so that - there is available data for each timestamp of - each attribute. - columns: columns separated by commas - time, r_value, w_value, quality, error_desc - - returns: - {'attr0':[(epoch0, r_value, w_value, quality, error_desc), - (epoch1, r_value, w_value, quality, error_desc), - ... ], - 'attr1':[(...),(...)]} - """ - return {'attr0': [(time.time(), 0., 0., 0, '')] - , 'attr1': [(time.time(), 0., 0., 0, '')]} - diff --git a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/__init__.py b/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/__init__.py deleted file mode 100755 index 6358f910dd9a3784bac810f91e14ea875b05fc40..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -try: - from mariadb import MariaDbReader -except: - pass diff --git a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/mariadb.py b/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/mariadb.py deleted file mode 100755 index 9c21409a0c724e2a226137ef7a6dc99bb4203eb2..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/mariadb.py +++ /dev/null @@ -1,304 +0,0 @@ -#!/usr/bin/env python3 - -import re, traceback -from timeutils import * -import abstract -from abstract import AbstractReader - -try: - import pymysql as mariadb -except: - import MySQLdb as mariadb - - - -class MariadbReader(AbstractReader): - """ - read-only API for hdb++ databases, based on PyTangoArchiving AbstractReader - """ - - def __init__(self,config='',**kwargs): - """ - Arguments accepted by pymysql connections: - - :param host: Host where the database server is located - :param user: Username to log in as - :param password: Password to use. - :param database: Database to use, None to not use a particular one. - :param port: MySQL port to use, default is usually OK. (default: 3306) - :param bind_address: When the client has multiple network interfaces, specify - the interface from which to connect to the host. Argument can be - a hostname or an IP address. - :param unix_socket: Optionally, you can use a unix socket rather than TCP/IP. - :param read_timeout: The timeout for reading from the connection in seconds (default: None - no timeout) - :param write_timeout: The timeout for writing to the connection in seconds (default: None - no timeout) - :param charset: Charset you want to use. - :param sql_mode: Default SQL_MODE to use. - :param read_default_file: - Specifies my.cnf file to read these parameters from under the [client] section. - :param conv: - Conversion dictionary to use instead of the default one. - This is used to provide custom marshalling and unmarshaling of types. - See converters. - :param use_unicode: - Whether or not to default to unicode strings. - This option defaults to true for Py3k. - :param client_flag: Custom flags to send to MySQL. Find potential values in constants.CLIENT. - :param cursorclass: Custom cursor class to use. - :param init_command: Initial SQL statement to run when connection is established. - :param connect_timeout: Timeout before throwing an exception when connecting. - (default: 10, min: 1, max: 31536000) - :param ssl: - A dict of arguments similar to mysql_ssl_set()'s parameters. - :param read_default_group: Group to read from in the configuration file. - :param compress: Not supported - :param named_pipe: Not supported - :param autocommit: Autocommit mode. None means use server default. (default: False) - :param local_infile: Boolean to enable the use of LOAD DATA LOCAL command. (default: False) - :param max_allowed_packet: Max size of packet sent to server in bytes. (default: 16MB) - Only used to limit size of "LOAD LOCAL INFILE" data packet smaller than default (16KB). - :param defer_connect: Don't explicitly connect on contruction - wait for connect call. - (default: False) - :param auth_plugin_map: A dict of plugin names to a class that processes that plugin. - The class will take the Connection object as the argument to the constructor. - The class needs an authenticate method taking an authentication packet as - an argument. For the dialog plugin, a prompt(echo, prompt) method can be used - (if no authenticate method) for returning a string from the user. (experimental) - :param server_public_key: SHA256 authenticaiton plugin public key value. (default: None) - :param db: Alias for database. (for compatibility to MySQLdb) - :param passwd: Alias for password. (for compatibility to MySQLdb) - :param binary_prefix: Add _binary prefix on bytes and bytearray. (default: False) - """ - if config and isinstance(config,(str,bytes)): - config = self.parse_config(config) - - - self.config = config or {} - self.config.update(kwargs) - - self.database = self.config.get('database','hdbpp') - self.user = self.config.get('user','') - self.password = self.config.get('password','') - self.port = int(self.config.get('port','3306')) - self.host = self.config.get('host','localhost') - - #print([(k,v) for k,v in self.__dict__.items() - #if k not in type(self).__dict__()]) - - self.db = mariadb.connect(database=self.database, - user=self.user, password=self.password, port=self.port, - host=self.host) - self._cursor = self.db.cursor() - - def __del__(self): - self._cursor.close() - self.db.close() - - def _query(self,query,prune=False): - """ - query: SQL code - """ - #print(query) - self._cursor.execute(query) - if prune: - r,l = [],True - while l: - try: - l = self._cursor.fetchone() - if l and (not r or l[1:] != r[-1][1:]): - r.append(l) - except: - print(r[-1:], l) - traceback.print_exc() - break - return r - else: - return self._cursor.fetchall() - - def parse_config(self,config): - """ - config string as user:password@host:port/database - or dictionary like - """ - try: - if re.match('.*[:].*[@].*',config): - h = config.split('@') - u,p = h[0].split(':') - config = {'user':u,'password':p} - if '/' in h[1]: - config['host'],config['database'] = h[1].split('/') - else: - config['host'] = h[1] - if ':' in config['host']: - config['host'],config['port'] = config['host'].split(':') - else: - if '{' not in config: - config = '{%s}' % config.replace(';',',') - config = dict(eval(config)) - except: - raise Exception('Wrong format in config, should be dict-like') - return config - - def get_attributes(self, active=False, pattern=''): - """ - Queries the database for the current list of archived attributes. - arguments: - active: True: only attributes currently archived - False: all attributes, even the one not archiving anymore - regexp: '' :filter for attributes to retrieve - """ - q = 'select att_name from att_conf' - if pattern: - q += " where att_name like '%s'" % pattern.replace('*','%') - #print(q) - return [str(a[0]).lower() for a in self._query(q) if a] - - def get_attribute_name(self,attribute): - attribute = str(attribute).lower() - if ':' not in attribute: - attribute = '%' + '/' + attribute - - elif '.' not in attribute: - attribute = attribute.rsplit(':',1) - attribute = attribute[0] + '.%' + attribute[1] - - if 'tango' not in attribute: - attribute = '%' + '/' + attribute - - attrs = self.get_attributes(pattern=attribute) - if len(attrs)!=1: - raise Exception('MultipleAttributeMatches') - - return attrs[0] if attrs else '' - - def is_attribute_archived(self, attribute, active=False): - """ - Returns if an attribute has values in DB. - - arguments: - attribute: fqdn for the attribute. - active: if true, only check for active attributes, - otherwise check all. - """ - return bool(self.get_attribute_name(attribute)) - - def get_attribute_id_table(self, attribute=''): - """ - for each matching attribute returns name, ID and table name - """ - q = "select att_name,att_conf_id,data_type " - q += " from att_conf, att_conf_data_type where " - q += "att_conf.att_conf_data_type_id = att_conf_data_type.att_conf_data_type_id" - if attribute: - q += " and att_name like '%s'" % attribute - - return [(a,i,'att_'+t) for (a,i,t) in self._query(q)] - - def get_last_attributes_values(self, attributes, columns = '', n = 1): - """ - Returns last values inserted in DB for a list of attributes - - arguments: - attribute: fqdn for the attribute. - columns: requested columns separated by commas - returns: - {'att1':(epoch, r_value, w_value, quality, error_desc), - 'att2':(epoch, r_value, w_value, quality, error_desc), - ... - } - """ - data = {} - columns = columns or 'data_time, value_r, quality, att_error_desc_id' - - for a in attributes: - try: - a,i,t = self.get_attribute_id_table(a)[0] - tdesc = str(self._query('describe %s'%t)) - tcol = ('int_time' if 'int_time' in tdesc else 'data_time') - cols = ','.join(c for c in columns.split(',') - if c.strip() in tdesc) - data[a] = self._query('select %s from %s where ' - 'att_conf_id = %s order by %s desc limit %s' - % (cols, t, i, tcol, n)) - except: - raise Exception('AttributeNotFound: %s' % a) - - return data - - def get_attributes_values(self, attributes, - start_date, stop_date=None, - decimate=None, - correlate = False, - columns = '', - **params): - """ - Returns attributes values between start and stop dates - , using decimation or not, correlating the values or not. - - arguments: - attributes: a list of the attributes' fqdn - start_date: datetime, beginning of the period to query. - stop_date: datetime, end of the period to query. - if None, now() is used. - decimate: aggregation function to use in the form: - {'timedelta0':(MIN, MAX,...) - , 'timedelta1':(AVG, COUNT,...) - ,...} - if None, returns raw data. - correlate: if True, data is generated so that - there is available data for each timestamp of - each attribute. - columns: columns separated by commas - time, r_value, w_value, quality, error_desc - - returns: - {'attr0':[(epoch0, r_value, w_value, quality, error_desc), - (epoch1, r_value, w_value, quality, error_desc), - ... ], - 'attr1':[(...),(...)]} - """ - data = {} - columns = columns or 'data_time, value_r, quality, att_error_desc_id' - if isinstance(start_date,(int,float)): - start_date = time2str(start_date) - if stop_date is None: - stop_date = now() - if isinstance(stop_date,(int,float)): - stop_date = time2str(stop_date) - - for a in attributes: - try: - a,i,t = self.get_attribute_id_table(a)[0] - tdesc = str(self._query('describe %s'%t)) - tcol = ('int_time' if 'int_time' in tdesc else 'data_time') - if tcol == 'int_time': - b,e = str2time(start_date),str2time(stop_date) - else: - b,e = "'%s'" % start_date, "'%s'" % stop_date - - cols = ','.join(c for c in columns.split(',') - if c.strip() in tdesc) - print(cols) - if 'data_time,' in cols: - cols = cols.replace('data_time,', - 'CAST(UNIX_TIMESTAMP(data_time) AS DOUBLE),') - data[a] = self._query('select %s from %s where ' - 'att_conf_id = %s and %s between %s and %s ' - 'order by data_time' - % (cols, t, i, tcol, b, e), prune=decimate) - except: - import traceback - traceback.print_exc() - #raise Exception('AttributeNotFound: %s' % a) - - return data - - return {'attr0': [(time.time(), 0., 0., 0, '')] - , 'attr1': [(time.time(), 0., 0., 0, '')]} - - -############################################################################## - -if __name__ == '__main__': - abstract.main(apiclass=MariadbReader,timeformatter=time2str) - diff --git a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/timeutils.py b/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/timeutils.py deleted file mode 100755 index 79a702ba1d7b64d4a37a44c25310d59a55183957..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/mariadb/timeutils.py +++ /dev/null @@ -1,227 +0,0 @@ -######################################################################## -## Time conversion methods from Fandango -######################################################################## - -import time, datetime, re - -END_OF_TIME = 1024*1024*1024*2-1 #Jan 19 04:14:07 2038 - -TIME_UNITS = { 'ns': 1e-9, 'us': 1e-6, 'ms': 1e-3, '': 1, 's': 1, 'm': 60, - 'h': 3600, 'd': 86.4e3, 'w': 604.8e3, 'M': 30*86.4e3, 'y': 31.536e6 } -TIME_UNITS.update((k.upper(),v) for k,v in list(TIME_UNITS.items()) if k!='m') - -#@todo: RAW_TIME should be capable to parse durations as of ISO 8601 -RAW_TIME = ('^(?:P)?([+-]?[0-9]+[.]?(?:[0-9]+)?)(?: )?(%s)$' - % ('|').join(TIME_UNITS)) # e.g. 3600.5 s - -MYSQL_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' -ISO_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' - -global DEFAULT_TIME_FORMAT -DEFAULT_TIME_FORMAT = MYSQL_TIME_FORMAT - -ALT_TIME_FORMATS = [ ('%s%s%s' % ( - date.replace('-',dash),separator if hour else '',hour)) - for date in ('%Y-%m-%d','%y-%m-%d','%d-%m-%Y', - '%d-%m-%y','%m-%d-%Y','%m-%d-%y') - for dash in ('-','/') - for separator in (' ','T') - for hour in ('%H:%M','%H:%M:%S','%H','')] - -def set_default_time_format(dtf, test = True): - """ - Usages: - - fandango.set_default_time_format('%Y-%m-%d %H:%M:%S') - - or - - fandango.set_default_time_format(fandango.ISO_TIME_FORMAT) - - """ - if test: - str2time(time2str(cad = dtf), cad = dtf) - global DEFAULT_TIME_FORMAT - DEFAULT_TIME_FORMAT = dtf - -def now(): - return time.time() - -def time2tuple(epoch=None, utc=False): - if epoch is None: epoch = now() - elif epoch<0: epoch = now()-epoch - if utc: - return time.gmtime(epoch) - else: - return time.localtime(epoch) - -def tuple2time(tup): - return time.mktime(tup) - -def date2time(date,us=True): - """ - This method would accept both timetuple and timedelta - in order to deal with times coming from different - api's with a single method - """ - try: - t = tuple2time(date.timetuple()) - us = us and getattr(date,'microsecond',0) - if us: t+=us*1e-6 - return t - except Exception as e: - try: - return date.total_seconds() - except: - raise e - -def date2str(date, cad = '', us=False): - #return time.ctime(date2time(date)) - global DEFAULT_TIME_FORMAT - cad = cad or DEFAULT_TIME_FORMAT - t = time.strftime(cad, time2tuple(date2time(date))) - us = us and getattr(date,'microsecond',0) - if us: t+='.%06d'%us - return t - -def time2date(epoch=None): - if epoch is None: epoch = now() - elif epoch<0: epoch = now()-epoch - return datetime.datetime.fromtimestamp(epoch) - -def utcdiff(t=None): - return now() - date2time(datetime.datetime.utcnow()) - -def time2str(epoch=None, cad='', us=False, bt=True, - utc=False, iso=False): - """ - cad: introduce your own custom format (see below) - use DEFAULT_TIME_FORMAT to set a default one - us=False; True to introduce ms precission - bt=True; negative epochs are considered relative from now - utc=False; if True it converts to UTC - iso=False; if True, 'T' will be used to separate date and time - - cad accepts the following formats: - - %a Locale's abbreviated weekday name - %A Locales full weekday name - %b Locales abbreviated month name - %B Locales full month name - %c Locales appropriate date and time representation - %d Day of the month as a decimal number [01,31] - %H Hour (24-hour clock) as a decimal number [00,23] - %I Hour (12-hour clock) as a decimal number [01,12] - %j Day of the year as a decimal number [001,366] - %m Month as a decimal number [01,12] - %M Minute as a decimal number [00,59] - %p Locales equivalent of either AM or PM - %S Second as a decimal number [00,61] - %U Week number of the year (Sunday as the first day of the week) as a decimal number [00,53] - All days in a new year preceding the first Sunday are considered to be in week 0 - %w Weekday as a decimal number [0(Sunday),6] - %W Week number of the year (Monday as the first day of the week) as a decimal number [00,53] - All days in a new year preceding the first Monday are considered to be in week 0 - %x Locales appropriate date representation - %X Locales appropriate time representation - %y Year without century as a decimal number [00,99] - %Y Year with century as a decimal number - %Z Time zone name (no characters if no time zone exists) - %% A literal '%' character - """ - if epoch is None: epoch = now() - elif bt and epoch<0: epoch = now()+epoch - global DEFAULT_TIME_FORMAT - if cad: - cad = 'T'.join(cad.split(' ',1)) if iso else cad - else: - cad = ISO_TIME_FORMAT if iso else DEFAULT_TIME_FORMAT - - t = time.strftime(cad,time2tuple(epoch,utc=utc)) - us = us and epoch%1 - if us: t+='.%06d'%(1e6*us) - return t - -epoch2str = time2str - -def str2time(seq='', cad='', throw=True, relative=False): - """ - :param seq: Date must be in ((Y-m-d|d/m/Y) (H:M[:S]?)) format or -N [d/m/y/s/h] - - See RAW_TIME and TIME_UNITS to see the units used for pattern matching. - - The conversion itself is done by time.strptime method. - - :param cad: You can pass a custom time format - :param relative: negative times will be converted to now()-time - :param throw: if False, None is returned instead of exception - """ - try: - if seq in (None,''): - return time.time() - if 'NOW-' in seq: - seq,relative = seq.replace('NOW',''),True - elif seq=='NOW': - return now() - - t, seq = None, str(seq).strip() - if not cad: - m = re.match(RAW_TIME,seq) - if m: - #Converting from a time(unit) format - value,unit = m.groups() - t = float(value)*TIME_UNITS[unit] - return t # must return here - - #Converting from a date format - ms = re.match('.*(\.[0-9]+)$',seq) #Splitting the decimal part - if ms: - ms,seq = float(ms.groups()[0]),seq.replace(ms.groups()[0],'') - - if t is None: - #tf=None will try default system format - global DEFAULT_TIME_FORMAT - time_fmts = ([cad] if cad else - [DEFAULT_TIME_FORMAT,None] + ALT_TIME_FORMATS) - for tf in time_fmts: - try: - tf = (tf,) if tf else () - t = time.strptime(seq,*tf) - break - except: - pass - - v = time.mktime(t)+(ms or 0) - if relative and v<0: - v = fn.now()-v - return v - except: - if throw: - raise Exception('PARAMS_ERROR','unknown time format: %s' % seq) - else: - return None - - -str2epoch = str2time - -def time2gmt(epoch=None): - if epoch is None: epoch = now() - return tuple2time(time.gmtime(epoch)) - -def timezone(): - t = now() - return old_div(int(t-time2gmt(t)),3600) - -#Auxiliary methods: -def ctime2time(time_struct): - try: - return (float(time_struct.tv_sec)+1e-6*float(time_struct.tv_usec)) - except: - return -1 - -def mysql2time(mysql_time,us=True): - try: - return date2time(mysql_time,us=us) - #t = time.mktime(mysql_time.timetuple()) - except: - return -1 diff --git a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/reader.py b/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/reader.py deleted file mode 100755 index bde0560e93c6d3a731b6ffab4e06d65ae2f781ce..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/reader.py +++ /dev/null @@ -1,238 +0,0 @@ - -import sys -import argparse -import yaml -import logging -import logging.handlers -import datetime -import importlib - - -logger = logging.getLogger('hdbpp_reader') - -############################################################################### - -__usage__ = """ -Usage: - -:> reader : print this help - -:> reader [options] list [pattern] : - returns matching attributes from database - -:> reader [options] <attribute> : - print last value for attribute - -:> reader [options] <attribute> <start> <stop> : - returns values for attribute between given dates - -Options (at least some is needed): - --prompt - --config=user:password@host:port/database - --database= - --host= - --port= - --user= - --password= - -""" - -def reader(apiclass='abstract.AbstractReader', config=None): - """ - Initialize a reader object, based on the specified backend - and a config dictionnary. - """ - - mod_name, class_name = apiclass.rsplit('.', 1) - - module = importlib.import_module(mod_name) - - return getattr(module, class_name)(**config) - -def load_config(config_file): - """ - Load the config file from the given path - Arguments: - config_file : str -- Path and name of the config file to load - Returns: - dict -- dictionary of values from the yaml config file. - """ - - with open(config_file, 'r') as fp: - try: - config = yaml.safe_load(fp) - - except yaml.YAMLError as error: - logger.error("Unable to load the config file: {}. Error: {}".format(config_file, error)) - return None - - logger.info("Loaded config file: {}".format(config_file)) - - # return the dictionary with the configuration in for the script to use - return config - -def parse_config(connect_str): - """ - Parse a connect string into the various element to initiate a connection. - Arguments: - connect_str : str -- user:password@host:port/database - Returns: - dict -- dictionary of values from the connect string. - """ - config = {} - - config_split, config['database'] = connect_str.split('/') - user_pass, host_port = config_split.split('@') - config['user'], config['password'] = user_pass.split(':') - config['host'], config['port'] = host_port.split(':') - - return config - -def add_defaults_to_config(config): - """ - Ensure the defaults for certain config params are part of the configuration - Arguments: - config : dict -- Configuration - """ - if 'database' not in config: - config['database'] = 'hdb' - - if 'host' not in config: - config['host'] = 'localhost' - - if 'port' not in config: - config['port'] = 3306 - -def validate_config(config): - """ - Validate the config. Certain values will be checked for, and if not present - the config is considered not valid and false is returned - Arguments: - config : dict -- dictionary of values that represent the config. - Returns: - bool -- True on success, False otherwise - """ - if len(config) == 0: - logger.error("Invalid configuration, no values loaded.") - return False - - if 'database' not in config: - logger.error("Invalid configuration, no database provided.") - return False - - if 'user' not in config: - logger.error("Invalid configuration, no username provided to connect to the database.") - return False - - if 'password' not in config: - logger.error("Invalid configuration, no password provided to connect to the database.") - return False - - return True - - - -def read(): - parser = argparse.ArgumentParser(description="HDB++ reader") - parser.add_argument("-v", "--version", action="store_true", help="version information") - parser.add_argument("-D", "--debug", action="store_true", help="debug output for development") - parser.add_argument("--syslog", action="store_true", help="send output to syslog") - parser.add_argument("-b", "--backend", default="mariadb.MariadbReader", help="Reader backend.") - parser.add_argument("-P", "--prompt", action="store_false", help="Prompt for connection details") - parser.add_argument("-c", "--config", default=None, help="config file to use") - parser.add_argument("-C", "--connect", help="connect string to connect to the database.") - parser.add_argument("-d", "--database", help="database to connect to.") - parser.add_argument("-H", "--host", help="host to connect to.") - parser.add_argument("-u", "--user", help="User for the database connection.") - parser.add_argument("-p", "--password", help="password for the database connection.") - parser.add_argument("-t", "--timeformat", help="Time format expression.") - - subparsers = parser.add_subparsers(help='sub-command help') - - # create the parser for the "list" command - parser_list = subparsers.add_parser('list', help='List attributes matching the pattern from database') - parser_list.add_argument("-P", '--pattern', type=str, nargs='?', default=None, help='SQL like type pattern') - parser_list.set_defaults(func=list_attributes) - - parser_read = subparsers.add_parser('read', help='Read attribute value.') - parser_read.add_argument('attribute', type=str, nargs='?', default=None, help="Name of the attribute to extract.") - parser_read.add_argument('start', type=datetime.date.fromisoformat, nargs='?', default=None, help="Start date for the query.") - parser_read.add_argument('stop', type=datetime.date.fromisoformat, nargs='?', default=None, help="End date for the query.") - parser_read.set_defaults(func=read_attribute) - - if len(sys.argv) == 1: - parser.print_help(sys.stderr) - sys.exit(1) - - args = parser.parse_args() - - stdout_formatter = logging.Formatter("%(asctime)s hdbpp_reader[%(process)d]: %(message)s", "%Y-%m-%d %H:%M:%S") - stdout_handler = logging.StreamHandler() - stdout_handler.setFormatter(stdout_formatter) - logger.addHandler(stdout_handler) - - if args.syslog: - syslog_formatter = logging.Formatter("hdbpp_reader[%(process)d]: %(message)s") - syslog_handler = logging.handlers.SysLogHandler(address='/dev/log') - syslog_handler.setFormatter(syslog_formatter) - logger.addHandler(syslog_handler) - - if args.debug: - logger.setLevel(logging.DEBUG) - else: - logger.setLevel(logging.INFO) - - if args.version: - print("Version {}.{}.{}".format(str(version_major), str(version_minor), str(version_patch))) - - # Build a config based on the provided information - # A config file is preferred, then a single connect argument, - # then the prompt. - config = {} - - if args.config: - config = load_config(args.config) - elif args.connect: - config = parse_config(args.connect) - elif args.prompt: - config['host'] = input('host (default localhost):') or 'localhost' - config['port'] = input('port (default 3306):') or 3306 - config['database'] = input('database (default hdb):') or 'hdb' - config['user'] = input('user:') - config['password'] = input('password:') - - add_defaults_to_config(config) - - # Check the config. - if not validate_config(config): - return False - - # Build the reader object - _reader = reader(args.backend, config) - - return args.func(_reader, args) - - -def list_attributes(_reader, args): - pattern = args.pattern - print('\n'.join(_reader.get_attributes(pattern=pattern))) - -def read_attribute(_reader, args): - start = args.start - stop = args.stop - attribute = args.attribute - - if start is None: - print(_reader.get_last_attribute_value(attribute)) - else: - datas = _reader.get_attribute_values(attribute, start, stop, - decimate=True) - for data in datas: - data_str = '\t'.join(map(str,data)) - if args.timeformat: - print('%s\t%s' % (timeformat(data[0]), data_str)) - else: - print(data_str) - -if __name__ == '__main__': - read() diff --git a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/timescaledb/__init__.py b/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/timescaledb/__init__.py deleted file mode 100755 index fe377c135a1e8decf67bf6817a9a5fc018996762..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/timescaledb/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .timescaledb import TimescaleDbReader diff --git a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/timescaledb/timescaledb.py b/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/timescaledb/timescaledb.py deleted file mode 100755 index 55871ef75e951498581010e1568a4fcd16941931..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/toolkit/hdbpp_reader/timescaledb/timescaledb.py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python3 - -import re, traceback -import datetime -import psycopg2 -from psycopg2 import sql - -import logging -import logging.handlers - -from datetime import timedelta -from ..abstract import AbstractReader - - -class TimescaleDbReader(AbstractReader): - """ - read-only API for hdb++ databases, based on PyTangoArchiving AbstractReader - """ - - def __init__(self,config='',**kwargs): - """ - """ - self.logger = logging.getLogger('TimescaledbReader') - - if config and isinstance(config,(str,bytes)): - config = self.parse_config(config) - - - self.config = config or {} - self.config.update(kwargs) - - self.database = self.config.get('database','hdbpp') - self.user = self.config.get('user','') - self.password = self.config.get('password','') - self.port = self.config.get('port', self._get_default_port()) - self.host = self.config.get('host','localhost') - - try: - self.logger.debug("Attempting to connect to server: {}".format(self.host)) - - # attempt to connect to the server - self.db = psycopg2.connect( - user=self.user, - password=self.password, - host=self.host, - port=self.port, - database=self.database) - - self.db.autocommit = True - - self.logger.debug("Connected to database at server: {}".format(self.host)) - - self._cursor = self.db.cursor() - except (Exception, psycopg2.Error) as error: - self.logger.error("Error: {}".format(error)) - - - def __del__(self): - self._cursor.close() - self.db.close() - - def _query(self,query,prune=False): - """ - query: SQL code - """ - #print(query) - self._cursor.execute(query) - if prune: - rows, next_row = [],True - while next_row: - try: - next_row = self._cursor.fetchone() - if next_row and (not next_row or rows[1:] != rows[-1][1:]): - rows.append(next_row) - except: - self.logger.error(rows[-1:], row) - traceback.print_exc() - break - return rows - else: - return self._cursor.fetchall() - - - def _get_default_port(self): - """ - Get the default port to connect to the database - """ - return 5001 - - - def parse_config(self,config): - """ - config string as user:password@host:port/database - or dictionary like - """ - try: - if re.match('.*[:].*[@].*',config): - h = config.split('@') - user, password = h[0].split(':') - config = {'user':user,'password':password} - if '/' in h[1]: - config['host'], config['database'] = h[1].split('/') - else: - config['host'] = h[1] - if ':' in config['host']: - config['host'], config['port'] = config['host'].split(':') - else: - if '{' not in config: - config = '{%s}' % config.replace(';',',') - config = dict(eval(config)) - except: - raise Exception('Wrong format in config, should be dict-like') - - if 'port' in config: - config['port'] = int(config['port']) - - return config - - - def get_attributes(self, active=False, pattern=''): - """ - Queries the database for the current list of archived attributes. - arguments: - active: True: only attributes currently archived - False: all attributes, even the one not archiving anymore - regexp: '' :filter for attributes to retrieve - """ - query = 'SELECT att_name FROM att_conf' - - if pattern: - query += " WHERE att_name LIKE '{}'".format(pattern.replace('*','%')) - - self.logger.debug(query) - - return [str(attribute[0]).lower() for attribute in self._query(query)] - - - def is_attribute_archived(self, attribute, active=False): - """ - Returns true if an attribute has values in DB. - - arguments: - attribute: fqdn for the attribute. - active: if true, only check for active attributes, - otherwise check all. - """ - self._cursor.execute("SELECT att_conf_id FROM att_conf WHERE att_name LIKE %s AND hide=%s;", ('%'+attribute+'%', active)) - - att_id = self._cursor.fetchall() - - # if we get more than one attribute an error occured. - if len(att_id) > 1: - self.logger.debug("Fetched more than 1 attribute with this name {}".format(attribute)) - - return len(att_id) - - def get_last_attributes_values(self, attributes, columns = [], n = 1): - """ - Returns last values inserted in DB for a list of attributes - - arguments: - attribute: fqdn for the attribute. - columns: list of requested columns - returns: - {'att1':(epoch, r_value, w_value, quality, error_desc), - 'att2':(epoch, r_value, w_value, quality, error_desc), - ... - } - """ - data = {} - columns = columns or ["value_r", "value_w", "quality", "att_error_desc_id"] - - for attr in attributes: - try: - # get the att_conf_id and the table where the data is from hdb - self._cursor.execute("SELECT att_conf_id, table_name, att_name FROM att_conf WHERE att_name LIKE %s;", ('%'+attr+'%',)) - - att_id = self._cursor.fetchall() - - if len(att_id) == 0: - self.logger.debug("Attribute {} not found".format(attr)) - raise Exception("Attribute {} not found.".format(attr)) - - elif (len(att_id) > 1): - self.logger.debug("Fetched more than 1 attribute with this name {}".format(attr)) - - else: - self._cursor.execute( - sql.SQL("SELECT data_time, {fields} FROM {table} WHERE att_conf_id=%s ORDER BY data_time DESC LIMIT %s").format( - fields = sql.SQL(',').join([sql.Identifier(field) for field in columns]), - table = sql.Identifier(att_id[0][1])) - , (att_id[0][0], n)) - - if n == 1: - data[attr] = self._cursor.fetchall()[0] - else: - data[attr] = self._cursor.fetchall() - - except (Exception, psycopg2.Error) as error: - self.logger.error("Error extracting data for attribute: {}: {}".format(attr, error)) - raise error - - return data - - def get_attributes_values(self, attributes, - start_date, stop_date=None, - decimate=None, - correlate = False, - columns = [], - **params): - """ - Returns attributes values between start and stop dates - , using decimation or not, correlating the values or not. - - arguments: - attributes: a list of the attributes' fqdn - start_date: datetime, beginning of the period to query. - stop_date: datetime, end of the period to query. - if None, now() is used. - decimate: aggregation function to use in the form: - {'timedelta0':(MIN, MAX, …) - , 'timedelta1':(AVG, COUNT, …) - , …} - if None, returns raw data. - correlate: if True, data is generated so that - there is available data for each timestamp of - each attribute. - columns: list of columns - [time, r_value, w_value, quality, error_desc] - - returns: - {'attr0':[(epoch0, r_value, w_value, quality, error_desc), - (epoch1, r_value, w_value, quality, error_desc), - ... ], - 'attr1':[(…),(…)]} - """ - data = {} - columns = columns or ["value_r", "value_w", "quality", "att_error_desc_id"] - - for attr in attributes: - try: - # get the att_conf_id and the table where the data is from hdb - self._cursor.execute("SELECT att_conf_id, table_name, att_name FROM att_conf WHERE att_name LIKE %s;", ('%'+attr+'%',)) - - att_id = self._cursor.fetchall() - - if (len(att_id) > 1): - self.logger.debug("Fetched more than 1 attribute with this name {}".format(attr)) - else: - if stop_date is None: - stop_date = datetime.datetime.now() - - # extract data. - self.logger.debug("Extracting data for attribute: {} in table: {}".format(att_id[0][2], att_id[0][1])) - - self._cursor.execute( - sql.SQL("SELECT data_time, {fields} FROM {table} WHERE att_conf_id=%s AND data_time BETWEEN %s AND %s ORDER BY data_time DESC").format( - fields = sql.SQL(',').join([sql.Identifier(field) for field in columns]), - table = sql.Identifier(att_id[0][1])) - , (att_id[0][0], start_date, stop_date)) - - data[attr] = self._cursor.fetchall() - - except (Exception, psycopg2.Error) as error: - self.logger.error("Error extracting data for attribute: {}: {}".format(attr, error)) - - return data - - -############################################################################## - -if __name__ == '__main__': - AbstractReader.main(apiclass=TimescaledbReader) -