diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 5a123de2a2afd9aef6e45672fe8e2605dc6fbead..64adacc22bb063429542298fd128efb55c7b8d1d 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -634,6 +634,12 @@ class IngestJobManager: with self.__lock: def getNextJobByStatus(status, min_age=None, exclude_job_group_ids=[]): + # TODO: replace jad_compare_func with fully implemented jad_sort_value_func + # needed for python2 -> python3 sorting changes + # Right now, we just sort on prio + def jad_sort_value_func(jad): + return jad['job'].get('priority', DEFAULT_JOB_PRIORITY) + def jad_compare_func(jad_a, jad_b): # sort on priority first if jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY) != jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY): @@ -676,7 +682,7 @@ class IngestJobManager: # filter out jad's from exclude_job_group_ids job_admin_dicts = [jad for jad in job_admin_dicts if 'job_group_id' not in jad['job'] or jad['job']['job_group_id'] not in exclude_job_group_ids] - job_admin_dicts = sorted(job_admin_dicts, key=cmp_to_key(jad_compare_func)) + job_admin_dicts = sorted(job_admin_dicts, key=jad_sort_value_func, reverse=True) if job_admin_dicts: logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status)) return job_admin_dicts[0] @@ -757,12 +763,8 @@ class IngestJobManager: def canProduceNextJob(self): # test if the managed_job_queue is empty enough, and if our administration agrees try: - with self.__jobs_for_transfer_queue_peeker: - num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.01) - if num_scheduled <= 1: - scheduled_jads = self.getJobAdminDicts(status=JobScheduled) - return len(scheduled_jads) <= 1 - return False + scheduled_jads = self.getJobAdminDicts(status=JobScheduled) + return len(scheduled_jads) < 1 except Exception as e: logger.exception('canProduceNextJob: %s', e) if 'No active session' in str(e):