Skip to content
Snippets Groups Projects
ingestjobmanagementserver.py 66.6 KiB
Newer Older
#!/usr/bin/env python

# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#

from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.server.config import DEFAULT_INGEST_BUSNAME, DEFAULT_INGEST_SERVICENAME
from lofar.lta.ingest.common.config import DEFAULT_BROKER
from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, FINISHED_NOTIFICATION_BCC_MAILING_LIST, DEFAULT_JOB_PRIORITY, MAX_USED_BANDWITH_TO_START_NEW_JOBS
from lofar.lta.ingest.server.config import DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME
from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import convertIntKeysToString, humanreadablesize
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.common import isProductionEnvironment

import os
import os.path
import fnmatch
import shutil
import time
from threading import RLock
from datetime import datetime, timedelta

import logging
class IngestServiceMessageHandler(MessageHandlerInterface):
    def __init__(self, job_manager, **kwargs):
        super(IngestServiceMessageHandler, self).__init__(**kwargs)

        self._job_manager = job_manager
        self.service2MethodMap = {'RemoveExportJob': self._job_manager.removeExportJob,
                                  'SetExportJobPriority': self._job_manager.setExportJobPriority,
                                  'GetStatusReport': self._job_manager.getStatusReportDict,
                                  'GetReport': self._job_manager.getReport,
                                  'GetExportIds': self._job_manager.getExportIds}


class IngestJobManager:
    def __init__(self,
                 busname=DEFAULT_INGEST_BUSNAME,
                 servicename=DEFAULT_INGEST_SERVICENAME,
                 notification_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME,
                 notification_prefix=DEFAULT_INGEST_NOTIFICATION_PREFIX,
                 notification_listen_queue_name=DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME,
                 incoming_job_queue_name=DEFAULT_INGEST_JOBS_QUEUENAME,
                 jobs_for_transfer_queue_name=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
                 jobs_dir=JOBS_DIR,
                 max_num_retries=MAX_NR_OF_RETRIES,
                 mom_busname=DEFAULT_MOMQUERY_BUSNAME,
                 mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                 **kwargs):

        self.__notification_listener = IngestBusListener(busname=notification_listen_queue_name, subjects=notification_prefix+'*', broker=broker)
        self.__notification_listener.onJobStarted = self.onJobStarted
        self.__notification_listener.onJobProgress = self.onJobProgress
        self.__notification_listener.onJobTransferFailed = self.onJobTransferFailed
        self.__notification_listener.onJobFinished = self.onJobFinished

        self.__jobs_dir = jobs_dir
        self.__max_num_retries = max_num_retries
        self.__job_admin_dicts = {}
        self.__lock = RLock()
        self.__running = False

        self.notification_prefix = notification_prefix
        self.event_bus = ToBus(notification_busname, broker=broker)
        if not mom_broker:
            mom_broker = broker
        self.__momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=mom_broker, timeout=180)

        self.service = Service(servicename,
                               IngestServiceMessageHandler,
                               busname=busname,
                               broker=broker,
                               use_service_methods=True,
                               numthreads=1,
                               handler_args={'job_manager': self})

        # incoming jobs (from mom, eor, user ingest, etc)
        self.__incoming_job_queue = FromBus(incoming_job_queue_name, broker=broker)

        # queue into which this manager produces jobs, which are consumened for actual data transfer by the ingestservices
        self.__jobs_for_transfer_queue = ToBus(jobs_for_transfer_queue_name, broker=broker)

        # the peeker frombus is used to see if the managed job queue we submit jobs to is emptyied by consumers
        # we want this manager to produce jobs just in time, so we can actively shuffle jobs priorities etc.
        self.__jobs_for_transfer_queue_peeker = FromBus(jobs_for_transfer_queue_name, broker=broker)

        self.__running_jobs_log_timestamp = datetime.utcnow()
        self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()

    def quit(self):
        self.__running = False

    def run(self):
        self.__running = True

        # start with full jobs dir scan to retreive state from disk
        self.scanJobsdir()

        logger.info('starting listening for new jobs and notifications')

        with self.service as _1, self.__incoming_job_queue as _2, self.__jobs_for_transfer_queue as _3, self.__notification_listener as _4, self.event_bus as _5:
            # ... and run the event loop,
            # produce jobs to managed job queue for ingest transfer services
            # receive new jobs
            logger.info('starting to produce jobs')
            while self.__running:
                try:
                    # receive any jobs from mom/user_ingest/eor/etc
                    msg = self.__incoming_job_queue.receive(timeout=1.0)
                    while msg:
                        logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg)
                        self.__incoming_job_queue.ack(msg)

                        if isinstance(msg, CommandMessage):
                            job = parseJobXml(msg.content)
                            if job and job.get('JobId'):
                                if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY):
                                    job['priority'] = msg.priority

                                logger.info("received job on queue %s: %s", self.__incoming_job_queue.address, job)
                                job_admin_dict = {'job': job, 'job_xml': msg.content}
                                self.addNewJob(job_admin_dict, check_done_dirs=True, add_old_jobs_from_disk=True)
                        else:
                            logger.warn("unexpected message type: %s", msg)

                        if datetime.utcnow() - receive_start > timedelta(seconds=1):
                            # break out of receiving while loop early,
                            # so we can also produce some jobs
                            # receive more in next iteration
                            break

                        # see if there are any more jobs to receive and process them, else jump out of while loop
                        msg = self.__incoming_job_queue.receive(timeout=0.01)

                    # check if producing jobs are actually making progress
                    # maybe the ingest transfer server was shut down in the middle of a transer?
                    # the ingest transfer server is stateless, so it won't restart that job itself (by design)
                    # when transfering at very regular intervals progress updates are given
                    # so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress)
                    if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=30):
                        self.__running_jobs_log_timestamp = datetime.utcnow()
                        producing_jads = self.getJobAdminDicts(status=JobProducing)
                        if producing_jads:
                            if len(producing_jads) == 1:
                                logger.info('1 job is running')
                            else:
                                logger.info('%s jobs are running', len(producing_jads))

                except KeyboardInterrupt:
                    break
                except Exception as e:
                    logger.error(e)
            logger.info('finished producing jobs')
        logger.info('finished listening for new jobs and notifications')

    def nrOfUnfinishedJobs(self):
        return len(self.getNotDoneJobAdminDicts())

    def nrOfJobs(self):
        return len(self.getJobAdminDicts())
    def getJobAdminDictsFromDisk(self, job_status=None, job_type=None, job_group_id=None, job_id=None):
        if job_status is None:
            for status in [JobToDo, JobRetry, JobFailed, JobHold, JobScheduled, JobProducing, JobProduced, JobRemoved]:
                job_admin_dicts += self.getJobAdminDictsFromDisk(job_status=status, job_type=job_type,
                                                                 job_group_id=job_group_id, job_id=job_id)
            return job_admin_dicts

        dir_path = self.jobDir(job_admin_dict=None, job_status=job_status, job_type=job_type, job_group_id=job_group_id)

        if not os.path.isdir(dir_path):
            return job_admin_dicts

        dir_paths = [dir_path]

            dir_paths += IngestJobManager.getSubDirs(dir_path)

        for dp in dir_paths:
            try:
                msg = 'scanning %s for job file' % dp
                if job_type:
                    msg += ' job_type=%s' % job_type
                if job_group_id:
                    msg += ' job_group_id=%s' % job_group_id
                if job_status:
                    msg += ' job_status=%s' % jobState2String(job_status)
                xml_files = [os.path.join(dp, f) for f in os.listdir(dp) if fnmatch.fnmatch(f, '*.xml')]

                logger.debug('found %d xml files in %s', len(xml_files), dp)

                for path in xml_files:
                    try:
                        with open(path) as file:
                            file_content = file.read()
                            job = parseJobXml(file_content)
                            if job:
                                if ((job_id and job_id == job.get('JobId')) or
                                    (job_id is None and job_group_id and job_group_id == job.get('job_group_id')) or
                                    (job_id is None and job_group_id is None)):
                                    job_admin_dict = {'path': path,
                                                      'job': job,
                                                      'job_xml': file_content,
                                                      'runs': {},
                                                      'created_at': datetime.fromtimestamp(os.lstat(path).st_ctime),
                                                      'updated_at': datetime.fromtimestamp(os.lstat(path).st_ctime)}

                                    if job_status is not None:
                                        job_admin_dict['status'] = job_status

                                    job_admin_dicts.append(job_admin_dict)
                    except Exception as e:
                        logger.error(e)
            except Exception as e:
                logger.error(e)

        logger.info('read %d job files from %s', len(job_admin_dicts), dir_path)
        return job_admin_dicts

    def jobStatusBaseDir(self, jobstatus):
        if jobstatus == JobToDo:
            return os.path.join(self.__jobs_dir, 'to_do')
        if jobstatus == JobRetry:
            return os.path.join(self.__jobs_dir, 'retry')
        if jobstatus == JobFailed:
            return os.path.join(self.__jobs_dir, 'failed')
        if jobstatus == JobHold:
            return os.path.join(self.__jobs_dir, 'on_hold')
        if jobstatus == JobScheduled:
            return os.path.join(self.__jobs_dir, 'scheduled')
        if jobstatus == JobProducing:
            return os.path.join(self.__jobs_dir, 'producing')
        if jobstatus == JobProduced:
            return os.path.join(self.__jobs_dir, 'done')
        if jobstatus == JobRemoved:
            return os.path.join(self.__jobs_dir, 'removed')
        return os.path.join(self.__jobs_dir, 'unknown')

    def jobDir(self, job_admin_dict, job_status=None, job_type=None, job_group_id=None, retry_attempt=None):
        if job_admin_dict:
            return self.jobDir(job_admin_dict=None,
                               job_status=job_admin_dict['status'],
                               job_type=job_admin_dict['job'].get('Type', 'unknown'),
                               job_group_id=job_admin_dict['job'].get('job_group_id', 'unknown'),
                               retry_attempt=job_admin_dict.get('retry_attempt', 1))

        base_dir = self.jobStatusBaseDir(job_status)

        if job_status == JobRetry and retry_attempt != None:
            return os.path.join(base_dir, str(retry_attempt))
        elif job_status in [JobProduced, JobRemoved, JobFailed]:
            group_dir = '%s_%s' % (job_type, job_group_id)
            return os.path.join(base_dir, group_dir)

        return base_dir

    def jobPath(self, job_admin_dict):
        dir = self.jobDir(job_admin_dict)
        filename = 'j%s.xml' % job_admin_dict['job']['JobId']
        path = os.path.join(dir, filename)
        return path

    def scanJobsdir(self):
        with self.__lock:
            logger.info('scanning jobs dirs in %s', self.__jobs_dir)

            for status in [JobToDo, JobScheduled, JobProducing, JobRetry]:
                job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status)

                for job_admin_dict in job_admin_dicts:
                    self.addNewJob(job_admin_dict, check_done_dirs=False, add_old_jobs_from_disk=False)

                logger.info('added %d existing %s jobs from disk', len(job_admin_dicts), jobState2String(status))

            # which (type, group_id) jobs were read
            # read the done jobs for these groups as well
            unique_type_groups = set([(jad['job']['Type'], jad['job'].get('job_group_id', 'unknown_group')) for jad in self.__job_admin_dicts.values()])

            if unique_type_groups:
                logger.info('scanning for done jobs for %s', unique_type_groups)

                for job_type, job_group_id in unique_type_groups:
                    for status in [JobFailed, JobProduced, JobRemoved]:
                        job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status, job_type=job_type, job_group_id=job_group_id)

                        for job_admin_dict in job_admin_dicts:
                            self.addNewJob(job_admin_dict, check_done_dirs=False, add_old_jobs_from_disk=False)

                        logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id)

            logger.info('finished scanning jobs')

    def addNewJob(self, job_admin_dict, check_done_dirs=False, add_old_jobs_from_disk=False):
        with self.__lock:
            job_id = job_admin_dict['job']['JobId']
            job_group_id = job_admin_dict['job'].get('job_group_id')
            logger.info('adding new job %s in group %s %s with status %s',
                        job_id,
                        job_type,
                        job_group_id,
                        jobState2String(job_admin_dict.get('status', JobToDo)))

            if check_done_dirs:
                # check if this job is already in memory with a done status
                matching_done_jads = []
                for status in [JobFailed, JobProduced, JobRemoved, JobRetry]:
                    matching_done_jads_for_status = self.getJobAdminDicts(job_group_id=job_group_id, status=status)
                    matching_done_jads_for_status = [jad for jad in matching_done_jads_for_status if jad['job']['JobId'] == job_id]

                    if matching_done_jads_for_status:
                        matching_done_jads += matching_done_jads_for_status

                if not matching_done_jads:
                    #if not in memory, check on disk
                    for status in [JobFailed, JobProduced, JobRemoved, JobRetry]:
                        matching_done_jads_for_status = self.getJobAdminDictsFromDisk(job_status=status,
                                                                                      job_type=job_type,                                                                            job_group_id=job_group_id,                                                                        job_id=job_id)

                        if matching_done_jads_for_status:
                            matching_done_jads += matching_done_jads_for_status

                # remove job from 'done' directories if present (this is a resubmitted job)
                for done_jad in matching_done_jads:
                    try:
                        logger.info('removing done job %s from %s because it is resubmitted', job_id, done_jad['path'])
                        os.remove(done_jad['path'])
                    except Exception as e:
                        logger.error('error while removing done job %s from %s: %s', job_id, done_jad['path'], e)

            self.__job_admin_dicts[job_id] = job_admin_dict
            if 'status' not in job_admin_dict:
                job_admin_dict['status'] = JobToDo

            if 'created_at' not in job_admin_dict:
                job_admin_dict['created_at'] = datetime.utcnow()

            job_admin_dict['updated_at'] = job_admin_dict['created_at']

            # store start- finish times per try in runs
            if not os.path.isdir(todo_dir):
                try:
                    os.makedirs(todo_dir)
                except OSError as e:
                    logger.error(e)

            if 'path' not in job_admin_dict or 'failed' not in job_admin_dict.get('path', ''):
                path = self.jobPath(job_admin_dict)
                job_admin_dict['path'] = path
                try:
                    if not os.path.exists(path):
                        logger.info('saving job %s on disk: %s', job_id, path)
                        with open(path, 'w') as file:
                            file.write(job_admin_dict['job_xml'])
                except Exception as e:
                    logger.error(e)
            else:
                job_dirname = os.path.dirname(job_admin_dict['path'])
                expected_dirname = self.jobDir(job_admin_dict)
                if job_dirname != expected_dirname:
                    # a new todo job should be located in the expected dir based on its status,
                    # it is not, so force it there
                    self.updateJobStatus(job_admin_dict['job']['JobId'], job_admin_dict['status'])

        if add_old_jobs_from_disk and job_group_id is not None:
            group_jads = self.getJobAdminDicts(job_group_id=job_group_id)
            group_jads = [jad for jad in group_jads if jad['job']['JobId'] != job_id]

            if not group_jads:
                logger.info('%s is the first new job of group %s, scanning disk for \'old\' jobs of this group.', job_id, job_group_id)
                jads_from_disk = self.getJobAdminDictsFromDisk(job_group_id=job_group_id, job_type=job_type)
                for jad in jads_from_disk:
                    if jad['job']['JobId'] != job_id:
                        logger.info('(re)adding job %s with status %s from group %s from disk',
                                    jad['job']['JobId'],
                                    jobState2String(jad.get('status')),
                                    jad['job']['job_group_id'])
                        self.__job_admin_dicts[jad['job']['JobId']] = jad

    def updateJobStatus(self, job_id, new_status, lta_site=None, message=None):
        with self.__lock:
            job_admin_dict = self.__job_admin_dicts.get(job_id)

            if not job_admin_dict:
                logger.error('updateJobStatus: unknown job %s with new status %s', job_id, jobState2String(new_status))
                return

            try:
                job_admin_dict['updated_at'] = datetime.utcnow()

                if new_status == JobProducing:
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)] = {}
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['started_at'] = datetime.utcnow()

                if new_status == JobProduced or new_status == JobTransferFailed:
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)] = {}
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow()

                if lta_site:
                    job_admin_dict['lta_site'] = lta_site

                job_admin_dict['last_message'] = message

                current_status = job_admin_dict.get('status', JobToDo)

                if new_status == JobTransferFailed:
                    # special case for jobs which failed to transer, which will be retried
                    current_retry_attempt = job_admin_dict.get('retry_attempt', 0)
                    next_retry_attempt = current_retry_attempt + 1

                    if next_retry_attempt < self.__max_num_retries:
                        if message and 'not on disk' in message.lower():
                            logger.info('job %s transfer failed because source data was not on disk, not retrying anymore', job_id)
                            new_status = JobFailed
                        elif message and 'invalid sip' in message.lower():
                            logger.info('job %s transfer failed because the SIP is invalid, not retrying anymore, please fix SIP and resubmit job to ingest.', job_id)
                            new_status = JobFailed
                        else:
                            new_status = JobRetry
                            job_admin_dict['retry_attempt'] = next_retry_attempt
                            job_admin_dict['job']['last_retry_attempt'] = next_retry_attempt == (self.__max_num_retries-1)
                    else:
                        logger.info('job %s transfer failed %s times, not retrying anymore',
                                    job_id,
                                    job_admin_dict.get('retry_attempt', 0))
                        new_status = JobFailed

                if new_status != current_status:
                    logger.info('updating job %s status from %s to %s%s',
                                job_id,
                                jobState2String(current_status),
                                jobState2String(new_status),
                                (' attempt #%d' % job_admin_dict.get('retry_attempt', 1))
                                if (new_status == JobRetry or current_status == JobRetry) else '')
                # move the job file to its new status directory
                # determine current and new paths
                current_path = job_admin_dict.get('path', '')
                new_path = self.jobPath(job_admin_dict)
                new_dirname = os.path.dirname(new_path)

                if not os.path.isdir(new_dirname):
                    try:
                        os.makedirs(new_dirname)
                    except OSError as e:
                        logger.error(e)

                if new_path != current_path:
                    logger.debug('moving job file from %s to %s.', current_path, new_path)
                    shutil.move(current_path, new_path)
                    job_admin_dict['path'] = new_path

                if new_status == JobRemoved or new_status == JobFailed:
                    # send notification
                    # this is (also) picked up by the ingestmomadapter
                    # which also sends a status update to MoM, including the last_message
                    if new_status == JobRemoved:
                        job_admin_dict['last_message'] = 'removed from queue'

                    self._sendNotification(job_admin_dict)

                # finally, remove job from interal admin jobs dict if finished
                if job_admin_dict['status'] in [JobProduced, JobFailed, JobRemoved]:
                    current_job_group_id = job_admin_dict['job'].get('job_group_id', 'unknown')
                    current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=current_job_group_id)

                    if len(current_group_jads) == 0:
                        logger.info('all jobs in group %s are done', current_job_group_id)
                        self.sendJobGroupFinishedMail(current_job_group_id)

                        current_group_done_jads = self.getDoneJobAdminDicts(job_group_id=current_job_group_id)
                        logger.info('removing %s jobs of group %s from job management server memory',
                                    len(current_group_done_jads),
                                    current_job_group_id)

                        for jad in current_group_done_jads:
                            del self.__job_admin_dicts[jad['job']['JobId']]
                    else:
                        logger.info('%s jobs to do in group %s', len(current_group_jads), current_job_group_id)
            except Exception as e:
                logger.error("updateJobStatus(job_id=%s, new_status=%s, lta_site=%s, message=%s) %s",
                             job_id,
                             jobState2String(new_status),
                             lta_site,
                             message,
                             e)

    def _sendNotification(self, job_admin_dict):
        try:
            job = job_admin_dict['job']
            contentDict = {'job_id': job['JobId'],
                           'archive_id': job['ArchiveId'],
                           'project': job['Project'],
                           'type': job["Type"],
                           'dataproduct': job['DataProduct']}

            if 'ObservationId' in job:
                contentDict['otdb_id'] = job['ObservationId']

            if 'lta_site' in job_admin_dict:
                contentDict['lta_site'] = job_admin_dict['lta_site']

            if 'last_message' in job_admin_dict:
                contentDict['message'] = job_admin_dict['last_message']

            if 'job_group_id' in job_admin_dict:
                contentDict['export_id'] = job_admin_dict['job_group_id']

            status = jobState2String(job_admin_dict['status'])
            status = status[status.index('(') + 1:status.index(')')]

            msg = EventMessage(context=self.notification_prefix + status, content=contentDict)
            # remove message from queue's when not picked up within 48 hours,
            # otherwise mom might endlessly reject messages if it cannot handle them
            logger.info('Sending notification %s: %s' % (status, str(contentDict).replace('\n', ' ')))
            self.event_bus.send(msg)

        except Exception as e:
            logger.error(str(e))

    def removeExportJob(self, export_group_id):
        logger.info('removing export job %s', export_group_id)
        job_admin_dicts = self.getJobAdminDicts(job_group_id=export_group_id)

        if job_admin_dicts:
            for jad in job_admin_dicts:
                self.updateJobStatus(jad['job']['JobId'], JobRemoved)

    def getExportIds(self):
        with self.__lock:
            return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in self.__job_admin_dicts.values()])))

    def __putStalledJobsBackToToDo(self):
        if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(minutes=1):
            return

            stalled_job_admin_dicts = [jad for jad in self.__job_admin_dicts.values()
                                       if (jad['status'] == JobProducing or jad['status'] == JobScheduled)
                logger.info('putting job %s back to ToDo because it did not make any progress during the last 15min', jad['job']['JobId'])
                self.updateJobStatus(jad['job']['JobId'], JobToDo)

            self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()

    def getNextJobToRun(self):
        '''get the next job to run.
        examine all 'to_do' and 'retry' jobs
        higher priority jobs always go first
        equal priority jobs will return 'to_do' jobs over 'retry' jobs
        'retry' jobs are sorted by least amount of retry attempts
        source load balancing: the more jobs transfer from a certain source host,
                               the less likely it is a next job will be for that source host as well
        '''

        # helper method to get the source host from the job's location
        def getSourceHost(job_admin_dict):
            try:
                host = job_admin_dict['job']['Location'].split(':')[0]
                if 'cep4' in host.lower() or 'cpu' in host.lower():
                    return 'localhost'
                return host
            except:
                return 'localhost'

        running_jads = self.getJobAdminDicts(status=JobProducing) + self.getJobAdminDicts(status=JobScheduled)
        running_hosts = {}
        for jad in running_jads:
            host = getSourceHost(jad)
            running_hosts[host] = running_hosts.get(host, 0) + 1

        with self.__lock:
            def getNextJobByStatus(status, min_age=None, exclude_job_group_ids=[]):
                    if jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY) != jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY):
                        return jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY) - jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY)

                    # equal priorities, so sort on next sort criterion, the retry attempt
                    if jad_a['status'] == JobRetry and jad_b['status'] == JobRetry:
                        if jad_a.get('retry_attempt', 0) != jad_b.get('retry_attempt', 0):
                            return jad_b.get('retry_attempt', 0) - jad_a.get('retry_attempt', 0)

                    # equal retry_attempt, so sort on next sort criterion,
                    # jobs for which the number of running jobs on that host is lower
                    # (load balance the source hosts)
                    nrOfRunningJobsOnSourceHostA = running_hosts.get(getSourceHost(jad_a), 0)
                    nrOfRunningJobsOnSourceHostB = running_hosts.get(getSourceHost(jad_b), 0)

                    if nrOfRunningJobsOnSourceHostA != nrOfRunningJobsOnSourceHostB:
                        return nrOfRunningJobsOnSourceHostA - nrOfRunningJobsOnSourceHostB

                    # equal number of jobs on source hosts, so sort on next sort criterion, the created_at timestamp (FIFO)
                    if jad_a['created_at'] < jad_b['created_at']:
                        return -1
                    if jad_a['created_at'] > jad_b['created_at']:
                        return 1

                    # TODO: we can add a lot of sort criteria in the future.
                    # For now, stick with FIFO and retry_attempt, after priority and source_host_load_balancing
                    return 0

                job_admin_dicts = self.getJobAdminDicts(status=status)

                # filter out priority 0 jobs (which are paused)
                job_admin_dicts = [jad for jad in job_admin_dicts if jad['job'].get('priority', 0) > 0]

                if min_age:
                    now = datetime.utcnow()
                    job_admin_dicts = [jad for jad in job_admin_dicts if now - jad['updated_at'] >= min_age]

                if exclude_job_group_ids:
                    # filter out jad's from exclude_job_group_ids
                    job_admin_dicts = [jad for jad in job_admin_dicts if 'job_group_id' not in jad['job'] or jad['job']['job_group_id'] not in exclude_job_group_ids]

                job_admin_dicts = sorted(job_admin_dicts, cmp=jad_compare_func)
                if job_admin_dicts:
                    logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status))
                    return job_admin_dicts[0]
                return None

            # gather some average speed stats for running groups
            running_job_group_avg_speeds = {}
            running_job_group_ids = set([jad['job']['job_group_id'] for jad in running_jads if 'job_group_id' in jad['job']])
            for job_group_id in running_job_group_ids:
                # get last 10 finished jobs with an average_speed for this group to compute avg speed
                group_finished_jads = [jad for jad in self.getJobAdminDicts(job_group_id=job_group_id, status=[JobProduced]) if 'average_speed' in jad]
                group_finished_jads = sorted(group_finished_jads, key=lambda jad: jad['updated_at'])[-10:]
                if len(group_finished_jads) == 10:
                    running_job_group_avg_speeds[job_group_id] = sum(jad['average_speed'] for jad in group_finished_jads)/len(group_finished_jads)
                    logger.debug('average speed over last 10 transers for group %s = %s', job_group_id, humanreadablesize(running_job_group_avg_speeds[job_group_id], 'Bps'))

            # which job_group_ids are slow?
            # this is most likely caused by transfers of small files for which the overhead has a huge impact on the overall average transfer speed
            # thus, these slow_running_job_group_ids are not making optimal use of the available bandwidth
            # since we cannot start a huge amount of parallel transfers to increase the total used average bandwith
            slow_running_job_group_ids = [job_group_id for job_group_id,avg_speed in running_job_group_avg_speeds.items() if avg_speed < 1.0e7]

            # randomize whether a slow job_group_id will produce the next job or not.
            # if not (and it is in the exclude_job_group_ids list), then the first available job from the rest of the groups is picked.
            # it is quite likely that the next group has larger files, thus will make better use of the available bandwith
            # the actual transfer speed of the next group will be measured as well, and if slow, it will be excluded once in a while as well.
            # The higher the job_group's priority, the bigger chance it has to be included, so high priority jobs will run faster overall.
            exclude_job_group_ids = [job_group_id for job_group_id in slow_running_job_group_ids
                                     if random() > self.getExportJobPriority(job_group_id) * 0.1111]

            # if there are job_group_ids to be excluded (for slow transfers) but there are no other group_ids to be exported,
            # then do not exclude anything
            if exclude_job_group_ids:
                all_group_ids = set(self.getExportIds())
                if set(exclude_job_group_ids) == all_group_ids:
                    exclude_job_group_ids = []
                else:
                    logger.info('excluding jobs from group(s) %s while determining the next job to run, because of their slow overall average speed', exclude_job_group_ids)

            # get the next job to run, both for JobToDo and JobRetry
            next_to_do_jad = getNextJobByStatus(JobToDo, exclude_job_group_ids=exclude_job_group_ids)
            next_retry_jad = getNextJobByStatus(JobRetry, timedelta(minutes=15) if next_to_do_jad else None)

            # limit the number of running jobs per source host if not localhost
            if next_to_do_jad and getSourceHost(next_to_do_jad) != 'localhost':
                if running_hosts.get(getSourceHost(next_to_do_jad), 0) > 1:
            # limit the number of running jobs per source host if not localhost
            if next_retry_jad and getSourceHost(next_retry_jad) != 'localhost':
                if running_hosts.get(getSourceHost(next_retry_jad), 0) > 1:
                    next_retry_jad = None

            if next_to_do_jad and next_retry_jad:
                # if next_retry_jad has higher priority then next_to_do_jad, then return next_retry_jad
                if next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
                    return next_retry_jad

                # or if next_to_do_jad has higher priority then next_retry_jad, then return next_to_do_jad
                if next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
                    return next_to_do_jad

                # or if next_retry_jad is already waiting for over an hour, then return next_retry_jad
                if datetime.utcnow() - next_retry_jad['updated_at'] > timedelta(minutes=60):
                    return next_retry_jad

                # or if next_retry_jad is older than next_to_do_jad
                if next_retry_jad['updated_at'] > next_to_do_jad.get('updated_at', next_to_do_jad.get('created_at')):
                    return next_retry_jad

            if next_to_do_jad:
                # just return the next_to_do_jad
                return next_to_do_jad

            # in all other cases, return next_retry_jad (which might be None)
            return next_retry_jad

    def canProduceNextJob(self):
        # test if the managed_job_queue is empty enough, and if our administration agrees
        try:
            with self.__jobs_for_transfer_queue_peeker:
                num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.01)
                    scheduled_jads = self.getJobAdminDicts(status=JobScheduled)
                return False
        except Exception as e:
            logger.error('canProduceNextJob: %s', e)
        return True

    def produceNextJobsIfPossible(self):
        start_producing_timestamp = datetime.utcnow()
        while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=2):
            job_admin_dict = self.getNextJobToRun()
            if job_admin_dict:
                if os.path.exists(job_admin_dict.get('path')):
                    msg = CommandMessage(content=job_admin_dict.get('job_xml'))
                    msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
                    self.__jobs_for_transfer_queue.send(msg)
                    logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker)
                    self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
                    job_id = job_admin_dict['job']['JobId']
                    logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
                    del self.__job_admin_dicts[job_id]

                # do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
                # so the queue is empty again
                # and we can submit yet another job
    def onJobStarted(self, job_notification_dict):
        self.__notification_listener._logJobNotification('started ', job_notification_dict);
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProducing,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobFinished(self, job_notification_dict):
        self.__notification_listener._logJobNotification('finished', job_notification_dict);
        job_id = job_notification_dict.get('job_id')

        # file_type might have changed to unspec for example
        if 'file_type' in job_notification_dict:
            with self.__lock:
                job_admin_dict = self.__job_admin_dicts.get(job_id)
                if job_admin_dict:
                    job_admin_dict['job']['file_type'] = job_notification_dict['file_type']

        try:
            if 'average_speed' in job_notification_dict:
                with self.__lock:
                    # keep track of average transfer speed
                    job_admin_dict = self.__job_admin_dicts.get(job_id)
                    if job_admin_dict:
                        job_average_speed = float(job_notification_dict['average_speed'])
                        job_admin_dict['average_speed'] = job_average_speed
        except Exception as e:
            #just continue
            logger.exception(str(e))

        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProduced,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobTransferFailed(self, job_notification_dict):
        self.__notification_listener._logJobNotification('transfer failed ', job_notification_dict);
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobTransferFailed,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobProgress(self, job_notification_dict):
        self.__notification_listener._logJobNotification('progress', job_notification_dict, level=logging.DEBUG);
        # touch job
        # producing jobs which are untouched for 5min are put back to JobToDo
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProducing,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    @staticmethod
    def getSubDirs(dir_path):
        dir_lists = [[os.path.join(root,dir) for dir in dirs if root==dir_path] for root, dirs, files in os.walk(dir_path) if dirs]
        if dir_lists:
            return reduce(lambda x, y: x + y, dir_lists)
        return []

    def getDoneJobAdminDicts(self, job_group_id=None):
        return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobFailed, JobProduced, JobRemoved])

    def getNotDoneJobAdminDicts(self, job_group_id=None):
        return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobToDo, JobScheduled, JobProducing, JobRetry])

    def getJobAdminDicts(self, job_group_id=None, status=None):
        with self.__lock:
            jads = [jad for jad in self.__job_admin_dicts.values()]
                job_group_id = str(job_group_id)
                jads = [jad for jad in jads if str(jad['job'].get('job_group_id')) == job_group_id]

            if status != None:
                if isinstance(status, int):
                    jads = [jad for jad in jads if jad['status'] == status]
                else:
                    statuses = set(status)
                    jads = [jad for jad in jads if jad['status'] in statuses]

    def getStatusReportDict(self):
        with self.__lock:
            export_ids = self.getExportIds()
            logger.info('getStatusReportDict export_ids: %s', export_ids)

            result = {}
            for export_id in export_ids:
                try:
                    finished_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobProduced)
                    failed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobFailed)
                    removed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobRemoved)

                    done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads
                    done_group_jobs = [jad['job'] for jad in done_group_jads]

                    current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=export_id)
                    current_group_jobs = [jad['job'] for jad in current_group_jads]
                    all_group_jads = current_group_jads + done_group_jads
                    all_group_jobs = current_group_jobs + done_group_jobs

                    priority = min([job.get('priority', DEFAULT_JOB_PRIORITY) for job in current_group_jobs]) if current_group_jobs else 4
                    submitters = list(set([job['Submitter'] for job in all_group_jobs if 'Submitter' in job]))
                    projects = list(set([job['Project'] for job in all_group_jobs if 'Project' in job]))
                    lta_sites = list(set([jad['lta_site'] for jad in all_group_jads if 'lta_site' in jad]))

                    job_run_events = {}
                    for jad in all_group_jads:
                        for run in jad['runs'].values():
                            if 'started_at' in run:
                                started_timestamp = run['started_at']

                                if started_timestamp not in job_run_events:
                                    job_run_events[started_timestamp] = 0

                                job_run_events[started_timestamp] += 1

                                if 'finished_at' in run:
                                    finished_timestamp = run['finished_at']

                                    if finished_timestamp not in job_run_events:
                                        job_run_events[finished_timestamp] = 0

                                    job_run_events[finished_timestamp] -= 1

                    all_run_timestamps = sorted(job_run_events.keys())
                    running_jobs_values = []
                    if all_run_timestamps:
                        prev_value = 0

                        for t in all_run_timestamps:
                            value = prev_value + job_run_events[t]
                            running_jobs_values.append(value)
                            prev_value = value

                    job_finised_events = {}
                    for jad in all_group_jads:
                        if jad['status'] == JobProduced or jad['status'] == JobFailed:
                            if jad['runs']:
                                final_run = max(jad['runs'].keys())

                                run = jad['runs'][final_run]
                                if 'started_at' in run and 'finished_at' in run:
                                    finished_timestamp = run['finished_at']

                                    if finished_timestamp not in job_finised_events:
                                        job_finised_events[finished_timestamp] = 0

                                    job_finised_events[finished_timestamp] += 1

                    finished_jobs_values = []
                    finished_timestamps = sorted(job_finised_events.keys())
                    for i, t in enumerate(finished_timestamps):
                        finished_jobs_values.append(i + 1)

                    result[export_id] = {'priority': priority,
                                         'submitters': submitters,
                                         'projects': projects,
                                         'lta_sites': lta_sites,
                                          'series': { 'running_jobs': { 'timestamps': all_run_timestamps, 'values': running_jobs_values },
                                                      'finished_jobs': { 'timestamps': finished_timestamps, 'values': finished_jobs_values }
                                                    },
                                          'jobs': { 'running': len(self.getJobAdminDicts(job_group_id=export_id, status=JobProducing)),
                                                    'to_do': len(self.getJobAdminDicts(job_group_id=export_id, status=JobToDo)),
                                                    'scheduled': len(self.getJobAdminDicts(job_group_id=export_id, status=JobScheduled)),
                                                    'retry': len(self.getJobAdminDicts(job_group_id=export_id, status=JobRetry)),
                                                  'finished': len(finished_group_jads),
                                                  'failed': len(failed_group_jads)}}
                except Exception as e:
                    logger.error(e)

            return convertIntKeysToString(result)

    def setExportJobPriority(self, export_id, priority):
        priority = max(0, min(9, int(priority)))
        with self.__lock:
            jads = self.getJobAdminDicts(job_group_id=export_id)

            logger.info('updating the priority of %s jobs of export %s to level %s', len(jads), export_id, priority)

            for jad in jads:
                try:
                    updatePriorityInJobFile(jad['path'], priority)
                except Exception as e:
                    logger.error(e)

    def getExportJobPriority(self, export_id):
        with self.__lock:
            job_group_id = str(export_id)
            for jad in self.__job_admin_dicts.values():
                if str(jad['job'].get('job_group_id')) == job_group_id:
                    if 'priority' in jad['job']:
                        return jad['job']['priority']

        return DEFAULT_JOB_PRIORITY

    def getReport(self, job_group_id):
        with self.__lock:
            current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=job_group_id)
            current_group_jobs = [jad['job'] for jad in current_group_jads]