Skip to content
Snippets Groups Projects
Commit 51e96e72 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #10339: speed improvements. changes mail subject for removed jobs

parent f4ac69db
No related branches found
No related tags found
No related merge requests found
...@@ -103,6 +103,7 @@ class IngestJobManager: ...@@ -103,6 +103,7 @@ class IngestJobManager:
self.__jobs_for_transfer_queue_peeker = FromBus(jobs_for_transfer_queue_name, broker=broker) self.__jobs_for_transfer_queue_peeker = FromBus(jobs_for_transfer_queue_name, broker=broker)
self.__running_jobs_log_timestamp = datetime.utcnow() self.__running_jobs_log_timestamp = datetime.utcnow()
self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()
def quit(self): def quit(self):
self.__running = False self.__running = False
...@@ -123,12 +124,12 @@ class IngestJobManager: ...@@ -123,12 +124,12 @@ class IngestJobManager:
logger.info('starting to produce jobs') logger.info('starting to produce jobs')
while self.__running: while self.__running:
try: try:
#produce next job #produce next jobs
self.produceNextJobsIfPossible() self.produceNextJobsIfPossible()
#receive any jobs from mom/user_ingest/eor/etc #receive any jobs from mom/user_ingest/eor/etc
receive_start = datetime.utcnow() receive_start = datetime.utcnow()
msg = self.__incoming_job_queue.receive(timeout=0.1) msg = self.__incoming_job_queue.receive(timeout=0.01)
while msg: while msg:
logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg) logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg)
self.__incoming_job_queue.ack(msg) self.__incoming_job_queue.ack(msg)
...@@ -152,26 +153,25 @@ class IngestJobManager: ...@@ -152,26 +153,25 @@ class IngestJobManager:
break break
#see if there are any more jobs to receive and process them, else jump out of while loop #see if there are any more jobs to receive and process them, else jump out of while loop
msg = self.__incoming_job_queue.receive(timeout=0.1) msg = self.__incoming_job_queue.receive(timeout=0.01)
# check if producing jobs are actually making progress # check if producing jobs are actually making progress
# maybe the ingest transfer server was shut down in the middle of a transer? # maybe the ingest transfer server was shut down in the middle of a transer?
# the ingest transfer server is stateless, so it won't restart that job itself (by design) # the ingest transfer server is stateless, so it won't restart that job itself (by design)
# when transfering at very regular intervals progress updates are given # when transfering at very regular intervals progress updates are given
# so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress) # so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress)
self.putStalledJobsBackToToDo() self.__putStalledJobsBackToToDo()
#report on running jobs #report on running jobs
if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10): if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10):
self.__running_jobs_log_timestamp = datetime.utcnow() self.__running_jobs_log_timestamp = datetime.utcnow()
with self.__lock: producing_jads = self.getJobAdminDicts(status=JobProducing)
producing_jads = self.getJobAdminDicts(status=JobProducing) if producing_jads:
if producing_jads: if len(producing_jads) == 1:
if len(producing_jads) == 1: logger.info('1 job is running')
logger.info('1 job is running') else:
else: logger.info('%s jobs are running', len(producing_jads))
logger.info('%s jobs are running', len(producing_jads))
except KeyboardInterrupt: except KeyboardInterrupt:
break break
...@@ -528,7 +528,10 @@ class IngestJobManager: ...@@ -528,7 +528,10 @@ class IngestJobManager:
with self.__lock: with self.__lock:
return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in self.__job_admin_dicts.values()]))) return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in self.__job_admin_dicts.values()])))
def putStalledJobsBackToToDo(self): def __putStalledJobsBackToToDo(self):
if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(minutes=1):
return
with self.__lock: with self.__lock:
now = datetime.utcnow() now = datetime.utcnow()
threshold = timedelta(minutes=15) threshold = timedelta(minutes=15)
...@@ -540,6 +543,9 @@ class IngestJobManager: ...@@ -540,6 +543,9 @@ class IngestJobManager:
logger.info('putting job %s back to ToDo because it did not make any progress during the last 15min', jad['job']['JobId']) logger.info('putting job %s back to ToDo because it did not make any progress during the last 15min', jad['job']['JobId'])
self.updateJobStatus(jad['job']['JobId'], JobToDo) self.updateJobStatus(jad['job']['JobId'], JobToDo)
self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()
def getNextJobToRun(self): def getNextJobToRun(self):
'''get the next job to run. '''get the next job to run.
examine all 'to_do' and 'retry' jobs examine all 'to_do' and 'retry' jobs
...@@ -655,7 +661,7 @@ class IngestJobManager: ...@@ -655,7 +661,7 @@ class IngestJobManager:
# test if the managed_job_queue is empty # test if the managed_job_queue is empty
try: try:
with self.__jobs_for_transfer_queue_peeker: with self.__jobs_for_transfer_queue_peeker:
num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.1) num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.01)
return (num_scheduled == 0) return (num_scheduled == 0)
except Exception as e: except Exception as e:
logger.error('canProduceNextJob: %s', e) logger.error('canProduceNextJob: %s', e)
...@@ -663,28 +669,26 @@ class IngestJobManager: ...@@ -663,28 +669,26 @@ class IngestJobManager:
def produceNextJobsIfPossible(self): def produceNextJobsIfPossible(self):
start_producing_timestamp = datetime.utcnow() start_producing_timestamp = datetime.utcnow()
with self.__lock: while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5):
while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5): job_admin_dict = self.getNextJobToRun()
job_admin_dict = self.getNextJobToRun() if job_admin_dict:
if job_admin_dict: if os.path.exists(job_admin_dict.get('path')):
if os.path.exists(job_admin_dict.get('path')): msg = CommandMessage(content=job_admin_dict.get('job_xml'))
msg = CommandMessage(content=job_admin_dict.get('job_xml')) msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY) self.__jobs_for_transfer_queue.send(msg)
self.__jobs_for_transfer_queue.send(msg) logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker)
logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker) self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
else:
job_id = job_admin_dict['job']['JobId']
logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
with self.__lock:
del self.__job_admin_dicts[job_id]
#do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
#so the queue is empty again
#and we can submit yet another job
time.sleep(0.1)
else: else:
return job_id = job_admin_dict['job']['JobId']
logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
del self.__job_admin_dicts[job_id]
#do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
#so the queue is empty again
#and we can submit yet another job
time.sleep(0.1)
else:
return
def onJobStarted(self, job_notification_dict): def onJobStarted(self, job_notification_dict):
self.__notification_listener._logJobNotification('started ', job_notification_dict); self.__notification_listener._logJobNotification('started ', job_notification_dict);
...@@ -732,34 +736,27 @@ class IngestJobManager: ...@@ -732,34 +736,27 @@ class IngestJobManager:
return [] return []
def getDoneJobAdminDicts(self, job_group_id=None): def getDoneJobAdminDicts(self, job_group_id=None):
with self.__lock: return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobFailed, JobProduced, JobRemoved])
return (self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed) +
self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced) +
self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved))
def getNotDoneJobAdminDicts(self, job_group_id=None): def getNotDoneJobAdminDicts(self, job_group_id=None):
with self.__lock: return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobToDo, JobScheduled, JobRetry])
return (self.getJobAdminDicts(job_group_id=job_group_id, status=JobToDo) +
self.getJobAdminDicts(job_group_id=job_group_id, status=JobScheduled) +
self.getJobAdminDicts(job_group_id=job_group_id, status=JobProducing) +
self.getJobAdminDicts(job_group_id=job_group_id, status=JobRetry))
def getJobAdminDicts(self, job_group_id=None, status=None): def getJobAdminDicts(self, job_group_id=None, status=None):
with self.__lock: with self.__lock:
if job_group_id != None and status != None: jads = [jad for jad in self.__job_admin_dicts.values()]
return [jad for jad in self.__job_admin_dicts.values()
if jad['status'] == status
and str(jad['job'].get('job_group_id')) == str(job_group_id)]
if status != None:
return [jad for jad in self.__job_admin_dicts.values()
if jad['status'] == status]
if job_group_id != None: if job_group_id != None:
return [jad for jad in self.__job_admin_dicts.values() job_group_id = str(job_group_id)
if str(jad['job'].get('job_group_id')) == str(job_group_id)] jads = [jad for jad in jads if str(jad['job'].get('job_group_id')) == job_group_id]
if status != None:
if isinstance(status, int):
jads = [jad for jad in jads if jad['status'] == status]
else:
statuses = set(status)
jads = [jad for jad in jads if jad['status'] in statuses]
return [jad for jad in self.__job_admin_dicts.values()] return jads
def getStatusReportDict(self): def getStatusReportDict(self):
with self.__lock: with self.__lock:
...@@ -1017,8 +1014,16 @@ Total Files: %(total)i ...@@ -1017,8 +1014,16 @@ Total Files: %(total)i
#make it a csv list of addresses #make it a csv list of addresses
mailing_list = ','.join(mailing_list) mailing_list = ','.join(mailing_list)
subject = "Ingest Export job %s finished %s"% (job_group_id, subject = "Ingest Export job %s " % (job_group_id,)
'successfully' if success else 'with errors') if success:
subject += 'finished successfully'
elif removed_group_jads :
if len(removed_group_jads) == len(done_group_jads):
subject += 'was removed completely before transfer'
else:
subject += 'was removed partially before transfer'
else:
subject += 'finished with errors'
if os.system('echo "%s"|mailx -s "%s" %s' % (report, subject, mailing_list)) == 0: if os.system('echo "%s"|mailx -s "%s" %s' % (report, subject, mailing_list)) == 0:
logger.info('sent notification email for export job %s to %s', job_group_id, mailing_list) logger.info('sent notification email for export job %s to %s', job_group_id, mailing_list)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment