Skip to content
Snippets Groups Projects
Select Git revision
  • bb33e9bb045c0461104e36e1c180d0f5dc9ed1dc
  • main default protected
  • test-10rc4
  • test-pytango-10rc2
  • bassa-main-patch-68330
  • fix-hfd5-close-error
  • offer-ports-constant
  • fix-timestamps-refactor-matrix-construction
  • bugfixes
  • fix-bst-dimensions
  • fix-bsts
  • L2SS-1465-add-bst-metadata
  • L2SS-1464-add-antennafield-to-filename
  • L2SS-1451-simplify-cmdline
  • L2SS-1425-dont-rewrite-whole-dict
  • testing-specify-file
  • also-record-tilebeam-tracking-enabled
  • dont-modulo-16
  • remove-parse-device-attributes
  • support-higher-gn-indices
  • L2SS-1080_get_statistics_writer_package_version
  • v0.23
  • v0.22.3 protected
  • v0.22.dev4 protected
  • v0.22.dev3 protected
  • v0.22.2 protected
  • v0.22.1 protected
  • v0.22.dev2 protected
  • v0.22
  • v0.22.dev1 protected
  • v0.22.dev0 protected
  • v0.21
  • v0.21.dev6 protected
  • v0.21.dev5 protected
  • v0.21.dev4 protected
  • v0.21.dev3 protected
  • v0.21.dev2 protected
  • v0.21.dev1 protected
  • v0.21.dev0 protected
  • v0.20
  • v0.20.dev1 protected
41 results

_utils.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    tango-prometheus-client.py 9.61 KiB
    import time
    import argparse
    from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily
    from prometheus_client import start_http_server
    from tango import Database, DeviceProxy, CmdArgType as ArgType, AttrDataFormat, DevState, DevFailed
    import logging
    import json
    import fnmatch
    from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter
    
    logger = logging.getLogger()
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
    
    # log to memory only, with a limit of 600 seconds. this container is not important enough to keep
    # all logs around for, and as a scraper must be robust against overloading on all sides
    handler = AsynchronousLogstashHandler("elk", 5959, database_path=None, event_ttl=600)
    handler.setLevel(logging.INFO)
    logger.addHandler(handler)
    
    """ Functions to parse and apply policy files. """
    
    class ArchiverPolicy(object):
        EMPTY_POLICY = {
          # default policy
          "default": {
          },
          # device-specific policies
          "devices": {
          }
        }
    
        @staticmethod
        def load_config(resource: str) -> dict:
            with open(resource) as fd:
                return json.load(fd)
    
        def __init__(self, config: dict = None):
            self.config = config or self.EMPTY_POLICY
    
        def devices(self) -> list:
            return list(self.config["devices"].keys())
    
        def attribute_list(self, device_name: str, attribute_list: list) -> dict:
            """ Return the full set of archiving policy for the given device. """
    
            if device_name not in self.devices():
                return {}
    
            attributes = set()
    
            for config_set in [self.config["default"], self.config["devices"][device_name]]:
                # include all specified attributes by pattern,
                for include in config_set.get("include", []):
                    for attr in attribute_list:
                        if fnmatch.fnmatch(attr, include):
                            attributes.add(attr)
    
                # then, remove any explicitly excluded attributes
                for exclude in config_set.get("exclude", []):
                    for attr in attribute_list:
                        if fnmatch.fnmatch(attr, exclude) and attr in attributes:
                            attributes.remove(attr)
    
            return sorted(list(attributes))
    
    class CustomCollector(object):
        def __init__(self, station, config, proxy_timeout=250):
            self.station = station
            self.policy = ArchiverPolicy(config)
            self.proxy_timeout = proxy_timeout
    
        @staticmethod
        def _to_metric(dev, attr_info, x, y, idx, value):
            """ Convert the given values to a (labels, value) pair, used to construct a Metric. """
    
            if attr_info.data_type in [ArgType.DevShort, ArgType.DevLong, ArgType.DevUShort, ArgType.DevULong, ArgType.DevLong64, ArgType.DevULong64, ArgType.DevInt, ArgType.DevFloat, ArgType.DevDouble]:
                data_type = 'float'
                str_value = ''
                float_value = float(value)
            elif attr_info.data_type == ArgType.DevBoolean:
                data_type = 'bool'
                str_value = ''
                float_value = int(value)
            elif attr_info.data_type == ArgType.DevString:
                data_type = 'string'
                str_value = str(value)
                float_value = len(str_value)
            elif attr_info.data_type == ArgType.DevEnum:
                attr_config = dev.get_attribute_config(attr_info.name)
                data_type = 'enum'
                str_value = str(attr_config.enum_labels[value])
                float_value = int(value)
            elif attr_info.data_type == ArgType.DevState:
                data_type = 'state'
                str_value = ''
                float_value = int(value)
            else:
                return None
    
            # (labels, value)
            return ([self.station, dev.dev_name(), attr_info.name, str_value, data_type, f"{x:02}", f"{y:02}", f"{idx:03}"], float_value)
    
        def metrics_scalar(self, dev, attr_info, attr_value):
            """ Return all metrics for a given SCALAR attribute. """
    
            new_metric = self._to_metric(dev, attr_info, 0, 0, 0, attr_value.value)
            return [new_metric] if new_metric else []
    
        def metrics_spectrum(self, dev, attr_info, attr_value):
            """ Return all metrics for a given SPECTRUM attribute. """
    
            metrics = []
            for x in range(int(attr_value.dim_x)):
                new_metric = self._to_metric(dev, attr_info, x, 0, x, attr_value.value[x])
                metrics.append(new_metric) if new_metric else None
    
            return metrics
    
        def metrics_image(self, dev, attr_info, attr_value):
            """ Return all metrics for a given IMAGE attribute. """
    
            metrics = []
            for y in range(int(attr_value.dim_y)): 
                for x in range(int(attr_value.dim_x)):
                    """ NOTE: We switch x and y in the annotation, to allow queries to combine 1D and 2D arrays in their first dimension using the same label (x). We effectively expose
                              the array as [x][y] instead of [y][x]. """
    
                    new_metric = self._to_metric(dev, attr_info, y, x, y * attr_value.dim_x + x, attr_value.value[y][x])
                    metrics.append(new_metric) if new_metric else None
    
            return metrics
    
        def metrics(self, dev, attr_info, attr_value):
            """ Return all metrics for a given attribute. """
    
            if attr_info.data_format == AttrDataFormat.SCALAR:
                return self.metrics_scalar(dev, attr_info, attr_value)
            elif attr_info.data_format == AttrDataFormat.SPECTRUM:
                return self.metrics_spectrum(dev, attr_info, attr_value)
            elif attr_info.data_format == AttrDataFormat.IMAGE:
                return self.metrics_image(dev, attr_info, attr_value)
            else:
                return []
    
        def device_metrics(self, device_name):
            """ Return all metrics for a given device, as configured. """
    
            dev = DeviceProxy(device_name)
            dev.set_timeout_millis(self.proxy_timeout)
    
            # obtain extended info about all attributes
            attr_infos = {attr_info.name: attr_info for attr_info in dev.attribute_list_query()}
    
            if dev.state() not in [DevState.STANDBY, DevState.ON, DevState.ALARM]:
                logger.error(f"Error processing device {device_name}: it is in state {dev.state()}")
    
                # at least log state & status
                attrs_to_scrape = ["State", "Status"]
            else:
                # obtain list of attributes to scrape
                attrs_to_scrape = self.policy.attribute_list(device_name, attr_infos.keys())
    
            logger.info(f"Processing device {device_name} attributes {attrs_to_scrape}")
    
            # scrape each attribute
            metrics = []
            for attr_name in attrs_to_scrape:
                try:
                    attr_value = dev.read_attribute(attr_name)
    
                    metrics.extend(self.metrics(dev, attr_infos[attr_name], attr_value))
                except DevFailed as e:
                    reason = e.args[0].desc.replace("\n", " ")
                    logger.error(f"Error processing device {device_name} attribute {attr_name}: {reason}")
                except Exception as e: 
                    logger.exception(f"Error processing device {device_name} attribute {attr_name}")
    
            return metrics
    
        def collect(self):
            """ Yield all scraped metrics from all devices, as configured. """
    
            logger.info("Start scraping")
            scrape_begin = time.time()
    
            attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['station', 'device', 'name', 'str_value', 'type', 'x', 'y', 'idx'])
            scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['station', 'device'])
    
            for device_name in self.policy.devices():
                logger.debug(f"Processing device {device_name}")
                dev_scrape_begin = time.time()
    
                try:
                    metrics = self.device_metrics(device_name)
                    for metric in metrics:
                        attribute_metrics.add_metric(*metric)
                except DevFailed as e: 
                    reason = e.args[0].desc.replace("\n", " ")
                    logger.error(f"Error processing device {device_name}: {reason}")
                except Exception as e:
                    logger.exception(f"Error processing device {device_name}")
                finally:
                    dev_scrape_end = time.time()
    
                logger.info(f"Done processing device {device_name}. Took {dev_scrape_end - dev_scrape_begin} seconds.")
    
                scraping_metrics.add_metric([self.station, device_name], dev_scrape_end - dev_scrape_begin)
    
            scrape_end = time.time()
            logger.info(f"Done scraping. Took {scrape_end - scrape_begin} seconds.")
    
            scraping_metrics.add_metric(["total"], scrape_end - scrape_begin)
            
            yield attribute_metrics
            yield scraping_metrics
    
    if __name__ == '__main__':
        import sys
    
        parser = argparse.ArgumentParser()
        parser.add_argument('-c', '--config', type=str, required=True, help='configuration file')
        parser.add_argument('-t', '--timeout', type=int, required=False, default=250, help='device proxy timeout (ms)')
        parser.add_argument('-p', '--port', type=int, required=False, default=8000, help='HTTP server port to open')
        args = parser.parse_args()
    
        config = ArchiverPolicy.load_config(args.config)
    
        db = Database()
        try:
            station = db.get_property("station","name")["name"][0]
        except Exception as e:
            logger.exception("Could not determine station name")
            sys.exit(1)
    
        collector = CustomCollector(config, station=station, proxy_timeout=args.timeout)
    
        logger.info("Starting server")
        start_http_server(args.port)
    
        logger.info("Registering collector")
        REGISTRY.register(collector)
    
        logger.info("Idling")
        while True:
            time.sleep(1)