Skip to content
Snippets Groups Projects
ingestjobmanagementserver.py 55.4 KiB
Newer Older
#!/usr/bin/env python

# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#

from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.server.config import DEFAULT_INGEST_BUSNAME, DEFAULT_INGEST_SERVICENAME
from lofar.lta.ingest.common.config import DEFAULT_BROKER
from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, DEFAULT_JOB_PRIORITY
from lofar.lta.ingest.server.config import DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME
from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import convertIntKeysToString

import os
import os.path
import fnmatch
import shutil
import time
from threading import RLock
from datetime import datetime, timedelta

import logging
logger = logging.getLogger()

class IngestServiceMessageHandler(MessageHandlerInterface):
    def __init__(self, job_manager, **kwargs):
        super(IngestServiceMessageHandler, self).__init__(**kwargs)

        self._job_manager = job_manager
        self.service2MethodMap = { 'RemoveExportJob': self._job_manager.removeExportJob,
                                   'SetExportJobPriority': self._job_manager.setExportJobPriority,
                                   'GetStatusReport': self._job_manager.getStatusReportDict,
                                   'GetReport': self._job_manager.getReport,
                                   'GetExportIds': self._job_manager.getExportIds }

class IngestJobManager:
    def __init__(self,
                 busname=DEFAULT_INGEST_BUSNAME,
                 servicename=DEFAULT_INGEST_SERVICENAME,
                 notification_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME,
                 notification_prefix=DEFAULT_INGEST_NOTIFICATION_PREFIX,
                 notification_listen_queue_name=DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME,
                 incoming_job_queue_name=DEFAULT_INGEST_JOBS_QUEUENAME,
                 jobs_for_transfer_queue_name=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
                 jobs_dir=JOBS_DIR,
                 max_num_retries=MAX_NR_OF_RETRIES,
                 broker=None,
                 **kwargs):

        self.__notification_listener = IngestBusListener(busname=notification_listen_queue_name, subjects=notification_prefix+'*', broker=broker)
        self.__notification_listener.onJobStarted = self.onJobStarted
        self.__notification_listener.onJobProgress = self.onJobProgress
        self.__notification_listener.onJobTransferFailed = self.onJobTransferFailed
        self.__notification_listener.onJobFinished = self.onJobFinished

        self.__jobs_dir = jobs_dir
        self.__max_num_retries = max_num_retries
        self.__job_admin_dicts = {}
        self.__lock = RLock()
        self.__running = False

        self.notification_prefix = notification_prefix
        self.event_bus           = ToBus(notification_busname, broker=broker)


        self.service = Service(servicename,
                               IngestServiceMessageHandler,
                               busname=busname,
                               broker=broker,
                               use_service_methods=True,
                               numthreads=1,
                               handler_args={'job_manager': self})

        #incoming jobs (from mom, eor, user ingest, etc)
        self.__incoming_job_queue = FromBus(incoming_job_queue_name, broker=broker)

        #queue into which this manager produces jobs, which are consumened for actual data transfer by the ingestservices
        self.__jobs_for_transfer_queue = ToBus(jobs_for_transfer_queue_name, broker=broker)

        # the peeker frombus is used to see if the managed job queue we submit jobs to is emptyied by consumers
        # we want this manager to produce jobs just in time, so we can actively shuffle jobs priorities etc.
        self.__jobs_for_transfer_queue_peeker = FromBus(jobs_for_transfer_queue_name, broker=broker)

        self.__running_jobs_log_timestamp = datetime.utcnow()
        self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()

    def quit(self):
        self.__running = False

    def run(self):
        self.__running = True

        # start with full jobs dir scan to retreive state from disk
        self.scanJobsdir()

        logger.info('starting listening for new jobs and notifications')

        #open queue connections...
        with self.service as _1, self.__incoming_job_queue as _2, self.__jobs_for_transfer_queue as _3, self.__notification_listener as _4, self.event_bus as _5:
            #... and run the event loop,
            #produce jobs to managed job queue for ingest transfer services
            #receive new jobs
            logger.info('starting to produce jobs')
            while self.__running:
                try:
                    self.produceNextJobsIfPossible()

                    #receive any jobs from mom/user_ingest/eor/etc
                    receive_start = datetime.utcnow()
                    msg = self.__incoming_job_queue.receive(timeout=0.01)
                    while msg:
                        logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg)
                        self.__incoming_job_queue.ack(msg)

                        if isinstance(msg, CommandMessage):
                            job = parseJobXml(msg.content)
                            if job and job.get('JobId'):
                                if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY):
                                    job['priority'] = msg.priority

                                logger.info("received job on queue %s: %s", self.__incoming_job_queue.address, job)
                                job_admin_dict = { 'job': job, 'job_xml': msg.content }
                                self.addNewJob(job_admin_dict, check_done_dirs=True)
                        else:
                            logger.warn("unexpected message type: %s", msg)

                        if datetime.utcnow() - receive_start > timedelta(seconds=10):
                            # break out of receiving while loop early,
                            # so we can also produce some jobs
                            # receive more in next iteration
                            break

                        #see if there are any more jobs to receive and process them, else jump out of while loop
                        msg = self.__incoming_job_queue.receive(timeout=0.01)

                    # check if producing jobs are actually making progress
                    # maybe the ingest transfer server was shut down in the middle of a transer?
                    # the ingest transfer server is stateless, so it won't restart that job itself (by design)
                    # when transfering at very regular intervals progress updates are given
                    # so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress)


                    #report on running jobs
                    if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10):
                        self.__running_jobs_log_timestamp = datetime.utcnow()
                        producing_jads = self.getJobAdminDicts(status=JobProducing)
                        if producing_jads:
                            if len(producing_jads) == 1:
                                logger.info('1 job is running')
                            else:
                                logger.info('%s jobs are running', len(producing_jads))

                except KeyboardInterrupt:
                    break
                except Exception as e:
                    logger.error(e)
            logger.info('finished producing jobs')
        logger.info('finished listening for new jobs and notifications')

    def nrOfUnfinishedJobs(self):
        return len(self.getNotDoneJobAdminDicts())

    def getJobAdminDictsFromDisk(self, job_status=None, job_type=None, job_group_id=None, job_id=None):
        dir_path = self.jobDir(job_admin_dict=None, job_status=job_status, job_type=job_type, job_group_id=job_group_id)

        job_admin_dicts = []

        if not os.path.isdir(dir_path):
            return job_admin_dicts

        dir_paths = [dir_path]

        if job_status==JobRetry:
            dir_paths += IngestJobManager.getSubDirs(dir_path)

        for dp in dir_paths:
            try:
                if job_id:
                    logger.info('scanning %s for job file of %s', dp, job_id)
                else:
                    logger.info('scanning %s for job files', dp)

                pattern = ('*%s*.xml*' % job_id) if job_id else '*.xml*'
                xml_files = [os.path.join(dp, f) for f in os.listdir(dp) if fnmatch.fnmatch(f, pattern)]

                logger.debug('found %d xml files in %s', len(xml_files), dp)

                for path in xml_files:
                    try:
                        with open(path) as file:
                            file_content = file.read()
                            job = parseJobXml(file_content)
                            if job and job.get('JobId'):
                                job_admin_dict = { 'path': path,
                                                  'job': job,
                                                  'job_xml': file_content,
                                                  'created_at': datetime.fromtimestamp(os.lstat(path).st_ctime) }
                                job_admin_dicts.append(job_admin_dict)
                    except Exception as e:
                        logger.error(e)
            except Exception as e:
                logger.error(e)

        if job_status:
            for job_admin_dict in job_admin_dicts:
                job_admin_dict['status'] = job_status

        logger.info('read %d job files from %s', len(job_admin_dicts), dir_path)
        return job_admin_dicts

    def jobStatusBaseDir(self, jobstatus):
        if jobstatus == JobToDo:
            return os.path.join(self.__jobs_dir, 'to_do')
        if jobstatus == JobRetry:
            return os.path.join(self.__jobs_dir, 'retry')
        if jobstatus == JobFailed:
            return os.path.join(self.__jobs_dir, 'failed')
        if jobstatus == JobHold:
            return os.path.join(self.__jobs_dir, 'on_hold')
        if jobstatus == JobScheduled:
            return os.path.join(self.__jobs_dir, 'scheduled')
        if jobstatus == JobProducing:
            return os.path.join(self.__jobs_dir, 'producing')
        if jobstatus == JobProduced:
            return os.path.join(self.__jobs_dir, 'done')
        if jobstatus == JobRemoved:
            return os.path.join(self.__jobs_dir, 'removed')
        return os.path.join(self.__jobs_dir, 'unknown')

    def jobDir(self, job_admin_dict, job_status=None, job_type=None, job_group_id=None, retry_attempt=None):
        if job_admin_dict:
            return self.jobDir(job_admin_dict=None,
                               job_status=job_admin_dict['status'],
                               job_type=job_admin_dict['job'].get('Type', 'unknown'),
                               job_group_id=job_admin_dict['job'].get('job_group_id', 'unknown'),
                               retry_attempt=job_admin_dict.get('retry_attempt', 1))

        base_dir = self.jobStatusBaseDir(job_status)

        if job_status == JobRetry and retry_attempt != None:
            return os.path.join(base_dir, str(retry_attempt))
        elif job_status in [JobProduced, JobRemoved, JobFailed]:
            group_dir = '%s_%s' % (job_type, job_group_id)
            return os.path.join(base_dir, group_dir)

        return base_dir

    def jobPath(self, job_admin_dict):
        dir = self.jobDir(job_admin_dict)
        filename = 'j%s.xml' % job_admin_dict['job']['JobId']
        path = os.path.join(dir, filename)
        return path

    def scanJobsdir(self):
        with self.__lock:
            logger.info('scanning jobs dirs in %s', self.__jobs_dir)

            for status in [JobToDo, JobScheduled, JobProducing, JobRetry]:
                job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status)

                for job_admin_dict in job_admin_dicts:
                    self.addNewJob(job_admin_dict, check_done_dirs=False)

                logger.info('added %d existing %s jobs from disk', len(job_admin_dicts), jobState2String(status))

            #which (type, group_id) jobs were read
            #read the done jobs for these groups as well
            unique_type_groups = set([(jad['job']['Type'], jad['job'].get('job_group_id', 'unknown_group')) for jad in self.__job_admin_dicts.values()])

            if unique_type_groups:
                logger.info('scanning for done jobs for %s', unique_type_groups)

                for job_type, job_group_id in unique_type_groups:
                    for status in [JobFailed, JobProduced, JobRemoved]:
                        job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status, job_type=job_type, job_group_id=job_group_id)

                        for job_admin_dict in job_admin_dicts:
                            self.addNewJob(job_admin_dict, check_done_dirs=False)

                        logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id)

            logger.info('finished scanning jobs')

    def addNewJob(self, job_admin_dict, check_done_dirs=False):
        with self.__lock:
            job_id = job_admin_dict['job']['JobId']
            job_group_id = job_admin_dict['job'].get('job_group_id')
            job_type=job_admin_dict['job']['Type']
            logger.info('adding new job %s in group %s %s with status %s',
                        job_id,
                        job_type,
                        job_group_id,
                        jobState2String(job_admin_dict.get('status', JobToDo)))

            if check_done_dirs:
                #remove job from 'done' directories if present (this is a resubmitted job)
                for done_status in [JobFailed, JobProduced, JobRemoved]:
                    matching_done_jads = self.getJobAdminDictsFromDisk(job_status=done_status,
                                                                       job_type=job_type,
                                                                       job_group_id=job_group_id,
                                                                       job_id=job_id)

                    for done_jad in matching_done_jads:
                        try:
                            logger.info('removing done job %s from %s because it is resubmitted', job_id, done_jad['path'])
                            os.remove(done_jad['path'])
                        except Exception as e:
                            logger.error('error while removing done job %s from %s: %s', job_id, done_jad['path'], e)

            self.__job_admin_dicts[job_id] = job_admin_dict
            if 'status' not in job_admin_dict:
                job_admin_dict['status'] = JobToDo

            if 'created_at' not in job_admin_dict:
                job_admin_dict['created_at'] = datetime.utcnow()

            job_admin_dict['updated_at'] = job_admin_dict['created_at']

            #store start- finish times per try in runs
            job_admin_dict['runs'] = {}

            #store new job
            todo_dir = self.jobDir(job_admin_dict)

            #create dir dir if not exists
            if not os.path.isdir(todo_dir):
                try:
                    os.makedirs(todo_dir)
                except OSError as e:
                    logger.error(e)

            if 'path' not in job_admin_dict:
                path = self.jobPath(job_admin_dict)
                job_admin_dict['path'] = path
                try:
                    if not os.path.exists(path):
                        logger.info('saving job %s on disk: %s', job_id, path)
                        with open(path, 'w') as file:
                            file.write(job_admin_dict['job_xml'])
                except Exception as e:
                    logger.error(e)
            else:
                job_dirname = os.path.dirname(job_admin_dict['path'])
                expected_dirname = self.jobDir(job_admin_dict)
                if job_dirname != expected_dirname:
                    # a new todo job should be located in the expected dir based on its status,
                    # it is not, so force it there
                    self.updateJobStatus(job_admin_dict['job']['JobId'], job_admin_dict['status'])

    def updateJobStatus(self, job_id, new_status, lta_site=None, message=None):
        with self.__lock:
            job_admin_dict = self.__job_admin_dicts.get(job_id)

            if not job_admin_dict:
                logger.error('updateJobStatus: unknown job %s with new status %s', job_id, jobState2String(new_status))
                return

            try:
                #update updated_at timestamp
                job_admin_dict['updated_at'] = datetime.utcnow()

                if new_status == JobProducing:
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)] = {}
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['started_at'] = datetime.utcnow()

                if new_status == JobProduced or new_status == JobTransferFailed:
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow()

                if lta_site:
                    job_admin_dict['lta_site'] = lta_site

                job_admin_dict['last_message'] = message

                current_status = job_admin_dict.get('status', JobToDo)

                if new_status == JobTransferFailed:
                    #special case for jobs which failed to transer, which will be retried
                    current_retry_attempt = job_admin_dict.get('retry_attempt', 0)
                    next_retry_attempt = current_retry_attempt+1

                    if next_retry_attempt < self.__max_num_retries:
                        if message and 'not on disk' in message:
                            logger.info('job %s transfer failed because source data was not on disk, not retrying anymore',
                                        job_id)
                            new_status = JobFailed
                        else:
                            new_status = JobRetry
                            job_admin_dict['retry_attempt'] = next_retry_attempt
                            job_admin_dict['job']['last_retry_attempt'] = next_retry_attempt == (self.__max_num_retries-1)
                    else:
                        logger.info('job %s transfer failed %s times, not retrying anymore',
                                    job_id,
                                    job_admin_dict.get('retry_attempt', 0))
                        new_status = JobFailed

                if new_status != current_status:
                    #update the internal status
                    logger.info('updating job %s status from %s to %s%s',
                                job_id,
                                jobState2String(current_status),
                                jobState2String(new_status),
                                (' attempt #%d' % job_admin_dict.get('retry_attempt', 1))
                                 if (new_status == JobRetry or current_status == JobRetry) else '')
                    job_admin_dict['status'] = new_status

                #move the job file to its new status directory
                #determine current and new paths
                current_path = job_admin_dict.get('path', '')
                new_path = self.jobPath(job_admin_dict)
                new_dirname = os.path.dirname(new_path)

                #create dir dir if not exists
                if not os.path.isdir(new_dirname):
                    try:
                        os.makedirs(new_dirname)
                    except OSError as e:
                        logger.error(e)

                if new_path != current_path:
                    #do actual file move
                    logger.debug('moving job file from %s to %s.', current_path, new_path)
                    shutil.move(current_path, new_path)
                    job_admin_dict['path'] = new_path


                if new_status == JobRemoved or new_status == JobFailed:
                    #send notification
                    #this is (also) picked up by the ingestmomadapter
                    #which also sends a status update to MoM, including the last_message
                    if new_status == JobRemoved:
                        job_admin_dict['last_message'] = 'removed from queue'

                    self._sendNotification(job_admin_dict)

                #finally, remove job from interal admin jobs dict if finished
                if job_admin_dict['status'] in [JobProduced, JobFailed, JobRemoved]:
                    current_job_group_id = job_admin_dict['job'].get('job_group_id', 'unknown')
                    current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=current_job_group_id)

                    if len(current_group_jads) == 0:
                        logger.info('all jobs in group %s are done', current_job_group_id)
                        self.sendJobGroupFinishedMail(current_job_group_id)

                        current_group_done_jads = self.getDoneJobAdminDicts(job_group_id=current_job_group_id)
                        logger.info('removing %s jobs of group %s from job management server memory',
                                    len(current_group_done_jads),
                                    current_job_group_id)

                        for jad in current_group_done_jads:
                            del self.__job_admin_dicts[jad['job']['JobId']]
                    else:
                        logger.info('%s jobs to do in group %s', len(current_group_jads), current_job_group_id)
            except Exception as e:
                logger.error("updateJobStatus(job_id=%s, new_status=%s, lta_site=%s, message=%s) %s",
                             job_id,
                             jobState2String(new_status),
                             lta_site,
                             message,
                             e)


    def _sendNotification(self, job_admin_dict):
        try:
            job = job_admin_dict['job']
            contentDict = { 'job_id': job['JobId'],
                            'archive_id': job['ArchiveId'],
                            'project': job['Project'],
                            'type': job["Type"],
                            'dataproduct': job['DataProduct'] }

            if 'ObservationId' in job:
                contentDict['otdb_id'] = job['ObservationId']

            if 'lta_site' in job_admin_dict:
                contentDict['lta_site'] = job_admin_dict['lta_site']

            if 'last_message' in job_admin_dict:
                contentDict['message'] = job_admin_dict['last_message']

            if 'job_group_id' in job_admin_dict:
                contentDict['export_id'] = job_admin_dict['job_group_id']

            status = jobState2String(job_admin_dict['status'])
            status = status[status.index('(')+1:status.index(')')]

            msg = EventMessage(context=self.notification_prefix + status, content=contentDict)
            # remove message from queue's when not picked up within 48 hours,
            # otherwise mom might endlessly reject messages if it cannot handle them
            msg.ttl = 48*3600
            logger.info('Sending notification %s: %s' % (status, str(contentDict).replace('\n', ' ')))
            self.event_bus.send(msg)

        except Exception as e:
            logger.error(str(e))

    def removeExportJob(self, export_group_id):
        logger.info('removing export job %s', export_group_id)
        job_admin_dicts = self.getJobAdminDicts(job_group_id=export_group_id)

        if job_admin_dicts:
            for jad in job_admin_dicts:
                self.updateJobStatus(jad['job']['JobId'], JobRemoved)

    def getExportIds(self):
        with self.__lock:
            return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in self.__job_admin_dicts.values()])))

    def __putStalledJobsBackToToDo(self):
        if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(minutes=1):
            return

            stalled_job_admin_dicts = [jad for jad in self.__job_admin_dicts.values()
                                       if (jad['status'] == JobProducing or jad['status'] == JobScheduled)
                                           and now - jad['updated_at'] >= threshold]

            for jad in stalled_job_admin_dicts:
                logger.info('putting job %s back to ToDo because it did not make any progress during the last 15min', jad['job']['JobId'])
                self.updateJobStatus(jad['job']['JobId'], JobToDo)

            self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()


    def getNextJobToRun(self):
        '''get the next job to run.
        examine all 'to_do' and 'retry' jobs
        higher priority jobs always go first
        equal priority jobs will return 'to_do' jobs over 'retry' jobs
        'retry' jobs are sorted by least amount of retry attempts
        source load balancing: the more jobs transfer from a certain source host,
                               the less likely it is a next job will be for that source host as well
        '''

        #helper method to get the source host from the job's location
        def getSourceHost(job_admin_dict):
            try:
                host = job_admin_dict['job']['Location'].split(':')[0]
                if 'cep4' in host.lower() or 'cpu' in host.lower():
                    return 'localhost'
                return host
            except:
                return 'localhost'

        running_jads = self.getJobAdminDicts(status=JobProducing) + self.getJobAdminDicts(status=JobScheduled)
        running_hosts = {}
        for jad in running_jads:
            host = getSourceHost(jad)
            running_hosts[host] = running_hosts.get(host, 0) + 1

        with self.__lock:
            def getNextJobByStatus(status, min_age=None):

                def jad_compare_func(jad_a, jad_b):
                    #sort on priority first
                    if jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY) != jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY):
                        return jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY) - jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY)

                    #equal priorities, so sort on next sort criterion, the retry attempt
                    if jad_a['status'] == JobRetry and jad_b['status'] == JobRetry:
                        if jad_a.get('retry_attempt', 0) != jad_b.get('retry_attempt', 0):
                            return jad_b.get('retry_attempt', 0) - jad_a.get('retry_attempt', 0)

                    # equal retry_attempt, so sort on next sort criterion,
                    # jobs for which the number of running jobs on that host is lower
                    # (load balance the source hosts)
                    nrOfRunningJobsOnSourceHostA = running_hosts.get(getSourceHost(jad_a), 0)
                    nrOfRunningJobsOnSourceHostB = running_hosts.get(getSourceHost(jad_b), 0)

                    if nrOfRunningJobsOnSourceHostA != nrOfRunningJobsOnSourceHostB:
                        return nrOfRunningJobsOnSourceHostA - nrOfRunningJobsOnSourceHostB

                    #equal number of jobs on source hosts, so sort on next sort criterion, the created_at timestamp (FIFO)
                    if jad_a['created_at'] < jad_b['created_at']:
                        return -1
                    if jad_a['created_at'] > jad_b['created_at']:
                        return 1

                    #TODO: we can add a lot of sort criteria in the future.
                    #For now, stick with FIFO and retry_attempt, after priority and source_host_load_balancing
                    return 0

                job_admin_dicts = self.getJobAdminDicts(status=status)

                # filter out priority 0 jobs (which are paused)
                job_admin_dicts = [jad for jad in job_admin_dicts if jad['job'].get('priority', 0) > 0]

                if min_age:
                    now = datetime.utcnow()
                    job_admin_dicts = [jad for jad in job_admin_dicts if now - jad['updated_at'] >= min_age]

                job_admin_dicts = sorted(job_admin_dicts, cmp=jad_compare_func)
                if job_admin_dicts:
                    logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status))
                    return job_admin_dicts[0]
                return None

            #get the next job to run, both for JobToDo and JobRetry
            next_to_do_jad = getNextJobByStatus(JobToDo)
            next_retry_jad = getNextJobByStatus(JobRetry, timedelta(minutes=15) if next_to_do_jad else None)

            #limit the number of running jobs per source host if not localhost
            if next_to_do_jad and getSourceHost(next_to_do_jad) != 'localhost':
                if running_hosts.get(getSourceHost(next_to_do_jad), 0) > 1:
                    next_to_do_jad = None

            #limit the number of running jobs per source host if not localhost
            if next_retry_jad and getSourceHost(next_retry_jad) != 'localhost':
                if running_hosts.get(getSourceHost(next_retry_jad), 0) > 1:
                    next_retry_jad = None

            if next_to_do_jad and next_retry_jad:
                # if next_retry_jad has higher priority then next_to_do_jad, then return next_retry_jad
                if next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
                    return next_retry_jad

                # or if next_to_do_jad has higher priority then next_retry_jad, then return next_to_do_jad
                if next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
                    return next_to_do_jad

                # or if next_retry_jad is already waiting for over an hour, then return next_retry_jad
                if datetime.utcnow() - next_retry_jad['updated_at'] > timedelta(minutes=60):
                    return next_retry_jad

                # or if next_retry_jad is older than next_to_do_jad
                if next_retry_jad['updated_at'] > next_to_do_jad.get('updated_at', next_to_do_jad.get('created_at')):
                    return next_retry_jad

            if next_to_do_jad:
                # just return the next_to_do_jad
                return next_to_do_jad

            # in all other cases, return next_retry_jad (which might be None)
            return next_retry_jad

    def canProduceNextJob(self):
        # test if the managed_job_queue is empty
        try:
            with self.__jobs_for_transfer_queue_peeker:
                num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.01)
                if num_scheduled == 0:
                    scheduled_jads = self.getJobAdminDicts(status=JobScheduled)
                    return len(scheduled_jads) < 10
                return False
        except Exception as e:
            logger.error('canProduceNextJob: %s', e)
        return True

    def produceNextJobsIfPossible(self):
        start_producing_timestamp = datetime.utcnow()
        while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5):
            job_admin_dict = self.getNextJobToRun()
            if job_admin_dict:
                if os.path.exists(job_admin_dict.get('path')):
                    msg = CommandMessage(content=job_admin_dict.get('job_xml'))
                    msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
                    self.__jobs_for_transfer_queue.send(msg)
                    logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker)
                    self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
                    job_id = job_admin_dict['job']['JobId']
                    logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
                    del self.__job_admin_dicts[job_id]

                #do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
                #so the queue is empty again
                #and we can submit yet another job
                time.sleep(0.1)
            else:
                return

    def onJobStarted(self, job_notification_dict):
        self.__notification_listener._logJobNotification('started ', job_notification_dict);
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProducing,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobFinished(self, job_notification_dict):
        self.__notification_listener._logJobNotification('finished', job_notification_dict);

        # file_type might have changed to unspec for example
        if 'file_type' in job_notification_dict:
            with self.__lock:
                job_admin_dict = self.__job_admin_dicts.get(job_id)
                if job_admin_dict:
                    job_admin_dict['job']['file_type'] = job_notification_dict['file_type']

        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProduced,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobTransferFailed(self, job_notification_dict):
        self.__notification_listener._logJobNotification('transfer failed ', job_notification_dict);
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobTransferFailed,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobProgress(self, job_notification_dict):
        self.__notification_listener._logJobNotification('progress', job_notification_dict, level=logging.DEBUG);
        #touch job
        #producing jobs which are untouched for 5min are put back to JobToDo
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProducing,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    @staticmethod
    def getSubDirs(dir_path):
        dir_lists = [[os.path.join(root,dir) for dir in dirs if root==dir_path] for root, dirs, files in os.walk(dir_path) if dirs]
        if dir_lists:
            return reduce(lambda x,y: x+y, dir_lists)
        return []

    def getDoneJobAdminDicts(self, job_group_id=None):
        return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobFailed, JobProduced, JobRemoved])

    def getNotDoneJobAdminDicts(self, job_group_id=None):
        return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobToDo, JobScheduled, JobRetry])

    def getJobAdminDicts(self, job_group_id=None, status=None):
        with self.__lock:
            jads = [jad for jad in self.__job_admin_dicts.values()]
                job_group_id = str(job_group_id)
                jads = [jad for jad in jads if str(jad['job'].get('job_group_id')) == job_group_id]

            if status != None:
                if isinstance(status, int):
                    jads = [jad for jad in jads if jad['status'] == status]
                else:
                    statuses = set(status)
                    jads = [jad for jad in jads if jad['status'] in statuses]

    def getStatusReportDict(self):
        with self.__lock:
            export_ids = self.getExportIds()
            logger.info('getStatusReportDict export_ids: %s', export_ids)

            result = {}
            for export_id in export_ids:
                try:
                    finished_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobProduced)
                    failed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobFailed)
                    removed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobRemoved)

                    done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads
                    done_group_jobs = [jad['job'] for jad in done_group_jads]

                    current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=export_id)
                    current_group_jobs = [jad['job'] for jad in current_group_jads]
                    all_group_jads = current_group_jads + done_group_jads
                    all_group_jobs = current_group_jobs + done_group_jobs

                    priority = min([job.get('priority', DEFAULT_JOB_PRIORITY) for job in current_group_jobs]) if current_group_jobs else 4
                    submitters = list(set([job['Submitter'] for job in all_group_jobs if 'Submitter' in job]))
                    projects = list(set([job['Project'] for job in all_group_jobs if 'Project' in job]))
                    lta_sites = list(set([jad['lta_site'] for jad in all_group_jads if 'lta_site' in jad]))

                    job_run_events = {}
                    for jad in all_group_jads:
                        for run in jad['runs'].values():
                            if 'started_at' in run:
                                started_timestamp = run['started_at']

                                if started_timestamp not in job_run_events:
                                    job_run_events[started_timestamp] = 0

                                job_run_events[started_timestamp] += 1

                                if 'finished_at' in run:
                                    finished_timestamp = run['finished_at']

                                    if finished_timestamp not in job_run_events:
                                        job_run_events[finished_timestamp] = 0

                                    job_run_events[finished_timestamp] -= 1

                    all_run_timestamps = sorted(job_run_events.keys())
                    running_jobs_values = []
                    if all_run_timestamps:
                        prev_value = 0

                        for t in all_run_timestamps:
                            value = prev_value + job_run_events[t]
                            running_jobs_values.append(value)
                            prev_value = value

                    job_finised_events = {}
                    for jad in all_group_jads:
                        if jad['status'] == JobProduced or jad['status'] == JobFailed:
                            if jad['runs']:
                                final_run = max(jad['runs'].keys())

                                run = jad['runs'][final_run]
                                if 'started_at' in run and 'finished_at' in run:
                                    finished_timestamp = run['finished_at']

                                    if finished_timestamp not in job_finised_events:
                                        job_finised_events[finished_timestamp] = 0

                                    job_finised_events[finished_timestamp] += 1

                    finished_jobs_values = []
                    finished_timestamps = sorted(job_finised_events.keys())
                    for i, t in enumerate(finished_timestamps):
                        finished_jobs_values.append(i + 1)

                    result[export_id] = { 'priority' : priority,
                                          'submitters' : submitters,
                                          'projects' : projects,
                                          'lta_sites' : lta_sites,
                                          'series': { 'running_jobs': { 'timestamps': all_run_timestamps, 'values': running_jobs_values },
                                                      'finished_jobs': { 'timestamps': finished_timestamps, 'values': finished_jobs_values }
                                                    },
                                          'jobs': { 'running': len(self.getJobAdminDicts(job_group_id=export_id, status=JobProducing)),
                                                    'to_do': len(self.getJobAdminDicts(job_group_id=export_id, status=JobToDo)),
                                                    'scheduled': len(self.getJobAdminDicts(job_group_id=export_id, status=JobScheduled)),
                                                    'retry': len(self.getJobAdminDicts(job_group_id=export_id, status=JobRetry)),
                                                    'finished': len(finished_group_jads),
                                                    'failed': len(failed_group_jads) } }
                except Exception as e:
                    logger.error(e)

            return convertIntKeysToString(result)

    def setExportJobPriority(self, export_id, priority):
        priority = max(0, min(9, int(priority)))
        with self.__lock:
            jads = self.getJobAdminDicts(job_group_id=export_id)

            logger.info('updating the priority of %s jobs of export %s to level %s', len(jads), export_id, priority)

            for jad in jads:
                try:
                    #update local copy
                    jad['job']['priority'] = priority
                    #persist to disk
                    updatePriorityInJobFile(jad['path'], priority)
                except Exception as e:
                    logger.error(e)

    def getReport(self, job_group_id):
        with self.__lock:
            #still running/waiting jobs
            current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=job_group_id)
            current_group_jobs = [jad['job'] for jad in current_group_jads]

            #done jobs
            finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
            finished_group_jobs = [jad['job'] for jad in finished_group_jads]
            failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
            failed_group_jobs = [jad['job'] for jad in failed_group_jads]
            removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)
            removed_group_jobs = [jad['job'] for jad in removed_group_jads]

            done_group_jobs = finished_group_jobs + failed_group_jobs + removed_group_jobs
            all_group_jobs = current_group_jobs + done_group_jobs

            submitters = set([j['Submitter'] for j in all_group_jobs if 'Submitter' in j])
            projects = set([j['Project'] for j in all_group_jobs if 'Project' in j])

            report = ''

            header = """=== Report on ingest Export Job (%(id)s) ===
User(s): %(user)s
Project: %(project)s""" % { 'id': job_group_id,
                            'user': ', '.join(submitters),
                            'project': ', '.join(projects)}

            report += header

            summary = """\n\n=== Summary ===
Total Files: %(total)i
  - Failed: %(failed)i
  - Success: %(done)i
    - Interferometer: %(corr)i
    - Beamformed: %(bf)i
    - SkyImages: %(img)i
    - Unspecified: %(unspec)i
    - Pulsar Pipeline: %(pulp)i""" % {'total': len(all_group_jobs),
                                      'done': len(finished_group_jobs),
                                      'corr': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_CORRELATED]),
                                      'bf': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_BEAMFORMED]),
                                      'img': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_IMAGE]),
                                      'unspec': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_UNSPECIFIED]),
                                      'pulp': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_PULP]),
                                      'failed': len(failed_group_jobs)}

            # TODO: generate lta link
            #try:
                #import mechanize
                #import json
                #browser = mechanize.Browser()
                #browser.set_handle_robots(False)
                #browser.addheaders = [('User-agent', 'Firefox')]

                #obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in all_group_jobs)))

                #for obs_id in obs_ids:
                    #response = browser.open('http://scu001.control.lofar:7412/rest/tasks/otdb/%s' % obs_id)

                    #if response.code == 200:
                        #task = json.loads(response.read())

            #except Exception as e:
                #logger.error(e)

            if removed_group_jobs:
                summary += '''\n\nTotal Removed before transfer: %s''' % (len(removed_group_jobs),)

            report += summary

            def file_listing_per_obs(jads, full_listing=False, dp_status_remark=''):
                jobs = [jad['job'] for jad in jads]
                obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in jobs)))
                obs_jads_dict = {obs_id:[] for obs_id in obs_ids}

                for jad in jads:
                    obs_jads_dict[jad['job'].get('ObservationId', -1)].append(jad)

                listing = ''
                for obs_id in obs_ids:
                    obs_jads = obs_jads_dict[obs_id]
                    listing += 'otdb_id: %s - #dataproducts: %s\n' % (obs_id, len(obs_jads))

                if full_listing:
                    for obs_id in obs_ids:
                        obs_jads = obs_jads_dict[obs_id]
                        obs_jads = sorted(obs_jads, key=lambda jad: jad['job'].get('DataProduct'))
                        listing += '\notdb_id: %s - %s dataproducts listing\n' % (obs_id, dp_status_remark)

                        for jad in obs_jads:
                            listing += 'dataproduct: %s - archive_id: %s - location: %s' % (jad['job']['DataProduct'],
                                                                                            jad['job']['ArchiveId'],
                                                                                            jad['job'].get('Location'))

                            if jad.get('last_message'):
                                listing += ' - message: %s' % (jad['last_message'])
                            listing += '\n'

                return listing

            if current_group_jads:
                report += "\n\n==== Scheduled/Running files: =====\n"
                report += file_listing_per_obs(current_group_jads, False, 'scheduled/running')

            if finished_group_jads:
                report += "\n\n==== Finished files: =====\n"
                report += file_listing_per_obs(finished_group_jads, False, 'finished')

            if failed_group_jads:
                report += "\n\n==== Failed files: =====\n"
                report += file_listing_per_obs(failed_group_jads, True, 'failed')

            if removed_group_jads:
                report += "\n\n==== Removed jobs before transfer: =====\n"
                report += file_listing_per_obs(removed_group_jads, False, 'removed')

            return report

    def sendJobGroupFinishedMail(self, job_group_id):
        report = self.getReport(job_group_id)
        logger.info(report)

        mailing_list = list(FINISHED_NOTIFICATION_MAILING_LIST)

        finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
        failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
        removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)
        unfinished_group_jads = failed_group_jads + removed_group_jads