From de0d1cb430e1764c319407bc3a11dbdae5de48e8 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Mon, 5 Aug 2024 13:14:53 +0200
Subject: [PATCH] Add monitoring to LTA ingest transfer service

---
 .../lib/ingesttransferserver.py               | 43 +++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py
index 275bc8d13d7..7d1ab76729d 100644
--- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py
+++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py
@@ -34,6 +34,7 @@ from lofar.messaging import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME, BusListener,
 from lofar.messaging import LofarMessage, CommandMessage, EventMessage
 from lofar.common import isProductionEnvironment
 from lofar.common import dbcredentials
+from lofar.common import metrics
 from lofar.common.datetimeutils import totalSeconds
 from lofar.common.util import humanreadablesize
 from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, INGEST_NOTIFICATION_PREFIX
@@ -45,6 +46,7 @@ from lofar.lta.ingest.client.rpc import IngestRPC
 from lofar.lta.ingest.server.ltaclient import *
 import psutil
 from lofar.common.util import waitForInterrupt
+from prometheus_client import Gauge, Counter
 
 logger = logging.getLogger(__name__)
 
@@ -61,6 +63,9 @@ def _getBytesSent():
         logger.warning("Cannot get network interface info: %s", e)
         return 0
 
+metric_bytes_sent = Counter("ingest_bytes_sent", "Number of bytes sent over the network interfaces towards the LTA")
+metric_bytes_sent.set_function(lambda: _getBytesSent())
+
 class IngestTransferServer:
     def __init__(self,
                  exchange = DEFAULT_BUSNAME,
@@ -91,6 +96,23 @@ class IngestTransferServer:
         self.__prev_used_bandwidth = 0.0
         self.__running_jobs_log_timestamp = datetime.utcnow()
 
+        # register Prometheus metrics
+        metric_nr_running_jobs = Gauge("ingest_running_jobs", "Number of jobs currently running")
+        metric_nr_running_jobs.set_function(lambda: len(self.__running_jobs))
+
+        def nr_pipelines_with_status(pipeline_status: int):
+            with self.__lock:
+                pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]
+                return len([pipeline for pipeline in pipelines if pipeline.status == pipeline_status])
+
+        metric_nr_running_pipelines = Gauge("ingest_running_pipelines", "Number of pipelines currently running", labels=["status"])
+        metric_nr_running_pipelines.labels(status="initializing").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_INITIALIZING))
+        metric_nr_running_pipelines.labels(status="transferring").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_TRANSFERRING))
+        metric_nr_running_pipelines.labels(status="finalizing").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_FINALIZING))
+        metric_nr_running_pipelines.labels(status="finished").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_FINISHED))
+
+        self.metric_not_enough_resources_reasons = Counter("ingest_not_enough_resources", "Which resources were too tight when a pipeline needed to be started", labels=["resource"])
+
     def start_job(self, job_dict):
         if not self.enoughResourcesAvailable():
             raise ResourceWarning("Not enough resources available to start new job: %s" % job_dict)
@@ -160,6 +182,7 @@ class IngestTransferServer:
                     logger.warning('resources: not enough bandwith available to start new jobs, using %s, max %s' %
                                          (humanreadablesize(used_bandwidth, 'bps'),
                                           humanreadablesize(MAX_USED_BANDWITH_TO_START_NEW_JOBS, 'bps')))
+                    self.metric_not_enough_resources_reasons.labels(resource="bandwidth").inc()
                     return False
             else:
                 # wrapped around 0, just store for next iteration, do not compute anything
@@ -172,6 +195,7 @@ class IngestTransferServer:
             if idle_cpu_percentage < 5:
                 logger.warning('resources: not enough cpu power available to start new jobs, cpu_idle %s%%' %
                                      idle_cpu_percentage)
+                self.metric_not_enough_resources_reasons.labels(resource="cpu").inc()
                 return False
 
             # only start new jobs if system load is not too high
@@ -183,6 +207,7 @@ class IngestTransferServer:
                 logger.warning('resources: system load too high (%s > %s), cannot start new jobs' %
                                      (short_load_avg,
                                       allowed_load))
+                self.metric_not_enough_resources_reasons.labels(resource="load").inc()
                 return False
 
             # only allow 1 job at the time if swapping
@@ -190,6 +215,7 @@ class IngestTransferServer:
             logger.debug("resources: current swap_memory_percentage = %s%%", swap_memory_percentage)
             if swap_memory_percentage > 5 and len(self.__running_jobs) > 0:
                 logger.warning('resources: system swapping. not enough memory available to start new jobs')
+                self.metric_not_enough_resources_reasons.labels(resource="memory").inc()
                 return False
 
             # only start new jobs if number of processes is not too high
@@ -206,6 +232,7 @@ class IngestTransferServer:
                                         (current_user,
                                         current_num_user_procs,
                                         allowed_num_user_procs))
+                    self.metric_not_enough_resources_reasons.labels(resource="process_count").inc()
                     return False
             except Exception as e:
                 logger.exception(e)
@@ -231,11 +258,13 @@ class IngestTransferServer:
                                          len(initializing_pipelines) + len(starting_threads),
                                          len(transferring_pipelines),
                                          self.max_nr_of_parallel_jobs))
+                    self.metric_not_enough_resources_reasons.labels(resource="parallel_jobs").inc()
                     return False
 
                 if num_finalizing_transfers >= 2 * self.max_nr_of_parallel_jobs:
                     logger.warning('resources: already waiting for %d jobs to finish (updating status/SIP to MoM and LTA). not starting new jobs until some jobs finished...' %
                                         (len(finalizing_pipelines),))
+                    self.metric_not_enough_resources_reasons.labels(resource="parallel_jobs").inc()
                     return False
 
         except Exception as e:
@@ -247,6 +276,7 @@ class IngestTransferServer:
                 return True
             else:
                 logger.warning("already running %d jobs, assuming for safety we cannot run more jobs...", num_running_jobs)
+                self.metric_not_enough_resources_reasons.labels(resource="error_fallback").inc()
                 return False
 
         return True
@@ -368,6 +398,16 @@ def main():
                       help = "Name of credentials for MoM user/pass (see ~/.lofar/dbcredentials) [default=%default]")
     parser.add_option('-V', '--verbose', dest = 'verbose', action = 'store_true', help = 'verbose logging')
 
+    group = OptionGroup(parser, "Monitoring options")
+    parser.add_option_group(group)
+    group.add_option(
+        "--prometheus_port",
+        dest="prometheus_port",
+        type="int",
+        default=8000,
+        help="Port to open for Prometheus end point, default: %default",
+    )
+
     (options, args) = parser.parse_args()
 
     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
@@ -385,6 +425,9 @@ def main():
     ltacreds = dbcredentials.DBCredentials().get(options.lta_credentials)
     momcreds = dbcredentials.DBCredentials().get(options.mom_credentials)
 
+    # initialise prometheus end point
+    start_metrics_server(options.prometheus_port)
+
     with IngestTransferServer(exchange = options.exchange, broker = options.broker,
                               mom_credentials = momcreds, lta_credentials = ltacreds,
                               max_nr_of_parallel_jobs = options.max_nr_of_parallel_jobs):
-- 
GitLab