Skip to content
Snippets Groups Projects
ingestjobmanagementserver.py 54.9 KiB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000
#!/usr/bin/env python

# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#

from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.server.config import DEFAULT_INGEST_BUSNAME, DEFAULT_INGEST_SERVICENAME
from lofar.lta.ingest.common.config import DEFAULT_BROKER
from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, DEFAULT_JOB_PRIORITY
from lofar.lta.ingest.server.config import DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME
from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import convertIntKeysToString

import os
import os.path
import fnmatch
import shutil
import time
from threading import RLock
from datetime import datetime, timedelta

import logging
logger = logging.getLogger()

class IngestServiceMessageHandler(MessageHandlerInterface):
    def __init__(self, job_manager, **kwargs):
        super(IngestServiceMessageHandler, self).__init__(**kwargs)

        self._job_manager = job_manager
        self.service2MethodMap = { 'RemoveExportJob': self._job_manager.removeExportJob,
                                   'SetExportJobPriority': self._job_manager.setExportJobPriority,
                                   'GetStatusReport': self._job_manager.getStatusReportDict,
                                   'GetReport': self._job_manager.getReport,
                                   'GetExportIds': self._job_manager.getExportIds }

class IngestJobManager:
    def __init__(self,
                 busname=DEFAULT_INGEST_BUSNAME,
                 servicename=DEFAULT_INGEST_SERVICENAME,
                 notification_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME,
                 notification_prefix=DEFAULT_INGEST_NOTIFICATION_PREFIX,
                 notification_listen_queue_name=DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME,
                 incoming_job_queue_name=DEFAULT_INGEST_JOBS_QUEUENAME,
                 jobs_for_transfer_queue_name=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
                 jobs_dir=JOBS_DIR,
                 max_num_retries=MAX_NR_OF_RETRIES,
                 broker=None,
                 **kwargs):

        self.__notification_listener = IngestBusListener(busname=notification_listen_queue_name, subjects=notification_prefix+'*', broker=broker)
        self.__notification_listener.onJobStarted = self.onJobStarted
        self.__notification_listener.onJobProgress = self.onJobProgress
        self.__notification_listener.onJobTransferFailed = self.onJobTransferFailed
        self.__notification_listener.onJobFinished = self.onJobFinished

        self.__jobs_dir = jobs_dir
        self.__max_num_retries = max_num_retries
        self.__job_admin_dicts = {}
        self.__lock = RLock()
        self.__running = False

        self.notification_prefix = notification_prefix
        self.event_bus           = ToBus(notification_busname, broker=broker)


        self.service = Service(servicename,
                               IngestServiceMessageHandler,
                               busname=busname,
                               broker=broker,
                               use_service_methods=True,
                               numthreads=1,
                               handler_args={'job_manager': self})

        #incoming jobs (from mom, eor, user ingest, etc)
        self.__incoming_job_queue = FromBus(incoming_job_queue_name, broker=broker)

        #queue into which this manager produces jobs, which are consumened for actual data transfer by the ingestservices
        self.__jobs_for_transfer_queue = ToBus(jobs_for_transfer_queue_name, broker=broker)

        # the peeker frombus is used to see if the managed job queue we submit jobs to is emptyied by consumers
        # we want this manager to produce jobs just in time, so we can actively shuffle jobs priorities etc.
        self.__jobs_for_transfer_queue_peeker = FromBus(jobs_for_transfer_queue_name, broker=broker)

        self.__running_jobs_log_timestamp = datetime.utcnow()

    def quit(self):
        self.__running = False

    def run(self):
        self.__running = True

        # start with full jobs dir scan to retreive state from disk
        self.scanJobsdir()

        logger.info('starting listening for new jobs and notifications')

        #open queue connections...
        with self.service as _1, self.__incoming_job_queue as _2, self.__jobs_for_transfer_queue as _3, self.__notification_listener as _4, self.event_bus as _5:
            #... and run the event loop,
            #produce jobs to managed job queue for ingest transfer services
            #receive new jobs
            logger.info('starting to produce jobs')
            while self.__running:
                try:
                    #produce next job
                    self.produceNextJobsIfPossible()

                    #receive any jobs from mom/user_ingest/eor/etc
                    receive_start = datetime.utcnow()
                    msg = self.__incoming_job_queue.receive(timeout=0.1)
                    while msg:
                        logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg)
                        self.__incoming_job_queue.ack(msg)

                        if isinstance(msg, CommandMessage):
                            job = parseJobXml(msg.content)
                            if job and job.get('JobId'):
                                if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY):
                                    job['priority'] = msg.priority

                                logger.info("received job on queue %s: %s", self.__incoming_job_queue.address, job)
                                job_admin_dict = { 'job': job, 'job_xml': msg.content }
                                self.addNewJob(job_admin_dict, check_done_dirs=True)
                        else:
                            logger.warn("unexpected message type: %s", msg)

                        if datetime.utcnow() - receive_start > timedelta(seconds=10):
                            # break out of receiving while loop early,
                            # so we can also produce some jobs
                            # receive more in next iteration
                            break

                        #see if there are any more jobs to receive and process them, else jump out of while loop
                        msg = self.__incoming_job_queue.receive(timeout=0.1)

                    # check if producing jobs are actually making progress
                    # maybe the ingest transfer server was shut down in the middle of a transer?
                    # the ingest transfer server is stateless, so it won't restart that job itself (by design)
                    # when transfering at very regular intervals progress updates are given
                    # so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress)
                    self.putStalledJobsBackToToDo()


                    #report on running jobs
                    if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10):
                        self.__running_jobs_log_timestamp = datetime.utcnow()
                        with self.__lock:
                            producing_jads = self.getJobAdminDicts(status=JobProducing)
                            if producing_jads:
                                if len(producing_jads) == 1:
                                    logger.info('1 job is running')
                                else:
                                    logger.info('%s jobs are running', len(producing_jads))

                except KeyboardInterrupt:
                    break
                except Exception as e:
                    logger.error(e)
            logger.info('finished producing jobs')
        logger.info('finished listening for new jobs and notifications')

    def nrOfUnfinishedJobs(self):
        return len(self.getNotDoneJobAdminDicts())

    def getJobAdminDictsFromDisk(self, job_status=None, job_type=None, job_group_id=None, job_id=None):
        dir_path = self.jobDir(job_admin_dict=None, job_status=job_status, job_type=job_type, job_group_id=job_group_id)

        job_admin_dicts = []

        if not os.path.isdir(dir_path):
            return job_admin_dicts

        dir_paths = [dir_path]

        if job_status==JobRetry:
            dir_paths += IngestJobManager.getSubDirs(dir_path)

        for dp in dir_paths:
            try:
                if job_id:
                    logger.info('scanning %s for job file of %s', dp, job_id)
                else:
                    logger.info('scanning %s for job files', dp)

                pattern = ('*%s*.xml*' % job_id) if job_id else '*.xml*'
                xml_files = [os.path.join(dp, f) for f in os.listdir(dp) if fnmatch.fnmatch(f, pattern)]

                logger.debug('found %d xml files in %s', len(xml_files), dp)

                for path in xml_files:
                    try:
                        with open(path) as file:
                            file_content = file.read()
                            job = parseJobXml(file_content)
                            if job and job.get('JobId'):
                                job_admin_dict = { 'path': path,
                                                  'job': job,
                                                  'job_xml': file_content,
                                                  'created_at': datetime.fromtimestamp(os.lstat(path).st_ctime) }
                                job_admin_dicts.append(job_admin_dict)
                    except Exception as e:
                        logger.error(e)
            except Exception as e:
                logger.error(e)

        if job_status:
            for job_admin_dict in job_admin_dicts:
                job_admin_dict['status'] = job_status

        logger.info('read %d job files from %s', len(job_admin_dicts), dir_path)
        return job_admin_dicts

    def jobStatusBaseDir(self, jobstatus):
        if jobstatus == JobToDo:
            return os.path.join(self.__jobs_dir, 'to_do')
        if jobstatus == JobRetry:
            return os.path.join(self.__jobs_dir, 'retry')
        if jobstatus == JobFailed:
            return os.path.join(self.__jobs_dir, 'failed')
        if jobstatus == JobHold:
            return os.path.join(self.__jobs_dir, 'on_hold')
        if jobstatus == JobScheduled:
            return os.path.join(self.__jobs_dir, 'scheduled')
        if jobstatus == JobProducing:
            return os.path.join(self.__jobs_dir, 'producing')
        if jobstatus == JobProduced:
            return os.path.join(self.__jobs_dir, 'done')
        if jobstatus == JobRemoved:
            return os.path.join(self.__jobs_dir, 'removed')
        return os.path.join(self.__jobs_dir, 'unknown')

    def jobDir(self, job_admin_dict, job_status=None, job_type=None, job_group_id=None, retry_attempt=None):
        if job_admin_dict:
            return self.jobDir(job_admin_dict=None,
                               job_status=job_admin_dict['status'],
                               job_type=job_admin_dict['job'].get('Type', 'unknown'),
                               job_group_id=job_admin_dict['job'].get('job_group_id', 'unknown'),
                               retry_attempt=job_admin_dict.get('retry_attempt', 1))

        base_dir = self.jobStatusBaseDir(job_status)

        if job_status == JobRetry and retry_attempt != None:
            return os.path.join(base_dir, str(retry_attempt))
        elif job_status in [JobProduced, JobRemoved, JobFailed]:
            group_dir = '%s_%s' % (job_type, job_group_id)
            return os.path.join(base_dir, group_dir)

        return base_dir

    def jobPath(self, job_admin_dict):
        dir = self.jobDir(job_admin_dict)
        filename = 'j%s.xml' % job_admin_dict['job']['JobId']
        path = os.path.join(dir, filename)
        return path

    def scanJobsdir(self):
        with self.__lock:
            logger.info('scanning jobs dirs in %s', self.__jobs_dir)

            for status in [JobToDo, JobScheduled, JobProducing, JobRetry]:
                job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status)

                for job_admin_dict in job_admin_dicts:
                    self.addNewJob(job_admin_dict, check_done_dirs=False)

                logger.info('added %d existing %s jobs from disk', len(job_admin_dicts), jobState2String(status))

            #which (type, group_id) jobs were read
            #read the done jobs for these groups as well
            unique_type_groups = set([(jad['job']['Type'], jad['job'].get('job_group_id', 'unknown_group')) for jad in self.__job_admin_dicts.values()])

            if unique_type_groups:
                logger.info('scanning for done jobs for %s', unique_type_groups)

                for job_type, job_group_id in unique_type_groups:
                    for status in [JobFailed, JobProduced, JobRemoved]:
                        job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status, job_type=job_type, job_group_id=job_group_id)

                        for job_admin_dict in job_admin_dicts:
                            self.addNewJob(job_admin_dict, check_done_dirs=False)

                        logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id)

            logger.info('finished scanning jobs')

    def addNewJob(self, job_admin_dict, check_done_dirs=False):
        with self.__lock:
            job_id = job_admin_dict['job']['JobId']
            job_group_id = job_admin_dict['job'].get('job_group_id')
            job_type=job_admin_dict['job']['Type']
            logger.info('adding new job %s in group %s %s with status %s',
                        job_id,
                        job_type,
                        job_group_id,
                        jobState2String(job_admin_dict.get('status', JobToDo)))

            if check_done_dirs:
                #remove job from 'done' directories if present (this is a resubmitted job)
                for done_status in [JobFailed, JobProduced, JobRemoved]:
                    matching_done_jads = self.getJobAdminDictsFromDisk(job_status=done_status,
                                                                       job_type=job_type,
                                                                       job_group_id=job_group_id,
                                                                       job_id=job_id)

                    for done_jad in matching_done_jads:
                        try:
                            logger.info('removing done job %s from %s because it is resubmitted', job_id, done_jad['path'])
                            os.remove(done_jad['path'])
                        except Exception as e:
                            logger.error('error while removing done job %s from %s: %s', job_id, done_jad['path'], e)

            self.__job_admin_dicts[job_id] = job_admin_dict
            if 'status' not in job_admin_dict:
                job_admin_dict['status'] = JobToDo

            if 'created_at' not in job_admin_dict:
                job_admin_dict['created_at'] = datetime.utcnow()

            job_admin_dict['updated_at'] = job_admin_dict['created_at']

            #store start- finish times per try in runs
            job_admin_dict['runs'] = {}

            #store new job
            todo_dir = self.jobDir(job_admin_dict)

            #create dir dir if not exists
            if not os.path.isdir(todo_dir):
                try:
                    os.makedirs(todo_dir)
                except OSError as e:
                    logger.error(e)

            if 'path' not in job_admin_dict:
                path = self.jobPath(job_admin_dict)
                job_admin_dict['path'] = path
                try:
                    if not os.path.exists(path):
                        logger.info('saving job %s on disk: %s', job_id, path)
                        with open(path, 'w') as file:
                            file.write(job_admin_dict['job_xml'])
                except Exception as e:
                    logger.error(e)
            else:
                job_dirname = os.path.dirname(job_admin_dict['path'])
                expected_dirname = self.jobDir(job_admin_dict)
                if job_dirname != expected_dirname:
                    # a new todo job should be located in the expected dir based on its status,
                    # it is not, so force it there
                    self.updateJobStatus(job_admin_dict['job']['JobId'], job_admin_dict['status'])

    def updateJobStatus(self, job_id, new_status, lta_site=None, message=None):
        with self.__lock:
            job_admin_dict = self.__job_admin_dicts.get(job_id)

            if not job_admin_dict:
                logger.error('updateJobStatus: unknown job %s with new status %s', job_id, jobState2String(new_status))
                return

            try:
                #update updated_at timestamp
                job_admin_dict['updated_at'] = datetime.utcnow()

                if new_status == JobProducing:
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)] = {}
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['started_at'] = datetime.utcnow()

                if new_status == JobProduced or new_status == JobTransferFailed:
                    job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow()

                if lta_site:
                    job_admin_dict['lta_site'] = lta_site

                job_admin_dict['last_message'] = message

                current_status = job_admin_dict.get('status', JobToDo)

                if new_status == JobTransferFailed:
                    #special case for jobs which failed to transer, which will be retried
                    current_retry_attempt = job_admin_dict.get('retry_attempt', 0)
                    next_retry_attempt = current_retry_attempt+1

                    if next_retry_attempt < self.__max_num_retries:
                        if message and 'not on disk' in message:
                            logger.info('job %s transfer failed because source data was not on disk, not retrying anymore',
                                        job_id)
                            new_status = JobFailed
                        else:
                            new_status = JobRetry
                            job_admin_dict['retry_attempt'] = next_retry_attempt
                            job_admin_dict['job']['last_retry_attempt'] = next_retry_attempt == (self.__max_num_retries-1)
                    else:
                        logger.info('job %s transfer failed %s times, not retrying anymore',
                                    job_id,
                                    job_admin_dict.get('retry_attempt', 0))
                        new_status = JobFailed

                if new_status != current_status:
                    #update the internal status
                    logger.info('updating job %s status from %s to %s%s',
                                job_id,
                                jobState2String(current_status),
                                jobState2String(new_status),
                                (' attempt #%d' % job_admin_dict.get('retry_attempt', 1))
                                 if (new_status == JobRetry or current_status == JobRetry) else '')
                    job_admin_dict['status'] = new_status

                #move the job file to its new status directory
                #determine current and new paths
                current_path = job_admin_dict.get('path', '')
                new_path = self.jobPath(job_admin_dict)
                new_dirname = os.path.dirname(new_path)

                #create dir dir if not exists
                if not os.path.isdir(new_dirname):
                    try:
                        os.makedirs(new_dirname)
                    except OSError as e:
                        logger.error(e)

                if new_path != current_path:
                    #do actual file move
                    logger.debug('moving job file from %s to %s.', current_path, new_path)
                    shutil.move(current_path, new_path)
                    job_admin_dict['path'] = new_path


                if new_status == JobRemoved or new_status == JobFailed:
                    #send notification
                    #this is (also) picked up by the ingestmomadapter
                    #which also sends a status update to MoM, including the last_message
                    if new_status == JobRemoved:
                        job_admin_dict['last_message'] = 'removed from queue'

                    self._sendNotification(job_admin_dict)

                #finally, remove job from interal admin jobs dict if finished
                if job_admin_dict['status'] in [JobProduced, JobFailed, JobRemoved]:
                    current_job_group_id = job_admin_dict['job'].get('job_group_id', 'unknown')
                    current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=current_job_group_id)

                    if len(current_group_jads) == 0:
                        logger.info('all jobs in group %s are done', current_job_group_id)
                        self.sendJobGroupFinishedMail(current_job_group_id)

                        current_group_done_jads = self.getDoneJobAdminDicts(job_group_id=current_job_group_id)
                        logger.info('removing %s jobs of group %s from job management server memory',
                                    len(current_group_done_jads),
                                    current_job_group_id)

                        for jad in current_group_done_jads:
                            del self.__job_admin_dicts[jad['job']['JobId']]
                    else:
                        logger.info('%s jobs to do in group %s', len(current_group_jads), current_job_group_id)
            except Exception as e:
                logger.error("updateJobStatus(job_id=%s, new_status=%s, lta_site=%s, message=%s) %s",
                             job_id,
                             jobState2String(new_status),
                             lta_site,
                             message,
                             e)


    def _sendNotification(self, job_admin_dict):
        try:
            job = job_admin_dict['job']
            contentDict = { 'job_id': job['JobId'],
                            'archive_id': job['ArchiveId'],
                            'project': job['Project'],
                            'type': job["Type"],
                            'dataproduct': job['DataProduct'] }

            if 'ObservationId' in job:
                contentDict['otdb_id'] = job['ObservationId']

            if 'lta_site' in job_admin_dict:
                contentDict['lta_site'] = job_admin_dict['lta_site']

            if 'last_message' in job_admin_dict:
                contentDict['message'] = job_admin_dict['last_message']

            if 'job_group_id' in job_admin_dict:
                contentDict['export_id'] = job_admin_dict['job_group_id']

            status = jobState2String(job_admin_dict['status'])
            status = status[status.index('(')+1:status.index(')')]

            msg = EventMessage(context=self.notification_prefix + status, content=contentDict)
            # remove message from queue's when not picked up within 48 hours,
            # otherwise mom might endlessly reject messages if it cannot handle them
            msg.ttl = 48*3600
            logger.info('Sending notification %s: %s' % (status, str(contentDict).replace('\n', ' ')))
            self.event_bus.send(msg)

        except Exception as e:
            logger.error(str(e))

    def removeExportJob(self, export_group_id):
        logger.info('removing export job %s', export_group_id)
        job_admin_dicts = self.getJobAdminDicts(job_group_id=export_group_id)

        for jad in job_admin_dicts:
            self.updateJobStatus(jad['job']['JobId'], JobRemoved)

    def getExportIds(self):
        with self.__lock:
            return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in self.__job_admin_dicts.values()])))

    def putStalledJobsBackToToDo(self):
        with self.__lock:
            now = datetime.utcnow()
            threshold = timedelta(minutes=5)
            stalled_job_admin_dicts = [jad for jad in self.__job_admin_dicts.values()
                                       if (jad['status'] == JobProducing or jad['status'] == JobScheduled)
                                           and now - jad['updated_at'] >= threshold]

            for jad in stalled_job_admin_dicts:
                logger.info('putting job %s back to ToDo because it did not make any progress during the last 5min', jad['job']['JobId'])
                self.updateJobStatus(jad['job']['JobId'], JobToDo)

    def getNextJobToRun(self):
        '''get the next job to run.
        examine all 'to_do' and 'retry' jobs
        higher priority jobs always go first
        equal priority jobs will return 'to_do' jobs over 'retry' jobs
        'retry' jobs are sorted by least amount of retry attempts
        source load balancing: the more jobs transfer from a certain source host,
                               the less likely it is a next job will be for that source host as well
        '''

        #helper method to get the source host from the job's location
        def getSourceHost(job_admin_dict):
            try:
                host = job_admin_dict['job']['Location'].split(':')[0]
                if 'cep4' in host.lower() or 'cpu' in host.lower():
                    return 'localhost'
                return host
            except:
                return 'localhost'

        running_jads = self.getJobAdminDicts(status=JobProducing) + self.getJobAdminDicts(status=JobScheduled)
        running_hosts = {}
        for jad in running_jads:
            host = getSourceHost(jad)
            running_hosts[host] = running_hosts.get(host, 0) + 1

        with self.__lock:
            def getNextJobByStatus(status, min_age=None):

                def jad_compare_func(jad_a, jad_b):
                    #sort on priority first
                    if jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY) != jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY):
                        return jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY) - jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY)

                    #equal priorities, so sort on next sort criterion, the retry attempt
                    if jad_a['status'] == JobRetry and jad_b['status'] == JobRetry:
                        if jad_a.get('retry_attempt', 0) != jad_b.get('retry_attempt', 0):
                            return jad_b.get('retry_attempt', 0) - jad_a.get('retry_attempt', 0)

                    # equal retry_attempt, so sort on next sort criterion,
                    # jobs for which the number of running jobs on that host is lower
                    # (load balance the source hosts)
                    nrOfRunningJobsOnSourceHostA = running_hosts.get(getSourceHost(jad_a), 0)
                    nrOfRunningJobsOnSourceHostB = running_hosts.get(getSourceHost(jad_b), 0)

                    if nrOfRunningJobsOnSourceHostA != nrOfRunningJobsOnSourceHostB:
                        return nrOfRunningJobsOnSourceHostA - nrOfRunningJobsOnSourceHostB

                    #equal number of jobs on source hosts, so sort on next sort criterion, the created_at timestamp (FIFO)
                    if jad_a['created_at'] < jad_b['created_at']:
                        return -1
                    if jad_a['created_at'] > jad_b['created_at']:
                        return 1

                    #TODO: we can add a lot of sort criteria in the future.
                    #For now, stick with FIFO and retry_attempt, after priority and source_host_load_balancing
                    return 0

                job_admin_dicts = self.getJobAdminDicts(status=status)

                # filter out priority 0 jobs (which are paused)
                job_admin_dicts = [jad for jad in job_admin_dicts if jad['job'].get('priority', 0) > 0]

                if min_age:
                    now = datetime.utcnow()
                    job_admin_dicts = [jad for jad in job_admin_dicts if now - jad['updated_at'] >= min_age]

                job_admin_dicts = sorted(job_admin_dicts, cmp=jad_compare_func)
                if job_admin_dicts:
                    logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status))
                    return job_admin_dicts[0]
                return None

            #get the next job to run, both for JobToDo and JobRetry
            next_to_do_jad = getNextJobByStatus(JobToDo)
            next_retry_jad = getNextJobByStatus(JobRetry, timedelta(minutes=15) if next_to_do_jad else None)

            #limit the number of running jobs per source host if not localhost
            if next_to_do_jad and getSourceHost(next_to_do_jad) != 'localhost':
                if running_hosts.get(getSourceHost(next_to_do_jad), 0) > 4:
                    next_to_do_jad = None

            #limit the number of running jobs per source host if not localhost
            if next_retry_jad and getSourceHost(next_retry_jad) != 'localhost':
                if running_hosts.get(getSourceHost(next_retry_jad), 0) > 4:
                    next_retry_jad = None

            if next_to_do_jad and next_retry_jad:
                # if next_retry_jad has higher priority then next_to_do_jad, then return next_retry_jad
                if next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
                    return next_retry_jad

                # or if next_to_do_jad has higher priority then next_retry_jad, then return next_to_do_jad
                if next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
                    return next_to_do_jad

                # or if next_retry_jad is already waiting for over an hour, then return next_retry_jad
                if datetime.utcnow() - next_retry_jad['updated_at'] > timedelta(minutes=60):
                    return next_retry_jad

                # or if next_retry_jad is older than next_to_do_jad
                if next_retry_jad['updated_at'] > next_to_do_jad.get('updated_at', next_to_do_jad.get('created_at')):
                    return next_retry_jad

            if next_to_do_jad:
                # just return the next_to_do_jad
                return next_to_do_jad

            # in all other cases, return next_retry_jad (which might be None)
            return next_retry_jad

    def canProduceNextJob(self):
        # test if the managed_job_queue is empty
        try:
            with self.__jobs_for_transfer_queue_peeker:
                num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.1)
                return (num_scheduled == 0)
        except Exception as e:
            logger.error('canProduceNextJob: %s', e)
        return True

    def produceNextJobsIfPossible(self):
        start_producing_timestamp = datetime.utcnow()
        with self.__lock:
            while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5):
                job_admin_dict = self.getNextJobToRun()
                if job_admin_dict:
                    if os.path.exists(job_admin_dict.get('path')):
                        msg = CommandMessage(content=job_admin_dict.get('job_xml'))
                        msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
                        self.__jobs_for_transfer_queue.send(msg)
                        logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker)
                        self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)
                    else:
                        job_id = job_admin_dict['job']['JobId']
                        logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
                        with self.__lock:
                            del self.__job_admin_dicts[job_id]

                    #do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
                    #so the queue is empty again
                    #and we can submit yet another job
                    time.sleep(0.1)
                else:
                    return

    def onJobStarted(self, job_notification_dict):
        self.__notification_listener._logJobNotification('started ', job_notification_dict);
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProducing,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobFinished(self, job_notification_dict):
        self.__notification_listener._logJobNotification('finished', job_notification_dict);

        # file_type might have changed to unspec for example
        if 'file_type' in job_notification_dict:
            with self.__lock:
                job_admin_dict = self.__job_admin_dicts.get(job_id)
                if job_admin_dict:
                    job_admin_dict['job']['file_type'] = job_notification_dict['file_type']

        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProduced,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobTransferFailed(self, job_notification_dict):
        self.__notification_listener._logJobNotification('transfer failed ', job_notification_dict);
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobTransferFailed,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    def onJobProgress(self, job_notification_dict):
        self.__notification_listener._logJobNotification('progress', job_notification_dict, level=logging.DEBUG);
        #touch job
        #producing jobs which are untouched for 5min are put back to JobToDo
        self.updateJobStatus(job_notification_dict.get('job_id'),
                             JobProducing,
                             job_notification_dict.get('lta_site'),
                             job_notification_dict.get('message'))

    @staticmethod
    def getSubDirs(dir_path):
        dir_lists = [[os.path.join(root,dir) for dir in dirs if root==dir_path] for root, dirs, files in os.walk(dir_path) if dirs]
        if dir_lists:
            return reduce(lambda x,y: x+y, dir_lists)
        return []

    def getDoneJobAdminDicts(self, job_group_id=None):
        with self.__lock:
            return (self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed) +
                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced) +
                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved))

    def getNotDoneJobAdminDicts(self, job_group_id=None):
        with self.__lock:
            return (self.getJobAdminDicts(job_group_id=job_group_id, status=JobToDo) +
                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobScheduled) +
                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobProducing) +
                    self.getJobAdminDicts(job_group_id=job_group_id, status=JobRetry))

    def getJobAdminDicts(self, job_group_id=None, status=None):
        with self.__lock:
            if job_group_id != None and status != None:
                return [jad for jad in self.__job_admin_dicts.values()
                        if jad['status'] == status
                        and str(jad['job'].get('job_group_id')) == str(job_group_id)]

            if status != None:
                return [jad for jad in self.__job_admin_dicts.values()
                        if jad['status'] == status]

            if job_group_id != None:
                return [jad for jad in self.__job_admin_dicts.values()
                        if str(jad['job'].get('job_group_id')) == str(job_group_id)]

            return [jad for jad in self.__job_admin_dicts.values()]

    def getStatusReportDict(self):
        with self.__lock:
            export_ids = self.getExportIds()
            logger.info('getStatusReportDict export_ids: %s', export_ids)

            result = {}
            for export_id in export_ids:
                try:
                    finished_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobProduced)
                    failed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobFailed)
                    removed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobRemoved)

                    done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads
                    done_group_jobs = [jad['job'] for jad in done_group_jads]

                    current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=export_id)
                    current_group_jobs = [jad['job'] for jad in current_group_jads]
                    all_group_jads = current_group_jads + done_group_jads
                    all_group_jobs = current_group_jobs + done_group_jobs

                    priority = min([job.get('priority', DEFAULT_JOB_PRIORITY) for job in current_group_jobs]) if current_group_jobs else 4
                    submitters = list(set([job['Submitter'] for job in all_group_jobs if 'Submitter' in job]))
                    projects = list(set([job['Project'] for job in all_group_jobs if 'Project' in job]))
                    lta_sites = list(set([jad['lta_site'] for jad in all_group_jads if 'lta_site' in jad]))

                    job_run_events = {}
                    for jad in all_group_jads:
                        for run in jad['runs'].values():
                            if 'started_at' in run:
                                started_timestamp = run['started_at']

                                if started_timestamp not in job_run_events:
                                    job_run_events[started_timestamp] = 0

                                job_run_events[started_timestamp] += 1

                                if 'finished_at' in run:
                                    finished_timestamp = run['finished_at']

                                    if finished_timestamp not in job_run_events:
                                        job_run_events[finished_timestamp] = 0

                                    job_run_events[finished_timestamp] -= 1

                    all_run_timestamps = sorted(job_run_events.keys())
                    running_jobs_values = []
                    if all_run_timestamps:
                        prev_value = 0

                        for t in all_run_timestamps:
                            value = prev_value + job_run_events[t]
                            running_jobs_values.append(value)
                            prev_value = value

                    job_finised_events = {}
                    for jad in all_group_jads:
                        if jad['status'] == JobProduced or jad['status'] == JobFailed:
                            if jad['runs']:
                                final_run = max(jad['runs'].keys())

                                run = jad['runs'][final_run]
                                if 'started_at' in run and 'finished_at' in run:
                                    finished_timestamp = run['finished_at']

                                    if finished_timestamp not in job_finised_events:
                                        job_finised_events[finished_timestamp] = 0

                                    job_finised_events[finished_timestamp] += 1

                    finished_jobs_values = []
                    finished_timestamps = sorted(job_finised_events.keys())
                    for i, t in enumerate(finished_timestamps):
                        finished_jobs_values.append(i + 1)

                    result[export_id] = { 'priority' : priority,
                                          'submitters' : submitters,
                                          'projects' : projects,
                                          'lta_sites' : lta_sites,
                                          'series': { 'running_jobs': { 'timestamps': all_run_timestamps, 'values': running_jobs_values },
                                                      'finished_jobs': { 'timestamps': finished_timestamps, 'values': finished_jobs_values }
                                                    },
                                          'jobs': { 'running': len(self.getJobAdminDicts(job_group_id=export_id, status=JobProducing)),
                                                    'to_do': len(self.getJobAdminDicts(job_group_id=export_id, status=JobToDo)),
                                                    'scheduled': len(self.getJobAdminDicts(job_group_id=export_id, status=JobScheduled)),
                                                    'retry': len(self.getJobAdminDicts(job_group_id=export_id, status=JobRetry)),
                                                    'finished': len(finished_group_jads),
                                                    'failed': len(failed_group_jads) } }
                except Exception as e:
                    logger.error(e)

            return convertIntKeysToString(result)

    def setExportJobPriority(self, export_id, priority):
        priority = max(0, min(9, int(priority)))
        with self.__lock:
            jads = self.getJobAdminDicts(job_group_id=export_id)

            logger.info('updating the priority of %s jobs of export %s to level %s', len(jads), export_id, priority)

            for jad in jads:
                try:
                    #update local copy
                    jad['job']['priority'] = priority
                    #persist to disk
                    updatePriorityInJobFile(jad['path'], priority)
                except Exception as e:
                    logger.error(e)

    def getReport(self, job_group_id):
        with self.__lock:
            #still running/waiting jobs
            current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=job_group_id)
            current_group_jobs = [jad['job'] for jad in current_group_jads]

            #done jobs
            finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
            finished_group_jobs = [jad['job'] for jad in finished_group_jads]
            failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
            failed_group_jobs = [jad['job'] for jad in failed_group_jads]
            removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)
            removed_group_jobs = [jad['job'] for jad in removed_group_jads]

            done_group_jobs = finished_group_jobs + failed_group_jobs + removed_group_jobs
            all_group_jobs = current_group_jobs + done_group_jobs

            submitters = set([j['Submitter'] for j in all_group_jobs if 'Submitter' in j])
            projects = set([j['Project'] for j in all_group_jobs if 'Project' in j])

            report = ''

            header = """=== Report on ingest Export Job (%(id)s) ===
User(s): %(user)s
Project: %(project)s""" % { 'id': job_group_id,
                            'user': ', '.join(submitters),
                            'project': ', '.join(projects)}

            report += header

            summary = """\n\n=== Summary ===
Total Files: %(total)i
  - Failed: %(failed)i
  - Success: %(done)i
    - Interferometer: %(corr)i
    - Beamformed: %(bf)i
    - SkyImages: %(img)i
    - Unspecified: %(unspec)i
    - Pulsar Pipeline: %(pulp)i""" % {'total': len(all_group_jobs),
                                      'done': len(finished_group_jobs),
                                      'corr': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_CORRELATED]),
                                      'bf': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_BEAMFORMED]),
                                      'img': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_IMAGE]),
                                      'unspec': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_UNSPECIFIED]),
                                      'pulp': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_PULP]),
                                      'failed': len(failed_group_jobs)}

            # TODO: generate lta link
            #try:
                #import mechanize
                #import json
                #browser = mechanize.Browser()
                #browser.set_handle_robots(False)
                #browser.addheaders = [('User-agent', 'Firefox')]

                #obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in all_group_jobs)))

                #for obs_id in obs_ids:
                    #response = browser.open('http://scu001.control.lofar:7412/rest/tasks/otdb/%s' % obs_id)

                    #if response.code == 200:
                        #task = json.loads(response.read())

            #except Exception as e:
                #logger.error(e)

            if removed_group_jobs:
                summary += '''\n\nTotal Removed before transfer: %s''' % (len(removed_group_jobs),)

            report += summary

            def file_listing_per_obs(jads, full_listing=False, dp_status_remark=''):
                jobs = [jad['job'] for jad in jads]
                obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in jobs)))
                obs_jads_dict = {obs_id:[] for obs_id in obs_ids}

                for jad in jads:
                    obs_jads_dict[jad['job'].get('ObservationId', -1)].append(jad)

                listing = ''
                for obs_id in obs_ids:
                    obs_jads = obs_jads_dict[obs_id]
                    listing += 'otdb_id: %s - #dataproducts: %s\n' % (obs_id, len(obs_jads))

                if full_listing:
                    for obs_id in obs_ids:
                        obs_jads = obs_jads_dict[obs_id]
                        obs_jads = sorted(obs_jads, key=lambda jad: jad['job'].get('DataProduct'))
                        listing += '\notdb_id: %s - %s dataproducts listing\n' % (obs_id, dp_status_remark)

                        for jad in obs_jads:
                            listing += 'dataproduct: %s - archive_id: %s - location: %s' % (jad['job']['DataProduct'],
                                                                                            jad['job']['ArchiveId'],
                                                                                            jad['job'].get('Location'))

                            if jad.get('last_message'):
                                listing += ' - message: %s' % (jad['last_message'])
                            listing += '\n'

                return listing

            if current_group_jads:
                report += "\n\n==== Scheduled/Running files: =====\n"
                report += file_listing_per_obs(current_group_jads, False, 'scheduled/running')

            if finished_group_jads:
                report += "\n\n==== Finished files: =====\n"
                report += file_listing_per_obs(finished_group_jads, False, 'finished')

            if failed_group_jads:
                report += "\n\n==== Failed files: =====\n"
                report += file_listing_per_obs(failed_group_jads, True, 'failed')

            if removed_group_jads:
                report += "\n\n==== Removed jobs before transfer: =====\n"
                report += file_listing_per_obs(removed_group_jads, False, 'removed')

            return report

    def sendJobGroupFinishedMail(self, job_group_id):
        report = self.getReport(job_group_id)
        logger.info(report)

        mailing_list = list(FINISHED_NOTIFICATION_MAILING_LIST)

        finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
        failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
        removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)

        success = not (failed_group_jads or removed_group_jads)