Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tango-prometheus-client.py 10.73 KiB
import time
import argparse
from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily
from prometheus_client import start_http_server
from tango import Database, DeviceProxy, CmdArgType as ArgType, AttrDataFormat, DevState, DevFailed
import logging
import json
import fnmatch
from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter

logger = logging.getLogger()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)

# log to memory only, with a limit of 600 seconds. this container is not important enough to keep
# all logs around for, and as a scraper must be robust against overloading on all sides
handler = AsynchronousLogstashHandler("elk", 5959, database_path=None, event_ttl=600)
handler.setLevel(logging.INFO)
logger.addHandler(handler)

""" Functions to parse and apply policy files. """

class ArchiverPolicy(object):
    EMPTY_POLICY = {
      # default policy
      "default": {
      },
      # device-specific policies
      "devices": {
      }
    }

    @staticmethod
    def load_config(resource: str) -> dict:
        with open(resource) as fd:
            return json.load(fd)

    def __init__(self, config: dict = None):
        self.config = config or self.EMPTY_POLICY

    def device_list(self) -> list:
        """ Retrieve the device list from TangoDB """
        device_list = []
        db = Database()
        server_list = db.get_server_list()  # e.g. SDP/STAT, RECV/STAT
        for server in server_list:
            # https://pytango.readthedocs.io/en/stable/database.html#tango.Database.get_device_class_list
            class_list = db.get_device_class_list(server)
            for cls in class_list[::2]:
                if "dserver" in cls:
                    continue
                device_list.append(cls.lower())
        return device_list

    def devices(self) -> list:
        """ Filter the device list from TangoDB following the lofar2-policy file """
        # Devices list from TangoDB
        db_devices = self.device_list()
        # Devices listed in policy file
        config_devices = list(k.lower() for k in self.config["devices"].keys())
        # Match device names fetched from DB against device names in policy file
        devices = []
        for config_dev in config_devices:
            for db_dev in db_devices:
                if fnmatch.fnmatch(db_dev, config_dev):
                    devices.append(db_dev)
        return devices

    def attribute_list(self, device_name: str, attribute_list: list) -> dict:
        """ Return the full set of archiving policy for the given device. """

        if device_name not in self.devices():
            return {}

        attributes = set()

        for config_set in [self.config["default"], self.config["devices"][device_name]]:
            # include all specified attributes by pattern,
            for include in config_set.get("include", []):
                for attr in attribute_list:
                    if fnmatch.fnmatch(attr, include):
                        attributes.add(attr)

            # then, remove any explicitly excluded attributes
            for exclude in config_set.get("exclude", []):
                for attr in attribute_list:
                    if fnmatch.fnmatch(attr, exclude) and attr in attributes:
                        attributes.remove(attr)

        return sorted(list(attributes))

class CustomCollector(object):
    def __init__(self, config, station, proxy_timeout=250):
        self.station = station
        self.policy = ArchiverPolicy(config)
        self.proxy_timeout = proxy_timeout

    def _to_metric(self, dev, attr_info, x, y, idx, value):
        """ Convert the given values to a (labels, value) pair, used to construct a Metric. """

        if attr_info.data_type in [ArgType.DevShort, ArgType.DevLong, ArgType.DevUShort, ArgType.DevULong, ArgType.DevLong64, ArgType.DevULong64, ArgType.DevInt, ArgType.DevFloat, ArgType.DevDouble]:
            data_type = 'float'
            str_value = ''
            float_value = float(value)
        elif attr_info.data_type == ArgType.DevBoolean:
            data_type = 'bool'
            str_value = ''
            float_value = int(value)
        elif attr_info.data_type == ArgType.DevString:
            data_type = 'string'
            str_value = str(value)
            float_value = len(str_value)
        elif attr_info.data_type == ArgType.DevEnum:
            attr_config = dev.get_attribute_config(attr_info.name)
            data_type = 'enum'
            str_value = str(attr_config.enum_labels[value])
            float_value = int(value)
        elif attr_info.data_type == ArgType.DevState:
            data_type = 'state'
            str_value = ''
            float_value = int(value)
        else:
            return None

        # (labels, value)
        return ([self.station, dev.dev_name(), attr_info.name, str_value, data_type, f"{x:02}", f"{y:02}", f"{idx:03}"], float_value)

    def metrics_scalar(self, dev, attr_info, attr_value):
        """ Return all metrics for a given SCALAR attribute. """

        new_metric = self._to_metric(dev, attr_info, 0, 0, 0, attr_value.value)
        return [new_metric] if new_metric else []

    def metrics_spectrum(self, dev, attr_info, attr_value):
        """ Return all metrics for a given SPECTRUM attribute. """

        metrics = []
        for x in range(int(attr_value.dim_x)):
            new_metric = self._to_metric(dev, attr_info, x, 0, x, attr_value.value[x])
            metrics.append(new_metric) if new_metric else None

        return metrics

    def metrics_image(self, dev, attr_info, attr_value):
        """ Return all metrics for a given IMAGE attribute. """

        metrics = []
        for y in range(int(attr_value.dim_y)): 
            for x in range(int(attr_value.dim_x)):
                """ NOTE: We switch x and y in the annotation, to allow queries to combine 1D and 2D arrays in their first dimension using the same label (x). We effectively expose
                          the array as [x][y] instead of [y][x]. """

                new_metric = self._to_metric(dev, attr_info, y, x, y * attr_value.dim_x + x, attr_value.value[y][x])
                metrics.append(new_metric) if new_metric else None

        return metrics

    def metrics(self, dev, attr_info, attr_value):
        """ Return all metrics for a given attribute. """

        if attr_info.data_format == AttrDataFormat.SCALAR:
            return self.metrics_scalar(dev, attr_info, attr_value)
        elif attr_info.data_format == AttrDataFormat.SPECTRUM:
            return self.metrics_spectrum(dev, attr_info, attr_value)
        elif attr_info.data_format == AttrDataFormat.IMAGE:
            return self.metrics_image(dev, attr_info, attr_value)
        else:
            return []

    def device_metrics(self, device_name):
        """ Return all metrics for a given device, as configured. """

        dev = DeviceProxy(device_name)
        dev.set_timeout_millis(self.proxy_timeout)

        # obtain extended info about all attributes
        attr_infos = {attr_info.name: attr_info for attr_info in dev.attribute_list_query()}

        if dev.state() not in [DevState.STANDBY, DevState.ON, DevState.ALARM, DevState.DISABLE]:
            logger.error(f"Error processing device {device_name}: it is in state {dev.state()}")

            # at least log state & status
            attrs_to_scrape = ["State", "Status"]
        else:
            # obtain list of attributes to scrape
            attrs_to_scrape = self.policy.attribute_list(device_name, attr_infos.keys())

        logger.info(f"Processing device {device_name} attributes {attrs_to_scrape}")

        # scrape each attribute
        metrics = []
        for attr_name in attrs_to_scrape:
            try:
                attr_value = dev.read_attribute(attr_name)

                metrics.extend(self.metrics(dev, attr_infos[attr_name], attr_value))
            except DevFailed as e:
                reason = e.args[0].desc.replace("\n", " ")
                logger.error(f"Error processing device {device_name} attribute {attr_name}: {reason}")
            except Exception as e: 
                logger.exception(f"Error processing device {device_name} attribute {attr_name}")

        return metrics

    def collect(self):
        """ Yield all scraped metrics from all devices, as configured. """

        logger.info("Start scraping")
        scrape_begin = time.time()

        attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['station', 'device', 'name', 'str_value', 'type', 'x', 'y', 'idx'])
        scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['station', 'device'])
        
        for device_name in self.policy.devices():
            logger.debug(f"Processing device {device_name}")
            dev_scrape_begin = time.time()

            try:
                metrics = self.device_metrics(device_name)
                for metric in metrics:
                    attribute_metrics.add_metric(*metric)
            except DevFailed as e: 
                reason = e.args[0].desc.replace("\n", " ")
                logger.error(f"Error processing device {device_name}: {reason}")
            except Exception as e:
                logger.exception(f"Error processing device {device_name}")
            finally:
                dev_scrape_end = time.time()

            logger.info(f"Done processing device {device_name}. Took {dev_scrape_end - dev_scrape_begin} seconds.")

            scraping_metrics.add_metric([self.station, device_name], dev_scrape_end - dev_scrape_begin)

        scrape_end = time.time()
        logger.info(f"Done scraping. Took {scrape_end - scrape_begin} seconds.")

        scraping_metrics.add_metric([self.station, "total"], scrape_end - scrape_begin)
        
        yield attribute_metrics
        yield scraping_metrics

if __name__ == '__main__':
    import sys

    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--config', type=str, required=True, help='configuration file')
    parser.add_argument('-t', '--timeout', type=int, required=False, default=250, help='device proxy timeout (ms)')
    parser.add_argument('-p', '--port', type=int, required=False, default=8000, help='HTTP server port to open')
    args = parser.parse_args()

    config = ArchiverPolicy.load_config(args.config)

    db = Database()
    try:
        station = db.get_property("station","name")["name"][0]
    except Exception as e:
        logger.exception("Could not determine station name")
        sys.exit(1)

    collector = CustomCollector(config, station=station, proxy_timeout=args.timeout)

    logger.info("Starting server")
    start_http_server(args.port)

    logger.info("Registering collector")
    REGISTRY.register(collector)

    logger.info("Idling")
    while True:
        time.sleep(1)