Commit 2142426c authored by Jorrit Schaap's avatar Jorrit Schaap

SW-778: only scan disk if no jobs for same group/status haven't been loaded...

SW-778: only scan disk if no jobs for same group/status haven't been loaded into memory yet. Major speedup.
parent 16de5da4
......@@ -160,6 +160,34 @@ class IngestJobManager:
logger.debug('found %d xml files in %s', len(xml_files), dp)
if job_id:
# opening, parsing, and checking each file is very expensive
# if we are looking for a specifix job_id, then try to find quickly based on filename, and leave early when found.
# if not found this way, then just scan all files.
logger.info('quick scan for job_id: %s', job_id)
possible_xml_files_for_job_id = [f for f in xml_files if job_id in f]
logger.debug('possible_xml_files_for_job_id: %s', possible_xml_files_for_job_id)
for path in possible_xml_files_for_job_id:
with open(path) as file:
file_content = file.read()
job = parseJobXml(file_content)
logger.info('job %s \njob_id %s', job, job_id)
if job and job_id == job.get('JobId'):
job_admin_dict = {'path': path,
'job': job,
'job_xml': file_content,
'runs': {},
'created_at': datetime.fromtimestamp(os.lstat(path).st_ctime),
'updated_at': datetime.fromtimestamp(os.lstat(path).st_ctime)}
if job_status is not None:
job_admin_dict['status'] = job_status
# found the specific file for this job_id, nothing more to search for, leave immediately
logger.info('quick scan for job_id returing %s', job_id, job_admin_dict)
return [job_admin_dict]
for path in xml_files:
try:
with open(path) as file:
......@@ -283,26 +311,33 @@ class IngestJobManager:
job_group_id,
jobState2String(job_admin_dict.get('status', JobToDo)))
if check_non_todo_dirs:
# check if this job is already in memory
matching_known_jads = []
for status in [JobScheduled, JobProducing, JobFailed, JobProduced, JobRemoved, JobRetry]:
for status in [JobScheduled, JobProducing, JobFailed, JobProduced, JobRetry]:
matching_known_jads_for_status = self.getJobAdminDicts(job_group_id=job_group_id, status=status)
matching_known_jads_for_status = [jad for jad in matching_known_jads_for_status if jad['job']['JobId'] == job_id]
matching_known_jads_for_status_for_job_id = [jad for jad in matching_known_jads_for_status if jad['job']['JobId'] == job_id]
if matching_known_jads_for_status_for_job_id:
matching_known_jads += matching_known_jads_for_status_for_job_id
else:
logger.info("no jobs for job_id=%s group_id=%s status=%s found in memory.", job_id,
job_group_id,
jobState2String(status))
if not matching_known_jads_for_status:
logger.info("no jobs for group_id=%s status=%s found in memory. Checking disk...", job_group_id, jobState2String(status))
matching_known_jads_for_status = self.getJobAdminDictsFromDisk(job_status=status,
job_type=job_type,
job_group_id=job_group_id,
job_id=job_id)
if matching_known_jads_for_status:
matching_known_jads += matching_known_jads_for_status
logger.info("found %d jobs for group %s for status=%s on disk", len(matching_known_jads), job_group_id, jobState2String(status))
if not matching_known_jads:
#if not in memory, check on disk
for status in [JobScheduled, JobProducing, JobFailed, JobProduced, JobRemoved, JobRetry]:
matching_known_jads_for_status = self.getJobAdminDictsFromDisk(job_status=status,
job_type=job_type,
job_group_id=job_group_id,
job_id=job_id)
if matching_known_jads_for_status:
matching_known_jads += matching_known_jads_for_status
if matching_known_jads_for_status:
matching_known_jads += matching_known_jads_for_status
# remove job from 'done' directories if present (this is a resubmitted job)
for done_jad in matching_known_jads:
......@@ -740,7 +775,7 @@ class IngestJobManager:
def canProduceNextJob(self):
# test if the managed_job_queue is empty enough, and if our administration agrees
try:
if len(self.getJobAdminDicts(status=JobScheduled) > 0:
if len(self.getJobAdminDicts(status=JobScheduled)) > 0:
return False
# HACK: hardcoded queue name
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment