From 2d9326b1528ff4a00d6c09a9e362076e2ddfaa16 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Wed, 13 Apr 2022 07:00:59 +0200
Subject: [PATCH] L2SS-758: Refactor tango-prometheus-exporter, add a
 configuration file with the attributes to parse

---
 .../tango-prometheus-exporter/Dockerfile      |  10 +-
 .../code/tango-prometheus-client.py           | 223 ++++++++++++++++++
 .../lofar2-policy.json                        |  62 +++++
 3 files changed, 291 insertions(+), 4 deletions(-)
 create mode 100644 docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py
 create mode 100644 docker-compose/tango-prometheus-exporter/lofar2-policy.json

diff --git a/docker-compose/tango-prometheus-exporter/Dockerfile b/docker-compose/tango-prometheus-exporter/Dockerfile
index 1df83afa6..1a75c4aa1 100644
--- a/docker-compose/tango-prometheus-exporter/Dockerfile
+++ b/docker-compose/tango-prometheus-exporter/Dockerfile
@@ -1,15 +1,17 @@
 FROM tangocs/tango-pytango
 
+# curl is needed by pip
 USER root
-
 RUN apt-get update && apt-get install curl -y
 
 USER tango
 
-ADD ska-tango-grafana-exporter/exporter/code /code
-RUN pip install -r /code/pip-requirements.txt
+COPY pip-requirements.txt /tmp/
+RUN pip install -r /tmp/pip-requirements.txt
 
+ADD code /code
+COPY lofar2-policy.json /code/
 WORKDIR /code
 ENV PYTHONPATH '/code/'
 
-CMD ["python", "-u", "/code/collector.py"]
+CMD ["python", "-u", "/code/tango-prometheus-client.py", "--config=/code/lofar2-policy.json", "--timeout=250"]
diff --git a/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py b/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py
new file mode 100644
index 000000000..1483ca3bc
--- /dev/null
+++ b/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py
@@ -0,0 +1,223 @@
+import time
+import argparse
+from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily
+from prometheus_client import start_http_server
+from tango import Database, DeviceProxy, CmdArgType as ArgType, AttrDataFormat, DevState, DevFailed
+import logging
+import json
+import fnmatch
+from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter
+
+logger = logging.getLogger()
+logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
+
+# log to memory only, with a limit of 600 seconds. this container is not important enough to keep
+# all logs around for, and as a scraper must be robust against overloading on all sides
+handler = AsynchronousLogstashHandler("elk", 5959, database_path=None, event_ttl=600)
+handler.setLevel(logging.INFO)
+logger.addHandler(handler)
+
+""" Functions to parse and apply policy files. """
+
+class ArchiverPolicy(object):
+    EMPTY_POLICY = {
+      # default policy
+      "default": {
+      },
+      # device-specific policies
+      "devices": {
+      }
+    }
+
+    @staticmethod
+    def load_config(resource: str) -> dict:
+        with open(resource) as fd:
+            return json.load(fd)
+
+    def __init__(self, config: dict = None):
+        self.config = config or self.EMPTY_POLICY
+
+    def devices(self) -> list:
+        return list(self.config["devices"].keys())
+
+    def attribute_list(self, device_name: str, attribute_list: list) -> dict:
+        """ Return the full set of archiving policy for the given device. """
+
+        if device_name not in self.devices():
+            return {}
+
+        attributes = set()
+
+        for config_set in [self.config["default"], self.config["devices"][device_name]]:
+            # include all specified attributes by pattern,
+            for include in config_set.get("include", []):
+                for attr in attribute_list:
+                    if fnmatch.fnmatch(attr, include):
+                        attributes.add(attr)
+
+            # then, remove any explicitly excluded attributes
+            for exclude in config_set.get("exclude", []):
+                for attr in attribute_list:
+                    if fnmatch.fnmatch(attr, exclude) and attr in attributes:
+                        attributes.remove(attr)
+
+        return sorted(list(attributes))
+
+class CustomCollector(object):
+    def __init__(self, config, proxy_timeout=250):
+        self.policy = ArchiverPolicy(config)
+        self.proxy_timeout = proxy_timeout
+
+    @staticmethod
+    def _to_metric(dev, attr_info, x, y, value):
+        """ Convert the given values to a (labels, value) pair, used to construct a Metric. """
+
+        if attr_info.data_type in [ArgType.DevShort, ArgType.DevLong, ArgType.DevUShort, ArgType.DevULong, ArgType.DevLong64, ArgType.DevULong64, ArgType.DevInt, ArgType.DevFloat, ArgType.DevDouble]:
+            data_type = 'float'
+            str_value = ''
+            float_value = float(value)
+        elif attr_info.data_type == ArgType.DevBoolean:
+            data_type = 'bool'
+            str_value = ''
+            float_value = int(value)
+        elif attr_info.data_type == ArgType.DevString:
+            data_type = 'string'
+            str_value = str(value)
+            float_value = 1
+        elif attr_info.data_type == ArgType.DevEnum:
+            attr_config = dev.get_attribute_config(attr_info.name)
+            data_type = 'enum'
+            str_value = str(attr_config.enum_labels[value])
+            float_value = int(value)
+        elif attr_info.data_type == ArgType.DevState:
+            data_type = 'state'
+            str_value = ''
+            float_value = int(value)
+        else:
+            return None
+
+        # (labels, value)
+        return ([dev.dev_name(), attr_info.name, str_value, data_type, f"{x:02}", f"{y:02}"], float_value)
+
+    def metrics_scalar(self, dev, attr_info, attr_value):
+        new_metric = self._to_metric(dev, attr_info, 0, 0, attr_value.value)
+        return [new_metric] if new_metric else []
+
+    def metrics_spectrum(self, dev, attr_info, attr_value):
+        metrics = []
+        for x in range(int(attr_value.dim_x)):
+            new_metric = self._to_metric(dev, attr_info, x, 0, attr_value.value[x])
+            metrics.append(new_metric) if new_metric else None
+
+        return metrics
+
+    def metrics_image(self, dev, attr_info, attr_value):
+        metrics = []
+        for y in range(int(attr_value.dim_y)): 
+            for x in range(int(attr_value.dim_x)):
+                """ NOTE: We switch x and y in the annotation, to allow queries to combine 1D and 2D arrays in their first dimension using the same label (x). We effectively expose
+                          the array as [x][y] instead of [y][x]. """
+
+                new_metric = self._to_metric(dev, attr_info, x, y, attr_value.value[y][x])
+                metrics.append(new_metric) if new_metric else None
+
+        return metrics
+
+    def metrics(self, dev, attr_info, attr_value):
+        if attr_info.data_format == AttrDataFormat.SCALAR:
+            return self.metrics_scalar(dev, attr_info, attr_value)
+        elif attr_info.data_format == AttrDataFormat.SPECTRUM:
+            return self.metrics_spectrum(dev, attr_info, attr_value)
+        elif attr_info.data_format == AttrDataFormat.IMAGE:
+            return self.metrics_image(dev, attr_info, attr_value)
+        else:
+            return []
+
+    def device_metrics(self, device_name):
+        dev = DeviceProxy(device_name)
+        dev.set_timeout_millis(self.proxy_timeout)
+
+        # obtain extended info about all attributes
+        attr_infos = {attr_info.name: attr_info for attr_info in dev.attribute_list_query()}
+
+        if dev.state() not in [DevState.STANDBY, DevState.ON, DevState.ALARM]:
+            logger.error(f"Error processing device {device_name}: it is in state {dev.state()}")
+
+            # at least log state & status
+            attrs_to_scrape = ["State", "Status"]
+        else:
+            # obtain list of attributes to scrape
+            attrs_to_scrape = self.policy.attribute_list(device_name, attr_infos.keys())
+
+        logger.info(f"Processing device {device_name} attributes {attrs_to_scrape}")
+
+        # scrape each attribute
+        metrics = []
+        for attr_name in attrs_to_scrape:
+            try:
+                attr_value = dev.read_attribute(attr_name)
+
+                metrics.extend(self.metrics(dev, attr_infos[attr_name], attr_value))
+            except DevFailed as e:
+                reason = e.args[0].desc.replace("\n", " ")
+                logger.error(f"Error processing device {device_name} attribute {attr_name}: {reason}")
+            except Exception as e: 
+                logger.exception(f"Error processing device {device_name} attribute {attr_name}")
+
+        return metrics
+
+    def collect(self):
+        """ Yield all scraped metrics. """
+
+        logger.info("Start scraping")
+        scrape_begin = time.time()
+
+        attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['device', 'name', 'str_value', 'type', 'x', 'y'])
+        scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['device'])
+
+        for device_name in self.policy.devices():
+            logger.debug(f"Processing device {device_name}")
+            dev_scrape_begin = time.time()
+
+            try:
+                metrics = self.device_metrics(device_name)
+                for metric in metrics:
+                    attribute_metrics.add_metric(*metric)
+            except DevFailed as e: 
+                reason = e.args[0].desc.replace("\n", " ")
+                logger.error(f"Error processing device {device_name}: {reason}")
+            except Exception as e:
+                logger.exception(f"Error processing device {device_name}")
+            finally:
+                dev_scrape_end = time.time()
+
+            logger.info(f"Done processing device {device_name}. Took {dev_scrape_end - dev_scrape_begin} seconds.")
+
+            scraping_metrics.add_metric([device_name], dev_scrape_end - dev_scrape_begin)
+
+        scrape_end = time.time()
+        logger.info(f"Done scraping. Took {scrape_end - scrape_begin} seconds.")
+
+        scraping_metrics.add_metric(["total"], scrape_end - scrape_begin)
+        
+        yield attribute_metrics
+        yield scraping_metrics
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-c', '--config', type=str, required=True, help='configuration file')
+    parser.add_argument('-t', '--timeout', type=int, required=False, default=250, help='device proxy timeout (ms)')
+    args = parser.parse_args()
+
+    config = ArchiverPolicy.load_config(args.config)
+    collector = CustomCollector(config, proxy_timeout=args.timeout)
+
+    logger.info("Starting server")
+    start_http_server(8000)
+
+    logger.info("Registering collector")
+    REGISTRY.register(collector)
+
+    logger.info("Idling")
+    while True:
+        time.sleep(1)
diff --git a/docker-compose/tango-prometheus-exporter/lofar2-policy.json b/docker-compose/tango-prometheus-exporter/lofar2-policy.json
new file mode 100644
index 000000000..b3ee2cdba
--- /dev/null
+++ b/docker-compose/tango-prometheus-exporter/lofar2-policy.json
@@ -0,0 +1,62 @@
+{
+    "default": {
+        "include": ["*"],
+        "exclude": ["*_RW"]
+    },
+
+    "devices": {
+        "STAT/APSCT/1": {
+        },
+        "STAT/APSPU/1": {
+        },
+        "STAT/Beamlet/1": {
+            "exclude": [
+                "FPGA_bf_weights_*"
+            ]
+        },
+        "STAT/Boot/1": {
+        },
+        "STAT/DigitalBeam/1": {
+        },
+        "STAT/Docker/1": {
+        },
+        "STAT/PDU/1": {
+        },
+        "STAT/RECV/1": {
+            "exclude": [
+                "*_ITRF_R",
+                "*_ITRF_offsets_R"
+            ]
+        },
+        "STAT/SDP/1": {
+            "exclude": [
+                "FPGA_subband_weights_*",
+                "FPGA_signal_input_samples_delay_*",
+                "FPGA_jesd204b_*",
+                "FPGA_scrap_*",
+                "FPGA_wg_amplitude_*",
+                "FPGA_wg_frequency_*",
+                "FPGA_wg_phase_*"
+            ]
+        },
+        "STAT/SST/1": {
+            "exclude": [
+                "sst_R",
+                "sst_timestamp_R",
+                "last_packet_R",
+                "integration_interval_R",
+                "subbands_calibrated_R"
+            ]
+        },
+        "STAT/TileBeam/1": {
+        },
+        "STAT/UNB2/1": {
+        },
+        "STAT/XST/1": {
+            "exclude": [
+                "last_packet_R",
+                "xst_*_R"
+            ]
+        }
+    }
+}
-- 
GitLab