From 7867e7c86efa144551d8cdd271bd0a9233216ba1 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 2 Apr 2019 11:48:18 +0000 Subject: [PATCH] SW-516: python3 fixes for ingestjobmanagementserver --- .../lib/ingestjobmanagementserver.py | 3 +- .../test/t_ingestjobmanagementserver.py | 792 +++++++++--------- 2 files changed, 378 insertions(+), 417 deletions(-) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 90e5abd8f07..f3253ec6371 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -43,6 +43,7 @@ import time from random import random from threading import RLock from datetime import datetime, timedelta +from functools import cmp_to_key import logging from functools import reduce @@ -676,7 +677,7 @@ class IngestJobManager: # filter out jad's from exclude_job_group_ids job_admin_dicts = [jad for jad in job_admin_dicts if 'job_group_id' not in jad['job'] or jad['job']['job_group_id'] not in exclude_job_group_ids] - job_admin_dicts = sorted(job_admin_dicts, cmp=jad_compare_func) + job_admin_dicts = sorted(job_admin_dicts, key=cmp_to_key(jad_compare_func)) if job_admin_dicts: logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status)) return job_admin_dicts[0] diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py index 2b4424ca21d..fb3ebbeffcd 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py @@ -9,426 +9,386 @@ import shutil from threading import Thread import fnmatch import time -from qpid.messaging.exceptions import * -from lofar.messaging.messagebus import FromBus, ToBus +from lofar.messaging.messagebus import FromBus, ToBus, TemporaryQueue from lofar.messaging.messages import CommandMessage, EventMessage -try: - from qpid.messaging import Connection - from qpidtoollibs import BrokerAgent -except ImportError: - print('Cannot run test without qpid tools') - print('Please source qpid profile') - exit(3) testname = 't_ingestjobmanagementserver_%s' % uuid.uuid1() -#overwrite some defaults in the config to run this as an isolated test -import lofar.lta.ingest.common.config as cconfig -cconfig.DEFAULT_INGEST_NOTIFICATION_BUSNAME = '%s.%s' % (cconfig.DEFAULT_INGEST_NOTIFICATION_BUSNAME, testname) - -import lofar.lta.ingest.server.config as config -config.JOBS_DIR = os.path.join(tempfile.gettempdir(), testname, 'jobs') -config.FINISHED_NOTIFICATION_MAILING_LIST = '' -config.MAX_NR_OF_RETRIES = 3 -config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME = '%s.%s' % (config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME, testname) -config.DEFAULT_INGEST_JOBS_QUEUENAME = '%s.%s' % (config.DEFAULT_INGEST_JOBS_QUEUENAME, testname) -config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME = '%s.%s' % (config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, testname) -config.DEFAULT_INGEST_BUSNAME = '%s.%s' % (config.DEFAULT_INGEST_BUSNAME, testname) - -from lofar.lta.ingest.common.job import * -from lofar.lta.ingest.client.ingestbuslistener import JobsMonitor - -connection = None -broker = None -manager = None -manager_thread = None -exit_code = 0 - -try: - from lofar.messaging import setQpidLogLevel - setQpidLogLevel(logging.INFO) - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - logger = logging.getLogger(__name__) - - # setup broker connection - connection = Connection.establish('127.0.0.1') - broker = BrokerAgent(connection) - - # add test service queues - logger.info('adding test exchange to broker: %s', config.DEFAULT_INGEST_BUSNAME) - broker.addExchange('topic', config.DEFAULT_INGEST_BUSNAME) - logger.info('adding test exchange to broker: %s', cconfig.DEFAULT_INGEST_NOTIFICATION_BUSNAME) - broker.addExchange('topic', cconfig.DEFAULT_INGEST_NOTIFICATION_BUSNAME) - logger.info('adding test queue to broker: %s', config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME) - broker.addQueue(config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME) - logger.info('adding test queue to broker: %s', config.DEFAULT_INGEST_JOBS_QUEUENAME) - broker.addQueue(config.DEFAULT_INGEST_JOBS_QUEUENAME) - logger.info('adding test queue to broker: %s', config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME) - broker.addQueue(config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME) - - from lofar.lta.ingest.server.ingestjobmanagementserver import IngestJobManager - - # create some 'to do' job files for group 999999999 - for i in range(3): - testfile_path = os.path.join(config.JOBS_DIR, 'to_do', 'testjob_%s.xml' % i) - logger.info('creating test jobfile: %s', testfile_path) - createJobXmlFile(testfile_path, 'test-project', 999999999, 888888888, 'L888888888_SB00%s_uv.MS'%i, 777777777+i, 'somehost:/path/to/dp') - time.sleep(0.25) - - # create some 'failed/done' job files for another group 666666666 - # these will not be transfered, but are just sitting there, and should not interfere (which is what we'll test) - for i in range(4): - testfile_path = os.path.join(config.JOBS_DIR, - 'failed' if i%2==0 else 'done', - 'MoM_666666666', - 'testjob_%s.xml' % i) - logger.info('creating test jobfile: %s', testfile_path) - createJobXmlFile(testfile_path, 'test-project', 666666666, 555555555, 'L888888888_SB00%s_uv.MS'%i, 444444444+i, 'somehost:/path/to/dp') - time.sleep(0.25) - - with FromBus(config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME) as test_consumer: - with ToBus(config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME) as test_notifier: - - def sendNotification(event, job_id, message=None, percentage_done=None): - content = { 'job_id': job_id } - if message: - content['message'] = message - if percentage_done: - content['percentage_done'] = percentage_done - event_msg = EventMessage(context=config.DEFAULT_INGEST_NOTIFICATION_PREFIX + event, content=content) - logger.info('sending test event message on %s subject=%s content=%s', - test_notifier.address, event_msg.subject, event_msg.content) - test_notifier.send(event_msg) - - def receiveJobForTransfer(): - msg = test_consumer.receive(timeout=1) - - if msg and isinstance(msg, CommandMessage): - test_consumer.ack(msg) - job = parseJobXml(msg.content) - if job and job.get('JobId'): - logger.info("test consumer received job on queue: %s", job) - return job - return None - - def sendJobFileToManager(jobfile_path): - try: - with ToBus(address=config.DEFAULT_INGEST_JOBS_QUEUENAME) as bus: - with open(jobfile_path) as file: - file_content = file.read() - msg = CommandMessage(content=file_content) - bus.send(msg) - logger.info('submitted jobfile %s to queue %s', jobfile_path, config.DEFAULT_INGEST_JOBS_QUEUENAME) - except Exception as e: - logger.error('sendJobFileToManager error: %s', e) - - - # by starting the job manager, all job files in the non-finished dirs will be scanned and picked up. - manager = IngestJobManager() - manager_thread = Thread(target=manager.run) - manager_thread.daemon = True - manager_thread.start() - - with JobsMonitor() as monitor: - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished before any job was started' - assert manager.nrOfJobs() == 3, 'expected 3 jobs in total before any job was started' - - #mimick receiving and transferring of jobs - #check the status of the manager for correctness - job1 = receiveJobForTransfer() - assert job1['JobId'] == 'A_999999999_777777777_L888888888_SB000_uv.MS', 'unexpected job %s' % job1['JobId'] - sendNotification('JobStarted', job1['JobId']) - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job was started' - - sendNotification('JobProgress', job1['JobId'], percentage_done=25) - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job made progress' - - #just finish normally - sendNotification('JobFinished', job1['JobId']) - - time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - - - #2nd job will fail one transfer before completing - job2 = receiveJobForTransfer() - assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] - sendNotification('JobStarted', job2['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[0] == 1' - - # let job2 fail - sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong') - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - - #the 2nd job failed, so did not finish, and will be retried later - #the next received job should be the 3rd job - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - - - #3rd job will fail all the time - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - - - #receive again, 2nd and 3rd job are going to be retried - #this should be the 2nd job - job2 = receiveJobForTransfer() - assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] - sendNotification('JobStarted', job2['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #keep job2 running while we process job3 - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - - - #only 3rd job is unfinished, and job2 is running - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - - #3rd job will fail again - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - - - # in the mean time, finish job2 normally - sendNotification('JobFinished', job2['JobId']) - - #one job to go - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' - assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' - - - #still 3rd job is still unfinished, final retry - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' - assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' - assert 1 == report['series']['running_jobs']['values'][10], 'expected running jobs series[10] == 1' - - #3rd job will fail again - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') - - #3rd job should have failed after 3 retries - #no more jobs to go - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' - - #there should be no more reports, cause the job group 999999999 is finished as a whole - #and is removed from the manager at this point - reports = manager.getStatusReportDict() - assert 0 == len(reports), 'expected 0 reports' - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - - jobgroup_999999999_failed_dir = os.path.join(config.JOBS_DIR, 'failed', 'MoM_999999999') - failed_jobgroup_999999999_files = [os.path.join(jobgroup_999999999_failed_dir, f) for f in - os.listdir(jobgroup_999999999_failed_dir) - if fnmatch.fnmatch(f, '*_999999999_*.xml*')] - - assert 1 == len(failed_jobgroup_999999999_files), '1 and only 1 failed file expected for job_group 999999999' - for file in failed_jobgroup_999999999_files: - sendJobFileToManager(file) - - time.sleep(1.0) - - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 jobs unfinished' - assert manager.nrOfJobs() == 3, 'expected 3 jobs' #1 to_do/scheduled, 2 done - assert len(manager.getJobAdminDicts(status=JobToDo) + manager.getJobAdminDicts(status=JobScheduled)) == 1, 'expected 1 todo/scheduled jobs' - assert len(manager.getJobAdminDicts(status=JobProduced)) == 2, 'expected 2 done jobs' - - # this time, start and finish job3 normally - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId']) - sendNotification('JobFinished', job3['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - - #there should be no more reports, cause the job group 999999999 is finished as a whole - #and is removed from the manager at this point - reports = manager.getStatusReportDict() - assert 0 == len(reports), 'expected 0 reports' - assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - - manager.quit() - manager_thread.join() - - # and run all tests - unittest.main() - -except ConnectError as ce: - logger.error(ce) - exit_code = 3 -except Exception as e: - logger.error(e) - exit_code = 1 -finally: - if manager: - manager.quit() - manager_thread.join() - - if os.path.exists(config.JOBS_DIR): - shutil.rmtree(config.JOBS_DIR) - - # cleanup test queues and exit - if broker: - logger.info('removing test exchange from broker: %s', config.DEFAULT_INGEST_BUSNAME) - broker.delExchange(config.DEFAULT_INGEST_BUSNAME) - logger.info('removing test exchange from broker: %s', config.DEFAULT_INGEST_NOTIFICATION_BUSNAME) - broker.delExchange(cconfig.DEFAULT_INGEST_NOTIFICATION_BUSNAME) - logger.info('removing test queue from broker: %s', config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME) - broker.delQueue(config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME, if_empty=False, if_unused=False) - logger.info('removing test queue from broker: %s', config.DEFAULT_INGEST_JOBS_QUEUENAME) - broker.delQueue(config.DEFAULT_INGEST_JOBS_QUEUENAME, if_empty=False, if_unused=False) - logger.info('removing test queue from broker: %s', config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME) - broker.delQueue(config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, if_empty=False, if_unused=False) - if connection: - connection.close() +with TemporaryQueue(testname+"_ingest_notification_bus") as tmp_queue1, \ + TemporaryQueue(testname+"_ingest_bus") as tmp_queue2, \ + TemporaryQueue(testname + "_jobman_notification_bus") as tmp_queue3, \ + TemporaryQueue(testname + "_jobs_queue") as tmp_queue4, \ + TemporaryQueue(testname + "_jobs_for_transfer_queue") as tmp_queue5: + + #overwrite some defaults in the config to run this as an isolated test + import lofar.lta.ingest.common.config as cconfig + cconfig.DEFAULT_INGEST_NOTIFICATION_BUSNAME = tmp_queue1.address + + import lofar.lta.ingest.server.config as config + config.DEFAULT_INGEST_BUSNAME = tmp_queue2.address + config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME = tmp_queue3.address + config.DEFAULT_INGEST_JOBS_QUEUENAME = tmp_queue4.address + config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME = tmp_queue5.address + + config.JOBS_DIR = os.path.join(tempfile.gettempdir(), testname, 'jobs') + config.FINISHED_NOTIFICATION_MAILING_LIST = '' + config.MAX_NR_OF_RETRIES = 3 + + from lofar.lta.ingest.common.job import * + from lofar.lta.ingest.client.ingestbuslistener import JobsMonitor + + connection = None + broker = None + manager = None + manager_thread = None + exit_code = 0 + + try: + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + logger = logging.getLogger(__name__) + + from lofar.lta.ingest.server.ingestjobmanagementserver import IngestJobManager + + # create some 'to do' job files for group 999999999 + for i in range(3): + testfile_path = os.path.join(config.JOBS_DIR, 'to_do', 'testjob_%s.xml' % i) + logger.info('creating test jobfile: %s', testfile_path) + createJobXmlFile(testfile_path, 'test-project', 999999999, 888888888, 'L888888888_SB00%s_uv.MS'%i, 777777777+i, 'somehost:/path/to/dp') + time.sleep(0.25) # need to sleep so the files have different timestamps and are read from old to new + + # create some 'failed/done' job files for another group 666666666 + # these will not be transfered, but are just sitting there, and should not interfere (which is what we'll test) + for i in range(4): + testfile_path = os.path.join(config.JOBS_DIR, + 'failed' if i%2==0 else 'done', + 'MoM_666666666', + 'testjob_%s.xml' % i) + logger.info('creating test jobfile: %s', testfile_path) + createJobXmlFile(testfile_path, 'test-project', 666666666, 555555555, 'L888888888_SB00%s_uv.MS'%i, 444444444+i, 'somehost:/path/to/dp') + time.sleep(0.25) # need to sleep so the files have different timestamps and are read from old to new + + with FromBus(config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME) as test_consumer: + with ToBus(config.DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME) as test_notifier: + + def sendNotification(event, job_id, message=None, percentage_done=None): + content = { 'job_id': job_id } + if message: + content['message'] = message + if percentage_done: + content['percentage_done'] = percentage_done + event_msg = EventMessage(context=config.DEFAULT_INGEST_NOTIFICATION_PREFIX + event, content=content) + logger.info('sending test event message on %s subject=%s content=%s', + test_notifier.address, event_msg.subject, event_msg.content) + test_notifier.send(event_msg) + + def receiveJobForTransfer(): + msg = test_consumer.receive(timeout=1) + + if msg and isinstance(msg, CommandMessage): + test_consumer.ack(msg) + job = parseJobXml(msg.content) + if job and job.get('JobId'): + logger.info("test consumer received job on queue: %s", job) + return job + return None + + def sendJobFileToManager(jobfile_path): + try: + with ToBus(address=config.DEFAULT_INGEST_JOBS_QUEUENAME) as bus: + with open(jobfile_path) as file: + file_content = file.read() + msg = CommandMessage(content=file_content) + bus.send(msg) + logger.info('submitted jobfile %s to queue %s', jobfile_path, config.DEFAULT_INGEST_JOBS_QUEUENAME) + except Exception as e: + logger.error('sendJobFileToManager error: %s', e) + + + # by starting the job manager, all job files in the non-finished dirs will be scanned and picked up. + manager = IngestJobManager() + manager_thread = Thread(target=manager.run) + manager_thread.daemon = True + manager_thread.start() + + with JobsMonitor() as monitor: + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished before any job was started' + assert manager.nrOfJobs() == 3, 'expected 3 jobs in total before any job was started' + + #mimick receiving and transferring of jobs + #check the status of the manager for correctness + job1 = receiveJobForTransfer() + assert job1['JobId'] == 'A_999999999_777777777_L888888888_SB000_uv.MS', 'unexpected job %s' % job1['JobId'] + sendNotification('JobStarted', job1['JobId']) + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job was started' + + sendNotification('JobProgress', job1['JobId'], percentage_done=25) + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job made progress' + + #just finish normally + sendNotification('JobFinished', job1['JobId']) + + time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + + + #2nd job will fail one transfer before completing + job2 = receiveJobForTransfer() + assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] + sendNotification('JobStarted', job2['JobId']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[0] == 1' + + # let job2 fail + sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong') + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + + #the 2nd job failed, so did not finish, and will be retried later + #the next received job should be the 3rd job + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + + + #3rd job will fail all the time + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + + + #receive again, 2nd and 3rd job are going to be retried + #this should be the 2nd job + job2 = receiveJobForTransfer() + assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] + sendNotification('JobStarted', job2['JobId']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #keep job2 running while we process job3 + #check report + report = manager.getStatusReportDict()['999999999'] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + + + #only 3rd job is unfinished, and job2 is running + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + + #3rd job will fail again + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + + + # in the mean time, finish job2 normally + sendNotification('JobFinished', job2['JobId']) + + #one job to go + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' + assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' + + + #still 3rd job is still unfinished, final retry + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' + + #check report + report = manager.getStatusReportDict()['999999999'] + assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' + assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' + assert 1 == report['series']['running_jobs']['values'][10], 'expected running jobs series[10] == 1' + + #3rd job will fail again + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') + + #3rd job should have failed after 3 retries + #no more jobs to go + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' + + #there should be no more reports, cause the job group 999999999 is finished as a whole + #and is removed from the manager at this point + reports = manager.getStatusReportDict() + assert 0 == len(reports), 'expected 0 reports' + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + + jobgroup_999999999_failed_dir = os.path.join(config.JOBS_DIR, 'failed', 'MoM_999999999') + failed_jobgroup_999999999_files = [os.path.join(jobgroup_999999999_failed_dir, f) for f in + os.listdir(jobgroup_999999999_failed_dir) + if fnmatch.fnmatch(f, '*_999999999_*.xml*')] + + assert 1 == len(failed_jobgroup_999999999_files), '1 and only 1 failed file expected for job_group 999999999' + for file in failed_jobgroup_999999999_files: + sendJobFileToManager(file) + + time.sleep(1.0) + + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 jobs unfinished' + assert manager.nrOfJobs() == 3, 'expected 3 jobs' #1 to_do/scheduled, 2 done + assert len(manager.getJobAdminDicts(status=JobToDo) + manager.getJobAdminDicts(status=JobScheduled)) == 1, 'expected 1 todo/scheduled jobs' + assert len(manager.getJobAdminDicts(status=JobProduced)) == 2, 'expected 2 done jobs' + + # this time, start and finish job3 normally + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId']) + sendNotification('JobFinished', job3['JobId']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + + #there should be no more reports, cause the job group 999999999 is finished as a whole + #and is removed from the manager at this point + reports = manager.getStatusReportDict() + assert 0 == len(reports), 'expected 0 reports' + assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + + manager.quit() + manager_thread.join() + + except Exception as e: + logger.error(e) + exit_code = 1 + finally: + if manager: + manager.quit() + manager_thread.join() + + if os.path.exists(config.JOBS_DIR): + shutil.rmtree(config.JOBS_DIR) exit(exit_code) -- GitLab