From 9eb58d0036edd9556e76fbfcbdb7abf3ccb4b137 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Mon, 5 Aug 2024 14:00:10 +0200
Subject: [PATCH] Added metrics to LTA ingest service

---
 .../lib/ingestpipeline.py                     | 38 +++++++++++++++----
 .../lib/ingesttransferserver.py               | 13 ++++---
 2 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py
index 49fbc109434..60b5332022a 100755
--- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py
+++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py
@@ -7,7 +7,7 @@ import random
 import socket
 import re
 import getpass
-
+from prometheus_client import Gauge, Counter
 
 from lofar.lta.ingest.common.job import *
 from lofar.lta.ingest.server.sip import validateSIPAgainstSchema, addIngestInfoToSIP
@@ -17,6 +17,7 @@ from lofar.lta.ingest.server.ltaclient import *
 from lofar.lta.ingest.client.rpc import IngestTMSSRPC
 from lofar.common.util import humanreadablesize
 from lofar.common import isProductionEnvironment
+from lofar.common.metrics import metric_track_duration
 from lofar.common.subprocess_utils import communicate_returning_strings
 from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME
 from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX
@@ -25,6 +26,13 @@ from lofar.common.dbcredentials import DBCredentials
 
 logger = logging.getLogger(__name__)
 
+#---------------------- Prometheus Metrics ----------------------------------------
+
+metric_nr_transfers_in_progress = Gauge("ingest_transfers_in_progress", "Count how many transfers are currently in progress", labelnames=["site"])
+metric_transfer_durations = Histogram("ingest_transfer_durations", "How long transfers are taking", labelnames=["site"])
+metric_nr_transfer_exceptions = Counter("ingest_transfer_exceptions", "Number of exceptions raised during transfer", labelnames=["site", "reason"])
+metric_nr_bytes_transferred = Counter("ingest_bytes_ingested", "Number of payload bytes ingested into the LTA (sum of ingested file sizes)", labelnames=["site"])
+
 #---------------------- Custom Exception ----------------------------------------
 
 PipelineJobFailedError      = 1
@@ -87,6 +95,7 @@ class IngestPipeline():
         self.SecondaryUri        = ''
         self.lta_site            = ''
 
+    @metric_track_duration(prefix="ingest_")
     def GetStorageTicket(self):
         do_check_already_in_lta=isProductionEnvironment()
         result = self.ltaClient.GetStorageTicket(self.Project, self.FileName, self.FileSize, self.ArchiveId, self.JobId, self.ObsId, do_check_already_in_lta, self.Type)
@@ -126,6 +135,7 @@ class IngestPipeline():
             elif 'psnc' in self.PrimaryUri:
                 self.lta_site = 'poznan'
 
+    @metric_track_duration(prefix="ingest_")
     def TransferFile(self):
         try:
             logger.info('Starting file transfer for %s ' % self.JobId)
@@ -155,12 +165,14 @@ class IngestPipeline():
                 # old hack, is needed to support dynspec / pulsar archiving scripts
                 self.Location = os.path.join(self.Location, self.job['Source'])
 
-            cp = LtaCp(self.Location,
-                       self.PrimaryUri,
-                       globus_timeout=self.globus_timeout,
-                       progress_callback=progress_callback)
+            with metric_track_duration.labels(site=self.lta_site).time(),
+                 metric_nr_transfers_in_progress.labels(site=self.lta_site).track_inprogress():
+                cp = LtaCp(self.Location,
+                           self.PrimaryUri,
+                           globus_timeout=self.globus_timeout,
+                           progress_callback=progress_callback)
 
-            transfer_result = cp.transfer(force=True)
+                transfer_result = cp.transfer(force=True)
 
             self.status = IngestPipeline.STATUS_FINALIZING
 
@@ -193,8 +205,11 @@ class IngestPipeline():
                                         message='transfer finished',
                                         percentage_done=100.0,
                                         total_bytes_transfered=int(self.FileSize))
-            except ValueError:
+
+                metric_nr_bytes_transferred.inc(self.FileSize)
+            except ValueError, TypeError:
                 pass
+
             elapsed = time.time() - start
 
             try:
@@ -208,14 +223,19 @@ class IngestPipeline():
             if isinstance(exp, LtacpException):
                 if '550 File not found' in exp.value:
                     logger.error('Destination directory does not exist. Creating %s in LTA for %s' % (self.PrimaryUri, self.JobId))
+                    metric_nr_transfer_exceptions.labels(site=self.lta_site,reason="target_directory_not_found").inc()
 
                     if create_missing_directories(self.PrimaryUri) == 0:
                         logger.info('Created path %s in LTA for %s' % (self.PrimaryUri, self.JobId))
                 elif 'source path' in exp.value and 'does not exist' in exp.value:
+                    metric_nr_transfer_exceptions.labels(site=self.lta_site,reason="source_path_not_found").inc()
                     raise PipelineError(exp.value, PipelineNoSourceError)
+                else:
+                    metric_nr_transfer_exceptions.labels(site=self.lta_site,reason="other").inc()
 
             raise Exception('transfer failed for %s: %s' % (self.JobId, str(exp)))
 
+    @metric_track_duration(prefix="ingest_")
     def SendChecksumsToLTA(self):
         result = self.ltaClient.SendChecksums(self.JobId, self.Project, self.ticket, self.FileSize, self.PrimaryUri, self.SecondaryUri, self.MD5Checksum, self.Adler32Checksum)
         if not result.get('error'):
@@ -223,6 +243,7 @@ class IngestPipeline():
             self.PrimaryUri   = result['primary_uri']
             self.SecondaryUri = result.get('secondary_uri')
 
+    @metric_track_duration(prefix="ingest_")
     def SendStatusToLTA(self, lta_state_id):
         if self.ticket:
             self.ltaClient.UpdateUriState(self.JobId, self.Project, self.ticket, self.PrimaryUri, lta_state_id)
@@ -264,6 +285,7 @@ class IngestPipeline():
 
         logger.info('SIP for %s is valid, can proceed with transfer' % (self.JobId,))
 
+    @metric_track_duration(prefix="ingest_")
     def GetSIP(self):
         try:
             if self.Type == "TMSS":
@@ -332,6 +354,7 @@ class IngestPipeline():
             else:
                 raise
 
+    @metric_track_duration(prefix="ingest_")
     def SendSIPToLTA(self):
         try:
             self.ltaClient.SendSIP(self.JobId, self.SIP, self.ticket)
@@ -339,6 +362,7 @@ class IngestPipeline():
             logger.error('SendSIPToLTA exception: %s', e)
             raise PipelineError(str(e), PipelineJobFailedError)
 
+    @metric_track_duration(prefix="ingest_")
     def RollBack(self):
         try:
             logger.info('rolling back file transfer for %s', self.JobId)
diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py
index 7d1ab76729d..b6fac9edbd0 100644
--- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py
+++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py
@@ -34,7 +34,7 @@ from lofar.messaging import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME, BusListener,
 from lofar.messaging import LofarMessage, CommandMessage, EventMessage
 from lofar.common import isProductionEnvironment
 from lofar.common import dbcredentials
-from lofar.common import metrics
+from lofar.common.metrics import start_metrics_server
 from lofar.common.datetimeutils import totalSeconds
 from lofar.common.util import humanreadablesize
 from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, INGEST_NOTIFICATION_PREFIX
@@ -63,10 +63,13 @@ def _getBytesSent():
         logger.warning("Cannot get network interface info: %s", e)
         return 0
 
-metric_bytes_sent = Counter("ingest_bytes_sent", "Number of bytes sent over the network interfaces towards the LTA")
+metric_bytes_sent = Counter("ingest_eth_bytes_sent", "Number of bytes sent over the network interfaces towards the LTA")
 metric_bytes_sent.set_function(lambda: _getBytesSent())
 
 class IngestTransferServer:
+    metric_is_alive = Counter("alive", "Gets incremented periodically while the service is alive")
+    metric_not_enough_resources_reasons = Counter("ingest_not_enough_resources", "Which resources were too tight when a pipeline needed to be started", labels=["resource"])
+
     def __init__(self,
                  exchange = DEFAULT_BUSNAME,
                  mom_credentials = None,
@@ -105,14 +108,12 @@ class IngestTransferServer:
                 pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]
                 return len([pipeline for pipeline in pipelines if pipeline.status == pipeline_status])
 
-        metric_nr_running_pipelines = Gauge("ingest_running_pipelines", "Number of pipelines currently running", labels=["status"])
+        metric_nr_running_pipelines = Gauge("ingest_running_pipelines", "Number of pipelines currently running", labelnames=["status"])
         metric_nr_running_pipelines.labels(status="initializing").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_INITIALIZING))
         metric_nr_running_pipelines.labels(status="transferring").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_TRANSFERRING))
         metric_nr_running_pipelines.labels(status="finalizing").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_FINALIZING))
         metric_nr_running_pipelines.labels(status="finished").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_FINISHED))
 
-        self.metric_not_enough_resources_reasons = Counter("ingest_not_enough_resources", "Which resources were too tight when a pipeline needed to be started", labels=["resource"])
-
     def start_job(self, job_dict):
         if not self.enoughResourcesAvailable():
             raise ResourceWarning("Not enough resources available to start new job: %s" % job_dict)
@@ -308,6 +309,8 @@ class IngestTransferServer:
         with self.event_bus, self.incoming_jobs_listener:
             while self.is_running:
                 try:
+                    metric_is_alive.inc()
+
                     self.__clearFinishedJobs()
 
                     with self.__lock:
-- 
GitLab