diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index b7540a611a6402c36daa2b8023916711a93eb44c..b14de7a563087b2ff74ac9b3fd25951b91f37ca1 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -78,7 +78,7 @@ class IngestJobManager: exchange=self._tobus.exchange, broker=self._tobus.broker) ingest_service = RPCService(DEFAULT_INGEST_SERVICENAME, IngestServiceMessageHandler, {'job_manager': self}, - exchange=self._tobus.exchange, broker=self._tobus.broker, num_threads=1) + exchange=self._tobus.exchange, broker=self._tobus.broker, num_threads=4) # open exchange connections... with incoming_jobs_listener, ingest_event_listener, ingest_service, self._tobus: @@ -108,7 +108,7 @@ class IngestJobManager: else: logger.info('%s jobs are running', len(producing_jads)) - time.sleep(2) + time.sleep(1) except KeyboardInterrupt: break except Exception as e: @@ -262,7 +262,8 @@ class IngestJobManager: for job_admin_dict in job_admin_dicts: self.addNewJob(job_admin_dict, check_non_todo_dirs=False, add_old_jobs_from_disk=False) - logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id) + if job_admin_dicts: + logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id) logger.info('finished scanning jobs in %s, found %s jobs', self.__jobs_dir, len(self.__job_admin_dicts)) @@ -531,7 +532,7 @@ class IngestJobManager: return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in list(self.__job_admin_dicts.values())]))) def __putStalledJobsBackToToDo(self): - if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(seconds=15): + if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(seconds=60): return logger.debug("checking stalled jobs...") @@ -710,7 +711,7 @@ class IngestJobManager: # test if the managed_job_queue is empty enough, and if our administration agrees try: scheduled_jads = self.getJobAdminDicts(status=JobScheduled) - return len(scheduled_jads) < 1 + return len(scheduled_jads) < 5 except Exception as e: logger.exception('canProduceNextJob: %s', e) if 'No active session' in str(e):