Commit 0d574628 authored by Jorrit Schaap's avatar Jorrit Schaap

SW-720: minor changes in scheduling new jobs

parent 869141d5
......@@ -25,7 +25,8 @@ from lofar.messaging.config import DEFAULT_BUSNAME, DEFAULT_BROKER
from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME, DEFAULT_INGEST_INCOMING_JOB_SUBJECT, DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT
from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, FINISHED_NOTIFICATION_BCC_MAILING_LIST, DEFAULT_JOB_PRIORITY
from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, RPCService, ServiceMessageHandler, AbstractMessageHandler, BusListener, nr_of_messages_in_queue, adaptNameToEnvironment
from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, RPCService, ServiceMessageHandler, AbstractMessageHandler, BusListener, adaptNameToEnvironment
from lofar.messaging.messagebus import nr_of_messages_in_queue
from lofar.common.util import humanreadablesize
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
......@@ -383,7 +384,10 @@ class IngestJobManager:
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['started_at'] = datetime.utcnow()
if new_status == JobProduced or new_status == JobTransferFailed:
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow()
try:
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow()
except:
pass
if lta_site:
job_admin_dict['lta_site'] = lta_site
......@@ -616,6 +620,13 @@ class IngestJobManager:
if jad_a.get('job_group_id', 0) > jad_b.get('job_group_id', 0):
return 1
# everything above equal? sort on next sort criterion, the dataproduct name,
# which in effect sorts on otdb id first, and then on subband
if jad_a.get('dataproduct', '') < jad_b.get('dataproduct', ''):
return -1
if jad_a.get('dataproduct', '') > jad_b.get('dataproduct', ''):
return 1
# everything above equal? sort on next sort criterion, the updated_at timestamp
# least recent updated_at job goes first
# if no updated_at timestamp available, use 'now'
......@@ -729,9 +740,12 @@ class IngestJobManager:
def canProduceNextJob(self):
# test if the managed_job_queue is empty enough, and if our administration agrees
try:
if len(self.getJobAdminDicts(status=JobScheduled) > 0:
return False
# HACK: hardcoded queue name
job_for_transfer_queue_name = adaptNameToEnvironment("lofar.queue.for.ingesttransferserver.BusListener.on.LTA.Ingest.job_for_transfer")
return nr_of_messages_in_queue(job_for_transfer_queue_name, self._tobus.broker) <= 2
return nr_of_messages_in_queue(job_for_transfer_queue_name, self._tobus.broker) == 0
except Exception as e:
logger.exception('canProduceNextJob: %s', e)
if 'No active session' in str(e):
......@@ -743,25 +757,21 @@ class IngestJobManager:
start_producing_timestamp = datetime.utcnow()
while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5):
job_admin_dict = self.getNextJobToRun()
if job_admin_dict:
if os.path.exists(job_admin_dict.get('path')):
msg = CommandMessage(content=job_admin_dict.get('job_xml'), subject=DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, ttl=60)
msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
self._tobus.send(msg)
logger.info('submitted job %s to exchange \'%s\' at %s', job_admin_dict['job']['JobId'], self._tobus.exchange, self._tobus.broker)
self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
else:
job_id = job_admin_dict['job']['JobId']
logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
del self.__job_admin_dicts[job_id]
# do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
# so the queue is empty again
# and we can submit yet another job
time.sleep(0.1)
else:
if not job_admin_dict:
return
if os.path.exists(job_admin_dict.get('path')):
self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
msg = CommandMessage(content=job_admin_dict.get('job_xml'), subject=DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, ttl=60)
msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
self._tobus.send(msg)
logger.info('submitted job %s to exchange \'%s\' at %s', job_admin_dict['job']['JobId'], self._tobus.exchange, self._tobus.broker)
else:
job_id = job_admin_dict['job']['JobId']
logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
del self.__job_admin_dicts[job_id]
# rate limit at 10 jobs/sec
time.sleep(0.1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment