Newer
Older
Jörn Künsemöller
committed
#!/usr/bin/env python3

Jorrit Schaap
committed
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
from lofar.lta.ingest.client.ingestbuslistener import IngestEventMessageHandler, IngestEventMesssageBusListener

Jorrit Schaap
committed
from lofar.lta.ingest.common.job import *

Jorrit Schaap
committed
from lofar.messaging.config import DEFAULT_BUSNAME, DEFAULT_BROKER
from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX

Jorrit Schaap
committed
from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME, DEFAULT_INGEST_INCOMING_JOB_SUBJECT, DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT
from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, FINISHED_NOTIFICATION_BCC_MAILING_LIST, DEFAULT_JOB_PRIORITY
from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, RPCService, ServiceMessageHandler, AbstractMessageHandler, BusListener
from lofar.common.util import humanreadablesize

Jorrit Schaap
committed
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC

Jorrit Schaap
committed
import os
import os.path
import fnmatch
import shutil
import time

Jorrit Schaap
committed
from random import random

Jorrit Schaap
committed
from threading import RLock
from datetime import datetime, timedelta
from functools import cmp_to_key

Jorrit Schaap
committed
import logging

Jorrit Schaap
committed

Jorrit Schaap
committed
logger = logging.getLogger()

Jorrit Schaap
committed

Jorrit Schaap
committed
class IngestJobManager:
def __init__(self, exchange=DEFAULT_BUSNAME, jobs_dir=JOBS_DIR, max_num_retries=MAX_NR_OF_RETRIES, broker=DEFAULT_BROKER):

Jorrit Schaap
committed
self.__jobs_dir = jobs_dir
self.__max_num_retries = max_num_retries
self.__job_admin_dicts = {}
self.__lock = RLock()
self.__running = False
self._tobus = ToBus(exchange=exchange, broker=broker)

Jorrit Schaap
committed
self.__running_jobs_log_timestamp = datetime.utcnow()
self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()

Jorrit Schaap
committed
def quit(self):
self.__running = False
def run(self):
self.__running = True
# start with full jobs dir scan to retreive state from disk
self.scanJobsdir()
logger.info('starting listening for new jobs and notifications')
incoming_jobs_listener = BusListener(IngestIncomingJobsHandler, {'job_manager': self},
exchange=self._tobus.exchange, broker=self._tobus.broker,
routing_key="%s.#" % DEFAULT_INGEST_INCOMING_JOB_SUBJECT)
ingest_event_listener = IngestEventMesssageBusListener(IngestEventMessageHandlerForJobManager,
{'job_manager': self},
exchange=self._tobus.exchange, broker=self._tobus.broker)
ingest_service = RPCService(DEFAULT_INGEST_SERVICENAME, IngestServiceMessageHandler, {'job_manager': self},
exchange=self._tobus.exchange, broker=self._tobus.broker, num_threads=4)
with incoming_jobs_listener, ingest_event_listener, ingest_service, self._tobus:

Jorrit Schaap
committed
# ... and run the event loop,
# produce jobs to managed job queue for ingest transfer services
# receive new jobs

Jorrit Schaap
committed
logger.info('starting to produce jobs')
while self.__running:
try:

Jorrit Schaap
committed
# produce next jobs

Jorrit Schaap
committed
self.produceNextJobsIfPossible()
# check if producing jobs are actually making progress
# maybe the ingest transfer server was shut down in the middle of a transer?
# the ingest transfer server is stateless, so it won't restart that job itself (by design)
# when transfering at very regular intervals progress updates are given
# so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress)
self.__putStalledJobsBackToToDo()

Jorrit Schaap
committed

Jorrit Schaap
committed
# report on running jobs

Jorrit Schaap
committed
if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=30):

Jorrit Schaap
committed
self.__running_jobs_log_timestamp = datetime.utcnow()
producing_jads = self.getJobAdminDicts(status=JobProducing)
if producing_jads:
if len(producing_jads) == 1:
logger.info('1 job is running')
else:
logger.info('%s jobs are running', len(producing_jads))

Jorrit Schaap
committed

Jorrit Schaap
committed
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)
logger.info('finished producing jobs')
logger.info('finished listening for new jobs and notifications')
def nrOfUnfinishedJobs(self):
return len(self.getNotDoneJobAdminDicts())

Jorrit Schaap
committed
def nrOfJobs(self):
return len(self.getJobAdminDicts())

Jorrit Schaap
committed

Jorrit Schaap
committed
def getJobAdminDictsFromDisk(self, job_status=None, job_type=None, job_group_id=None, job_id=None):

Jorrit Schaap
committed
job_admin_dicts = []

Jorrit Schaap
committed
if job_status is None:
for status in [JobToDo, JobRetry, JobFailed, JobHold, JobScheduled, JobProducing, JobProduced, JobRemoved]:
job_admin_dicts += self.getJobAdminDictsFromDisk(job_status=status, job_type=job_type,
job_group_id=job_group_id, job_id=job_id)
return job_admin_dicts
dir_path = self.jobDir(job_admin_dict=None, job_status=job_status, job_type=job_type, job_group_id=job_group_id)

Jorrit Schaap
committed
if not os.path.isdir(dir_path):
return job_admin_dicts
dir_paths = [dir_path]

Jorrit Schaap
committed
dir_paths += IngestJobManager.getSubDirs(dir_path)
for dp in dir_paths:
try:
job_admin_dicts_for_dir = []

Jorrit Schaap
committed
if job_type:
msg += ' job_type=%s' % job_type
if job_group_id:
msg += ' job_group_id=%s' % job_group_id
if job_status:
msg += ' job_status=%s' % jobState2String(job_status)

Jorrit Schaap
committed
if job_id:

Jorrit Schaap
committed
msg += ' job_id=%s' % job_id
logger.info(msg)

Jorrit Schaap
committed

Jorrit Schaap
committed
xml_files = [os.path.join(dp, f) for f in os.listdir(dp) if fnmatch.fnmatch(f, '*.xml')]

Jorrit Schaap
committed
logger.debug('found %d xml files in %s', len(xml_files), dp)
for path in xml_files:
try:
with open(path) as file:
file_content = file.read()
job = parseJobXml(file_content)

Jorrit Schaap
committed
if job:
if ((job_id and job_id == job.get('JobId')) or
(job_id is None and job_group_id and job_group_id == job.get('job_group_id')) or
(job_id is None and job_group_id is None)):
job_admin_dict = {'path': path,
'job': job,
'job_xml': file_content,
'runs': {},
'created_at': datetime.fromtimestamp(os.lstat(path).st_ctime),
'updated_at': datetime.fromtimestamp(os.lstat(path).st_ctime)}
if job_status is not None:
job_admin_dict['status'] = job_status
job_admin_dicts_for_dir.append(job_admin_dict)

Jorrit Schaap
committed
except Exception as e:
logger.error(e)
logger.info('read %d job files from %s', len(job_admin_dicts_for_dir), dp)
job_admin_dicts += job_admin_dicts_for_dir

Jorrit Schaap
committed
except Exception as e:
logger.error(e)
if job_admin_dicts:
logger.info('read %d job files from %s', len(job_admin_dicts), dir_path)

Jorrit Schaap
committed
return job_admin_dicts
def jobStatusBaseDir(self, jobstatus):
if jobstatus == JobToDo:
return os.path.join(self.__jobs_dir, 'to_do')
if jobstatus == JobRetry:
return os.path.join(self.__jobs_dir, 'retry')
if jobstatus == JobFailed:
return os.path.join(self.__jobs_dir, 'failed')
if jobstatus == JobHold:
return os.path.join(self.__jobs_dir, 'on_hold')
if jobstatus == JobScheduled:
return os.path.join(self.__jobs_dir, 'scheduled')
if jobstatus == JobProducing:
return os.path.join(self.__jobs_dir, 'producing')
if jobstatus == JobProduced:
return os.path.join(self.__jobs_dir, 'done')
if jobstatus == JobRemoved:
return os.path.join(self.__jobs_dir, 'removed')
return os.path.join(self.__jobs_dir, 'unknown')
def jobStatusBaseDirs(self):
return [self.jobStatusBaseDir(status) for status in [JobToDo, JobRetry, JobFailed, JobHold, JobScheduled,
JobProducing, JobProduced, JobRemoved]]

Jorrit Schaap
committed
def jobDir(self, job_admin_dict, job_status=None, job_type=None, job_group_id=None, retry_attempt=None):
if job_admin_dict:
return self.jobDir(job_admin_dict=None,
job_status=job_admin_dict['status'],
job_type=job_admin_dict['job'].get('Type', 'unknown'),
job_group_id=job_admin_dict['job'].get('job_group_id', 'unknown'),
retry_attempt=job_admin_dict.get('retry_attempt', 1))
base_dir = self.jobStatusBaseDir(job_status)
if job_status == JobRetry and retry_attempt != None:
elif job_status in [JobToDo, JobProduced, JobRemoved, JobFailed]:
if job_type and job_group_id:
group_dir = '%s_%s' % (job_type, job_group_id)
return os.path.join(base_dir, group_dir)

Jorrit Schaap
committed
return base_dir

Jorrit Schaap
committed
def jobPath(self, job_admin_dict):
dir = self.jobDir(job_admin_dict)
filename = 'j%s.xml' % job_admin_dict['job']['JobId']
path = os.path.join(dir, filename)
return path
def scanJobsdir(self):
with self.__lock:
logger.info('scanning jobs dirs in %s', self.__jobs_dir)
for status in [JobToDo, JobScheduled, JobProducing, JobRetry]:
job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status)
for job_admin_dict in job_admin_dicts:
self.addNewJob(job_admin_dict, check_non_todo_dirs=False, add_old_jobs_from_disk=False)

Jorrit Schaap
committed
if job_admin_dicts:
logger.info('added %d existing %s jobs from disk', len(job_admin_dicts), jobState2String(status))

Jorrit Schaap
committed

Jorrit Schaap
committed
# which (type, group_id) jobs were read
# read the done jobs for these groups as well
unique_type_groups = set([(jad['job']['Type'], jad['job'].get('job_group_id', 'unknown_group')) for jad in list(self.__job_admin_dicts.values())])

Jorrit Schaap
committed
if unique_type_groups:
logger.info('scanning for done jobs for %s', unique_type_groups)
for job_type, job_group_id in unique_type_groups:
for status in [JobFailed, JobProduced, JobRemoved]:
job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status, job_type=job_type, job_group_id=job_group_id)
for job_admin_dict in job_admin_dicts:
self.addNewJob(job_admin_dict, check_non_todo_dirs=False, add_old_jobs_from_disk=False)

Jorrit Schaap
committed
if job_admin_dicts:
logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id)

Jorrit Schaap
committed
logger.info('finished scanning jobs in %s, found %s jobs', self.__jobs_dir, len(self.__job_admin_dicts))

Jorrit Schaap
committed
def addNewJob(self, job_admin_dict, check_non_todo_dirs=False, add_old_jobs_from_disk=False):

Jorrit Schaap
committed
with self.__lock:
job_id = job_admin_dict['job']['JobId']
job_group_id = job_admin_dict['job'].get('job_group_id')

Jorrit Schaap
committed
job_type = job_admin_dict['job']['Type']

Jorrit Schaap
committed
logger.info('adding new job %s in group %s %s with status %s',
job_id,
job_type,
job_group_id,
jobState2String(job_admin_dict.get('status', JobToDo)))
if check_non_todo_dirs:
# check if this job is already in memory
matching_known_jads = []
for status in [JobScheduled, JobProducing, JobFailed, JobProduced, JobRemoved, JobRetry]:
matching_known_jads_for_status = self.getJobAdminDicts(job_group_id=job_group_id, status=status)
matching_known_jads_for_status = [jad for jad in matching_known_jads_for_status if jad['job']['JobId'] == job_id]

Jorrit Schaap
committed
if matching_known_jads_for_status:
matching_known_jads += matching_known_jads_for_status

Jorrit Schaap
committed

Jorrit Schaap
committed
#if not in memory, check on disk
for status in [JobScheduled, JobProducing, JobFailed, JobProduced, JobRemoved, JobRetry]:
matching_known_jads_for_status = self.getJobAdminDictsFromDisk(job_status=status,
job_type=job_type,
job_group_id=job_group_id,
job_id=job_id)

Jorrit Schaap
committed
if matching_known_jads_for_status:
matching_known_jads += matching_known_jads_for_status

Jorrit Schaap
committed

Jorrit Schaap
committed
# remove job from 'done' directories if present (this is a resubmitted job)

Jorrit Schaap
committed
try:
logger.info('removing done job %s from %s because it is resubmitted', job_id, done_jad['path'])
os.remove(done_jad['path'])
except Exception as e:
logger.error('error while removing done job %s from %s: %s', job_id, done_jad['path'], e)

Jorrit Schaap
committed
self.__job_admin_dicts[job_id] = job_admin_dict
if 'status' not in job_admin_dict:
job_admin_dict['status'] = JobToDo
if 'created_at' not in job_admin_dict:
job_admin_dict['created_at'] = datetime.utcnow()
job_admin_dict['updated_at'] = job_admin_dict['created_at']

Jorrit Schaap
committed
# store start- finish times per try in runs

Jorrit Schaap
committed
job_admin_dict['runs'] = {}

Jorrit Schaap
committed
# store new job

Jorrit Schaap
committed
todo_dir = self.jobDir(job_admin_dict)

Jorrit Schaap
committed
# create dir dir if not exists

Jorrit Schaap
committed
if not os.path.isdir(todo_dir):
try:
os.makedirs(todo_dir)
except OSError as e:
logger.error(e)

Jorrit Schaap
committed
if 'path' not in job_admin_dict or 'failed' not in job_admin_dict.get('path', ''):

Jorrit Schaap
committed
path = self.jobPath(job_admin_dict)
job_admin_dict['path'] = path
try:
if not os.path.exists(path):
logger.info('saving job %s on disk: %s', job_id, path)
with open(path, 'w') as file:
file.write(job_admin_dict['job_xml'])
except Exception as e:
logger.error(e)
else:
job_dirname = os.path.dirname(job_admin_dict['path'])
expected_dirname = self.jobDir(job_admin_dict)
if job_dirname != expected_dirname:
# a new todo job should be located in the expected dir based on its status,
# it is not, so force it there
self.updateJobStatus(job_admin_dict['job']['JobId'], job_admin_dict['status'])

Jorrit Schaap
committed
if add_old_jobs_from_disk and job_group_id is not None:

Jorrit Schaap
committed
group_jads = self.getJobAdminDicts(job_group_id=job_group_id)
group_jads = [jad for jad in group_jads if jad['job']['JobId'] != job_id]
if not group_jads:
logger.info('%s is the first new job of group %s, scanning disk for \'old\' jobs of this group.', job_id, job_group_id)
jads_from_disk = self.getJobAdminDictsFromDisk(job_group_id=job_group_id, job_type=job_type)
for jad in jads_from_disk:
if jad['job']['JobId'] != job_id:
logger.info('(re)adding job %s with status %s from group %s from disk',
jad['job']['JobId'],
jobState2String(jad.get('status')),
jad['job']['job_group_id'])
self.__job_admin_dicts[jad['job']['JobId']] = jad

Jorrit Schaap
committed
def updateJobStatus(self, job_id, new_status, lta_site=None, message=None):
with self.__lock:
job_admin_dict = self.__job_admin_dicts.get(job_id)
if not job_admin_dict:
logger.error('updateJobStatus: unknown job %s with new status %s', job_id, jobState2String(new_status))
return
try:

Jorrit Schaap
committed
# update updated_at timestamp

Jorrit Schaap
committed
job_admin_dict['updated_at'] = datetime.utcnow()
if new_status == JobProducing:
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)] = {}
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['started_at'] = datetime.utcnow()
if new_status == JobProduced or new_status == JobTransferFailed:
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow()
if lta_site:
job_admin_dict['lta_site'] = lta_site
job_admin_dict['last_message'] = message
current_status = job_admin_dict.get('status', JobToDo)
if new_status == JobTransferFailed:

Jorrit Schaap
committed
# special case for jobs which failed to transer, which will be retried

Jorrit Schaap
committed
current_retry_attempt = job_admin_dict.get('retry_attempt', 0)

Jorrit Schaap
committed
next_retry_attempt = current_retry_attempt + 1

Jorrit Schaap
committed
if next_retry_attempt < self.__max_num_retries:

Jorrit Schaap
committed
if message and 'not on disk' in message.lower():

Jorrit Schaap
committed
logger.info('job %s transfer failed because source data was not on disk, not retrying anymore', job_id)
new_status = JobFailed

Jorrit Schaap
committed
elif message and 'invalid sip' in message.lower():
logger.info('job %s transfer failed because the SIP is invalid, not retrying anymore, please fix SIP and resubmit job to ingest.', job_id)

Jorrit Schaap
committed
new_status = JobFailed
else:
new_status = JobRetry
job_admin_dict['retry_attempt'] = next_retry_attempt
job_admin_dict['job']['last_retry_attempt'] = next_retry_attempt == (self.__max_num_retries-1)
else:
logger.info('job %s transfer failed %s times, not retrying anymore',
job_id,
job_admin_dict.get('retry_attempt', 0))
new_status = JobFailed
if new_status != current_status:

Jorrit Schaap
committed
# update the internal status

Jorrit Schaap
committed
logger.info('updating job %s status from %s to %s%s',
job_id,
jobState2String(current_status),
jobState2String(new_status),
(' attempt #%d' % job_admin_dict.get('retry_attempt', 1))

Jorrit Schaap
committed
if (new_status == JobRetry or current_status == JobRetry) else '')

Jorrit Schaap
committed
job_admin_dict['status'] = new_status

Jorrit Schaap
committed
# move the job file to its new status directory
# determine current and new paths

Jorrit Schaap
committed
current_path = job_admin_dict.get('path', '')
new_path = self.jobPath(job_admin_dict)
new_dirname = os.path.dirname(new_path)

Jorrit Schaap
committed
# create dir dir if not exists

Jorrit Schaap
committed
if not os.path.isdir(new_dirname):
try:
os.makedirs(new_dirname)
except OSError as e:
logger.error(e)
if new_path != current_path:

Jorrit Schaap
committed
# do actual file move

Jorrit Schaap
committed
logger.debug('moving job file from %s to %s.', current_path, new_path)
shutil.move(current_path, new_path)
job_admin_dict['path'] = new_path
# nice cleanup of obsolete empty directories
try:
old_dirname = os.path.dirname(current_path)
if old_dirname.startswith(self.__jobs_dir) and len(os.listdir(old_dirname)) == 0:
if old_dirname not in self.jobStatusBaseDirs():
logger.info('removing empty jobs directory: %s', old_dirname)
os.rmdir(old_dirname)
except OSError as e:
logger.error(e)

Jorrit Schaap
committed
if new_status == JobRemoved or new_status == JobFailed:

Jorrit Schaap
committed
# send notification
# this is (also) picked up by the ingestmomadapter
# which also sends a status update to MoM, including the last_message

Jorrit Schaap
committed
if new_status == JobRemoved:
job_admin_dict['last_message'] = 'removed from queue'
self._sendNotification(job_admin_dict)

Jorrit Schaap
committed
# finally, remove job from interal admin jobs dict if finished

Jorrit Schaap
committed
if job_admin_dict['status'] in [JobProduced, JobFailed, JobRemoved]:
current_job_group_id = job_admin_dict['job'].get('job_group_id', 'unknown')
current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=current_job_group_id)
if len(current_group_jads) == 0:
logger.info('all jobs in group %s are done', current_job_group_id)
self.sendJobGroupFinishedMail(current_job_group_id)
current_group_done_jads = self.getDoneJobAdminDicts(job_group_id=current_job_group_id)
logger.info('removing %s jobs of group %s from job management server memory',
len(current_group_done_jads),
current_job_group_id)
for jad in current_group_done_jads:
del self.__job_admin_dicts[jad['job']['JobId']]
else:
logger.info('%s jobs to do in group %s', len(current_group_jads), current_job_group_id)
except Exception as e:

Jorrit Schaap
committed
logger.exception(str(e))

Jorrit Schaap
committed
logger.error("updateJobStatus(job_id=%s, new_status=%s, lta_site=%s, message=%s) %s",
job_id,
jobState2String(new_status),
lta_site,
message,
e)
def _sendNotification(self, job_admin_dict):
try:
job = job_admin_dict['job']

Jorrit Schaap
committed
contentDict = {'job_id': job['JobId'],
'archive_id': job['ArchiveId'],
'project': job['Project'],
'type': job["Type"],
'dataproduct': job['DataProduct']}

Jorrit Schaap
committed
if 'ObservationId' in job:
contentDict['otdb_id'] = job['ObservationId']
if 'lta_site' in job_admin_dict:
contentDict['lta_site'] = job_admin_dict['lta_site']
if 'last_message' in job_admin_dict:
contentDict['message'] = job_admin_dict['last_message']
if 'job_group_id' in job_admin_dict:
contentDict['export_id'] = job_admin_dict['job_group_id']
status = jobState2String(job_admin_dict['status'])

Jorrit Schaap
committed
status = status[status.index('(') + 1:status.index(')')]

Jorrit Schaap
committed
msg = EventMessage(subject="%s.%s" % (INGEST_NOTIFICATION_PREFIX, status), content=contentDict)

Jorrit Schaap
committed
# remove message from queue's when not picked up within 48 hours,
# otherwise mom might endlessly reject messages if it cannot handle them

Jorrit Schaap
committed
msg.ttl = 48 * 3600

Jorrit Schaap
committed
logger.info('Sending notification %s: %s' % (status, str(contentDict).replace('\n', ' ')))

Jorrit Schaap
committed
except Exception as e:
logger.error(str(e))
def removeExportJob(self, export_group_id):
logger.info('removing export job %s', export_group_id)
job_admin_dicts = self.getJobAdminDicts(job_group_id=export_group_id)

Jorrit Schaap
committed
if job_admin_dicts:
for jad in job_admin_dicts:
self.updateJobStatus(jad['job']['JobId'], JobRemoved)

Jorrit Schaap
committed
def getExportIds(self):
with self.__lock:
return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in list(self.__job_admin_dicts.values())])))

Jorrit Schaap
committed
def __putStalledJobsBackToToDo(self):
if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(seconds=60):
return
logger.debug("checking stalled jobs...")

Jorrit Schaap
committed
with self.__lock:
scheduled_jads = self.getJobAdminDicts(status=JobScheduled)
stalled_scheduled_jads = [jad for jad in scheduled_jads
if datetime.utcnow() - jad['updated_at'] >= timedelta(seconds=30)]
for jad in stalled_scheduled_jads:
logger.info('putting stalled scheduled job %s back to ToDo because it was not picked up in time by a transferservice', jad['job']['JobId'])
self.updateJobStatus(jad['job']['JobId'], JobToDo)
producing_jads = self.getJobAdminDicts(status=JobProducing)
stalled_producing_jads = [jad for jad in producing_jads
if datetime.utcnow() - jad['updated_at'] >= timedelta(minutes=10)]
for jad in stalled_producing_jads:
logger.info('putting stalled producing job %s back to ToDo because it did not make any progress during the last 10 min', jad['job']['JobId'])

Jorrit Schaap
committed
self.updateJobStatus(jad['job']['JobId'], JobToDo)
self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()
logger.debug("checked stalled jobs")

Jorrit Schaap
committed
def getNextJobToRun(self):
'''get the next job to run.
examine all 'to_do' and 'retry' jobs
higher priority jobs always go first
equal priority jobs will return 'to_do' jobs over 'retry' jobs
'retry' jobs are sorted by least amount of retry attempts
source load balancing: the more jobs transfer from a certain source host,
the less likely it is a next job will be for that source host as well
'''

Jorrit Schaap
committed
# helper method to get the source host from the job's location

Jorrit Schaap
committed
def getSourceHost(job_admin_dict):
try:
host = job_admin_dict['job']['Location'].split(':')[0]
if 'cep4' in host.lower() or 'cpu' in host.lower():
return 'localhost'
return host
except:
return 'localhost'
running_jads = self.getJobAdminDicts(status=JobProducing) + self.getJobAdminDicts(status=JobScheduled)
running_hosts = {}
for jad in running_jads:
host = getSourceHost(jad)
running_hosts[host] = running_hosts.get(host, 0) + 1
with self.__lock:

Jorrit Schaap
committed
def getNextJobByStatus(status, min_age=None, exclude_job_group_ids=[]):

Jorrit Schaap
committed
def jad_compare_func(jad_a, jad_b):

Jorrit Schaap
committed
# sort on priority first

Jorrit Schaap
committed
if jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY) != jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY):
return jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY) - jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY)

Jorrit Schaap
committed
# equal priorities, so sort on next sort criterion, the retry attempt

Jorrit Schaap
committed
if jad_a['status'] == JobRetry and jad_b['status'] == JobRetry:
if jad_a.get('retry_attempt', 0) != jad_b.get('retry_attempt', 0):
return jad_b.get('retry_attempt', 0) - jad_a.get('retry_attempt', 0)
# equal retry_attempt, so sort on next sort criterion,
# jobs for which the number of running jobs on that host is lower
# (load balance the source hosts)
nrOfRunningJobsOnSourceHostA = running_hosts.get(getSourceHost(jad_a), 0)
nrOfRunningJobsOnSourceHostB = running_hosts.get(getSourceHost(jad_b), 0)
if nrOfRunningJobsOnSourceHostA != nrOfRunningJobsOnSourceHostB:
return nrOfRunningJobsOnSourceHostA - nrOfRunningJobsOnSourceHostB
# everything above equal? sort on next sort criterion, the group_id
if jad_a.get('job_group_id', 0) < jad_b.get('job_group_id', 0):
return -1
if jad_a.get('job_group_id', 0) > jad_b.get('job_group_id', 0):
return 1
# everything above equal? sort on next sort criterion, the updated_at timestamp
# least recent updated_at job goes first
# if no updated_at timestamp available, use 'now'
now = datetime.utcnow()
if jad_a.get('updated_at', now) < jad_b.get('updated_at', now):
return -1
if jad_a.get('updated_at', now) > jad_b.get('updated_at', now):
return 1
# everything else equal? Then make it a FIFO queue by sorting on created_at timestamp

Jorrit Schaap
committed
if jad_a['created_at'] < jad_b['created_at']:
return -1
if jad_a['created_at'] > jad_b['created_at']:
return 1

Jorrit Schaap
committed
# TODO: we can add a lot of sort criteria in the future.
# For now, stick with FIFO and retry_attempt, after priority and source_host_load_balancing

Jorrit Schaap
committed
return 0
job_admin_dicts = self.getJobAdminDicts(status=status)
# filter out priority 0 jobs (which are paused)
job_admin_dicts = [jad for jad in job_admin_dicts if jad['job'].get('priority', 0) > 0]
if min_age:
now = datetime.utcnow()
job_admin_dicts = [jad for jad in job_admin_dicts if now - jad['updated_at'] >= min_age]

Jorrit Schaap
committed
if exclude_job_group_ids:
# filter out jad's from exclude_job_group_ids
job_admin_dicts = [jad for jad in job_admin_dicts if 'job_group_id' not in jad['job'] or jad['job']['job_group_id'] not in exclude_job_group_ids]
job_admin_dicts = sorted(job_admin_dicts, key=cmp_to_key(jad_compare_func))

Jorrit Schaap
committed
if job_admin_dicts:
logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status))
return job_admin_dicts[0]
return None

Jorrit Schaap
committed
# gather some average speed stats for running groups
running_job_group_avg_speeds = {}
running_job_group_ids = set([jad['job']['job_group_id'] for jad in running_jads if 'job_group_id' in jad['job']])
for job_group_id in running_job_group_ids:
# get last 10 finished jobs with an average_speed for this group to compute avg speed
group_finished_jads = [jad for jad in self.getJobAdminDicts(job_group_id=job_group_id, status=[JobProduced]) if 'average_speed' in jad]
group_finished_jads = sorted(group_finished_jads, key=lambda jad: jad['updated_at'])[-10:]
if len(group_finished_jads) == 10:
running_job_group_avg_speeds[job_group_id] = sum(jad['average_speed'] for jad in group_finished_jads)/len(group_finished_jads)
logger.debug('average speed over last 10 transers for group %s = %s', job_group_id, humanreadablesize(running_job_group_avg_speeds[job_group_id], 'Bps'))
# which job_group_ids are slow?
# this is most likely caused by transfers of small files for which the overhead has a huge impact on the overall average transfer speed
# thus, these slow_running_job_group_ids are not making optimal use of the available bandwidth
# since we cannot start a huge amount of parallel transfers to increase the total used average bandwith
slow_running_job_group_ids = [job_group_id for job_group_id,avg_speed in list(running_job_group_avg_speeds.items()) if avg_speed < 1.0e7]

Jorrit Schaap
committed
# randomize whether a slow job_group_id will produce the next job or not.
# if not (and it is in the exclude_job_group_ids list), then the first available job from the rest of the groups is picked.
# it is quite likely that the next group has larger files, thus will make better use of the available bandwith
# the actual transfer speed of the next group will be measured as well, and if slow, it will be excluded once in a while as well.
# The higher the job_group's priority, the bigger chance it has to be included, so high priority jobs will run faster overall.
exclude_job_group_ids = [job_group_id for job_group_id in slow_running_job_group_ids
if random() > self.getExportJobPriority(job_group_id) * 0.1111]
# if there are job_group_ids to be excluded (for slow transfers) but there are no other group_ids to be exported,
# then do not exclude anything
if exclude_job_group_ids:
all_group_ids = set(self.getExportIds())
if set(exclude_job_group_ids) == all_group_ids:
exclude_job_group_ids = []
else:
logger.info('excluding jobs from group(s) %s while determining the next job to run, because of their slow overall average speed', exclude_job_group_ids)

Jorrit Schaap
committed
# get the next job to run, both for JobToDo and JobRetry

Jorrit Schaap
committed
next_to_do_jad = getNextJobByStatus(JobToDo, exclude_job_group_ids=exclude_job_group_ids)

Jorrit Schaap
committed
next_retry_jad = getNextJobByStatus(JobRetry, timedelta(minutes=15) if next_to_do_jad else None)

Jorrit Schaap
committed
# limit the number of running jobs per source host if not localhost

Jorrit Schaap
committed
if next_to_do_jad and getSourceHost(next_to_do_jad) != 'localhost':

Jorrit Schaap
committed
if running_hosts.get(getSourceHost(next_to_do_jad), 0) > 1:

Jorrit Schaap
committed
next_to_do_jad = None

Jorrit Schaap
committed
# limit the number of running jobs per source host if not localhost

Jorrit Schaap
committed
if next_retry_jad and getSourceHost(next_retry_jad) != 'localhost':

Jorrit Schaap
committed
if running_hosts.get(getSourceHost(next_retry_jad), 0) > 1:

Jorrit Schaap
committed
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
next_retry_jad = None
if next_to_do_jad and next_retry_jad:
# if next_retry_jad has higher priority then next_to_do_jad, then return next_retry_jad
if next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
return next_retry_jad
# or if next_to_do_jad has higher priority then next_retry_jad, then return next_to_do_jad
if next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
return next_to_do_jad
# or if next_retry_jad is already waiting for over an hour, then return next_retry_jad
if datetime.utcnow() - next_retry_jad['updated_at'] > timedelta(minutes=60):
return next_retry_jad
# or if next_retry_jad is older than next_to_do_jad
if next_retry_jad['updated_at'] > next_to_do_jad.get('updated_at', next_to_do_jad.get('created_at')):
return next_retry_jad
if next_to_do_jad:
# just return the next_to_do_jad
return next_to_do_jad
# in all other cases, return next_retry_jad (which might be None)
return next_retry_jad
def canProduceNextJob(self):

Jorrit Schaap
committed
# test if the managed_job_queue is empty enough, and if our administration agrees

Jorrit Schaap
committed
try:
scheduled_jads = self.getJobAdminDicts(status=JobScheduled)

Jorrit Schaap
committed
except Exception as e:

Jorrit Schaap
committed
logger.exception('canProduceNextJob: %s', e)
if 'No active session' in str(e):
logger.fatal('cannot connect to qpid broker. bailing out...')
exit(1) # when running under supervisord, it will be restarted and the qpid connection usually works again.

Jorrit Schaap
committed
return True
def produceNextJobsIfPossible(self):
start_producing_timestamp = datetime.utcnow()
while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5):
job_admin_dict = self.getNextJobToRun()
if job_admin_dict:
if os.path.exists(job_admin_dict.get('path')):

Jorrit Schaap
committed
msg = CommandMessage(content=job_admin_dict.get('job_xml'), subject=DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT)
msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
logger.info('submitted job %s to exchange \'%s\' at %s', job_admin_dict['job']['JobId'], self._tobus.exchange, self._tobus.broker)
self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)

Jorrit Schaap
committed
else:
job_id = job_admin_dict['job']['JobId']
logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
del self.__job_admin_dicts[job_id]

Jorrit Schaap
committed
# do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
# so the queue is empty again
# and we can submit yet another job
time.sleep(0.1)
else:
return

Jorrit Schaap
committed

Jorrit Schaap
committed
# rate limit at 10 jobs/sec
time.sleep(0.1)

Jorrit Schaap
committed
def onJobStarted(self, job_notification_dict):
self.updateJobStatus(job_notification_dict.get('job_id'),
JobProducing,
job_notification_dict.get('lta_site'),
job_notification_dict.get('message'))
def onJobFinished(self, job_notification_dict):

Jorrit Schaap
committed
job_id = job_notification_dict.get('job_id')

Jorrit Schaap
committed
# file_type might have changed to unspec for example
if 'file_type' in job_notification_dict:
with self.__lock:
job_admin_dict = self.__job_admin_dicts.get(job_id)
if job_admin_dict:
job_admin_dict['job']['file_type'] = job_notification_dict['file_type']

Jorrit Schaap
committed
try:
if 'average_speed' in job_notification_dict:
with self.__lock:
# keep track of average transfer speed
job_admin_dict = self.__job_admin_dicts.get(job_id)
if job_admin_dict:
job_average_speed = float(job_notification_dict['average_speed'])
job_admin_dict['average_speed'] = job_average_speed
except Exception as e:
#just continue
logger.exception(str(e))

Jorrit Schaap
committed
self.updateJobStatus(job_notification_dict.get('job_id'),
JobProduced,
job_notification_dict.get('lta_site'),
job_notification_dict.get('message'))
def onJobTransferFailed(self, job_notification_dict):
self.updateJobStatus(job_notification_dict.get('job_id'),
JobTransferFailed,
job_notification_dict.get('lta_site'),
job_notification_dict.get('message'))
def onJobProgress(self, job_notification_dict):

Jorrit Schaap
committed
# touch job
# producing jobs which are untouched for 5min are put back to JobToDo

Jorrit Schaap
committed
self.updateJobStatus(job_notification_dict.get('job_id'),
JobProducing,
job_notification_dict.get('lta_site'),
job_notification_dict.get('message'))
@staticmethod
def getSubDirs(dir_path):
dir_lists = [[os.path.join(root,dir) for dir in dirs if root==dir_path] for root, dirs, files in os.walk(dir_path) if dirs]
if dir_lists:

Jorrit Schaap
committed
return reduce(lambda x, y: x + y, dir_lists)

Jorrit Schaap
committed
return []
def getDoneJobAdminDicts(self, job_group_id=None):
return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobFailed, JobProduced, JobRemoved])

Jorrit Schaap
committed
def getNotDoneJobAdminDicts(self, job_group_id=None):
return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobToDo, JobScheduled, JobProducing, JobRetry])

Jorrit Schaap
committed
def getJobAdminDicts(self, job_group_id=None, status=None):
with self.__lock:
jads = [jad for jad in list(self.__job_admin_dicts.values())]

Jorrit Schaap
committed
if job_group_id != None:
job_group_id = str(job_group_id)
jads = [jad for jad in jads if str(jad['job'].get('job_group_id')) == job_group_id]
if status != None:
if isinstance(status, int):
jads = [jad for jad in jads if jad['status'] == status]
else:
statuses = set(status)
jads = [jad for jad in jads if jad['status'] in statuses]

Jorrit Schaap
committed
return jads

Jorrit Schaap
committed
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
def getStatusReportDict(self):
with self.__lock:
export_ids = self.getExportIds()
logger.info('getStatusReportDict export_ids: %s', export_ids)
result = {}
for export_id in export_ids:
try:
finished_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobProduced)
failed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobFailed)
removed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobRemoved)
done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads
done_group_jobs = [jad['job'] for jad in done_group_jads]
current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=export_id)
current_group_jobs = [jad['job'] for jad in current_group_jads]
all_group_jads = current_group_jads + done_group_jads
all_group_jobs = current_group_jobs + done_group_jobs
priority = min([job.get('priority', DEFAULT_JOB_PRIORITY) for job in current_group_jobs]) if current_group_jobs else 4
submitters = list(set([job['Submitter'] for job in all_group_jobs if 'Submitter' in job]))
projects = list(set([job['Project'] for job in all_group_jobs if 'Project' in job]))
lta_sites = list(set([jad['lta_site'] for jad in all_group_jads if 'lta_site' in jad]))
job_run_events = {}
for jad in all_group_jads:
for run in list(jad['runs'].values()):

Jorrit Schaap
committed
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
if 'started_at' in run:
started_timestamp = run['started_at']
if started_timestamp not in job_run_events:
job_run_events[started_timestamp] = 0
job_run_events[started_timestamp] += 1
if 'finished_at' in run:
finished_timestamp = run['finished_at']
if finished_timestamp not in job_run_events:
job_run_events[finished_timestamp] = 0
job_run_events[finished_timestamp] -= 1
all_run_timestamps = sorted(job_run_events.keys())
running_jobs_values = []
if all_run_timestamps:
prev_value = 0
for t in all_run_timestamps:
value = prev_value + job_run_events[t]
running_jobs_values.append(value)
prev_value = value
job_finised_events = {}
for jad in all_group_jads:
if jad['status'] == JobProduced or jad['status'] == JobFailed:
if jad['runs']:
final_run = max(jad['runs'].keys())
run = jad['runs'][final_run]
if 'started_at' in run and 'finished_at' in run:
finished_timestamp = run['finished_at']
if finished_timestamp not in job_finised_events:
job_finised_events[finished_timestamp] = 0
job_finised_events[finished_timestamp] += 1
finished_jobs_values = []
finished_timestamps = sorted(job_finised_events.keys())
for i, t in enumerate(finished_timestamps):
finished_jobs_values.append(i + 1)

Jorrit Schaap
committed
result[export_id] = {'priority': priority,
'submitters': submitters,
'projects': projects,
'lta_sites': lta_sites,

Jorrit Schaap
committed
'series': { 'running_jobs': { 'timestamps': all_run_timestamps, 'values': running_jobs_values },
'finished_jobs': { 'timestamps': finished_timestamps, 'values': finished_jobs_values }
},
'jobs': { 'running': len(self.getJobAdminDicts(job_group_id=export_id, status=JobProducing)),
'to_do': len(self.getJobAdminDicts(job_group_id=export_id, status=JobToDo)),
'scheduled': len(self.getJobAdminDicts(job_group_id=export_id, status=JobScheduled)),
'retry': len(self.getJobAdminDicts(job_group_id=export_id, status=JobRetry)),

Jorrit Schaap
committed
'finished': len(finished_group_jads),
'failed': len(failed_group_jads)}}

Jorrit Schaap
committed
except Exception as e:
logger.error(e)

Jorrit Schaap
committed
def setExportJobPriority(self, export_id, priority):
priority = max(0, min(9, int(priority)))
with self.__lock:
jads = self.getJobAdminDicts(job_group_id=export_id)
logger.info('updating the priority of %s jobs of export %s to level %s', len(jads), export_id, priority)
for jad in jads:
try:

Jorrit Schaap
committed
# update local copy

Jorrit Schaap
committed
jad['job']['priority'] = priority

Jorrit Schaap
committed
# persist to disk

Jorrit Schaap
committed
updatePriorityInJobFile(jad['path'], priority)
except Exception as e:
logger.error(e)

Jorrit Schaap
committed
def getExportJobPriority(self, export_id):
with self.__lock:
job_group_id = str(export_id)
for jad in list(self.__job_admin_dicts.values()):

Jorrit Schaap
committed
if str(jad['job'].get('job_group_id')) == job_group_id:
if 'priority' in jad['job']:
return jad['job']['priority']
return DEFAULT_JOB_PRIORITY

Jorrit Schaap
committed
def getReport(self, job_group_id):
with self.__lock:

Jorrit Schaap
committed
# still running/waiting jobs

Jorrit Schaap
committed
current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=job_group_id)
current_group_jobs = [jad['job'] for jad in current_group_jads]

Jorrit Schaap
committed
# done jobs

Jorrit Schaap
committed
finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
finished_group_jobs = [jad['job'] for jad in finished_group_jads]
failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
failed_group_jobs = [jad['job'] for jad in failed_group_jads]
removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)
removed_group_jobs = [jad['job'] for jad in removed_group_jads]
done_group_jobs = finished_group_jobs + failed_group_jobs + removed_group_jobs
all_group_jobs = current_group_jobs + done_group_jobs
submitters = set([j['Submitter'] for j in all_group_jobs if 'Submitter' in j])
projects = set([j['Project'] for j in all_group_jobs if 'Project' in j])
report = ''
header = """=== Report on ingest Export Job (%(id)s) ===
User(s): %(user)s

Jorrit Schaap
committed
Project: %(project)s""" % {'id': job_group_id,
'user': ', '.join(submitters),
'project': ', '.join(projects)}

Jorrit Schaap
committed
report += header
summary = """\n\n=== Summary ===
Total Files: %(total)i
- Failed: %(failed)i
- Success: %(done)i
- Interferometer: %(corr)i
- Beamformed: %(bf)i
- SkyImages: %(img)i
- Unspecified: %(unspec)i
- Pulsar Pipeline: %(pulp)i""" % {'total': len(all_group_jobs),