import time import argparse from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily from prometheus_client import start_http_server from tango import Database, DeviceProxy, CmdArgType as ArgType, AttrDataFormat, DevState, DevFailed import logging import json import fnmatch from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter logger = logging.getLogger() logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) # log to memory only, with a limit of 600 seconds. this container is not important enough to keep # all logs around for, and as a scraper must be robust against overloading on all sides handler = AsynchronousLogstashHandler("elk", 5959, database_path=None, event_ttl=600) handler.setLevel(logging.INFO) logger.addHandler(handler) """ Functions to parse and apply policy files. """ class ArchiverPolicy(object): EMPTY_POLICY = { # default policy "default": { }, # device-specific policies "devices": { } } @staticmethod def load_config(resource: str) -> dict: with open(resource) as fd: return json.load(fd) def __init__(self, config: dict = None): self.config = config or self.EMPTY_POLICY def device_list(self) -> list: """ Retrieve the device list from TangoDB """ device_list = [] db = Database() server_list = db.get_server_list() # e.g. SDP/STAT, RECV/STAT for server in server_list: # https://pytango.readthedocs.io/en/stable/database.html#tango.Database.get_device_class_list class_list = db.get_device_class_list(server) for cls in class_list[::2]: if "dserver" in cls: continue device_list.append(cls.lower()) return device_list def devices(self) -> list: """ Filter the device list from TangoDB following the lofar2-policy file """ # Devices list from TangoDB db_devices = self.device_list() # Devices listed in policy file config_devices = list(k.lower() for k in self.config["devices"].keys()) # Match device names fetched from DB against device names in policy file devices = [] for config_dev in config_devices: for db_dev in db_devices: if fnmatch.fnmatch(db_dev, config_dev): devices.append(db_dev) return devices def attribute_list(self, device_name: str, attribute_list: list) -> dict: """ Return the full set of archiving policy for the given device. """ if device_name not in self.devices(): return {} attributes = set() for config_set in [self.config["default"], self.config["devices"][device_name]]: # include all specified attributes by pattern, for include in config_set.get("include", []): for attr in attribute_list: if fnmatch.fnmatch(attr, include): attributes.add(attr) # then, remove any explicitly excluded attributes for exclude in config_set.get("exclude", []): for attr in attribute_list: if fnmatch.fnmatch(attr, exclude) and attr in attributes: attributes.remove(attr) return sorted(list(attributes)) class CustomCollector(object): def __init__(self, config, station, proxy_timeout=250): self.station = station self.policy = ArchiverPolicy(config) self.proxy_timeout = proxy_timeout def _to_metric(self, dev, attr_info, x, y, idx, value): """ Convert the given values to a (labels, value) pair, used to construct a Metric. """ if attr_info.data_type in [ArgType.DevShort, ArgType.DevLong, ArgType.DevUShort, ArgType.DevULong, ArgType.DevLong64, ArgType.DevULong64, ArgType.DevInt, ArgType.DevFloat, ArgType.DevDouble]: data_type = 'float' str_value = '' float_value = float(value) elif attr_info.data_type == ArgType.DevBoolean: data_type = 'bool' str_value = '' float_value = int(value) elif attr_info.data_type == ArgType.DevString: data_type = 'string' str_value = str(value) float_value = len(str_value) elif attr_info.data_type == ArgType.DevEnum: attr_config = dev.get_attribute_config(attr_info.name) data_type = 'enum' str_value = str(attr_config.enum_labels[value]) float_value = int(value) elif attr_info.data_type == ArgType.DevState: data_type = 'state' str_value = '' float_value = int(value) else: return None # (labels, value) return ([self.station, dev.dev_name(), attr_info.name, str_value, data_type, f"{x:02}", f"{y:02}", f"{idx:03}"], float_value) def metrics_scalar(self, dev, attr_info, attr_value): """ Return all metrics for a given SCALAR attribute. """ new_metric = self._to_metric(dev, attr_info, 0, 0, 0, attr_value.value) return [new_metric] if new_metric else [] def metrics_spectrum(self, dev, attr_info, attr_value): """ Return all metrics for a given SPECTRUM attribute. """ metrics = [] for x in range(int(attr_value.dim_x)): new_metric = self._to_metric(dev, attr_info, x, 0, x, attr_value.value[x]) metrics.append(new_metric) if new_metric else None return metrics def metrics_image(self, dev, attr_info, attr_value): """ Return all metrics for a given IMAGE attribute. """ metrics = [] for y in range(int(attr_value.dim_y)): for x in range(int(attr_value.dim_x)): """ NOTE: We switch x and y in the annotation, to allow queries to combine 1D and 2D arrays in their first dimension using the same label (x). We effectively expose the array as [x][y] instead of [y][x]. """ new_metric = self._to_metric(dev, attr_info, y, x, y * attr_value.dim_x + x, attr_value.value[y][x]) metrics.append(new_metric) if new_metric else None return metrics def metrics(self, dev, attr_info, attr_value): """ Return all metrics for a given attribute. """ if attr_info.data_format == AttrDataFormat.SCALAR: return self.metrics_scalar(dev, attr_info, attr_value) elif attr_info.data_format == AttrDataFormat.SPECTRUM: return self.metrics_spectrum(dev, attr_info, attr_value) elif attr_info.data_format == AttrDataFormat.IMAGE: return self.metrics_image(dev, attr_info, attr_value) else: return [] def device_metrics(self, device_name): """ Return all metrics for a given device, as configured. """ dev = DeviceProxy(device_name) dev.set_timeout_millis(self.proxy_timeout) # obtain extended info about all attributes attr_infos = {attr_info.name: attr_info for attr_info in dev.attribute_list_query()} if dev.state() not in [DevState.STANDBY, DevState.ON, DevState.ALARM, DevState.DISABLE]: logger.error(f"Error processing device {device_name}: it is in state {dev.state()}") # at least log state & status attrs_to_scrape = ["State", "Status"] else: # obtain list of attributes to scrape attrs_to_scrape = self.policy.attribute_list(device_name, attr_infos.keys()) logger.info(f"Processing device {device_name} attributes {attrs_to_scrape}") # scrape each attribute metrics = [] for attr_name in attrs_to_scrape: try: attr_value = dev.read_attribute(attr_name) metrics.extend(self.metrics(dev, attr_infos[attr_name], attr_value)) except DevFailed as e: reason = e.args[0].desc.replace("\n", " ") logger.error(f"Error processing device {device_name} attribute {attr_name}: {reason}") except Exception as e: logger.exception(f"Error processing device {device_name} attribute {attr_name}") return metrics def collect(self): """ Yield all scraped metrics from all devices, as configured. """ logger.info("Start scraping") scrape_begin = time.time() attribute_metrics = GaugeMetricFamily("device_attribute", 'Device attribute value', labels=['station', 'device', 'name', 'str_value', 'type', 'x', 'y', 'idx']) scraping_metrics = GaugeMetricFamily("device_scraping", 'Device scraping duration', labels=['station', 'device']) for device_name in self.policy.devices(): logger.debug(f"Processing device {device_name}") dev_scrape_begin = time.time() try: metrics = self.device_metrics(device_name) for metric in metrics: attribute_metrics.add_metric(*metric) except DevFailed as e: reason = e.args[0].desc.replace("\n", " ") logger.error(f"Error processing device {device_name}: {reason}") except Exception as e: logger.exception(f"Error processing device {device_name}") finally: dev_scrape_end = time.time() logger.info(f"Done processing device {device_name}. Took {dev_scrape_end - dev_scrape_begin} seconds.") scraping_metrics.add_metric([self.station, device_name], dev_scrape_end - dev_scrape_begin) scrape_end = time.time() logger.info(f"Done scraping. Took {scrape_end - scrape_begin} seconds.") scraping_metrics.add_metric([self.station, "total"], scrape_end - scrape_begin) yield attribute_metrics yield scraping_metrics if __name__ == '__main__': import sys parser = argparse.ArgumentParser() parser.add_argument('-c', '--config', type=str, required=True, help='configuration file') parser.add_argument('-t', '--timeout', type=int, required=False, default=250, help='device proxy timeout (ms)') parser.add_argument('-p', '--port', type=int, required=False, default=8000, help='HTTP server port to open') args = parser.parse_args() config = ArchiverPolicy.load_config(args.config) db = Database() try: station = db.get_property("station","name")["name"][0] except Exception as e: logger.exception("Could not determine station name") sys.exit(1) collector = CustomCollector(config, station=station, proxy_timeout=args.timeout) logger.info("Starting server") start_http_server(args.port) logger.info("Registering collector") REGISTRY.register(collector) logger.info("Idling") while True: time.sleep(1)