From 51e96e720d97132712f5a5246b4de8f2e4d6ffa1 Mon Sep 17 00:00:00 2001
From: Jorrit Schaap <schaap@astron.nl>
Date: Fri, 31 Mar 2017 13:46:30 +0000
Subject: [PATCH] Task #10339: speed improvements. changes mail subject for
 removed jobs

---
 .../lib/ingestjobmanagementserver.py          | 117 +++++++++---------
 1 file changed, 61 insertions(+), 56 deletions(-)

diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py
index 813875f98f5..4ff7f711ae7 100644
--- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py
+++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py
@@ -103,6 +103,7 @@ class IngestJobManager:
         self.__jobs_for_transfer_queue_peeker = FromBus(jobs_for_transfer_queue_name, broker=broker)
 
         self.__running_jobs_log_timestamp = datetime.utcnow()
+        self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()
 
     def quit(self):
         self.__running = False
@@ -123,12 +124,12 @@ class IngestJobManager:
             logger.info('starting to produce jobs')
             while self.__running:
                 try:
-                    #produce next job
+                    #produce next jobs
                     self.produceNextJobsIfPossible()
 
                     #receive any jobs from mom/user_ingest/eor/etc
                     receive_start = datetime.utcnow()
-                    msg = self.__incoming_job_queue.receive(timeout=0.1)
+                    msg = self.__incoming_job_queue.receive(timeout=0.01)
                     while msg:
                         logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg)
                         self.__incoming_job_queue.ack(msg)
@@ -152,26 +153,25 @@ class IngestJobManager:
                             break
 
                         #see if there are any more jobs to receive and process them, else jump out of while loop
-                        msg = self.__incoming_job_queue.receive(timeout=0.1)
+                        msg = self.__incoming_job_queue.receive(timeout=0.01)
 
                     # check if producing jobs are actually making progress
                     # maybe the ingest transfer server was shut down in the middle of a transer?
                     # the ingest transfer server is stateless, so it won't restart that job itself (by design)
                     # when transfering at very regular intervals progress updates are given
                     # so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress)
-                    self.putStalledJobsBackToToDo()
+                    self.__putStalledJobsBackToToDo()
 
 
                     #report on running jobs
                     if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10):
                         self.__running_jobs_log_timestamp = datetime.utcnow()
-                        with self.__lock:
-                            producing_jads = self.getJobAdminDicts(status=JobProducing)
-                            if producing_jads:
-                                if len(producing_jads) == 1:
-                                    logger.info('1 job is running')
-                                else:
-                                    logger.info('%s jobs are running', len(producing_jads))
+                        producing_jads = self.getJobAdminDicts(status=JobProducing)
+                        if producing_jads:
+                            if len(producing_jads) == 1:
+                                logger.info('1 job is running')
+                            else:
+                                logger.info('%s jobs are running', len(producing_jads))
 
                 except KeyboardInterrupt:
                     break
@@ -528,7 +528,10 @@ class IngestJobManager:
         with self.__lock:
             return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in self.__job_admin_dicts.values()])))
 
-    def putStalledJobsBackToToDo(self):
+    def __putStalledJobsBackToToDo(self):
+        if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(minutes=1):
+            return
+
         with self.__lock:
             now = datetime.utcnow()
             threshold = timedelta(minutes=15)
@@ -540,6 +543,9 @@ class IngestJobManager:
                 logger.info('putting job %s back to ToDo because it did not make any progress during the last 15min', jad['job']['JobId'])
                 self.updateJobStatus(jad['job']['JobId'], JobToDo)
 
+            self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()
+
+
     def getNextJobToRun(self):
         '''get the next job to run.
         examine all 'to_do' and 'retry' jobs
@@ -655,7 +661,7 @@ class IngestJobManager:
         # test if the managed_job_queue is empty
         try:
             with self.__jobs_for_transfer_queue_peeker:
-                num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.1)
+                num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.01)
                 return (num_scheduled == 0)
         except Exception as e:
             logger.error('canProduceNextJob: %s', e)
@@ -663,28 +669,26 @@ class IngestJobManager:
 
     def produceNextJobsIfPossible(self):
         start_producing_timestamp = datetime.utcnow()
-        with self.__lock:
-            while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5):
-                job_admin_dict = self.getNextJobToRun()
-                if job_admin_dict:
-                    if os.path.exists(job_admin_dict.get('path')):
-                        msg = CommandMessage(content=job_admin_dict.get('job_xml'))
-                        msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
-                        self.__jobs_for_transfer_queue.send(msg)
-                        logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker)
-                        self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
-                    else:
-                        job_id = job_admin_dict['job']['JobId']
-                        logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
-                        with self.__lock:
-                            del self.__job_admin_dicts[job_id]
-
-                    #do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
-                    #so the queue is empty again
-                    #and we can submit yet another job
-                    time.sleep(0.1)
+        while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5):
+            job_admin_dict = self.getNextJobToRun()
+            if job_admin_dict:
+                if os.path.exists(job_admin_dict.get('path')):
+                    msg = CommandMessage(content=job_admin_dict.get('job_xml'))
+                    msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
+                    self.__jobs_for_transfer_queue.send(msg)
+                    logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker)
+                    self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
                 else:
-                    return
+                    job_id = job_admin_dict['job']['JobId']
+                    logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
+                    del self.__job_admin_dicts[job_id]
+
+                #do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
+                #so the queue is empty again
+                #and we can submit yet another job
+                time.sleep(0.1)
+            else:
+                return
 
     def onJobStarted(self, job_notification_dict):
         self.__notification_listener._logJobNotification('started ', job_notification_dict);
@@ -732,34 +736,27 @@ class IngestJobManager:
         return []
 
     def getDoneJobAdminDicts(self, job_group_id=None):
-        with self.__lock:
-            return (self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed) +
-                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced) +
-                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved))
+        return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobFailed, JobProduced, JobRemoved])
 
     def getNotDoneJobAdminDicts(self, job_group_id=None):
-        with self.__lock:
-            return (self.getJobAdminDicts(job_group_id=job_group_id, status=JobToDo) +
-                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobScheduled) +
-                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobProducing) +
-                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobRetry))
+        return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobToDo, JobScheduled, JobRetry])
 
     def getJobAdminDicts(self, job_group_id=None, status=None):
         with self.__lock:
-            if job_group_id != None and status != None:
-                return [jad for jad in self.__job_admin_dicts.values()
-                        if jad['status'] == status
-                        and str(jad['job'].get('job_group_id')) == str(job_group_id)]
-
-            if status != None:
-                return [jad for jad in self.__job_admin_dicts.values()
-                        if jad['status'] == status]
+            jads = [jad for jad in self.__job_admin_dicts.values()]
 
             if job_group_id != None:
-                return [jad for jad in self.__job_admin_dicts.values()
-                        if str(jad['job'].get('job_group_id')) == str(job_group_id)]
+                job_group_id = str(job_group_id)
+                jads = [jad for jad in jads if str(jad['job'].get('job_group_id')) == job_group_id]
+
+            if status != None:
+                if isinstance(status, int):
+                    jads = [jad for jad in jads if jad['status'] == status]
+                else:
+                    statuses = set(status)
+                    jads = [jad for jad in jads if jad['status'] in statuses]
 
-            return [jad for jad in self.__job_admin_dicts.values()]
+            return jads
 
     def getStatusReportDict(self):
         with self.__lock:
@@ -1017,8 +1014,16 @@ Total Files: %(total)i
             #make it a csv list of addresses
             mailing_list = ','.join(mailing_list)
 
-            subject = "Ingest Export job %s finished %s"% (job_group_id,
-                                                           'successfully' if success else 'with errors')
+            subject = "Ingest Export job %s " % (job_group_id,)
+            if success:
+                subject += 'finished successfully'
+            elif removed_group_jads :
+                if len(removed_group_jads) == len(done_group_jads):
+                    subject += 'was removed completely before transfer'
+                else:
+                    subject += 'was removed partially before transfer'
+            else:
+                subject += 'finished with errors'
 
             if os.system('echo "%s"|mailx -s "%s" %s' % (report, subject, mailing_list)) == 0:
                 logger.info('sent notification email for export job %s to %s', job_group_id, mailing_list)
-- 
GitLab