diff --git a/docker-compose/tango-prometheus-exporter/Dockerfile b/docker-compose/tango-prometheus-exporter/Dockerfile index 1df83afa690c008f83868c1bc9c8d6c1a09323ef..1a75c4aa11f2c97536821d3c141fa4a8fba26aeb 100644 --- a/docker-compose/tango-prometheus-exporter/Dockerfile +++ b/docker-compose/tango-prometheus-exporter/Dockerfile @@ -1,15 +1,17 @@ FROM tangocs/tango-pytango +# curl is needed by pip USER root - RUN apt-get update && apt-get install curl -y USER tango -ADD ska-tango-grafana-exporter/exporter/code /code -RUN pip install -r /code/pip-requirements.txt +COPY pip-requirements.txt /tmp/ +RUN pip install -r /tmp/pip-requirements.txt +ADD code /code +COPY lofar2-policy.json /code/ WORKDIR /code ENV PYTHONPATH '/code/' -CMD ["python", "-u", "/code/collector.py"] +CMD ["python", "-u", "/code/tango-prometheus-client.py", "--config=/code/lofar2-policy.json", "--timeout=250"] diff --git a/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py b/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py new file mode 100644 index 0000000000000000000000000000000000000000..1483ca3bc21936828eb349c1ffdd483b74d65bbd --- /dev/null +++ b/docker-compose/tango-prometheus-exporter/code/tango-prometheus-client.py @@ -0,0 +1,223 @@ +import time +import argparse +from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily +from prometheus_client import start_http_server +from tango import Database, DeviceProxy, CmdArgType as ArgType, AttrDataFormat, DevState, DevFailed +import logging +import json +import fnmatch +from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter + +logger = logging.getLogger() +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) + +# log to memory only, with a limit of 600 seconds. this container is not important enough to keep +# all logs around for, and as a scraper must be robust against overloading on all sides +handler = AsynchronousLogstashHandler("elk", 5959, database_path=None, event_ttl=600) +handler.setLevel(logging.INFO) +logger.addHandler(handler) + +""" Functions to parse and apply policy files. """ + +class ArchiverPolicy(object): + EMPTY_POLICY = { + # default policy + "default": { + }, + # device-specific policies + "devices": { + } + } + + @staticmethod + def load_config(resource: str) -> dict: + with open(resource) as fd: + return json.load(fd) + + def __init__(self, config: dict = None): + self.config = config or self.EMPTY_POLICY + + def devices(self) -> list: + return list(self.config["devices"].keys()) + + def attribute_list(self, device_name: str, attribute_list: list) -> dict: + """ Return the full set of archiving policy for the given device. """ + + if device_name not in self.devices(): + return {} + + attributes = set() + + for config_set in [self.config["default"], self.config["devices"][device_name]]: + # include all specified attributes by pattern, + for include in config_set.get("include", []): + for attr in attribute_list: + if fnmatch.fnmatch(attr, include): + attributes.add(attr) + + # then, remove any explicitly excluded attributes + for exclude in config_set.get("exclude", []): + for attr in attribute_list: + if fnmatch.fnmatch(attr, exclude) and attr in attributes: + attributes.remove(attr) + + return sorted(list(attributes)) + +class CustomCollector(object): + def __init__(self, config, proxy_timeout=250): + self.policy = ArchiverPolicy(config) + self.proxy_timeout = proxy_timeout + + @staticmethod + def _to_metric(dev, attr_info, x, y, value): + """ Convert the given values to a (labels, value) pair, used to construct a Metric. """ + + if attr_info.data_type in [ArgType.DevShort, ArgType.DevLong, ArgType.DevUShort, ArgType.DevULong, ArgType.DevLong64, ArgType.DevULong64, ArgType.DevInt, ArgType.DevFloat, ArgType.DevDouble]: + data_type = 'float' + str_value = '' + float_value = float(value) + elif attr_info.data_type == ArgType.DevBoolean: + data_type = 'bool' + str_value = '' + float_value = int(value) + elif attr_info.data_type == ArgType.DevString: + data_type = 'string' + str_value = str(value) + float_value = 1 + elif attr_info.data_type == ArgType.DevEnum: + attr_config = dev.get_attribute_config(attr_info.name) + data_type = 'enum' + str_value = str(attr_config.enum_labels[value]) + float_value = int(value) + elif attr_info.data_type == ArgType.DevState: + data_type = 'state' + str_value = '' + float_value = int(value) + else: + return None + + # (labels, value) + return ([dev.dev_name(), attr_info.name, str_value, data_type, f"{x:02}", f"{y:02}"], float_value) + + def metrics_scalar(self, dev, attr_info, attr_value): + new_metric = self._to_metric(dev, attr_info, 0, 0, attr_value.value) + return [new_metric] if new_metric else [] + + def metrics_spectrum(self, dev, attr_info, attr_value): + metrics = [] + for x in range(int(attr_value.dim_x)): + new_metric = self._to_metric(dev, attr_info, x, 0, attr_value.value[x]) + metrics.append(new_metric) if new_metric else None + + return metrics + + def metrics_image(self, dev, attr_info, attr_value): + metrics = [] + for y in range(int(attr_value.dim_y)): + for x in range(int(attr_value.dim_x)): + """ NOTE: We switch x and y in the annotation, to allow queries to combine 1D and 2D arrays in their first dimension using the same label (x). We effectively expose + the array as [x][y] instead of [y][x]. """ + + new_metric = self._to_metric(dev, attr_info, x, y, attr_value.value[y][x]) + metrics.append(new_metric) if new_metric else None + + return metrics + + def metrics(self, dev, attr_info, attr_value): + if attr_info.data_format == AttrDataFormat.SCALAR: + return self.metrics_scalar(dev, attr_info, attr_value) + elif attr_info.data_format == AttrDataFormat.SPECTRUM: + return self.metrics_spectrum(dev, attr_info, attr_value) + elif attr_info.data_format == AttrDataFormat.IMAGE: + return self.metrics_image(dev, attr_info, attr_value) + else: + return [] + + def device_metrics(self, device_name): + dev = DeviceProxy(device_name) + dev.set_timeout_millis(self.proxy_timeout) + + # obtain extended info about all attributes + attr_infos = {attr_info.name: attr_info for attr_info in dev.attribute_list_query()} + + if dev.state() not in [DevState.STANDBY, DevState.ON, DevState.ALARM]: + logger.error(f"Error processing device {device_name}: it is in state {dev.state()}") + + # at least log state & status + attrs_to_scrape = ["State", "Status"] + else: + # obtain list of attributes to scrape + attrs_to_scrape = self.policy.attribute_list(device_name, attr_infos.keys()) + + logger.info(f"Processing device {device_name} attributes {attrs_to_scrape}") + + # scrape each attribute + metrics = [] + for attr_name in attrs_to_scrape: + try: + attr_value = dev.read_attribute(attr_name) + + metrics.extend(self.metrics(dev, attr_infos[attr_name], attr_value)) + except DevFailed as e: + reason = e.args[0].desc.replace("\n", " ") + logger.error(f"Error processing device {device_name} attribute {attr_name}: {reason}") + except Exception as e: + logger.exception(f"Error processing device {device_name} attribute {attr_name}") + + return metrics + + def collect(self): + """ Yield all scraped metrics. """ + + logger.info("Start scraping") + scrape_begin = time.time() + + attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['device', 'name', 'str_value', 'type', 'x', 'y']) + scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['device']) + + for device_name in self.policy.devices(): + logger.debug(f"Processing device {device_name}") + dev_scrape_begin = time.time() + + try: + metrics = self.device_metrics(device_name) + for metric in metrics: + attribute_metrics.add_metric(*metric) + except DevFailed as e: + reason = e.args[0].desc.replace("\n", " ") + logger.error(f"Error processing device {device_name}: {reason}") + except Exception as e: + logger.exception(f"Error processing device {device_name}") + finally: + dev_scrape_end = time.time() + + logger.info(f"Done processing device {device_name}. Took {dev_scrape_end - dev_scrape_begin} seconds.") + + scraping_metrics.add_metric([device_name], dev_scrape_end - dev_scrape_begin) + + scrape_end = time.time() + logger.info(f"Done scraping. Took {scrape_end - scrape_begin} seconds.") + + scraping_metrics.add_metric(["total"], scrape_end - scrape_begin) + + yield attribute_metrics + yield scraping_metrics + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('-c', '--config', type=str, required=True, help='configuration file') + parser.add_argument('-t', '--timeout', type=int, required=False, default=250, help='device proxy timeout (ms)') + args = parser.parse_args() + + config = ArchiverPolicy.load_config(args.config) + collector = CustomCollector(config, proxy_timeout=args.timeout) + + logger.info("Starting server") + start_http_server(8000) + + logger.info("Registering collector") + REGISTRY.register(collector) + + logger.info("Idling") + while True: + time.sleep(1) diff --git a/docker-compose/tango-prometheus-exporter/lofar2-policy.json b/docker-compose/tango-prometheus-exporter/lofar2-policy.json new file mode 100644 index 0000000000000000000000000000000000000000..b3ee2cdba57b28a8407e21d7340b7bf1de3e4a98 --- /dev/null +++ b/docker-compose/tango-prometheus-exporter/lofar2-policy.json @@ -0,0 +1,62 @@ +{ + "default": { + "include": ["*"], + "exclude": ["*_RW"] + }, + + "devices": { + "STAT/APSCT/1": { + }, + "STAT/APSPU/1": { + }, + "STAT/Beamlet/1": { + "exclude": [ + "FPGA_bf_weights_*" + ] + }, + "STAT/Boot/1": { + }, + "STAT/DigitalBeam/1": { + }, + "STAT/Docker/1": { + }, + "STAT/PDU/1": { + }, + "STAT/RECV/1": { + "exclude": [ + "*_ITRF_R", + "*_ITRF_offsets_R" + ] + }, + "STAT/SDP/1": { + "exclude": [ + "FPGA_subband_weights_*", + "FPGA_signal_input_samples_delay_*", + "FPGA_jesd204b_*", + "FPGA_scrap_*", + "FPGA_wg_amplitude_*", + "FPGA_wg_frequency_*", + "FPGA_wg_phase_*" + ] + }, + "STAT/SST/1": { + "exclude": [ + "sst_R", + "sst_timestamp_R", + "last_packet_R", + "integration_interval_R", + "subbands_calibrated_R" + ] + }, + "STAT/TileBeam/1": { + }, + "STAT/UNB2/1": { + }, + "STAT/XST/1": { + "exclude": [ + "last_packet_R", + "xst_*_R" + ] + } + } +}