diff --git a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py index 03b4f3a1b665230b724f320a9b2769a1c2153dcf..d7cfe7e8aa3d6c8020e1a91d605621a29dbc01c1 100644 --- a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py +++ b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py @@ -31,7 +31,7 @@ import logging logger = logging.getLogger() class IngestBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + def __init__(self, busname=DEFAULT_BUSNAME, queue_name=None, broker=DEFAULT_BROKER): """ IngestBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from IngestBusListener and implement the specific on<SomeMessage> methods that you are interested in. @@ -39,8 +39,11 @@ class IngestBusListener(AbstractBusListener): :param broker: valid Qpid broker host """ self.subject_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX + "." - super(IngestBusListener, self).__init__(exchange_name=busname, routing_key=self.subject_prefix+"#", broker=broker) + super(IngestBusListener, self).__init__(exchange_name=busname, routing_key=self.subject_prefix+"#", + queue_name=queue_name, broker=broker) + def start_listening(self): + super(IngestBusListener, self).start_listening() def _handleMessage(self, msg): try: @@ -161,8 +164,8 @@ class IngestBusListener(AbstractBusListener): class JobsMonitor(IngestBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, listen_for_all_jobs=True): - super(JobsMonitor, self).__init__(busname=busname, broker=broker) + def __init__(self, busname=DEFAULT_BUSNAME, queue_name=None, broker=DEFAULT_BROKER, listen_for_all_jobs=True): + super(JobsMonitor, self).__init__(busname=busname, queue_name=queue_name, broker=broker) self.__monitored_jobs = set() self.__listen_for_all_jobs = listen_for_all_jobs diff --git a/LTA/LTAIngest/LTAIngestServer/CMakeLists.txt b/LTA/LTAIngest/LTAIngestServer/CMakeLists.txt index 3563f05792813e6d40a329894cac07221103a4fa..6347ce851a4d000f9e7154f79993cdd9b59bc639 100644 --- a/LTA/LTAIngest/LTAIngestServer/CMakeLists.txt +++ b/LTA/LTAIngest/LTAIngestServer/CMakeLists.txt @@ -3,4 +3,4 @@ lofar_add_package(LTAIngestAdminServer) lofar_add_package(LTAIngestTransferServer) lofar_add_package(LTAIngestWebServer) -lofar_package(LTAIngestServer 2.0 DEPENDS LTAIngestAdminServer LTAIngestTransferServer) +lofar_package(LTAIngestServer 2.0 DEPENDS LTAIngestAdminServer LTAIngestTransferServer MoMQueryService) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 5d1effd1a2e1500063578790726804f60e42fee0..6d216db676c360cf27313143cfbd7a03a40a5f9a 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -29,7 +29,7 @@ from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAUL from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus from lofar.messaging import create_queue, create_binding from lofar.messaging.Service import Service, MessageHandlerInterface -from lofar.common.util import convertIntKeysToString, humanreadablesize +from lofar.common.util import humanreadablesize from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC import os @@ -961,7 +961,7 @@ class IngestJobManager: except Exception as e: logger.error(e) - return convertIntKeysToString(result) + return result def setExportJobPriority(self, export_id, priority): priority = max(0, min(9, int(priority))) @@ -1133,7 +1133,9 @@ Total Files: %(total)i # and add these to the extra_mail_addresses done_group_mom_jobs = [job for job in done_group_jobs if job.get('Type', '').lower() == 'mom'] mom_export_ids = set([int(job['JobId'].split('_')[1]) for job in done_group_mom_jobs if 'JobId' in job]) - project_mom2ids = set([self.__momrpc.getObjectDetails(mom_export_id).get(mom_export_id).get('project_mom2id') for mom_export_id in mom_export_ids]) + for i in range(10): + mom_objects_details = self.__momrpc.getObjectDetails(mom_export_ids) + project_mom2ids = set(obj_details.get('project_mom2id') for obj_details in mom_objects_details.values()) project_mom2ids = [x for x in project_mom2ids if x is not None] for project_mom2id in project_mom2ids: diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py index 38a515fe7d4035ec24ccb44e214daa3fb13c13af..567f69545e8f73cbf580620b77f4458398d50c1d 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py @@ -30,6 +30,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus, \ exchange_name=tmp_bus.address, routing_key=ingest_config.DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT) as tmp_jobs_for_transfer_queue: logger.info(tmp_bus.address) + # time.sleep(5) #overwrite some defaults in the config to run this as an isolated test msg_config.DEFAULT_BROKER = "localhost" @@ -74,12 +75,14 @@ with TemporaryExchange(testname+"_bus") as tmp_bus, \ with FromBus(ingest_config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME) as test_consumer: with ToBus(tmp_bus.address) as test_notifier: - def sendNotification(event, job_id, message=None, percentage_done=None): + def sendNotification(event, job_id, message=None, percentage_done=None, export_id=None): content = { 'job_id': job_id } if message: content['message'] = message if percentage_done: content['percentage_done'] = percentage_done + if export_id: + content['export_id'] = export_id event_msg = EventMessage(subject="%s.%s" % (ingest_config.DEFAULT_INGEST_NOTIFICATION_PREFIX, event), content=content) logger.info('sending test event message on %s subject=%s content=%s', @@ -92,7 +95,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus, \ if msg and isinstance(msg, CommandMessage): job = parseJobXml(msg.content) if job and job.get('JobId'): - logger.info("test consumer received job on queue: %s", job) + logger.info("test consumer (stub-ingesttransferservcer) received job on queue: %s", job) return job return None @@ -109,276 +112,282 @@ with TemporaryExchange(testname+"_bus") as tmp_bus, \ # by starting the job manager, all job files in the non-finished dirs will be scanned and picked up. - manager = IngestJobManager() + manager = IngestJobManager(busname=tmp_bus.address) manager_thread = Thread(target=manager.run) manager_thread.daemon = True manager_thread.start() - with JobsMonitor() as monitor: - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished before any job was started' - assert manager.nrOfJobs() == 3, 'expected 3 jobs in total before any job was started' - - #mimick receiving and transferring of jobs - #check the status of the manager for correctness - job1 = receiveJobForTransfer() - assert job1['JobId'] == 'A_999999999_777777777_L888888888_SB000_uv.MS', 'unexpected job %s' % job1['JobId'] - sendNotification('JobStarted', job1['JobId']) - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job was started' - - sendNotification('JobProgress', job1['JobId'], percentage_done=25) - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job made progress' - - #just finish normally - sendNotification('JobFinished', job1['JobId']) - - time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - - - #2nd job will fail one transfer before completing - job2 = receiveJobForTransfer() - assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] - sendNotification('JobStarted', job2['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[0] == 1' - - # let job2 fail - sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong') - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - - #the 2nd job failed, so did not finish, and will be retried later - #the next received job should be the 3rd job - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - - - #3rd job will fail all the time - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - - - #receive again, 2nd and 3rd job are going to be retried - #this should be the 2nd job - job2 = receiveJobForTransfer() - assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] - sendNotification('JobStarted', job2['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #keep job2 running while we process job3 - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - - - #only 3rd job is unfinished, and job2 is running - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - - #3rd job will fail again - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - - - # in the mean time, finish job2 normally - sendNotification('JobFinished', job2['JobId']) - - #one job to go - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' - assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' - - - #still 3rd job is still unfinished, final retry - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' - - #check report - report = manager.getStatusReportDict()['999999999'] - assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' - assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' - assert 1 == report['series']['running_jobs']['values'][10], 'expected running jobs series[10] == 1' - - #3rd job will fail again - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong') - - #3rd job should have failed after 3 retries - #no more jobs to go - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' - - #there should be no more reports, cause the job group 999999999 is finished as a whole - #and is removed from the manager at this point - reports = manager.getStatusReportDict() - assert 0 == len(reports), 'expected 0 reports' - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - - jobgroup_999999999_failed_dir = os.path.join(ingest_config.JOBS_DIR, 'failed', 'MoM_999999999') - failed_jobgroup_999999999_files = [os.path.join(jobgroup_999999999_failed_dir, f) for f in - os.listdir(jobgroup_999999999_failed_dir) - if fnmatch.fnmatch(f, '*_999999999_*.xml*')] - - assert 1 == len(failed_jobgroup_999999999_files), '1 and only 1 failed file expected for job_group 999999999' - for file in failed_jobgroup_999999999_files: - sendJobFileToManager(file) - - time.sleep(1.0) - - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 jobs unfinished' - assert manager.nrOfJobs() == 3, 'expected 3 jobs' #1 to_do/scheduled, 2 done - assert len(manager.getJobAdminDicts(status=JobToDo) + manager.getJobAdminDicts(status=JobScheduled)) == 1, 'expected 1 todo/scheduled jobs' - assert len(manager.getJobAdminDicts(status=JobProduced)) == 2, 'expected 2 done jobs' - - # this time, start and finish job3 normally - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId']) - sendNotification('JobFinished', job3['JobId']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - - #there should be no more reports, cause the job group 999999999 is finished as a whole - #and is removed from the manager at this point - reports = manager.getStatusReportDict() - assert 0 == len(reports), 'expected 0 reports' - assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + # setup a special tmp queue for the JobsMonitor to listen for ingest-job-notifications + # do not bind it to the exchange here in the test, but allow the JobsMonitor constructor to create the + # binding to the test exchange. + with TemporaryQueue(testname + "_notifications_for_test_jobsmonitor") as notification_queue: + with JobsMonitor(busname=tmp_bus.address, queue_name=notification_queue.address) as monitor: + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished before any job was started' + assert manager.nrOfJobs() == 3, 'expected 3 jobs in total before any job was started' + + #mimick receiving and transferring of jobs + #check the status of the manager for correctness + job1 = receiveJobForTransfer() + logger.info("jobs: %s", job1) + + assert job1['JobId'] == 'A_999999999_777777777_L888888888_SB000_uv.MS', 'unexpected job %s' % job1['JobId'] + sendNotification('JobStarted', job1['JobId'], export_id=job1['job_group_id']) + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job was started' + + sendNotification('JobProgress', job1['JobId'], percentage_done=25, export_id=job1['job_group_id']) + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job made progress' + + #just finish normally + sendNotification('JobFinished', job1['JobId'], export_id=job1['job_group_id']) + + time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + + + #2nd job will fail one transfer before completing + job2 = receiveJobForTransfer() + assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] + sendNotification('JobStarted', job2['JobId'], export_id=job2['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[0] == 1' + + # let job2 fail + sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong', export_id=job2['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + + #the 2nd job failed, so did not finish, and will be retried later + #the next received job should be the 3rd job + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + + + #3rd job will fail all the time + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + + + #receive again, 2nd and 3rd job are going to be retried + #this should be the 2nd job + job2 = receiveJobForTransfer() + assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] + sendNotification('JobStarted', job2['JobId'], export_id=job2['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #keep job2 running while we process job3 + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + + + #only 3rd job is unfinished, and job2 is running + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + + #3rd job will fail again + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + + + # in the mean time, finish job2 normally + sendNotification('JobFinished', job2['JobId'], export_id=job2['job_group_id']) + + #one job to go + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' + assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' + + + #still 3rd job is still unfinished, final retry + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' + assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' + assert 1 == report['series']['running_jobs']['values'][10], 'expected running jobs series[10] == 1' + + #3rd job will fail again + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + + #3rd job should have failed after 3 retries + #no more jobs to go + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' + + #there should be no more reports, cause the job group 999999999 is finished as a whole + #and is removed from the manager at this point + reports = manager.getStatusReportDict() + assert 0 == len(reports), 'expected 0 reports' + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + + jobgroup_999999999_failed_dir = os.path.join(ingest_config.JOBS_DIR, 'failed', 'MoM_999999999') + failed_jobgroup_999999999_files = [os.path.join(jobgroup_999999999_failed_dir, f) for f in + os.listdir(jobgroup_999999999_failed_dir) + if fnmatch.fnmatch(f, '*_999999999_*.xml*')] + + assert 1 == len(failed_jobgroup_999999999_files), '1 and only 1 failed file expected for job_group 999999999' + for file in failed_jobgroup_999999999_files: + sendJobFileToManager(file) + + time.sleep(1.0) + + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 jobs unfinished' + assert manager.nrOfJobs() == 3, 'expected 3 jobs' #1 to_do/scheduled, 2 done + assert len(manager.getJobAdminDicts(status=JobToDo) + manager.getJobAdminDicts(status=JobScheduled)) == 1, 'expected 1 todo/scheduled jobs' + assert len(manager.getJobAdminDicts(status=JobProduced)) == 2, 'expected 2 done jobs' + + # this time, start and finish job3 normally + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) + sendNotification('JobFinished', job3['JobId'], export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + + #there should be no more reports, cause the job group 999999999 is finished as a whole + #and is removed from the manager at this point + reports = manager.getStatusReportDict() + assert 0 == len(reports), 'expected 0 reports' + assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout manager.quit() manager_thread.join()