diff --git a/docker-compose/jupyter/requirements.txt b/docker-compose/jupyter/requirements.txt index 3be9c3d4c8f1052920b577d77442a348c09aa7ca..7d8be5631439e6b5dbae530ce06ac64cdd6cc542 100644 --- a/docker-compose/jupyter/requirements.txt +++ b/docker-compose/jupyter/requirements.txt @@ -9,7 +9,8 @@ nbconvert notebook-as-pdf python-logstash-async PyMySQL[rsa] +psycopg2-binary >= 2.9.2 #LGPL sqlalchemy pyvisa pyvisa-py -opcua \ No newline at end of file +opcua diff --git a/docker-compose/prometheus.yml b/docker-compose/prometheus.yml index 1e9ce6f1aa2cd050565f48a4b991865641fd1566..e7924c1a7219adc16e1a3c1780b0bcc43773b3c0 100644 --- a/docker-compose/prometheus.yml +++ b/docker-compose/prometheus.yml @@ -7,6 +7,9 @@ version: '2' +volumes: + prometheus-data: {} + services: prometheus: image: prometheus @@ -15,6 +18,8 @@ services: container_name: ${CONTAINER_NAME_PREFIX}prometheus networks: - control + volumes: + - prometheus-data:/prometheus ports: - "9090:9090" logging: diff --git a/docker-compose/prometheus/Dockerfile b/docker-compose/prometheus/Dockerfile index cc1494f98dbce6c66e437b001af2a88320ca0ffa..ad8e5165b06b55a3ca1e273d09ee2fbf6c69db1c 100644 --- a/docker-compose/prometheus/Dockerfile +++ b/docker-compose/prometheus/Dockerfile @@ -1,3 +1,5 @@ FROM prom/prometheus COPY prometheus.yml /etc/prometheus/prometheus.yml + +CMD ["--config.file=/etc/prometheus/prometheus.yml", "--storage.tsdb.path=/prometheus", "--web.console.libraries=/usr/share/prometheus/console_libraries", "--web.console.templates=/usr/share/prometheus/consoles", "--storage.tsdb.retention.time=31d"] diff --git a/tangostationcontrol/requirements.txt b/tangostationcontrol/requirements.txt index 64b992037fe5697a8c90a0518ad41628c6c23faa..9f910e1dae2d3ddd31e6c70c1bcffce55e0a92b0 100644 --- a/tangostationcontrol/requirements.txt +++ b/tangostationcontrol/requirements.txt @@ -4,6 +4,7 @@ asyncua >= 0.9.90 # LGPLv3 PyMySQL[rsa] >= 1.0.2 # MIT +psycopg2-binary >= 2.9.2 #LGPL sqlalchemy >= 1.4.26 #MIT snmp >= 0.1.7 # GPL3 h5py >= 3.5.0 # BSD diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py index 6c0cd3baa8e2c708d39858191bc56e73e84a9eff..7562e88620c897bbc09c35fce92f87540d9bb04b 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py @@ -1,47 +1,59 @@ #! /usr/bin/env python3 -#from logging import raiseExceptions import logging from tango import DeviceProxy, AttributeProxy -from datetime import datetime, timedelta import time import json, os -from sqlalchemy import create_engine, and_ -from sqlalchemy.orm import sessionmaker -from sqlalchemy.orm.exc import NoResultFound -from .archiver_base import * logger = logging.getLogger() -def parse_attribute_name(attribute_name:str): +def attribute_name_from_url(attribute_name:str): """ For some operations Tango attribute must be transformed from the form 'tango://db:port/domain/family/name/attribute' to canonical 'domain/family/name/attribute' """ - chunk_num = len(attribute_name.split('/')) - if (chunk_num==7 and attribute_name.split('/')[0]=='tango:'): + if attribute_name.startswith('tango://'): return '/'.join(attribute_name.split('/')[3:]) - else: - if (chunk_num!=4): - raise AttributeFormatException - else: - return attribute_name -def parse_device_name(device_name:str, tango_host:str = 'databaseds:10000'): + if len(attribute_name.split('/')) != 4: + raise ValueError(f"Expected attribute of format 'domain/family/name/attribute', got {attribute_name}") + + return attribute_name + +def device_name_url(device_name:str, tango_host:str = 'databaseds:10000'): """ For some operations Tango devices must be transformed from the form 'domain/family/name' to 'tango://db:port/domain/family/name' """ - chunk_num = len(device_name.split('/')) - if (chunk_num==3): - return 'tango://'+tango_host+'/'+device_name - elif (chunk_num==6 and device_name.split('/')[0]=='tango:'): + if device_name.startswith('tango://'): return device_name + + if len(device_name.split('/')) != 3: + raise ValueError(f"Expected device name of format 'domain/family/name', got {device_name}") + + return f"tango://{tango_host}/{device_name}" + +def split_tango_name(tango_fqname:str, tango_type:str): + """ + Helper function to split device or attribute Tango full qualified names + into its components + """ + if tango_type.lower() == 'device': + try: + domain, family, member = tango_fqname.split('/') + return domain, family, member + except ValueError as e: + raise ValueError(f"Could not parse device name {tango_fqname}. Please provide FQDN, e.g. STAT/Device/1") from e + elif tango_type.lower() == 'attribute': + try: + domain, family, member, name = tango_fqname.split('/') + return domain, family, member, name + except ValueError as e: + raise ValueError(f"Could not parse attribute name {tango_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute") from e else: - raise Exception(f'{device_name} is a wrong device name') - + raise ValueError(f"Invalid value: {tango_type}. Please provide 'device' or 'attribute'.") class Archiver(): """ @@ -52,7 +64,7 @@ class Archiver(): dev_polling_time = None dev_archive_time = None - def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbpp/confmanager01', context: str = 'RUN'): + def __init__(self, selector_filename:str = None, cm_name: str = 'archiving/hdbppts/confmanager01', context: str = 'RUN'): self.cm_name = cm_name self.cm = DeviceProxy(cm_name) try: @@ -67,7 +79,7 @@ class Archiver(): try: self.apply_selector() except Exception as e: - raise Exception("Error in selecting configuration! Archiving framework will not be updated!") from e + raise Exception("Error in selecting configuration. Archiving framework will not be updated.") from e def get_db_config(self, device_name:str): """ @@ -177,7 +189,7 @@ class Archiver(): es_state = es.state() # ping the device server if 'FAULT' in str(es_state): raise Exception(f"{es_name} is in FAULT state") - self.cm.ArchiverAdd(parse_device_name(es_name)) + self.cm.ArchiverAdd(device_name_url(es_name)) except Exception as e: if 'already_present' in str(e): logger.warning(f"Subscriber {es_name} already present in Configuration Manager") @@ -191,7 +203,7 @@ class Archiver(): The ConfigurationManager and EventSubscriber devices must be already up and running. The archiving-DBMS must be already properly configured. """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) try: self.cm.write_attribute('SetAttributeName', attribute_name) self.cm.write_attribute('SetArchiver', es_name or self.get_next_subscriber()) @@ -237,7 +249,7 @@ class Archiver(): """ Stops the data archiving of the attribute passed as input, and remove it from the subscriber's list. """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) try: self.cm.AttributeStop(attribute_name) self.cm.AttributeRemove(attribute_name) @@ -278,7 +290,7 @@ class Archiver(): exclude_list = [a.lower() for a in exclude] attrs_list = [a.lower() for a in list(attributes_nok) if a.lower() not in exclude_list] for a in attrs_list: - attr_fullname = parse_attribute_name(a) + attr_fullname = attribute_name_from_url(a) self.remove_attribute_from_archiver(attr_fullname) def start_archiving_attribute(self, attribute_name:str): @@ -286,7 +298,7 @@ class Archiver(): Starts the archiving of the attribute passed as input. The attribute must be already present in the subscriber's list """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) try: self.cm.AttributeStart(attribute_name) except Exception as e: @@ -300,7 +312,7 @@ class Archiver(): Stops the archiving of the attribute passed as input. The attribute must be already present in the subscriber's list """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) try: self.cm.AttributeStop(attribute_name) except Exception as e: @@ -313,14 +325,14 @@ class Archiver(): """ Check if an attribute is in the archiving list """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) attributes = self.cm.AttributeSearch(attribute_name.lower()) if len(attributes)>1: # Handle case same attribute_name r/rw if len(attributes)==2 and (attributes[0].endswith(attributes[1]+'w') or attributes[1].endswith(attributes[0]+'w')): return True else: - raise Exception(f"Multiple Attributes Matched! {attributes}") + raise Exception(f"Multiple Attributes Matched: {attributes}") elif len(attributes)==1: return True else: @@ -371,7 +383,7 @@ class Archiver(): """ Return the error related to the attribute """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) errs_dict = self.get_subscriber_errors() for e in errs_dict: if attribute_name in e: @@ -396,7 +408,7 @@ class Archiver(): """ Given an attribute name, return the event subscriber associated with it """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) # If the ConfManager manages more than one subscriber if len(self.get_subscribers())>1: for es_name in self.get_subscribers(): @@ -411,7 +423,7 @@ class Archiver(): """ Return the attribute archiving frequency in events/minute """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) if self.is_attribute_archived(attribute_name): es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) freq_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeRecordFreqList)) @@ -425,7 +437,7 @@ class Archiver(): """ Return the attribute failure archiving frequency in events/minute """ - attribute_name = parse_attribute_name(attribute_name) + attribute_name = attribute_name_from_url(attribute_name) if self.is_attribute_archived(attribute_name): es = DeviceProxy(self.get_attribute_subscriber(attribute_name)) fail_dict = dict((a,r) for a,r in zip(es.AttributeList,es.AttributeFailureFreqList)) @@ -461,168 +473,5 @@ class Selector(): data = json.load(f) f.close() except FileNotFoundError as e: - raise Exception("JSON configuration file not found!") from e + raise return data - -class Retriever(): - """ - The Retriever class implements retrieve operations on a given DBMS - """ - def __init__(self, cm_name: str = 'archiving/hdbpp/confmanager01'): - self.cm_name = cm_name - self.session = self.connect_to_archiving_db() - - def get_db_credentials(self): - """ - Retrieves the DB credentials from the Tango properties of Configuration Manager - """ - cm = DeviceProxy(self.cm_name) - config_list = cm.get_property('LibConfiguration')['LibConfiguration'] # dictionary {'LibConfiguration': list of strings} - host = str([s for s in config_list if "host" in s][0].split('=')[1]) - dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1]) - port = str([s for s in config_list if "port" in s][0].split('=')[1]) - user = str([s for s in config_list if "user" in s][0].split('=')[1]) - pw = str([s for s in config_list if "password" in s][0].split('=')[1]) - return host,dbname,port,user,pw - - def connect_to_archiving_db(self): - """ - Returns a session to a MySQL DBMS using default credentials. - """ - host,dbname,port,user,pw = self.get_db_credentials() - engine = create_engine('mysql+pymysql://'+user+':'+pw+'@'+host+':'+port+'/'+dbname) - Session = sessionmaker(bind=engine) - return Session() - - def get_all_archived_attributes(self): - """ - Returns a list of the archived attributes in the DB. - """ - attrs = self.session.query(Attribute).order_by(Attribute.att_conf_id).all() - # Returns the representation as set in __repr__ method of the mapper class - return attrs - - def get_archived_attributes_by_device(self,device_fqname: str): - """ - Takes as input the fully-qualified name of a device and returns a list of its archived attributes - """ - try: - [domain, family, member] = device_fqname.split('/') - except: - raise AttributeFormatException(f"Could not parse device name {device_fqname}. Please provide FQDN, e.g. STAT/Device/1") - attrs = self.session.query(Attribute).filter(and_(Attribute.domain == domain, Attribute.family == family, \ - Attribute.member == member)).all() - # Returns the representation as set in __repr__ method of the mapper class - return attrs - - def get_attribute_id(self,attribute_fqname: str): - """ - Takes as input the fully-qualified name of an attribute and returns its id. - """ - try: - [domain, family, member, name] = attribute_fqname.split('/') - except: - raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute") - try: - result = self.session.query(Attribute.att_conf_id).filter(and_(Attribute.domain == domain, Attribute.family == family, \ - Attribute.member == member, Attribute.name == name)).one() - return result[0] - except TypeError as e: - raise Exception("Attribute not found!") from e - except NoResultFound as e: - raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e - - def get_attribute_datatype(self,attribute_fqname: str): - """ - Takes as input the fully-qualified name of an attribute and returns its Data-Type. - Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used - as DB table name suffix in which values are stored. - """ - try: - [domain, family, member, name] = attribute_fqname.split('/') - except: - raise AttributeFormatException(f"Could not parse attribute name {attribute_fqname}. Please provide FQDN, e.g. STAT/Device/1/Attribute") - try: - result = self.session.query(DataType.data_type).join(Attribute,Attribute.att_conf_data_type_id==DataType.att_conf_data_type_id).\ - filter(and_(Attribute.domain == domain, Attribute.family == family, Attribute.member == member, Attribute.name == name)).one() - return result[0] - except TypeError as e: - raise Exception("Attribute not found!") from e - except NoResultFound as e: - raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e - - def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0): - """ - Takes as input the attribute fully-qualified name and the number of past hours since the actual time - (e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half). - Returns a list of timestamps and a list of values - """ - attr_id = self.get_attribute_id(attribute_fqname) - attr_datatype = self.get_attribute_datatype(attribute_fqname) - attr_table_name = 'att_'+str(attr_datatype) - # Retrieves the class that maps the DB table given the tablename - base_class = get_class_by_tablename(attr_table_name) - # Retrieves the timestamp - time_now = datetime.now() - time_delta = time_now - timedelta(hours=hours) - # Converts the timestamps in the right format for the query - time_now_db = str(time_now.strftime("%Y-%m-%d %X")) - time_delta_db = str(time_delta.strftime("%Y-%m-%d %X")) - try: - result = self.session.query(base_class).\ - join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\ - filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \ - base_class.data_time <= time_now_db)).order_by(base_class.data_time).all() - except AttributeError as e: - raise Exception(f"Empty result! Attribute {attribute_fqname} not found") from e - return result - - def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime): - """ - Takes as input the attribute name and a certain starting and ending point-time. - The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss"). - Returns a list of timestamps and a list of values - """ - attr_id = self.get_attribute_id(attribute_fqname) - attr_datatype = self.get_attribute_datatype(attribute_fqname) - attr_table_name = 'att_'+str(attr_datatype) - # Retrieves the class that maps the DB table given the tablename - base_class = get_class_by_tablename(attr_table_name) - try: - result = self.session.query(base_class).\ - join(Attribute,Attribute.att_conf_id==base_class.att_conf_id).\ - filter(and_(Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \ - base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all() - except AttributeError as e: - raise Exception(f"Empty result! Attribute {attribute_fqname} not found") from e - return result - - def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='LTS/SDP/1/fpga_temp_r', - mask_attr_name:str='LTS/SDP/1/tr_fpga_mask_r'): - """ - Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE - """ - mask_values = self.get_attribute_value_by_interval(mask_attr_name,start_time,stop_time) - temp_values = self.get_attribute_value_by_interval(temp_attr_name,start_time,stop_time) - # Since timestamps can be not syncrhonized, remove first or last element from arrays - if len(mask_values)==len(temp_values): - first_mask_datatime = mask_values[0].data_time - first_temp_datatime = temp_values[0].data_time - if (first_mask_datatime>first_temp_datatime): - mask_values = mask_values[:-int(mask_values[0].dim_x_r)] - temp_values = temp_values[int(temp_values[0].dim_x_r):] - elif (first_mask_datatime<first_temp_datatime): - mask_values = mask_values[int(mask_values[0].dim_x_r)] - temp_values = temp_values[:-int(temp_values[0].dim_x_r):] - else: - raise Exception - # Convert DB Array records into Python lists - mask_data = build_array_from_record(mask_values,mask_values[0].dim_x_r) - temp_data = build_array_from_record(temp_values,temp_values[0].dim_x_r) - # Extract only the value from the array - mask_array_values = get_values_from_record(mask_data) - temp_array_values = get_values_from_record(temp_data) - # Multiply the matrix - #masked_values = np.multiply(temp_array_values,mask_array_values) - masked_values = np.ma.masked_array(temp_array_values,mask=np.invert(mask_array_values.astype(bool))) - return masked_values, mask_values, temp_values diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver_base.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver_base_mysql.py similarity index 98% rename from tangostationcontrol/tangostationcontrol/toolkit/archiver_base.py rename to tangostationcontrol/tangostationcontrol/toolkit/archiver_base_mysql.py index 4440957bb8546a9e42638a0a6b441e43119fa601..4224b2349e7554d51a0a918fc22f70a01c022cdf 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver_base.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver_base_mysql.py @@ -7,7 +7,7 @@ from sqlalchemy.dialects.mysql import DOUBLE,TIMESTAMP,BLOB, FLOAT, BIGINT from sqlalchemy.sql.expression import table from typing import List from itertools import groupby -import numpy as np +import numpy #Declarative system used to define classes mapped to relational DB tables Base = declarative_base() @@ -28,10 +28,9 @@ class Attribute(Base): family = Column(String) member = Column(String) name = Column(String) - - + def __repr__(self): - return f"<Attribute(fullname='{self.att_name}',data_type ='{self.att_conf_data_type_id}',ttl='{self.att_ttl}',facility ='{elf.facility}',domain ='{self.domain}',family ='{self.family}',member ='{self.member}',name ='{self.name}')>" + return f"<Attribute(fullname='{self.att_name}',data_type ='{self.att_conf_data_type_id}',ttl='{self.att_ttl}',facility ='{self.facility}',domain ='{self.domain}',family ='{self.family}',member ='{self.member}',name ='{self.name}')>" class DataType(Base): """ @@ -906,14 +905,14 @@ def build_array_from_record(rows: List[Array], dim_x: int): """ Converts Array database items in Python lists """ - matrix = np.array([]) + matrix = numpy.array([]) for i in range(0,dim_x): - x = np.array([item for item in rows if item.idx==i]) #group records by array index + x = numpy.array([item for item in rows if item.idx==i]) #group records by array index if i==0: - matrix = np.append(matrix,x) #append first row + matrix = numpy.append(matrix,x) #append first row else: - matrix = np.vstack([matrix,x]) #stack vertically - result = np.transpose(matrix) #transpose -> each row is a distinct array of value + matrix = numpy.vstack([matrix,x]) #stack vertically + result = numpy.transpose(matrix) #transpose -> each row is a distinct array of value list_result = result.tolist() return list_result @@ -921,8 +920,8 @@ def get_values_from_record(data_matrix: List[Array]): """ Returns a matrix of values from a matrix of Array records """ - array_matrix = np.matrix(data_matrix) - value_matrix = np.empty(array_matrix.shape) + array_matrix = numpy.matrix(data_matrix) + value_matrix = numpy.empty(array_matrix.shape) for index in range(array_matrix.size): # for each object element value_matrix.itemset(index,array_matrix.item(index).value_r) # extract the value from object and put in the matrix return value_matrix diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver_base_ts.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver_base_ts.py new file mode 100644 index 0000000000000000000000000000000000000000..0480777234333dc7e8e36e2f0cd6db519a06453c --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver_base_ts.py @@ -0,0 +1,473 @@ +#! /usr/bin/env python3 + +from sqlalchemy.dialects.postgresql import ARRAY,TIMESTAMP,FLOAT, JSON +from sqlalchemy.dialects.postgresql.base import BYTEA +from sqlalchemy.dialects.postgresql.ranges import INT4RANGE, INT8RANGE +from sqlalchemy.sql.sqltypes import INTEGER, TEXT, Boolean +from sqlalchemy.orm import declarative_base +from sqlalchemy import Column, Integer, String +from sqlalchemy.sql.expression import table +from typing import List +from itertools import groupby +import numpy + +#Declarative system used to define classes mapped to relational DB tables +Base = declarative_base() + +class Attribute(Base): + """ + Class that represents a Tango Attribute mapped to table 'att_conf' + """ + __tablename__ = 'att_conf' + __table_args__ = {'extend_existing': True} + + att_conf_id = Column(Integer, primary_key=True) + att_name = Column(String) + att_conf_type_id = Column(Integer) + att_conf_format_id = Column(Integer) + table_name = Column(String) + cs_name = Column(String) + domain = Column(String) + family = Column(String) + member = Column(String) + name = Column(String) + ttl = Column(Integer) + + def __repr__(self): + return f"<Attribute(fullname='{self.att_name}',data_type ='{self.att_conf_type_id}',format='{self.att_conf_format_id}',table_name='{self.table_name}',cs_name ='{self.cs_name}',domain ='{self.domain}',family ='{self.family}',member ='{self.member}',name ='{self.name}'),ttl='{self.ttl}'>" + +class DataType(Base): + """ + Class that represents a Tango Data Type mapped to table 'att_conf_data_type' + """ + __tablename__ = 'att_conf_type' + __table_args__ = {'extend_existing': True} + + att_conf_type_id = Column(Integer, primary_key=True) + type = Column(String) + + def __repr__(self): + return f"<DataType(type='{self.type}')>" + +class Format(Base): + """ + Class that represents a Tango Format mapped to table 'att_conf_format' + """ + __tablename__ = 'att_conf_format' + __table_args__ = {'extend_existing': True} + + att_conf_format_id = Column(Integer, primary_key=True) + format = Column(String) + format_num = Column(Integer) + + def __repr__(self): + return f"<Format(format='{self.format}', format_num='{self.format_num}')>" + +class Scalar(Base): + """ + Abstract class that represents Super-class of Scalar mapper classes + """ + # In the concrete inheritance use case, it is common that the base class is not represented + # within the database, only the subclasses. In other words, the base class is abstract. + __abstract__ = True + + # Primary key is not defined for tables which store values, but SQLAlchemy requires a mandatory + # primary key definition. Anyway, this definition is on Python-side and does not compromise + # DBMS architecture + att_conf_id = Column(Integer, primary_key=True) + data_time = Column(TIMESTAMP, primary_key=True) + quality = Column(Integer) + att_error_desc_id = Column(Integer) + details = Column(JSON) + +class Scalar_Boolean(Scalar): + """ + Class that represents a Tango Boolean mapped to table 'att_scalar_devboolean' + """ + __tablename__ = 'att_scalar_devboolean' + __table_args__ = {'extend_existing': True} + value_r = Column(Boolean) + value_w = Column(Boolean) + + def __repr__(self): + return f"<Scalar_Boolean(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_Double(Scalar): + """ + Class that represents a Tango Double mapped to table 'att_scalar_devdouble' + """ + __tablename__ = 'att_scalar_devdouble' + __table_args__ = {'extend_existing': True} + value_r = Column(FLOAT) + value_w = Column(FLOAT) + + def __repr__(self): + return f"<Scalar_Double(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_Encoded(Scalar): + """ + Class that represents a Tango Encoded mapped to table 'att_scalar_devencoded' + """ + __tablename__ = 'att_scalar_devencoded' + __table_args__ = {'extend_existing': True} + value_r = Column(BYTEA) + value_w = Column(BYTEA) + + def __repr__(self): + return f"<Scalar_Encoded(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_Enum(Scalar): + """ + Class that represents a Tango Enum mapped to table 'att_scalar_devenum' + """ + __tablename__ = 'att_scalar_devenum' + __table_args__ = {'extend_existing': True} + value_r_label = Column(TEXT) + value_r = Column(INTEGER) + value_w_label = Column(TEXT) + value_w = Column(INTEGER) + + def __repr__(self): + return f"<Scalar_Enum(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r_label='{self.value_r_label}',value_r='{self.value_r}',value_w_label='{self.value_w_label}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_Float(Scalar): + """ + Class that represents a Tango Float mapped to table 'att_scalar_devfloat' + """ + __tablename__ = 'att_scalar_devfloat' + __table_args__ = {'extend_existing': True} + value_r = Column(FLOAT) + value_w = Column(FLOAT) + + def __repr__(self): + return f"<Scalar_Float(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_Long(Scalar): + """ + Class that represents a Tango Long mapped to table 'att_scalar_devlong' + """ + __tablename__ = 'att_scalar_devlong' + __table_args__ = {'extend_existing': True} + value_r = Column(INT4RANGE) + value_w = Column(INT4RANGE) + + def __repr__(self): + return f"<Scalar_Long(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_Long64(Scalar): + """ + Class that represents a Tango Long64 mapped to table 'att_scalar_devlong64' + """ + __tablename__ = 'att_scalar_devlong64' + __table_args__ = {'extend_existing': True} + value_r = Column(INT8RANGE) + value_w = Column(INT8RANGE) + + def __repr__(self): + return f"<Scalar_Long64(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_Short(Scalar): + """ + Class that represents a Tango Short mapped to table 'att_scalar_devshort' + """ + __tablename__ = 'att_scalar_devshort' + __table_args__ = {'extend_existing': True} + value_r = Column(INTEGER) + value_w = Column(INTEGER) + + def __repr__(self): + return f"<Scalar_Short(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_State(Scalar): + """ + Class that represents a Tango State mapped to table 'att_scalar_devstate' + """ + __tablename__ = 'att_scalar_devstate' + __table_args__ = {'extend_existing': True} + value_r = Column(INTEGER) + value_w = Column(INTEGER) + + def __repr__(self): + return f"<Scalar_State(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_String(Scalar): + """ + Class that represents a Tango String mapped to table 'att_scalar_devstring' + """ + __tablename__ = 'att_scalar_devstring' + __table_args__ = {'extend_existing': True} + value_r = Column(TEXT) + value_w = Column(TEXT) + + def __repr__(self): + return f"<Scalar_String(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_UChar(Scalar): + """ + Class that represents a Tango UChar mapped to table 'att_scalar_devuchar' + """ + __tablename__ = 'att_scalar_devuchar' + __table_args__ = {'extend_existing': True} + value_r = Column(INTEGER) + value_w = Column(INTEGER) + + def __repr__(self): + return f"<Scalar_UChar(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_ULong(Scalar): + """ + Class that represents a Tango ULong mapped to table 'att_scalar_devulong' + """ + __tablename__ = 'att_scalar_devulong' + __table_args__ = {'extend_existing': True} + value_r = Column(INTEGER) + value_w = Column(INTEGER) + + def __repr__(self): + return f"<Scalar_ULong(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_ULong64(Scalar): + """ + Class that represents a Tango ULong64 mapped to table 'att_scalar_devulong64' + """ + __tablename__ = 'att_scalar_devulong64' + __table_args__ = {'extend_existing': True} + value_r = Column(INTEGER) + value_w = Column(INTEGER) + + def __repr__(self): + return f"<Scalar_ULong64(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Scalar_UShort(Scalar): + """ + Class that represents a Tango UShort mapped to table 'att_scalar_devushort' + """ + __tablename__ = 'att_scalar_devushort' + __table_args__ = {'extend_existing': True} + value_r = Column(INTEGER) + value_w = Column(INTEGER) + + def __repr__(self): + return f"<Scalar_UShort(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array(Base): + """ + Abstract class that represents Super-class of Array mapper classes + """ + __abstract__ = True + # Primary key is not defined for tables which store values, but SQLAlchemy requires a mandatory + # primary key definition. Anyway, this definition is on Python-side and does not compromise + # DBMS architecture + att_conf_id = Column(Integer, primary_key=True) + data_time = Column(TIMESTAMP, primary_key=True) + quality = Column(Integer) + att_error_desc_id = Column(Integer) + details = Column(JSON) + +class Array_Boolean(Array): + """ + Class that represents a Tango Boolean Array mapped to table 'att_array_devboolean' + """ + __tablename__ = 'att_array_devboolean' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(Boolean)) + value_w = Column(ARRAY(Boolean)) + + def __repr__(self): + return f"<Array_Boolean(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_Double(Array): + """ + Class that represents a Tango Double Array mapped to table 'att_array_devdouble' + """ + __tablename__ = 'att_array_devdouble' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(FLOAT)) + value_w = Column(ARRAY(FLOAT)) + + def __repr__(self): + return f"<Array_Double(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_Encoded(Array): + """ + Class that represents a Tango Encoded Array mapped to table 'att_array_devencoded' + """ + __tablename__ = 'att_array_devencoded' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(BYTEA)) + value_w = Column(ARRAY(BYTEA)) + + def __repr__(self): + return f"<Array_Encoded(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_Enum(Array): + """ + Class that represents a Tango Enum Array mapped to table 'att_array_devenum' + """ + __tablename__ = 'att_array_devenum' + __table_args__ = {'extend_existing': True} + value_r_label = Column(ARRAY(TEXT)) + value_r = Column(ARRAY(INTEGER)) + value_w_label = Column(ARRAY(TEXT)) + value_w = Column(ARRAY(INTEGER)) + + def __repr__(self): + return f"<Array_Enum(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r_label='{self.value_r_label}',value_r='{self.value_r}',value_w_label='{self.value_w_label}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_Float(Array): + """ + Class that represents a Tango Float Array mapped to table 'att_array_devfloat' + """ + __tablename__ = 'att_array_devfloat' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(FLOAT)) + value_w = Column(ARRAY(FLOAT)) + + def __repr__(self): + return f"<Array_Float(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_Long(Array): + """ + Class that represents a Tango Long Array mapped to table 'att_array_devlong' + """ + __tablename__ = 'att_array_devlong' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(INT4RANGE)) + value_w = Column(ARRAY(INT4RANGE)) + + def __repr__(self): + return f"<Array_Long(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_Long64(Array): + """ + Class that represents a Tango Long64 Array mapped to table 'att_array_devlong64' + """ + __tablename__ = 'att_array_devlong64' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(INT8RANGE)) + value_w = Column(ARRAY(INT8RANGE)) + + def __repr__(self): + return f"<Array_Long64(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_Short(Array): + """ + Class that represents a Tango Short Array mapped to table 'att_array_devshort' + """ + __tablename__ = 'att_array_devshort' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(INTEGER)) + value_w = Column(ARRAY(INTEGER)) + + def __repr__(self): + return f"<Array_Short(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_State(Array): + """ + Class that represents a Tango State Array mapped to table 'att_array_devstate' + """ + __tablename__ = 'att_array_devstate' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(INT4RANGE)) + value_w = Column(ARRAY(INT4RANGE)) + + def __repr__(self): + return f"<Array_State(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_String(Array): + """ + Class that represents a Tango String Array mapped to table 'att_array_devstring' + """ + __tablename__ = 'att_array_devstring' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(TEXT)) + value_w = Column(ARRAY(TEXT)) + + def __repr__(self): + return f"<Array_String(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_UChar(Array): + """ + Class that represents a Tango UChar Array mapped to table 'att_array_devuchar' + """ + __tablename__ = 'att_array_devuchar' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(INTEGER)) + value_w = Column(ARRAY(INTEGER)) + + def __repr__(self): + return f"<Array_UChar(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_ULong(Array): + """ + Class that represents a Tango ULong Array mapped to table 'att_array_devulong' + """ + __tablename__ = 'att_array_devulong' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(INTEGER)) + value_w = Column(ARRAY(INTEGER)) + + def __repr__(self): + return f"<Array_ULong(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_ULong64(Array): + """ + Class that represents a Tango ULong64 Array mapped to table 'att_array_devulong64' + """ + __tablename__ = 'att_array_devulong64' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(INTEGER)) + value_w = Column(ARRAY(INTEGER)) + + def __repr__(self): + return f"<Array_ULong64(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +class Array_UShort(Array): + """ + Class that represents a Tango UShort Array mapped to table 'att_array_devushort' + """ + __tablename__ = 'att_array_devushort' + __table_args__ = {'extend_existing': True} + value_r = Column(ARRAY(INTEGER)) + value_w = Column(ARRAY(INTEGER)) + + def __repr__(self): + return f"<Array_UShort(att_conf_id='{self.att_conf_id}',data_time='{self.data_time}',value_r='{self.value_r}',value_w='{self.value_w}',quality='{self.quality}',att_error_desc_id='{self.att_error_desc_id}',details='{self.details}')>" + +def get_class_by_tablename(tablename: str): + """ + Returns class reference mapped to a table. + """ + for mapper in Base.registry.mappers: + c = mapper.class_ + classname = c.__name__ + if not classname.startswith('_'): + if hasattr(c, '__tablename__') and c.__tablename__ == tablename: + return c + return None + +def build_array_from_record(rows: List[Array], dim_x: int): + """ + Converts Array database items in Python lists + """ + matrix = numpy.array([]) + for i in range(0,dim_x): + x = numpy.array([item for item in rows if item.idx==i]) #group records by array index + if i==0: + matrix = numpy.append(matrix,x) #append first row + else: + matrix = numpy.vstack([matrix,x]) #stack vertically + result = numpy.transpose(matrix) #transpose -> each row is a distinct array of value + list_result = result.tolist() + return list_result + +def get_values_from_record(data_matrix: List[Array]): + """ + Returns a matrix of values from a matrix of Array records + """ + array_matrix = numpy.matrix(data_matrix) + value_matrix = numpy.empty(array_matrix.shape) + for index in range(array_matrix.size): # for each object element + value_matrix.itemset(index,array_matrix.item(index).value_r) # extract the value from object and put in the matrix + return value_matrix + diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver_config/__init__.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver_config/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tangostationcontrol/tangostationcontrol/toolkit/retriever.py b/tangostationcontrol/tangostationcontrol/toolkit/retriever.py new file mode 100644 index 0000000000000000000000000000000000000000..b84802cde128ca2f256f5f18ffb78dec3b9ea29f --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/toolkit/retriever.py @@ -0,0 +1,314 @@ +#! /usr/bin/env python3 + +from tango import DeviceProxy, AttributeProxy +from tangostationcontrol.toolkit.archiver import split_tango_name + +from abc import ABC, abstractmethod +from datetime import datetime, timedelta +from sqlalchemy import create_engine, and_ +from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm.exc import NoResultFound +import importlib +import numpy + +class Retriever(ABC): + """ + The Retriever abstract class implements retrieve operations on a given DBMS + """ + + def get_db_credentials(self): + """ + Retrieves the DB credentials from the Tango properties of Configuration Manager + """ + cm = DeviceProxy(self.cm_name) + config_list = list(cm.get_property('LibConfiguration')['LibConfiguration']) # dictionary {'LibConfiguration': list of strings} + if 'connect_string=' in config_list[0]: config_list.pop(0) # possibly remove connect string because it causes errors + host = str([s for s in config_list if "host" in s][0].split('=')[1]) + dbname = str([s for s in config_list if "dbname" in s][0].split('=')[1]) + port = str([s for s in config_list if "port" in s][0].split('=')[1]) + user = str([s for s in config_list if "user" in s][0].split('=')[1]) + pw = str([s for s in config_list if "password" in s][0].split('=')[1]) + return host,dbname,port,user,pw + + def create_session(self,libname:str,user:str,pw:str,host:str,port:str,dbname:str): + """ + Returns a session to a DBMS using default credentials. + """ + connection_string = f"{libname}://{user}:{pw}@{host}:{port}/{dbname}" + engine = create_engine(connection_string) + Session = sessionmaker(bind=engine) + return Session + + @abstractmethod + def set_archiver_base(self): + return + + @abstractmethod + def connect_to_archiving_db(self): + return + + def get_all_archived_attributes(self): + """ + Returns a list of the archived attributes in the DB. + """ + attrs = self.session.query(self.ab.Attribute).order_by(self.ab.Attribute.att_conf_id).all() + # Returns the representation as set in __repr__ method of the mapper class + return attrs + + def get_archived_attributes_by_device(self,device_fqname: str): + """ + Takes as input the fully-qualified name of a device and returns a list of its archived attributes + """ + domain, family, member = split_tango_name(device_fqname,"device") + attrs = self.session.query(self.ab.Attribute).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ + self.ab.Attribute.member == member)).all() + # Returns the representation as set in __repr__ method of the mapper class + return attrs + + def get_attribute_id(self,attribute_fqname: str): + """ + Takes as input the fully-qualified name of an attribute and returns its id. + """ + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + result = self.session.query(self.ab.Attribute.att_conf_id).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ + self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception(f"Attribute {attribute_fqname} not found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + @abstractmethod + def get_attribute_datatype(self,attribute_fqname: str): + return + + def get_attribute_value_by_hours(self, attribute_fqname: str, hours: float, tablename:str): + """ + Takes as input the attribute fully-qualified name and the number of past hours since the actual time + (e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half). + Returns a list of timestamps and a list of values + """ + attr_id = self.get_attribute_id(attribute_fqname) + # Retrieves the class that maps the DB table given the tablename + base_class = self.ab.get_class_by_tablename(tablename) + # Retrieves the timestamp + time_now = datetime.now() + time_delta = time_now - timedelta(hours=hours) + # Converts the timestamps in the right format for the query + time_now_db = str(time_now.strftime("%Y-%m-%d %X")) + time_delta_db = str(time_delta.strftime("%Y-%m-%d %X")) + try: + result = self.session.query(base_class).\ + join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\ + filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= time_delta_db, \ + base_class.data_time <= time_now_db)).order_by(base_class.data_time).all() + except AttributeError as e: + raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e + return result + + def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime, tablename:str): + """ + Takes as input the attribute name and a certain starting and ending point-time. + The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss"). + Returns a list of timestamps and a list of values + """ + attr_id = self.get_attribute_id(attribute_fqname) + # Retrieves the class that maps the DB table given the tablename + base_class = self.ab.get_class_by_tablename(tablename) + try: + result = self.session.query(base_class).\ + join(self.ab.Attribute,self.ab.Attribute.att_conf_id==base_class.att_conf_id).\ + filter(and_(self.ab.Attribute.att_conf_id == attr_id,base_class.data_time >= str(start_time), \ + base_class.data_time <= str(stop_time))).order_by(base_class.data_time).all() + except AttributeError as e: + raise Exception(f"Empty result: Attribute {attribute_fqname} not found") from e + return result + +class RetrieverMySQL(Retriever): + + def __init__(self, cm_name: str = 'archiving/hdbpp/confmanager01'): + self.cm_name = cm_name + self.session = self.connect_to_archiving_db() + self.ab = self.set_archiver_base() + + def connect_to_archiving_db(self): + """ + Returns a session to a MySQL DBMS using default credentials. + """ + host,dbname,port,user,pw = super().get_db_credentials() + # Set sqlalchemy library connection + if host=='archiver-maria-db': + libname = 'mysql+pymysql' + else: + raise ValueError(f"Invalid hostname: {host}") + Session = super().create_session(libname,user,pw,host,port,dbname) + return Session() + + def set_archiver_base(self): + """ + Sets the right mapper class following the DBMS connection + """ + return importlib.import_module('.archiver_base_mysql', package=__package__) + + def get_attribute_datatype(self,attribute_fqname: str): + """ + Takes as input the fully-qualified name of an attribute and returns its Data-Type. + Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used + as DB table name suffix in which values are stored. + """ + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + result = self.session.query(self.ab.DataType.data_type).join(self.ab.Attribute,self.ab.Attribute.att_conf_data_type_id==self.ab.DataType.att_conf_data_type_id).\ + filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception(f"Attribute not {attribute_fqname} found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + def get_attribute_value_by_hours(self,attribute_fqname: str, hours: float = 1.0): + """ + Takes as input the attribute fully-qualified name and the number of past hours since the actual time + (e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half). + Returns a list of timestamps and a list of values + """ + attr_datatype = self.get_attribute_datatype(attribute_fqname) + # Retrieves the class that maps the DB table given the tablename + tablename = f"att_{attr_datatype}" + return super().get_attribute_value_by_hours(attribute_fqname,hours,tablename) + + + def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime): + """ + Takes as input the attribute name and a certain starting and ending point-time. + The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss"). + Returns a list of timestamps and a list of values + """ + attr_datatype = self.get_attribute_datatype(attribute_fqname) + # Retrieves the class that maps the DB table given the tablename + tablename = f"att_{attr_datatype}" + return super().get_attribute_value_by_interval(attribute_fqname,start_time,stop_time,tablename) + + # DRAFT # + def get_masked_fpga_temp(self,start_time: datetime, stop_time: datetime,temp_attr_name:str='stat/sdp/1/fpga_temp_r', + mask_attr_name:str='stat/sdp/1/tr_fpga_mask_r'): + """ + Returns a list of SDP/fpga_temp_r values, but only if SDP/tr_fpga_mask_r values are TRUE + """ + mask_values = self.get_attribute_value_by_interval(mask_attr_name,start_time,stop_time) + temp_values = self.get_attribute_value_by_interval(temp_attr_name,start_time,stop_time) + # Since timestamps can be not syncrhonized, remove first or last element from arrays + if len(mask_values)==len(temp_values): + first_mask_datatime = mask_values[0].data_time + first_temp_datatime = temp_values[0].data_time + if (first_mask_datatime>first_temp_datatime): + mask_values = mask_values[:-int(mask_values[0].dim_x_r)] + temp_values = temp_values[int(temp_values[0].dim_x_r):] + elif (first_mask_datatime<first_temp_datatime): + mask_values = mask_values[int(mask_values[0].dim_x_r)] + temp_values = temp_values[:-int(temp_values[0].dim_x_r):] + else: + raise Exception + # Convert DB Array records into Python lists + mask_data = self.ab.build_array_from_record(mask_values,mask_values[0].dim_x_r) + temp_data = self.ab.build_array_from_record(temp_values,temp_values[0].dim_x_r) + # Extract only the value from the array + mask_array_values = self.ab.get_values_from_record(mask_data) + temp_array_values = self.ab.get_values_from_record(temp_data) + # Multiply the matrix + #masked_values = np.multiply(temp_array_values,mask_array_values) + masked_values = numpy.ma.masked_array(temp_array_values,mask=numpy.invert(mask_array_values.astype(bool))) + return masked_values, mask_values, temp_values + +class RetrieverTimescale(Retriever): + + def __init__(self, cm_name: str = 'archiving/hdbppts/confmanager01'): + self.cm_name = cm_name + self.session = self.connect_to_archiving_db() + self.ab = self.set_archiver_base() + + def connect_to_archiving_db(self): + """ + Returns a session to a MySQL DBMS using default credentials. + """ + host,dbname,port,user,pw = super().get_db_credentials() + # Set sqlalchemy library connection + if host=='archiver-timescale': + libname = 'postgresql+psycopg2' + else: + raise ValueError(f"Invalid hostname: {host}") + Session = super().create_session(libname,user,pw,host,port,dbname) + return Session() + + def set_archiver_base(self): + """ + Sets the right mapper class following the DBMS connection + """ + return importlib.import_module('.archiver_base_ts', package=__package__) + + def get_attribute_datatype(self,attribute_fqname: str): + """ + Takes as input the fully-qualified name of an attribute and returns its Data-Type. + Data Type name indicates the type (e.g. string, int, ...) and the read/write property. The name is used + as DB table name suffix in which values are stored. + """ + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + result = self.session.query(self.ab.DataType.type).join(self.ab.Attribute,self.ab.Attribute.att_conf_type_id==self.ab.DataType.att_conf_type_id).\ + filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception(f"Attribute not {attribute_fqname} found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + def get_attribute_format(self,attribute_fqname: str): + """ + Takes as input the fully-qualified name of an attribute and returns its format. + Formats are basically three: Scalar, Spectrum and Image. + * Works only for POSTGRESQL * + """ + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + result = self.session.query(self.ab.Format.format).join(self.ab.Attribute,self.ab.Attribute.att_conf_format_id==self.ab.Format.att_conf_format_id).\ + filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception("Attribute not found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + def get_attribute_tablename(self,attribute_fqname: str): + """ + Takes as input the fully-qualified name of an attribute and returns the tablename where it is stored. + * Works only for POSTGRESQL * + """ + domain, family, member, name = split_tango_name(attribute_fqname,"attribute") + try: + result = self.session.query(self.ab.Attribute.table_name).filter(and_(self.ab.Attribute.domain == domain, self.ab.Attribute.family == family, \ + self.ab.Attribute.member == member, self.ab.Attribute.name == name)).one() + return result[0] + except TypeError as e: + raise Exception("Attribute not found!") from e + except NoResultFound as e: + raise Exception(f"No records of attribute {attribute_fqname} found in DB") from e + + def get_attribute_value_by_hours(self, attribute_fqname: str, hours: float = 1.0): + """ + Takes as input the attribute fully-qualified name and the number of past hours since the actual time + (e.g. hours=1 retrieves values in the last hour, hours=8.5 retrieves values in the last eight hours and half). + Returns a list of timestamps and a list of values + """ + tablename = self.get_attribute_tablename(attribute_fqname) + return super().get_attribute_value_by_hours(attribute_fqname,hours,tablename) + + def get_attribute_value_by_interval(self,attribute_fqname: str, start_time: datetime, stop_time: datetime): + """ + Takes as input the attribute name and a certain starting and ending point-time. + The datetime format is pretty flexible (e.g. "YYYY-MM-dd hh:mm:ss"). + Returns a list of timestamps and a list of values + """ + tablename = self.get_attribute_tablename(attribute_fqname) + return super().get_attribute_value_by_interval(attribute_fqname,start_time,stop_time,tablename) + \ No newline at end of file