Skip to content
Snippets Groups Projects
Commit feac6666 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: bit more responsive

parent c930acae
No related branches found
No related tags found
No related merge requests found
......@@ -78,7 +78,7 @@ class IngestJobManager:
exchange=self._tobus.exchange, broker=self._tobus.broker)
ingest_service = RPCService(DEFAULT_INGEST_SERVICENAME, IngestServiceMessageHandler, {'job_manager': self},
exchange=self._tobus.exchange, broker=self._tobus.broker, num_threads=1)
exchange=self._tobus.exchange, broker=self._tobus.broker, num_threads=4)
# open exchange connections...
with incoming_jobs_listener, ingest_event_listener, ingest_service, self._tobus:
......@@ -108,7 +108,7 @@ class IngestJobManager:
else:
logger.info('%s jobs are running', len(producing_jads))
time.sleep(2)
time.sleep(1)
except KeyboardInterrupt:
break
except Exception as e:
......@@ -262,7 +262,8 @@ class IngestJobManager:
for job_admin_dict in job_admin_dicts:
self.addNewJob(job_admin_dict, check_non_todo_dirs=False, add_old_jobs_from_disk=False)
logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id)
if job_admin_dicts:
logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id)
logger.info('finished scanning jobs in %s, found %s jobs', self.__jobs_dir, len(self.__job_admin_dicts))
......@@ -531,7 +532,7 @@ class IngestJobManager:
return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in list(self.__job_admin_dicts.values())])))
def __putStalledJobsBackToToDo(self):
if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(seconds=15):
if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(seconds=60):
return
logger.debug("checking stalled jobs...")
......@@ -710,7 +711,7 @@ class IngestJobManager:
# test if the managed_job_queue is empty enough, and if our administration agrees
try:
scheduled_jads = self.getJobAdminDicts(status=JobScheduled)
return len(scheduled_jads) < 1
return len(scheduled_jads) < 5
except Exception as e:
logger.exception('canProduceNextJob: %s', e)
if 'No active session' in str(e):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment