Skip to content
Snippets Groups Projects
Commit 3448154f authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-758-configurable-tango-exporter' into 'master'

L2SS-758: Tune prometheus archiving

Closes L2SS-758

See merge request !305
parents abdaec73 780677b8
Branches
Tags
1 merge request!305L2SS-758: Tune prometheus archiving
[submodule "docker-compose/tango-prometheus-exporter/ska-tango-grafana-exporter"]
path = docker-compose/tango-prometheus-exporter/ska-tango-grafana-exporter
url = https://git.astron.nl/lofar2.0/ska-tango-grafana-exporter.git
branch = station-control
[submodule "tangostationcontrol/tangostationcontrol/toolkit/libhdbpp-python"]
path = tangostationcontrol/tangostationcontrol/toolkit/libhdbpp-python
url = https://gitlab.com/tango-controls/hdbpp/libhdbpp-python.git
......@@ -150,12 +150,10 @@ pull: ## pull the images from the Docker hub
build: ## rebuild images
# docker-compose does not support build dependencies, so manage those here
$(DOCKER_COMPOSE_ARGS) docker-compose -f lofar-device-base.yml -f networks.yml build --progress=plain
$(DOCKER_COMPOSE_ARGS) docker-compose $(COMPOSE_FILE_ARGS) build --parallel --progress=plain $(SERVICE)
build-nocache: ## rebuild images from scratch
# docker-compose does not support build dependencies, so manage those here
$(DOCKER_COMPOSE_ARGS) docker-compose -f lofar-device-base.yml -f networks.yml build --progress=plain
$(DOCKER_COMPOSE_ARGS) docker-compose $(COMPOSE_FILE_ARGS) build --no-cache --progress=plain $(SERVICE)
up: minimal ## start the base TANGO system and prepare requested services
......
......@@ -2,4 +2,4 @@ FROM prom/prometheus
COPY prometheus.yml /etc/prometheus/prometheus.yml
CMD ["--config.file=/etc/prometheus/prometheus.yml", "--storage.tsdb.path=/prometheus", "--web.console.libraries=/usr/share/prometheus/console_libraries", "--web.console.templates=/usr/share/prometheus/consoles", "--storage.tsdb.retention.time=31d"]
CMD ["--config.file=/etc/prometheus/prometheus.yml", "--storage.tsdb.path=/prometheus", "--web.console.libraries=/usr/share/prometheus/console_libraries", "--web.console.templates=/usr/share/prometheus/consoles", "--storage.tsdb.retention.time=5y", "--storage.tsdb.retention.size=500GB", "--web.enable-admin-api"]
FROM tangocs/tango-pytango
# curl is needed by pip
USER root
RUN apt-get update && apt-get install curl -y
USER tango
ADD ska-tango-grafana-exporter/exporter/code /code
RUN pip install -r /code/pip-requirements.txt
COPY code/pip-requirements.txt /tmp/
RUN pip install -r /tmp/pip-requirements.txt
ADD code /code
COPY lofar2-policy.json /code/
WORKDIR /code
ENV PYTHONPATH '/code/'
CMD ["python", "-u", "/code/collector.py"]
CMD ["python", "-u", "/code/tango-prometheus-client.py", "--config=/code/lofar2-policy.json", "--timeout=250"]
import time
import argparse
from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily
from prometheus_client import start_http_server
from tango import Database, DeviceProxy, CmdArgType as ArgType, AttrDataFormat, DevState, DevFailed
import logging
import json
import fnmatch
from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter
logger = logging.getLogger()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
# log to memory only, with a limit of 600 seconds. this container is not important enough to keep
# all logs around for, and as a scraper must be robust against overloading on all sides
handler = AsynchronousLogstashHandler("elk", 5959, database_path=None, event_ttl=600)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
""" Functions to parse and apply policy files. """
class ArchiverPolicy(object):
EMPTY_POLICY = {
# default policy
"default": {
},
# device-specific policies
"devices": {
}
}
@staticmethod
def load_config(resource: str) -> dict:
with open(resource) as fd:
return json.load(fd)
def __init__(self, config: dict = None):
self.config = config or self.EMPTY_POLICY
def devices(self) -> list:
return list(self.config["devices"].keys())
def attribute_list(self, device_name: str, attribute_list: list) -> dict:
""" Return the full set of archiving policy for the given device. """
if device_name not in self.devices():
return {}
attributes = set()
for config_set in [self.config["default"], self.config["devices"][device_name]]:
# include all specified attributes by pattern,
for include in config_set.get("include", []):
for attr in attribute_list:
if fnmatch.fnmatch(attr, include):
attributes.add(attr)
# then, remove any explicitly excluded attributes
for exclude in config_set.get("exclude", []):
for attr in attribute_list:
if fnmatch.fnmatch(attr, exclude) and attr in attributes:
attributes.remove(attr)
return sorted(list(attributes))
class CustomCollector(object):
def __init__(self, config, proxy_timeout=250):
self.policy = ArchiverPolicy(config)
self.proxy_timeout = proxy_timeout
@staticmethod
def _to_metric(dev, attr_info, x, y, idx, value):
""" Convert the given values to a (labels, value) pair, used to construct a Metric. """
if attr_info.data_type in [ArgType.DevShort, ArgType.DevLong, ArgType.DevUShort, ArgType.DevULong, ArgType.DevLong64, ArgType.DevULong64, ArgType.DevInt, ArgType.DevFloat, ArgType.DevDouble]:
data_type = 'float'
str_value = ''
float_value = float(value)
elif attr_info.data_type == ArgType.DevBoolean:
data_type = 'bool'
str_value = ''
float_value = int(value)
elif attr_info.data_type == ArgType.DevString:
data_type = 'string'
str_value = str(value)
float_value = len(str_value)
elif attr_info.data_type == ArgType.DevEnum:
attr_config = dev.get_attribute_config(attr_info.name)
data_type = 'enum'
str_value = str(attr_config.enum_labels[value])
float_value = int(value)
elif attr_info.data_type == ArgType.DevState:
data_type = 'state'
str_value = ''
float_value = int(value)
else:
return None
# (labels, value)
return ([dev.dev_name(), attr_info.name, str_value, data_type, f"{x:02}", f"{y:02}", f"{idx:03}"], float_value)
def metrics_scalar(self, dev, attr_info, attr_value):
""" Return all metrics for a given SCALAR attribute. """
new_metric = self._to_metric(dev, attr_info, 0, 0, 0, attr_value.value)
return [new_metric] if new_metric else []
def metrics_spectrum(self, dev, attr_info, attr_value):
""" Return all metrics for a given SPECTRUM attribute. """
metrics = []
for x in range(int(attr_value.dim_x)):
new_metric = self._to_metric(dev, attr_info, x, 0, x, attr_value.value[x])
metrics.append(new_metric) if new_metric else None
return metrics
def metrics_image(self, dev, attr_info, attr_value):
""" Return all metrics for a given IMAGE attribute. """
metrics = []
for y in range(int(attr_value.dim_y)):
for x in range(int(attr_value.dim_x)):
""" NOTE: We switch x and y in the annotation, to allow queries to combine 1D and 2D arrays in their first dimension using the same label (x). We effectively expose
the array as [x][y] instead of [y][x]. """
new_metric = self._to_metric(dev, attr_info, y, x, y * attr_value.dim_x + x, attr_value.value[y][x])
metrics.append(new_metric) if new_metric else None
return metrics
def metrics(self, dev, attr_info, attr_value):
""" Return all metrics for a given attribute. """
if attr_info.data_format == AttrDataFormat.SCALAR:
return self.metrics_scalar(dev, attr_info, attr_value)
elif attr_info.data_format == AttrDataFormat.SPECTRUM:
return self.metrics_spectrum(dev, attr_info, attr_value)
elif attr_info.data_format == AttrDataFormat.IMAGE:
return self.metrics_image(dev, attr_info, attr_value)
else:
return []
def device_metrics(self, device_name):
""" Return all metrics for a given device, as configured. """
dev = DeviceProxy(device_name)
dev.set_timeout_millis(self.proxy_timeout)
# obtain extended info about all attributes
attr_infos = {attr_info.name: attr_info for attr_info in dev.attribute_list_query()}
if dev.state() not in [DevState.STANDBY, DevState.ON, DevState.ALARM]:
logger.error(f"Error processing device {device_name}: it is in state {dev.state()}")
# at least log state & status
attrs_to_scrape = ["State", "Status"]
else:
# obtain list of attributes to scrape
attrs_to_scrape = self.policy.attribute_list(device_name, attr_infos.keys())
logger.info(f"Processing device {device_name} attributes {attrs_to_scrape}")
# scrape each attribute
metrics = []
for attr_name in attrs_to_scrape:
try:
attr_value = dev.read_attribute(attr_name)
metrics.extend(self.metrics(dev, attr_infos[attr_name], attr_value))
except DevFailed as e:
reason = e.args[0].desc.replace("\n", " ")
logger.error(f"Error processing device {device_name} attribute {attr_name}: {reason}")
except Exception as e:
logger.exception(f"Error processing device {device_name} attribute {attr_name}")
return metrics
def collect(self):
""" Yield all scraped metrics from all devices, as configured. """
logger.info("Start scraping")
scrape_begin = time.time()
attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['device', 'name', 'str_value', 'type', 'x', 'y', 'idx'])
scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['device'])
for device_name in self.policy.devices():
logger.debug(f"Processing device {device_name}")
dev_scrape_begin = time.time()
try:
metrics = self.device_metrics(device_name)
for metric in metrics:
attribute_metrics.add_metric(*metric)
except DevFailed as e:
reason = e.args[0].desc.replace("\n", " ")
logger.error(f"Error processing device {device_name}: {reason}")
except Exception as e:
logger.exception(f"Error processing device {device_name}")
finally:
dev_scrape_end = time.time()
logger.info(f"Done processing device {device_name}. Took {dev_scrape_end - dev_scrape_begin} seconds.")
scraping_metrics.add_metric([device_name], dev_scrape_end - dev_scrape_begin)
scrape_end = time.time()
logger.info(f"Done scraping. Took {scrape_end - scrape_begin} seconds.")
scraping_metrics.add_metric(["total"], scrape_end - scrape_begin)
yield attribute_metrics
yield scraping_metrics
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', type=str, required=True, help='configuration file')
parser.add_argument('-t', '--timeout', type=int, required=False, default=250, help='device proxy timeout (ms)')
parser.add_argument('-p', '--port', type=int, required=False, default=8000, help='HTTP server port to open')
args = parser.parse_args()
config = ArchiverPolicy.load_config(args.config)
collector = CustomCollector(config, proxy_timeout=args.timeout)
logger.info("Starting server")
start_http_server(args.port)
logger.info("Registering collector")
REGISTRY.register(collector)
logger.info("Idling")
while True:
time.sleep(1)
{
"default": {
"include": ["*"],
"exclude": ["*_RW"]
},
"devices": {
"STAT/APSCT/1": {
},
"STAT/APSPU/1": {
},
"STAT/Beamlet/1": {
"exclude": [
"FPGA_beamlet_subband_select_*",
"FPGA_bf_weights_*"
]
},
"STAT/Boot/1": {
},
"STAT/DigitalBeam/1": {
},
"STAT/Docker/1": {
},
"STAT/PDU/1": {
},
"STAT/RECV/1": {
"include": [
"ANT_mask_RW",
"RCU_mask_RW"
],
"exclude": [
"HBAT_BF_delay_steps_*",
"*_ITRF_R",
"*_ITRF_offsets_R"
]
},
"STAT/SDP/1": {
"include": [
"TR_fpga_mask_RW"
],
"exclude": [
"FPGA_subband_weights_*",
"FPGA_signal_input_samples_delay_*",
"FPGA_jesd204b_*",
"FPGA_scrap_*",
"FPGA_wg_amplitude_*",
"FPGA_wg_frequency_*",
"FPGA_wg_phase_*"
]
},
"STAT/SST/1": {
"exclude": [
"sst_R",
"sst_timestamp_R",
"last_packet_R",
"integration_interval_R",
"subbands_calibrated_R"
]
},
"STAT/TileBeam/1": {
},
"STAT/UNB2/1": {
"include": [
"UNB2_mask_RW"
]
},
"STAT/XST/1": {
"exclude": [
"last_packet_R",
"xst_*_R"
]
}
}
}
Subproject commit e313399d197d266e49d6da0442ea983c6f92adad
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment