From 8ec6cbc9860ff33925f60854f1ed5ac738fa2ec4 Mon Sep 17 00:00:00 2001 From: stedif <stefano.difrischia@inaf.it> Date: Tue, 9 Aug 2022 15:48:32 +0200 Subject: [PATCH] L2SS-780: add archiverpolicy integration test --- .../test_tango_prometheus_client.py | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/test_tango_prometheus_client.py b/tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/test_tango_prometheus_client.py index 620c25476..c0de4abd2 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/test_tango_prometheus_client.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/prometheus/test_tango_prometheus_client.py @@ -8,14 +8,45 @@ # See LICENSE.txt for more info. from tangostationcontrol.integration_test.base import BaseIntegrationTestCase +from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy + +from tango import Database import importlib import sys sys.path.append('/opt/lofar/tango/docker-compose/tango-prometheus-exporter/code') tpc = importlib.import_module('tango-prometheus-client') class TestPrometheusClient(BaseIntegrationTestCase): + + CONFIG = tpc.ArchiverPolicy.load_config('/opt/lofar/tango/docker-compose/tango-prometheus-exporter/lofar2-policy.json') def setUp(self): super().setUp() - self.policy = tpc.ArchiverPolicy() - self.assertIsNotNone(self.policy) + db = Database() + station = db.get_property("station","name")["name"][0] + custom_collector = tpc.CustomCollector(self.CONFIG, station) + self.assertIsNotNone(custom_collector) + return custom_collector + + def setup_recv_proxy(self, device_name='stat/recv/1'): + # setup RECV + recv_proxy = TestDeviceProxy(device_name) + recv_proxy.off() + recv_proxy.warm_boot() + recv_proxy.set_defaults() + return recv_proxy + + def test_archiver_policy_attribute_list(self): + """ Test if the full set of archiving policy for the given device is retrieved """ + device_name = 'stat/recv/1' + recv_proxy = self.setup_recv_proxy(device_name) + policy = tpc.ArchiverPolicy(self.CONFIG) + attribute_list = policy.policy.attribute_list(device_name, recv_proxy.get_attribute_list()) + include = policy.config['devices']['stat/recv/1']['include'] # attribute that must be included + for i in include: + if '*' not in i: # exclude wildcard + self.assertIn(i, attribute_list) + exclude = policy.config['devices']['stat/recv/1']['exclude'] # attribute that must be excluded + for e in exclude: + if '*' not in e: # exclude wildcard + self.assertNotIn(e, attribute_list) -- GitLab