From 78caa8e694c48d70f2becb7d38d2e354a068d4a0 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 11 Jun 2019 13:12:08 +0000 Subject: [PATCH] SW-699: fixed test. use handlers in a context in the run method --- .../lib/ingestjobmanagementserver.py | 38 +++---- .../test/t_ingestjobmanagementserver.py | 107 +++++++++--------- 2 files changed, 74 insertions(+), 71 deletions(-) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 0036c248a9e..cfb5585ccdc 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -25,7 +25,7 @@ from lofar.messaging.config import DEFAULT_BUSNAME, DEFAULT_BROKER from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME, DEFAULT_INGEST_INCOMING_JOB_SUBJECT, DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, FINISHED_NOTIFICATION_BCC_MAILING_LIST, DEFAULT_JOB_PRIORITY -from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, Service, ServiceMessageHandler, AbstractMessageHandler, BusListener +from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, RPCService, ServiceMessageHandler, AbstractMessageHandler, BusListener from lofar.common.util import humanreadablesize from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC @@ -69,8 +69,19 @@ class IngestJobManager: logger.info('starting listening for new jobs and notifications') + incoming_jobs_listener = BusListener(IngestIncomingJobsHandler, {'job_manager': self}, + exchange=self._tobus.exchange, broker=self._tobus.broker, + routing_key="%s.#" % DEFAULT_INGEST_INCOMING_JOB_SUBJECT) + + ingest_event_listener = IngestEventMesssageBusListener(IngestEventMessageHandlerForJobManager, + {'job_manager': self}, + exchange=self._tobus.exchange, broker=self._tobus.broker) + + ingest_service = RPCService(DEFAULT_INGEST_SERVICENAME, IngestServiceMessageHandler, {'job_manager': self}, + exchange=self._tobus.exchange, broker=self._tobus.broker, num_threads=1) + # open exchange connections... - with self._tobus: + with incoming_jobs_listener, ingest_event_listener, ingest_service, self._tobus: # ... and run the event loop, # produce jobs to managed job queue for ingest transfer services # receive new jobs @@ -97,7 +108,7 @@ class IngestJobManager: else: logger.info('%s jobs are running', len(producing_jads)) - time.sleep(10) + time.sleep(2) except KeyboardInterrupt: break except Exception as e: @@ -598,7 +609,7 @@ class IngestJobManager: # filter out jad's from exclude_job_group_ids job_admin_dicts = [jad for jad in job_admin_dicts if 'job_group_id' not in jad['job'] or jad['job']['job_group_id'] not in exclude_job_group_ids] - job_admin_dicts = sorted(job_admin_dicts, cmp_to_key=jad_compare_func) + job_admin_dicts = sorted(job_admin_dicts, key=cmp_to_key(jad_compare_func)) if job_admin_dicts: logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status)) return job_admin_dicts[0] @@ -1054,9 +1065,8 @@ Total Files: %(total)i done_group_mom_jobs = [job for job in done_group_jobs if job.get('Type', '').lower() == 'mom'] mom_export_ids = set([int(job['JobId'].split('_')[1]) for job in done_group_mom_jobs if 'JobId' in job]) - with MoMQueryRPC(exchange=self._tobus.exchange, broker=self._tobus.broker, timeout=10) as momrpc: - for i in range(10): - mom_objects_details = momrpc.getObjectDetails(mom_export_ids) + with MoMQueryRPC.create(exchange=self._tobus.exchange, broker=self._tobus.broker) as momrpc: + mom_objects_details = momrpc.getObjectDetails(mom_export_ids) project_mom2ids = set(obj_details.get('project_mom2id') for obj_details in mom_objects_details.values()) project_mom2ids = [x for x in project_mom2ids if x is not None] @@ -1177,19 +1187,7 @@ def main(): jobs_dir=options.jobs_dir, max_num_retries=options.max_num_retries, broker=options.broker) - - incoming_jobs_listener = BusListener(IngestIncomingJobsHandler, {'job_manager': manager}, - exchange=options.exchange, routing_key="%s.#" % DEFAULT_INGEST_INCOMING_JOB_SUBJECT) - - ingest_event_listener = IngestEventMesssageBusListener(IngestEventMessageHandlerForJobManager, - {'job_manager': manager}, - exchange=options.exchange) - - ingest_service = Service(DEFAULT_INGEST_SERVICENAME, IngestServiceMessageHandler, {'job_manager': manager}, - exchange=options.exchange, broker=options.broker, num_threads=1) - - with incoming_jobs_listener, ingest_event_listener, ingest_service: - manager.run() + manager.run() if __name__ == '__main__': main() diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py index c0f723a7c63..8e8b328f275 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py @@ -14,8 +14,9 @@ import logging logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) -from lofar.messaging.messagebus import FromBus, ToBus, TemporaryExchange, TemporaryQueue +from lofar.messaging.messagebus import TemporaryExchange, TemporaryQueue from lofar.messaging.messages import CommandMessage, EventMessage +from lofar.messaging.messagelogger import MessageLogger import lofar.lta.ingest.server.config as ingest_config @@ -24,38 +25,40 @@ testname = 'TEST_INGESTJOBMANAGEMENTSERVER_%s' % uuid.uuid1().hex[:6] with TemporaryExchange(testname+"_bus") as tmp_bus: logger.info(tmp_bus.address) - ingest_config.JOBS_DIR = os.path.join(tempfile.gettempdir(), testname, 'jobs') - ingest_config.FINISHED_NOTIFICATION_MAILING_LIST = '' - ingest_config.MAX_NR_OF_RETRIES = 3 - - from lofar.lta.ingest.server.ingestjobmanagementserver import IngestJobManager - from lofar.lta.ingest.common.job import * - - manager = None - manager_thread = None - exit_code = 0 - - try: - # create some 'to do' job files for group 999999999 - for i in range(3): - testfile_path = os.path.join(ingest_config.JOBS_DIR, 'to_do', 'testjob_%s.xml' % i) - logger.info('creating test jobfile: %s', testfile_path) - createJobXmlFile(testfile_path, 'test-project', 999999999, 888888888, 'L888888888_SB00%s_uv.MS'%i, 777777777+i, 'somehost:/path/to/dp') - time.sleep(0.1) # need to sleep so the files have different timestamps and are read from old to new - - # create some 'failed/done' job files for another group 666666666 - # these will not be transfered, but are just sitting there, and should not interfere (which is what we'll test) - for i in range(4): - testfile_path = os.path.join(ingest_config.JOBS_DIR, - 'failed' if i%2==0 else 'done', - 'MoM_666666666', - 'testjob_%s.xml' % i) - logger.info('creating test jobfile: %s', testfile_path) - createJobXmlFile(testfile_path, 'test-project', 666666666, 555555555, 'L888888888_SB00%s_uv.MS'%i, 444444444+i, 'somehost:/path/to/dp') - time.sleep(0.1) # need to sleep so the files have different timestamps and are read from old to new - - with FromBus(ingest_config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME) as test_consumer: - with ToBus(tmp_bus.address) as test_notifier: + with TemporaryQueue(testname, exchange=tmp_bus.address, routing_key="%s.#" % ingest_config.DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT) as tmp_job_queue, \ + MessageLogger(exchange=tmp_bus.address, remove_content_newlines=True): # use messagelogger to log what is sent over the bus for reference. + + ingest_config.JOBS_DIR = os.path.join(tempfile.gettempdir(), testname, 'jobs') + ingest_config.FINISHED_NOTIFICATION_MAILING_LIST = '' + ingest_config.MAX_NR_OF_RETRIES = 3 + + from lofar.lta.ingest.server.ingestjobmanagementserver import IngestJobManager + from lofar.lta.ingest.common.job import * + + manager = None + manager_thread = None + exit_code = 0 + + try: + # create some 'to do' job files for group 999999999 + for i in range(3): + testfile_path = os.path.join(ingest_config.JOBS_DIR, 'to_do', 'testjob_%s.xml' % i) + logger.info('creating test jobfile: %s', testfile_path) + createJobXmlFile(testfile_path, 'test-project', 999999999, 888888888, 'L888888888_SB00%s_uv.MS'%i, 777777777+i, 'somehost:/path/to/dp') + time.sleep(0.1) # need to sleep so the files have different timestamps and are read from old to new + + # create some 'failed/done' job files for another group 666666666 + # these will not be transfered, but are just sitting there, and should not interfere (which is what we'll test) + for i in range(4): + testfile_path = os.path.join(ingest_config.JOBS_DIR, + 'failed' if i%2==0 else 'done', + 'MoM_666666666', + 'testjob_%s.xml' % i) + logger.info('creating test jobfile: %s', testfile_path) + createJobXmlFile(testfile_path, 'test-project', 666666666, 555555555, 'L888888888_SB00%s_uv.MS'%i, 444444444+i, 'somehost:/path/to/dp') + time.sleep(0.1) # need to sleep so the files have different timestamps and are read from old to new + + with tmp_job_queue.create_frombus() as test_consumer, tmp_bus.create_tobus() as test_notifier: def sendNotification(event, job_id, message=None, percentage_done=None, export_id=None): content = { 'job_id': job_id } @@ -68,7 +71,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus: event_msg = EventMessage(subject="%s.%s" % (ingest_config.INGEST_NOTIFICATION_PREFIX, event), content=content) logger.info('sending test event message on %s subject=%s content=%s', - test_notifier.address, event_msg.subject, event_msg.content) + test_notifier.exchange, event_msg.subject, event_msg.content) test_notifier.send(event_msg) def receiveJobForTransfer(): @@ -88,13 +91,13 @@ with TemporaryExchange(testname+"_bus") as tmp_bus: file_content = file.read() msg = CommandMessage(content=file_content, subject=ingest_config.DEFAULT_INGEST_INCOMING_JOB_SUBJECT) bus.send(msg) - logger.info('submitted jobfile %s to queue %s', jobfile_path, bus.address) + logger.info('submitted jobfile %s to exchange %s', jobfile_path, bus.exchange) except Exception as e: logger.error('sendJobFileToManager error: %s', e) # by starting the job manager, all job files in the non-finished dirs will be scanned and picked up. - manager = IngestJobManager(busname=tmp_bus.address) + manager = IngestJobManager(exchange=tmp_bus.address) manager_thread = Thread(target=manager.run) manager_thread.daemon = True manager_thread.start() @@ -109,9 +112,11 @@ with TemporaryExchange(testname+"_bus") as tmp_bus: assert job1['JobId'] == 'A_999999999_777777777_L888888888_SB000_uv.MS', 'unexpected job %s' % job1['JobId'] sendNotification('JobStarted', job1['JobId'], export_id=job1['job_group_id']) + time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job was started' sendNotification('JobProgress', job1['JobId'], percentage_done=25, export_id=job1['job_group_id']) + time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job made progress' #just finish normally @@ -147,7 +152,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus: assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[0] == 1' # let job2 fail - sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong', export_id=job2['job_group_id']) + sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong (intentionally for this test)', export_id=job2['job_group_id']) time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' @@ -184,7 +189,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus: #3rd job will fail all the time - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong (intentionally for this test)', export_id=job3['job_group_id']) time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' @@ -249,7 +254,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus: assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' #3rd job will fail again - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong (intentionally for this test)', export_id=job3['job_group_id']) time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' @@ -322,7 +327,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus: assert 1 == report['series']['running_jobs']['values'][10], 'expected running jobs series[10] == 1' #3rd job will fail again - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong (intentionally for this test)', export_id=job3['job_group_id']) #3rd job should have failed after 3 retries #no more jobs to go @@ -366,18 +371,18 @@ with TemporaryExchange(testname+"_bus") as tmp_bus: assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - manager.quit() - manager_thread.join() + manager.quit() + manager_thread.join() - except Exception as e: - logger.exception(e) - exit_code = 1 - finally: - if manager: - manager.quit() - manager_thread.join() + except Exception as e: + logger.exception(e) + exit_code = 1 + finally: + if manager: + manager.quit() + manager_thread.join() - if os.path.exists(ingest_config.JOBS_DIR): - shutil.rmtree(ingest_config.JOBS_DIR) + if os.path.exists(ingest_config.JOBS_DIR): + shutil.rmtree(ingest_config.JOBS_DIR) exit(exit_code) -- GitLab