Skip to content
Snippets Groups Projects
Commit 78caa8e6 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: fixed test. use handlers in a context in the run method

parent b4fad5b3
No related branches found
No related tags found
No related merge requests found
......@@ -25,7 +25,7 @@ from lofar.messaging.config import DEFAULT_BUSNAME, DEFAULT_BROKER
from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME, DEFAULT_INGEST_INCOMING_JOB_SUBJECT, DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT
from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, FINISHED_NOTIFICATION_BCC_MAILING_LIST, DEFAULT_JOB_PRIORITY
from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, Service, ServiceMessageHandler, AbstractMessageHandler, BusListener
from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, RPCService, ServiceMessageHandler, AbstractMessageHandler, BusListener
from lofar.common.util import humanreadablesize
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
......@@ -69,8 +69,19 @@ class IngestJobManager:
logger.info('starting listening for new jobs and notifications')
incoming_jobs_listener = BusListener(IngestIncomingJobsHandler, {'job_manager': self},
exchange=self._tobus.exchange, broker=self._tobus.broker,
routing_key="%s.#" % DEFAULT_INGEST_INCOMING_JOB_SUBJECT)
ingest_event_listener = IngestEventMesssageBusListener(IngestEventMessageHandlerForJobManager,
{'job_manager': self},
exchange=self._tobus.exchange, broker=self._tobus.broker)
ingest_service = RPCService(DEFAULT_INGEST_SERVICENAME, IngestServiceMessageHandler, {'job_manager': self},
exchange=self._tobus.exchange, broker=self._tobus.broker, num_threads=1)
# open exchange connections...
with self._tobus:
with incoming_jobs_listener, ingest_event_listener, ingest_service, self._tobus:
# ... and run the event loop,
# produce jobs to managed job queue for ingest transfer services
# receive new jobs
......@@ -97,7 +108,7 @@ class IngestJobManager:
else:
logger.info('%s jobs are running', len(producing_jads))
time.sleep(10)
time.sleep(2)
except KeyboardInterrupt:
break
except Exception as e:
......@@ -598,7 +609,7 @@ class IngestJobManager:
# filter out jad's from exclude_job_group_ids
job_admin_dicts = [jad for jad in job_admin_dicts if 'job_group_id' not in jad['job'] or jad['job']['job_group_id'] not in exclude_job_group_ids]
job_admin_dicts = sorted(job_admin_dicts, cmp_to_key=jad_compare_func)
job_admin_dicts = sorted(job_admin_dicts, key=cmp_to_key(jad_compare_func))
if job_admin_dicts:
logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status))
return job_admin_dicts[0]
......@@ -1054,9 +1065,8 @@ Total Files: %(total)i
done_group_mom_jobs = [job for job in done_group_jobs if job.get('Type', '').lower() == 'mom']
mom_export_ids = set([int(job['JobId'].split('_')[1]) for job in done_group_mom_jobs if 'JobId' in job])
with MoMQueryRPC(exchange=self._tobus.exchange, broker=self._tobus.broker, timeout=10) as momrpc:
for i in range(10):
mom_objects_details = momrpc.getObjectDetails(mom_export_ids)
with MoMQueryRPC.create(exchange=self._tobus.exchange, broker=self._tobus.broker) as momrpc:
mom_objects_details = momrpc.getObjectDetails(mom_export_ids)
project_mom2ids = set(obj_details.get('project_mom2id') for obj_details in mom_objects_details.values())
project_mom2ids = [x for x in project_mom2ids if x is not None]
......@@ -1177,19 +1187,7 @@ def main():
jobs_dir=options.jobs_dir,
max_num_retries=options.max_num_retries,
broker=options.broker)
incoming_jobs_listener = BusListener(IngestIncomingJobsHandler, {'job_manager': manager},
exchange=options.exchange, routing_key="%s.#" % DEFAULT_INGEST_INCOMING_JOB_SUBJECT)
ingest_event_listener = IngestEventMesssageBusListener(IngestEventMessageHandlerForJobManager,
{'job_manager': manager},
exchange=options.exchange)
ingest_service = Service(DEFAULT_INGEST_SERVICENAME, IngestServiceMessageHandler, {'job_manager': manager},
exchange=options.exchange, broker=options.broker, num_threads=1)
with incoming_jobs_listener, ingest_event_listener, ingest_service:
manager.run()
manager.run()
if __name__ == '__main__':
main()
......
......@@ -14,8 +14,9 @@ import logging
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
from lofar.messaging.messagebus import FromBus, ToBus, TemporaryExchange, TemporaryQueue
from lofar.messaging.messagebus import TemporaryExchange, TemporaryQueue
from lofar.messaging.messages import CommandMessage, EventMessage
from lofar.messaging.messagelogger import MessageLogger
import lofar.lta.ingest.server.config as ingest_config
......@@ -24,38 +25,40 @@ testname = 'TEST_INGESTJOBMANAGEMENTSERVER_%s' % uuid.uuid1().hex[:6]
with TemporaryExchange(testname+"_bus") as tmp_bus:
logger.info(tmp_bus.address)
ingest_config.JOBS_DIR = os.path.join(tempfile.gettempdir(), testname, 'jobs')
ingest_config.FINISHED_NOTIFICATION_MAILING_LIST = ''
ingest_config.MAX_NR_OF_RETRIES = 3
from lofar.lta.ingest.server.ingestjobmanagementserver import IngestJobManager
from lofar.lta.ingest.common.job import *
manager = None
manager_thread = None
exit_code = 0
try:
# create some 'to do' job files for group 999999999
for i in range(3):
testfile_path = os.path.join(ingest_config.JOBS_DIR, 'to_do', 'testjob_%s.xml' % i)
logger.info('creating test jobfile: %s', testfile_path)
createJobXmlFile(testfile_path, 'test-project', 999999999, 888888888, 'L888888888_SB00%s_uv.MS'%i, 777777777+i, 'somehost:/path/to/dp')
time.sleep(0.1) # need to sleep so the files have different timestamps and are read from old to new
# create some 'failed/done' job files for another group 666666666
# these will not be transfered, but are just sitting there, and should not interfere (which is what we'll test)
for i in range(4):
testfile_path = os.path.join(ingest_config.JOBS_DIR,
'failed' if i%2==0 else 'done',
'MoM_666666666',
'testjob_%s.xml' % i)
logger.info('creating test jobfile: %s', testfile_path)
createJobXmlFile(testfile_path, 'test-project', 666666666, 555555555, 'L888888888_SB00%s_uv.MS'%i, 444444444+i, 'somehost:/path/to/dp')
time.sleep(0.1) # need to sleep so the files have different timestamps and are read from old to new
with FromBus(ingest_config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME) as test_consumer:
with ToBus(tmp_bus.address) as test_notifier:
with TemporaryQueue(testname, exchange=tmp_bus.address, routing_key="%s.#" % ingest_config.DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT) as tmp_job_queue, \
MessageLogger(exchange=tmp_bus.address, remove_content_newlines=True): # use messagelogger to log what is sent over the bus for reference.
ingest_config.JOBS_DIR = os.path.join(tempfile.gettempdir(), testname, 'jobs')
ingest_config.FINISHED_NOTIFICATION_MAILING_LIST = ''
ingest_config.MAX_NR_OF_RETRIES = 3
from lofar.lta.ingest.server.ingestjobmanagementserver import IngestJobManager
from lofar.lta.ingest.common.job import *
manager = None
manager_thread = None
exit_code = 0
try:
# create some 'to do' job files for group 999999999
for i in range(3):
testfile_path = os.path.join(ingest_config.JOBS_DIR, 'to_do', 'testjob_%s.xml' % i)
logger.info('creating test jobfile: %s', testfile_path)
createJobXmlFile(testfile_path, 'test-project', 999999999, 888888888, 'L888888888_SB00%s_uv.MS'%i, 777777777+i, 'somehost:/path/to/dp')
time.sleep(0.1) # need to sleep so the files have different timestamps and are read from old to new
# create some 'failed/done' job files for another group 666666666
# these will not be transfered, but are just sitting there, and should not interfere (which is what we'll test)
for i in range(4):
testfile_path = os.path.join(ingest_config.JOBS_DIR,
'failed' if i%2==0 else 'done',
'MoM_666666666',
'testjob_%s.xml' % i)
logger.info('creating test jobfile: %s', testfile_path)
createJobXmlFile(testfile_path, 'test-project', 666666666, 555555555, 'L888888888_SB00%s_uv.MS'%i, 444444444+i, 'somehost:/path/to/dp')
time.sleep(0.1) # need to sleep so the files have different timestamps and are read from old to new
with tmp_job_queue.create_frombus() as test_consumer, tmp_bus.create_tobus() as test_notifier:
def sendNotification(event, job_id, message=None, percentage_done=None, export_id=None):
content = { 'job_id': job_id }
......@@ -68,7 +71,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus:
event_msg = EventMessage(subject="%s.%s" % (ingest_config.INGEST_NOTIFICATION_PREFIX, event),
content=content)
logger.info('sending test event message on %s subject=%s content=%s',
test_notifier.address, event_msg.subject, event_msg.content)
test_notifier.exchange, event_msg.subject, event_msg.content)
test_notifier.send(event_msg)
def receiveJobForTransfer():
......@@ -88,13 +91,13 @@ with TemporaryExchange(testname+"_bus") as tmp_bus:
file_content = file.read()
msg = CommandMessage(content=file_content, subject=ingest_config.DEFAULT_INGEST_INCOMING_JOB_SUBJECT)
bus.send(msg)
logger.info('submitted jobfile %s to queue %s', jobfile_path, bus.address)
logger.info('submitted jobfile %s to exchange %s', jobfile_path, bus.exchange)
except Exception as e:
logger.error('sendJobFileToManager error: %s', e)
# by starting the job manager, all job files in the non-finished dirs will be scanned and picked up.
manager = IngestJobManager(busname=tmp_bus.address)
manager = IngestJobManager(exchange=tmp_bus.address)
manager_thread = Thread(target=manager.run)
manager_thread.daemon = True
manager_thread.start()
......@@ -109,9 +112,11 @@ with TemporaryExchange(testname+"_bus") as tmp_bus:
assert job1['JobId'] == 'A_999999999_777777777_L888888888_SB000_uv.MS', 'unexpected job %s' % job1['JobId']
sendNotification('JobStarted', job1['JobId'], export_id=job1['job_group_id'])
time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout
assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job was started'
sendNotification('JobProgress', job1['JobId'], percentage_done=25, export_id=job1['job_group_id'])
time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout
assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job made progress'
#just finish normally
......@@ -147,7 +152,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus:
assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[0] == 1'
# let job2 fail
sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong', export_id=job2['job_group_id'])
sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong (intentionally for this test)', export_id=job2['job_group_id'])
time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout
assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished'
......@@ -184,7 +189,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus:
#3rd job will fail all the time
sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id'])
sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong (intentionally for this test)', export_id=job3['job_group_id'])
time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout
assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished'
......@@ -249,7 +254,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus:
assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2'
#3rd job will fail again
sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id'])
sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong (intentionally for this test)', export_id=job3['job_group_id'])
time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout
assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished'
......@@ -322,7 +327,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus:
assert 1 == report['series']['running_jobs']['values'][10], 'expected running jobs series[10] == 1'
#3rd job will fail again
sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id'])
sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong (intentionally for this test)', export_id=job3['job_group_id'])
#3rd job should have failed after 3 retries
#no more jobs to go
......@@ -366,18 +371,18 @@ with TemporaryExchange(testname+"_bus") as tmp_bus:
assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished'
time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout
manager.quit()
manager_thread.join()
manager.quit()
manager_thread.join()
except Exception as e:
logger.exception(e)
exit_code = 1
finally:
if manager:
manager.quit()
manager_thread.join()
except Exception as e:
logger.exception(e)
exit_code = 1
finally:
if manager:
manager.quit()
manager_thread.join()
if os.path.exists(ingest_config.JOBS_DIR):
shutil.rmtree(ingest_config.JOBS_DIR)
if os.path.exists(ingest_config.JOBS_DIR):
shutil.rmtree(ingest_config.JOBS_DIR)
exit(exit_code)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment