Skip to content
Snippets Groups Projects

Archive more devices in general by using wild-card matching, avoid archiving...

Merged Jan David Mol requested to merge prometheus-exporter-fixes into master
3 files
+ 154
66
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -2,31 +2,39 @@ import time
@@ -2,31 +2,39 @@ import time
import argparse
import argparse
from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily
from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily
from prometheus_client import start_http_server
from prometheus_client import start_http_server
from tango import Database, DeviceProxy, CmdArgType as ArgType, AttrDataFormat, DevState, DevFailed
from tango import (
 
Database,
 
DeviceProxy,
 
CmdArgType as ArgType,
 
AttrDataFormat,
 
DevState,
 
DevFailed,
 
)
import logging
import logging
import json
import json
import fnmatch
import fnmatch
from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter
from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter
logger = logging.getLogger()
logger = logging.getLogger()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
logging.basicConfig(format="%(asctime)s %(levelname)s %(message)s", level=logging.DEBUG)
# log to memory only, with a limit of 600 seconds. this container is not important enough to keep
# log to memory only, with a limit of 600 seconds. this container is not important enough to keep
# all logs around for, and as a scraper must be robust against overloading on all sides
# all logs around for, and as a scraper must be robust against overloading on all sides
handler = AsynchronousLogstashHandler("logstash", 5959, database_path=None, event_ttl=600)
handler = AsynchronousLogstashHandler(
 
"logstash", 5959, database_path=None, event_ttl=600
 
)
handler.setLevel(logging.INFO)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
logger.addHandler(handler)
""" Functions to parse and apply policy files. """
""" Functions to parse and apply policy files. """
 
class ArchiverPolicy(object):
class ArchiverPolicy(object):
EMPTY_POLICY = {
EMPTY_POLICY = {
# default policy
# default policy
"default": {
"default": {},
},
# device-specific policies
# device-specific policies
"devices": {},
"devices": {
}
}
}
@staticmethod
@staticmethod
@@ -38,7 +46,7 @@ class ArchiverPolicy(object):
@@ -38,7 +46,7 @@ class ArchiverPolicy(object):
self.config = config or self.EMPTY_POLICY
self.config = config or self.EMPTY_POLICY
def device_list(self) -> list:
def device_list(self) -> list:
""" Retrieve the device list from TangoDB """
"""Retrieve the device list from TangoDB"""
device_list = []
device_list = []
db = Database()
db = Database()
server_list = db.get_server_list() # e.g. SDP/STAT, RECV/STAT
server_list = db.get_server_list() # e.g. SDP/STAT, RECV/STAT
@@ -52,7 +60,7 @@ class ArchiverPolicy(object):
@@ -52,7 +60,7 @@ class ArchiverPolicy(object):
return device_list
return device_list
def devices(self) -> list:
def devices(self) -> list:
""" Filter the device list from TangoDB following the lofar2-policy file """
"""Filter the device list from TangoDB following the lofar2-policy file"""
# Devices list from TangoDB
# Devices list from TangoDB
db_devices = self.device_list()
db_devices = self.device_list()
# Devices listed in policy file
# Devices listed in policy file
@@ -66,14 +74,21 @@ class ArchiverPolicy(object):
@@ -66,14 +74,21 @@ class ArchiverPolicy(object):
return devices
return devices
def attribute_list(self, device_name: str, attribute_list: list) -> dict:
def attribute_list(self, device_name: str, attribute_list: list) -> dict:
""" Return the full set of archiving policy for the given device. """
"""Return the full set of archiving policy for the given device."""
if device_name not in self.devices():
if device_name not in self.devices():
return {}
return {}
attributes = set()
attributes = set()
for config_set in [self.config["default"], self.config["devices"][device_name]]:
# find config(s) matching our device
 
configs = [self.config["default"]] + [
 
config
 
for config_dev, config in self.config["devices"].items()
 
if fnmatch.fnmatch(device_name, config_dev)
 
]
 
 
for config_set in configs:
# include all specified attributes by pattern,
# include all specified attributes by pattern,
for include in config_set.get("include", []):
for include in config_set.get("include", []):
for attr in attribute_list:
for attr in attribute_list:
@@ -88,6 +103,7 @@ class ArchiverPolicy(object):
@@ -88,6 +103,7 @@ class ArchiverPolicy(object):
return sorted(list(attributes))
return sorted(list(attributes))
 
class CustomCollector(object):
class CustomCollector(object):
def __init__(self, config, station, proxy_timeout=250):
def __init__(self, config, station, proxy_timeout=250):
self.station = station
self.station = station
@@ -95,43 +111,65 @@ class CustomCollector(object):
@@ -95,43 +111,65 @@ class CustomCollector(object):
self.proxy_timeout = proxy_timeout
self.proxy_timeout = proxy_timeout
def _to_metric(self, dev, attr_info, x, y, idx, value):
def _to_metric(self, dev, attr_info, x, y, idx, value):
""" Convert the given values to a (labels, value) pair, used to construct a Metric. """
"""Convert the given values to a (labels, value) pair, used to construct a Metric."""
if attr_info.data_type in [ArgType.DevShort, ArgType.DevLong, ArgType.DevUShort, ArgType.DevULong, ArgType.DevLong64, ArgType.DevULong64, ArgType.DevInt, ArgType.DevFloat, ArgType.DevDouble]:
if attr_info.data_type in [
data_type = 'float'
ArgType.DevShort,
str_value = ''
ArgType.DevLong,
 
ArgType.DevUShort,
 
ArgType.DevULong,
 
ArgType.DevLong64,
 
ArgType.DevULong64,
 
ArgType.DevInt,
 
ArgType.DevFloat,
 
ArgType.DevDouble,
 
]:
 
data_type = "float"
 
str_value = ""
float_value = float(value)
float_value = float(value)
elif attr_info.data_type == ArgType.DevBoolean:
elif attr_info.data_type == ArgType.DevBoolean:
data_type = 'bool'
data_type = "bool"
str_value = ''
str_value = ""
float_value = int(value)
float_value = int(value)
elif attr_info.data_type == ArgType.DevString:
elif attr_info.data_type == ArgType.DevString:
data_type = 'string'
data_type = "string"
str_value = str(value)
str_value = str(value)
float_value = len(str_value)
float_value = len(str_value)
elif attr_info.data_type == ArgType.DevEnum:
elif attr_info.data_type == ArgType.DevEnum:
attr_config = dev.get_attribute_config(attr_info.name)
attr_config = dev.get_attribute_config(attr_info.name)
data_type = 'enum'
data_type = "enum"
str_value = str(attr_config.enum_labels[value])
str_value = str(attr_config.enum_labels[value])
float_value = int(value)
float_value = int(value)
elif attr_info.data_type == ArgType.DevState:
elif attr_info.data_type == ArgType.DevState:
data_type = 'state'
data_type = "state"
str_value = ''
str_value = ""
float_value = int(value)
float_value = int(value)
else:
else:
return None
return None
# (labels, value)
# (labels, value)
return ([self.station, dev.dev_name(), attr_info.name, str_value, data_type, f"{x:02}", f"{y:02}", f"{idx:03}"], float_value)
return (
 
[
 
self.station,
 
dev.dev_name(),
 
attr_info.name,
 
str_value,
 
data_type,
 
f"{x:02}",
 
f"{y:02}",
 
f"{idx:03}",
 
],
 
float_value,
 
)
def metrics_scalar(self, dev, attr_info, attr_value):
def metrics_scalar(self, dev, attr_info, attr_value):
""" Return all metrics for a given SCALAR attribute. """
"""Return all metrics for a given SCALAR attribute."""
new_metric = self._to_metric(dev, attr_info, 0, 0, 0, attr_value.value)
new_metric = self._to_metric(dev, attr_info, 0, 0, 0, attr_value.value)
return [new_metric] if new_metric else []
return [new_metric] if new_metric else []
def metrics_spectrum(self, dev, attr_info, attr_value):
def metrics_spectrum(self, dev, attr_info, attr_value):
""" Return all metrics for a given SPECTRUM attribute. """
"""Return all metrics for a given SPECTRUM attribute."""
metrics = []
metrics = []
for x in range(int(attr_value.dim_x)):
for x in range(int(attr_value.dim_x)):
@@ -141,21 +179,28 @@ class CustomCollector(object):
@@ -141,21 +179,28 @@ class CustomCollector(object):
return metrics
return metrics
def metrics_image(self, dev, attr_info, attr_value):
def metrics_image(self, dev, attr_info, attr_value):
""" Return all metrics for a given IMAGE attribute. """
"""Return all metrics for a given IMAGE attribute."""
metrics = []
metrics = []
for y in range(int(attr_value.dim_y)):
for y in range(int(attr_value.dim_y)):
for x in range(int(attr_value.dim_x)):
for x in range(int(attr_value.dim_x)):
""" NOTE: We switch x and y in the annotation, to allow queries to combine 1D and 2D arrays in their first dimension using the same label (x). We effectively expose
"""NOTE: We switch x and y in the annotation, to allow queries to combine 1D and 2D arrays in their first dimension using the same label (x). We effectively expose
the array as [x][y] instead of [y][x]. """
the array as [x][y] instead of [y][x]."""
new_metric = self._to_metric(dev, attr_info, y, x, y * attr_value.dim_x + x, attr_value.value[y][x])
new_metric = self._to_metric(
 
dev,
 
attr_info,
 
y,
 
x,
 
y * attr_value.dim_x + x,
 
attr_value.value[y][x],
 
)
metrics.append(new_metric) if new_metric else None
metrics.append(new_metric) if new_metric else None
return metrics
return metrics
def metrics(self, dev, attr_info, attr_value):
def metrics(self, dev, attr_info, attr_value):
""" Return all metrics for a given attribute. """
"""Return all metrics for a given attribute."""
if attr_info.data_format == AttrDataFormat.SCALAR:
if attr_info.data_format == AttrDataFormat.SCALAR:
return self.metrics_scalar(dev, attr_info, attr_value)
return self.metrics_scalar(dev, attr_info, attr_value)
@@ -167,16 +212,25 @@ class CustomCollector(object):
@@ -167,16 +212,25 @@ class CustomCollector(object):
return []
return []
def device_metrics(self, device_name):
def device_metrics(self, device_name):
""" Return all metrics for a given device, as configured. """
"""Return all metrics for a given device, as configured."""
dev = DeviceProxy(device_name)
dev = DeviceProxy(device_name)
dev.set_timeout_millis(self.proxy_timeout)
dev.set_timeout_millis(self.proxy_timeout)
# obtain extended info about all attributes
# obtain extended info about all attributes
attr_infos = {attr_info.name: attr_info for attr_info in dev.attribute_list_query()}
attr_infos = {
attr_info.name: attr_info for attr_info in dev.attribute_list_query()
if dev.state() not in [DevState.STANDBY, DevState.ON, DevState.ALARM, DevState.DISABLE]:
}
logger.warning(f"Error processing device {device_name}: it is in state {dev.state()}")
 
if dev.state() not in [
 
DevState.STANDBY,
 
DevState.ON,
 
DevState.ALARM,
 
DevState.DISABLE,
 
]:
 
logger.warning(
 
f"Error processing device {device_name}: it is in state {dev.state()}"
 
)
# at least log state & status
# at least log state & status
attrs_to_scrape = ["State", "Status"]
attrs_to_scrape = ["State", "Status"]
@@ -195,20 +249,30 @@ class CustomCollector(object):
@@ -195,20 +249,30 @@ class CustomCollector(object):
metrics.extend(self.metrics(dev, attr_infos[attr_name], attr_value))
metrics.extend(self.metrics(dev, attr_infos[attr_name], attr_value))
except DevFailed as e:
except DevFailed as e:
reason = e.args[0].desc.replace("\n", " ")
reason = e.args[0].desc.replace("\n", " ")
logger.warning(f"Error processing device {device_name} attribute {attr_name}: {reason}")
logger.warning(
 
f"Error processing device {device_name} attribute {attr_name}: {reason}"
 
)
except Exception as e:
except Exception as e:
logger.exception(f"Error processing device {device_name} attribute {attr_name}")
logger.exception(
 
f"Error processing device {device_name} attribute {attr_name}"
 
)
return metrics
return metrics
def collect(self):
def collect(self):
""" Yield all scraped metrics from all devices, as configured. """
"""Yield all scraped metrics from all devices, as configured."""
logger.info("Start scraping")
logger.info("Start scraping")
scrape_begin = time.time()
scrape_begin = time.time()
attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['station', 'device', 'name', 'str_value', 'type', 'x', 'y', 'idx'])
attribute_metrics = GaugeMetricFamily(
scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['station', 'device'])
"device_attribute",
 
"Device attribute value",
 
labels=["station", "device", "name", "str_value", "type", "x", "y", "idx"],
 
)
 
scraping_metrics = GaugeMetricFamily(
 
"device_scraping", "Device scraping duration", labels=["station", "device"]
 
)
for device_name in self.policy.devices():
for device_name in self.policy.devices():
logger.debug(f"Processing device {device_name}")
logger.debug(f"Processing device {device_name}")
@@ -226,9 +290,13 @@ class CustomCollector(object):
@@ -226,9 +290,13 @@ class CustomCollector(object):
finally:
finally:
dev_scrape_end = time.time()
dev_scrape_end = time.time()
logger.info(f"Done processing device {device_name}. Took {dev_scrape_end - dev_scrape_begin} seconds.")
logger.info(
 
f"Done processing device {device_name}. Took {dev_scrape_end - dev_scrape_begin} seconds."
 
)
scraping_metrics.add_metric([self.station, device_name], dev_scrape_end - dev_scrape_begin)
scraping_metrics.add_metric(
 
[self.station, device_name], dev_scrape_end - dev_scrape_begin
 
)
scrape_end = time.time()
scrape_end = time.time()
logger.info(f"Done scraping. Took {scrape_end - scrape_begin} seconds.")
logger.info(f"Done scraping. Took {scrape_end - scrape_begin} seconds.")
@@ -238,20 +306,39 @@ class CustomCollector(object):
@@ -238,20 +306,39 @@ class CustomCollector(object):
yield attribute_metrics
yield attribute_metrics
yield scraping_metrics
yield scraping_metrics
if __name__ == '__main__':
 
if __name__ == "__main__":
import sys
import sys
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', type=str, required=True, help='configuration file')
parser.add_argument(
parser.add_argument('-t', '--timeout', type=int, required=False, default=250, help='device proxy timeout (ms)')
"-c", "--config", type=str, required=True, help="configuration file"
parser.add_argument('-p', '--port', type=int, required=False, default=8000, help='HTTP server port to open')
)
 
parser.add_argument(
 
"-t",
 
"--timeout",
 
type=int,
 
required=False,
 
default=250,
 
help="device proxy timeout (ms)",
 
)
 
parser.add_argument(
 
"-p",
 
"--port",
 
type=int,
 
required=False,
 
default=8000,
 
help="HTTP server port to open",
 
)
args = parser.parse_args()
args = parser.parse_args()
config = ArchiverPolicy.load_config(args.config)
config = ArchiverPolicy.load_config(args.config)
db = Database()
db = Database()
try:
try:
station = db.get_device_property("STAT/StationManager/1","Station_Name")["Station_Name"][0]
station = db.get_device_property("STAT/StationManager/1", "Station_Name")[
 
"Station_Name"
 
][0]
except Exception as e:
except Exception as e:
logger.exception("Could not determine station name")
logger.exception("Could not determine station name")
sys.exit(1)
sys.exit(1)
Loading