diff --git a/LTA/LTAIngest/LTAIngestClient/bin/ingestaddjobstoqueue b/LTA/LTAIngest/LTAIngestClient/bin/ingestaddjobstoqueue index efe8e43c335eb09d114eff5dd0d57ba90a086507..d4b08332f39cb96efccdb53e8dc25aa6415cc1f4 100755 --- a/LTA/LTAIngest/LTAIngestClient/bin/ingestaddjobstoqueue +++ b/LTA/LTAIngest/LTAIngestClient/bin/ingestaddjobstoqueue @@ -5,12 +5,9 @@ import os.path import fnmatch import logging from optparse import OptionParser -from lofar.messaging import CommandMessage, ToBus -from lofar.lta.ingest.common.config import DEFAULT_BROKER -from lofar.lta.ingest.common.config import DEFAULT_INGEST_JOBS_QUEUENAME -from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_SUBJECTS +from lofar.messaging import CommandMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.lta.ingest.server.config import DEFAULT_INGEST_INCOMING_JOB_SUBJECT from lofar.lta.ingest.common.job import parseJobXml -from lofar.lta.ingest.client.ingestbuslistener import JobsMonitor logger = logging.getLogger(__name__) @@ -19,13 +16,8 @@ if __name__ == '__main__': # Check the invocation arguments parser = OptionParser("%prog [options] <path_to_jobfile.xml>/<dir_containing_jobfiles>", description='Send the given jobfile or all jobfiles in the given directory to the ingest job queue.') - parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') - parser.add_option("--ingest_job_queuename", dest="ingest_job_queuename", type="string", - default=DEFAULT_INGEST_JOBS_QUEUENAME, - help="Name of the job queue. [default: %default]") - parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', default=DEFAULT_INGEST_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default') - parser.add_option('--ingest_notification_subjects', dest='ingest_notification_subjects', type='string', default=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the ingest notification bus exchange on the qpid broker, default: %default') - parser.add_option('-m', '--monitor', dest='monitor', action='store_true', help='keep monitoring these jobs after adding them to the queue') + parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') + parser.add_option('-e', '--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the broker on which the ingest messages are published, default: %default') parser.add_option('-p', '--priority', dest='priority', type='int', default=None, help='set the priority of the job(s) to given priority (0=paused, 1=lowest ... 9=highest, None=use priority given in jobfile.). default=%default') (options, args) = parser.parse_args() @@ -39,11 +31,9 @@ if __name__ == '__main__': if options.priority is not None: options.priority = min(9, max(0, options.priority)) - added_jobs_monitor = JobsMonitor(busname=options.ingest_notification_busname, subjects=options.ingest_notification_subjects, broker=options.broker, listen_for_all_jobs=False) - def sendJobFiles(jobfile_paths): try: - with ToBus(address=options.ingest_job_queuename, broker=options.broker) as bus: + with ToBus(exchange=options.exchange, broker=options.broker) as bus: for jobfile_path in jobfile_paths: try: with open(jobfile_path) as file: @@ -54,11 +44,11 @@ if __name__ == '__main__': if options.priority is not None: job['priority'] = options.priority - msg = CommandMessage(content=file_content) + msg = CommandMessage(subject=DEFAULT_INGEST_INCOMING_JOB_SUBJECT, content=file_content) msg.priority=int(job.get('priority', 4)) bus.send(msg) - added_jobs_monitor.addJobToListenFor(job['JobId']) - logger.info('submitted job %s to queue %s at %s', job['JobId'], bus.address, bus.broker) + logger.info('submitted job %s to exchange %s with subject %s at broker %s', + job['JobId'], bus.exchange, msg.subject, bus.broker) except Exception as e: logger.error(e) except Exception as e: @@ -82,11 +72,6 @@ if __name__ == '__main__': else: logger.error('given path is not a file or directory: %s', path) exit(1) - - if options.monitor: - with added_jobs_monitor: - added_jobs_monitor.waitForAllJobsToFinish() - except Exception as e: logger.error(e) exit(1) diff --git a/LTA/LTAIngest/LTAIngestClient/bin/ingestreport b/LTA/LTAIngest/LTAIngestClient/bin/ingestreport index ef2119a796ea426166868b081059ce2197b80490..f5315dadf1cc7a68b057508a3304027c529aca01 100755 --- a/LTA/LTAIngest/LTAIngestClient/bin/ingestreport +++ b/LTA/LTAIngest/LTAIngestClient/bin/ingestreport @@ -1,11 +1,8 @@ #!/usr/bin/env python3 import logging -from lofar.lta.ingest.client.rpc import IngestRPC -from lofar.lta.ingest.common.config import DEFAULT_BROKER -from lofar.lta.ingest.server.config import DEFAULT_INGEST_BUSNAME, DEFAULT_INGEST_SERVICENAME +from lofar.lta.ingest.client.rpc import IngestRPC, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_INGEST_SERVICENAME -import sys import pprint from optparse import OptionParser @@ -17,15 +14,15 @@ def main(): description='''Print report of current status of the given export_group_id (or all if no id given)\n export_group_id is the mom_id of the export job''') parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') - parser.add_option('-b', '--busname', dest='busname', type='string', default=DEFAULT_INGEST_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %s' % DEFAULT_INGEST_BUSNAME) - parser.add_option('-s', '--servicename', dest='servicename', type='string', default=DEFAULT_INGEST_SERVICENAME, help='Name for this service, default: %s' % DEFAULT_INGEST_SERVICENAME) + parser.add_option('-b', '--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %default') + parser.add_option('-s', '--service_name', dest='service_name', type='string', default=DEFAULT_INGEST_SERVICENAME, help='Name for this service, default: %default') parser.add_option('-d', '--detailed', dest='detailed', action='store_true', help='get a detail (developers) report') (options, args) = parser.parse_args() logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - with IngestRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc: + with IngestRPC(exchange=options.busname, service_name=options.service_name, broker=options.broker) as rpc: if options.detailed: pprint.pprint(rpc.getStatusReport()) else: diff --git a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py index d7cfe7e8aa3d6c8020e1a91d605621a29dbc01c1..6d6a4f129cbe1c1dd9b74afdbe29d5286fb3de25 100644 --- a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py +++ b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py @@ -20,9 +20,11 @@ from lofar.common.util import humanreadablesize -from lofar.messaging.messagebus import AbstractBusListener -from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_PREFIX -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging.messagebus import BusListener, AbstractMessageHandler +from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX +from lofar.messaging.messagebus import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging.messages import EventMessage +from lofar.common.util import waitForInterrupt import time import sys @@ -30,45 +32,42 @@ import sys import logging logger = logging.getLogger() -class IngestBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, queue_name=None, broker=DEFAULT_BROKER): - """ - IngestBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. - Typical usage is to derive your own subclass from IngestBusListener and implement the specific on<SomeMessage> methods that you are interested in. - :param busname: valid Qpid address - :param broker: valid Qpid broker host - """ - self.subject_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX + "." - super(IngestBusListener, self).__init__(exchange_name=busname, routing_key=self.subject_prefix+"#", - queue_name=queue_name, broker=broker) - def start_listening(self): - super(IngestBusListener, self).start_listening() +class IngestEventMessageHandler(AbstractMessageHandler): + def handle_message(self, msg: EventMessage) -> bool: + if not isinstance(msg, EventMessage): + logger.warning("%s: Ignoring non-EventMessage: %s", self.__class__.__name__, msg) + return False - def _handleMessage(self, msg): try: - logger.debug("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))) + stripped_subject = msg.subject.replace("%s." % INGEST_NOTIFICATION_PREFIX, '') + + self._log_job_notification(stripped_subject, msg.content) - if msg.subject == '%sJobStarted' % self.subject_prefix: + # map msg subject onto callback method + if stripped_subject == 'JobStarted': self.onJobStarted(msg.content) - elif msg.subject == '%sJobFinished' % self.subject_prefix: + elif stripped_subject == 'JobFinished': self.onJobFinished(msg.content) - elif msg.subject == '%sJobFailed' % self.subject_prefix: + elif stripped_subject == 'JobFailed': self.onJobFailed(msg.content) - elif msg.subject == '%sJobProgress' % self.subject_prefix: + elif stripped_subject == 'JobProgress': self.onJobProgress(msg.content) - elif msg.subject == '%sJobRemoved' % self.subject_prefix: + elif stripped_subject == 'JobRemoved': self.onJobRemoved(msg.content) - elif msg.subject == '%sJobTransferFailed' % self.subject_prefix: + elif stripped_subject == 'JobTransferFailed': self.onJobTransferFailed(msg.content) - elif msg.subject == '%sTaskProgress' % self.subject_prefix: + elif stripped_subject == 'TaskProgress': self.onTaskProgress(msg.content) - elif msg.subject == '%sTaskFinished' % self.subject_prefix: + elif stripped_subject == 'TaskFinished': self.onTaskFinished(msg.content) - elif msg.subject == '%sTransferServiceStatus' % self.subject_prefix: + elif stripped_subject == 'TransferServiceStatus': self.onTransferServiceStatus(msg.content) else: logger.error("IngestBusListener.handleMessage: unknown subject: %s", msg.subject) + return False + + return True except Exception as e: logger.error("IngestBusListener.handleMessage: %s", e) raise @@ -118,19 +117,21 @@ class IngestBusListener(AbstractBusListener): :param status_dict: dictionary with the status''' pass - def _logJobNotification(self, status, job_dict, level=logging.INFO): + def _log_job_notification(self, status: str, job_dict: dict): try: - msg = 'job ' + msg = '' - if status == 'progress': - msg += 'is transferring. ' + if status == 'JobProgress': + msg += 'job is transferring. ' + elif status == 'TransferServiceStatus': + msg += 'TransferServiceStatus: ' else: - msg += 'status changed to %s. ' % (status,) + msg += 'job status changed to %s. ' % (status,) - msg += 'project: %s export_id: %s type: %s server: %s'% (job_dict.get('project'), - job_dict.get('export_id'), - job_dict.get('type'), - job_dict.get('ingest_server')) + msg += 'project: %s export_id: %s type: %s server: %s' % (job_dict.get('project'), + job_dict.get('export_id'), + job_dict.get('type'), + job_dict.get('ingest_server')) if job_dict.get('archive_id'): msg += ' archive_id: %s' % job_dict.get('archive_id') @@ -144,7 +145,7 @@ class IngestBusListener(AbstractBusListener): msg += ' otdb_id: %s' % job_dict.get('otdb_id') if job_dict.get('percentage_done') != None: - msg += ' progress: %s%%' % (round(10.0*float(job_dict.get('percentage_done')))/10.0) + msg += ' progress: %s%%' % (round(10.0 * float(job_dict.get('percentage_done'))) / 10.0) if job_dict.get('total_bytes_transfered') != None: msg += ' transferred: %s' % humanreadablesize(job_dict.get('total_bytes_transfered')) @@ -158,111 +159,48 @@ class IngestBusListener(AbstractBusListener): if job_dict.get('message'): msg += ' message: %s' % job_dict.get('message') - logger.log(level, msg) + logger.log(logging.WARNING if 'Failed' in status else logging.INFO, msg) except Exception as e: logger.error(e) -class JobsMonitor(IngestBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, queue_name=None, broker=DEFAULT_BROKER, listen_for_all_jobs=True): - super(JobsMonitor, self).__init__(busname=busname, queue_name=queue_name, broker=broker) - self.__monitored_jobs = set() - self.__listen_for_all_jobs = listen_for_all_jobs - - def addJobToListenFor(self, job_id): - self.__monitored_jobs.add(job_id) - - def __isJobInAddedJobs(self, job_dict): - return job_dict.get('job_id') in self.__monitored_jobs - - def onJobStarted(self, job_dict): - if self.__listen_for_all_jobs or self.__isJobInAddedJobs(job_dict): - self._logJobNotification('started ', job_dict); - - def onJobFinished(self, job_dict): - if self.__listen_for_all_jobs or self.__isJobInAddedJobs(job_dict): - self._logJobNotification('finished', job_dict); - try: - self.__monitored_jobs.remove(job_dict.get('job_id')) - logger.info('%s jobs to go', len(self.__monitored_jobs)) - except KeyError: - pass - - def onJobFailed(self, job_dict): - if self.__listen_for_all_jobs or self.__isJobInAddedJobs(job_dict): - self._logJobNotification('failed ', job_dict, level=logging.WARN); - try: - self.__monitored_jobs.remove(job_dict.get('job_id')) - logger.info('%s jobs to go', len(self.__monitored_jobs)) - except KeyError: - pass - - def onJobRemoved(self, job_dict): - if self.__listen_for_all_jobs or self.__isJobInAddedJobs(job_dict): - self._logJobNotification('removed ', job_dict); - try: - self.__monitored_jobs.remove(job_dict.get('job_id')) - logger.info('%s jobs to go', len(self.__monitored_jobs)) - except KeyError: - pass - - def onJobTransferFailed(self, job_dict): - if self.__listen_for_all_jobs or self.__isJobInAddedJobs(job_dict): - self._logJobNotification('tranfer failed ', job_dict, level=logging.WARN); - - def onJobProgress(self, job_dict): - if self.__listen_for_all_jobs or self.__isJobInAddedJobs(job_dict): - self._logJobNotification('progress', job_dict); - - def onTaskProgress(self, task_dict): - if self.__listen_for_all_jobs or self.__isJobInAddedJobs(task_dict): - self._logJobNotification('task progress', task_dict); - - def onTaskFinished(self, task_dict): - if self.__listen_for_all_jobs or self.__isJobInAddedJobs(task_dict): - self._logJobNotification('task finised', task_dict); - - def onTransferServiceStatus(self, status_dict): - logger.info('%s tranferservice status: %s', status_dict.get('ingest_server', ''), status_dict.get('message')) - - def waitForAllJobsToFinish(self): - if self.__monitored_jobs: - logger.info('Listening for %s jobs', len(self.__monitored_jobs)) - - logger.info('You can press Ctrl-C any time to stop listening. The jobs will continue running in the background.') +class IngestEventMesssageBusListener(BusListener): + def __init__(self, + handler_type: IngestEventMessageHandler.__class__ = IngestEventMessageHandler, + handler_kwargs: dict = None, + exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, + num_threads: int=1): + """ + IngestBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. + Typical usage is to derive your own subclass from IngestBusListener and implement the specific on<SomeMessage> methods that you are interested in. + :param busname: valid Qpid address + :param broker: valid Qpid broker host + """ + if not issubclass(handler_type, IngestEventMessageHandler): + raise TypeError("handler_type should be a IngestEventMessageHandler subclass") - while True: - try: - if len(self.__monitored_jobs) == 0 and not self.__listen_for_all_jobs: - logger.info('All submitted jobs are done.') - break - time.sleep(1) - except KeyboardInterrupt: - raise + super(IngestEventMesssageBusListener, self).__init__(handler_type=handler_type, + handler_kwargs=handler_kwargs, + exchange=exchange, routing_key="%s.#" % INGEST_NOTIFICATION_PREFIX, + broker=broker, + num_threads=num_threads) def main(): - from lofar.common.util import waitForInterrupt - from lofar.messaging import setQpidLogLevel from optparse import OptionParser # Check the invocation arguments parser = OptionParser('%prog [options]', description='run the ingest job monitor') - parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') - parser.add_option('--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on which the ingest notifications are published, default: %default') + parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default') + parser.add_option('--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, help='Name of the exchange on which the ingest notifications are published, default: %default') (options, args) = parser.parse_args() - setQpidLogLevel(logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO, stream=sys.stdout) - with JobsMonitor(busname=options.busname, - broker=options.broker, - listen_for_all_jobs=True) as monitor: - monitor.waitForAllJobsToFinish() + with IngestEventMesssageBusListener(exchange=options.exchange, broker=options.broker): + waitForInterrupt() if __name__ == '__main__': main() - -__all__ = ["IngestBusListener", "JobsMonitor"] diff --git a/LTA/LTAIngest/LTAIngestClient/lib/rpc.py b/LTA/LTAIngest/LTAIngestClient/lib/rpc.py index 15e96394d38710b5771040b6dd8bbffd19d7d2d0..826d04c15fb1bee33da15e25bdfd968a609a4d92 100644 --- a/LTA/LTAIngest/LTAIngestClient/lib/rpc.py +++ b/LTA/LTAIngest/LTAIngestClient/lib/rpc.py @@ -1,34 +1,34 @@ #!/usr/bin/env python3 import logging -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import RPC, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME logger = logging.getLogger(__name__) -class IngestRPC(RPCWrapper): - def __init__(self, busname=DEFAULT_BUSNAME, - broker=DEFAULT_BROKER): - super(IngestRPC, self).__init__(busname=busname, servicename=DEFAULT_INGEST_SERVICENAME, broker=broker, timeout=18000) +class IngestRPC(RPC): + def __init__(self, service_name=DEFAULT_INGEST_SERVICENAME, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + super(IngestRPC, self).__init__(service_name=service_name, + exchange=exchange, + broker=broker, timeout=5*60) def removeExportJob(self, export_group_id): - return self.rpc('RemoveExportJob', export_group_id=export_group_id) + return self.execute('RemoveExportJob', export_group_id=export_group_id) def setExportJobPriority(self, export_group_id, priority): - return self.rpc('SetExportJobPriority', export_id=export_group_id, priority=priority) + return self.execute('SetExportJobPriority', export_id=export_group_id, priority=priority) def getStatusReport(self): - return self.rpc('GetStatusReport') + return self.execute('GetStatusReport') def getJobStatus(self, job_id): - return self.rpc('GetJobStatus', job_id=job_id)[job_id] + return self.execute('GetJobStatus', job_id=job_id)[job_id] def getReport(self, export_group_id): - return self.rpc('GetReport', job_group_id=export_group_id) + return self.execute('GetReport', job_group_id=export_group_id) def getExportIds(self): - return self.rpc('GetExportIds') + return self.execute('GetExportIds') if __name__ == '__main__': logging.basicConfig() diff --git a/LTA/LTAIngest/LTAIngestCommon/config.py b/LTA/LTAIngest/LTAIngestCommon/config.py index c9863e473e3601cd32367fe27e80bf2caac789da..ea382baf57224550aad88d189040385c88b1213f 100644 --- a/LTA/LTAIngest/LTAIngestCommon/config.py +++ b/LTA/LTAIngest/LTAIngestCommon/config.py @@ -1,18 +1,11 @@ from lofar.messaging import adaptNameToEnvironment -DEFAULT_INGEST_SERVICENAME = 'IngestService' - -DEFAULT_INGEST_JOBS_QUEUENAME = adaptNameToEnvironment('lofar.lta.ingest.jobs') -DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME = adaptNameToEnvironment('lofar.lta.ingest.jobs.for_transfer') DEFAULT_INGEST_PREFIX = 'LTA.Ingest' -DEFAULT_INGEST_NOTIFICATION_PREFIX = DEFAULT_INGEST_PREFIX + ".notification" +INGEST_NOTIFICATION_PREFIX = DEFAULT_INGEST_PREFIX + ".notification" +DEFAULT_INGEST_SERVICENAME = DEFAULT_INGEST_PREFIX + ".service" def hostnameToIp(hostname): - if 'lexar001' in hostname: - return '10.178.1.1' - if 'lexar002' in hostname: - return '10.178.1.2' if 'lexar003' in hostname: return '10.178.1.3' if 'lexar004' in hostname: diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 6d216db676c360cf27313143cfbd7a03a40a5f9a..560eaef0259f0c5bfcf4b9204fdf0f0a7e632f0d 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -19,16 +19,13 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener +from lofar.lta.ingest.client.ingestbuslistener import IngestEventMessageHandler, IngestEventMesssageBusListener from lofar.lta.ingest.common.job import * from lofar.messaging.config import DEFAULT_BUSNAME, DEFAULT_BROKER -from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_PREFIX +from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME, DEFAULT_INGEST_INCOMING_JOB_SUBJECT, DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT -from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, FINISHED_NOTIFICATION_BCC_MAILING_LIST, DEFAULT_JOB_PRIORITY, MAX_USED_BANDWITH_TO_START_NEW_JOBS -from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME -from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus -from lofar.messaging import create_queue, create_binding -from lofar.messaging.Service import Service, MessageHandlerInterface +from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, FINISHED_NOTIFICATION_BCC_MAILING_LIST, DEFAULT_JOB_PRIORITY +from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, Service, ServiceMessageHandler, AbstractMessageHandler, BusListener from lofar.common.util import humanreadablesize from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC @@ -48,58 +45,17 @@ from functools import reduce logger = logging.getLogger() -class IngestServiceMessageHandler(MessageHandlerInterface): - def __init__(self, job_manager, **kwargs): - super(IngestServiceMessageHandler, self).__init__(**kwargs) - - self._job_manager = job_manager - self.service2MethodMap = {'RemoveExportJob': self._job_manager.removeExportJob, - 'SetExportJobPriority': self._job_manager.setExportJobPriority, - 'GetStatusReport': self._job_manager.getStatusReportDict, - 'GetReport': self._job_manager.getReport, - 'GetExportIds': self._job_manager.getExportIds} - - class IngestJobManager: - def __init__(self, - busname=DEFAULT_BUSNAME, - incoming_job_queue_name=DEFAULT_INGEST_JOBS_QUEUENAME, - jobs_dir=JOBS_DIR, - max_num_retries=MAX_NR_OF_RETRIES, - broker=DEFAULT_BROKER, - **kwargs): - - self.__notification_listener = IngestBusListener(busname=busname, broker=broker) - self.__notification_listener.onJobStarted = self.onJobStarted - self.__notification_listener.onJobProgress = self.onJobProgress - self.__notification_listener.onJobTransferFailed = self.onJobTransferFailed - self.__notification_listener.onJobFinished = self.onJobFinished - + def __init__(self, exchange=DEFAULT_BUSNAME, jobs_dir=JOBS_DIR, max_num_retries=MAX_NR_OF_RETRIES, broker=DEFAULT_BROKER): self.__jobs_dir = jobs_dir self.__max_num_retries = max_num_retries self.__job_admin_dicts = {} self.__lock = RLock() self.__running = False - self.notification_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX - self.tobus = ToBus(address=busname, broker=broker) - - self.__momrpc = MoMQueryRPC(busname=busname, broker=broker, timeout=10) + self._tobus = ToBus(exchange=exchange, broker=broker) - self.service = Service(DEFAULT_INGEST_SERVICENAME, - IngestServiceMessageHandler, - busname=busname, - broker=broker, - use_service_methods=True, - num_threads=1, - handler_args={'job_manager': self}) - - # incoming jobs (from mom, eor, user ingest, etc) - # create it if not already existing - if create_queue(incoming_job_queue_name, durable=True, broker=broker): - create_binding(busname, incoming_job_queue_name, routing_key=DEFAULT_INGEST_INCOMING_JOB_SUBJECT, broker=broker) - - self.__incoming_job_queue = FromBus(incoming_job_queue_name, broker=broker) + self.__momrpc = MoMQueryRPC(exchange=exchange, broker=broker, timeout=10) self.__running_jobs_log_timestamp = datetime.utcnow() self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow() @@ -115,8 +71,8 @@ class IngestJobManager: logger.info('starting listening for new jobs and notifications') - # open queue connections... - with self.service, self.__incoming_job_queue, self.__notification_listener, self.tobus: + # open exchange connections... + with self._tobus: # ... and run the event loop, # produce jobs to managed job queue for ingest transfer services # receive new jobs @@ -126,33 +82,6 @@ class IngestJobManager: # produce next jobs self.produceNextJobsIfPossible() - # receive any jobs from mom/user_ingest/eor/etc - receive_start = datetime.utcnow() - msg = self.__incoming_job_queue.receive(timeout=1.0) - while msg: - logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg) - - if isinstance(msg, CommandMessage): - job = parseJobXml(msg.content) - if job and job.get('JobId'): - if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY): - job['priority'] = msg.priority - - logger.info("received job on queue %s: %s", self.__incoming_job_queue.address, job) - job_admin_dict = {'job': job, 'job_xml': msg.content} - self.addNewJob(job_admin_dict, check_done_dirs=True, add_old_jobs_from_disk=True) - else: - logger.warn("unexpected message type: %s", msg) - - if datetime.utcnow() - receive_start > timedelta(seconds=1): - # break out of receiving while loop early, - # so we can also produce some jobs - # receive more in next iteration - break - - # see if there are any more jobs to receive and process them, else jump out of while loop - msg = self.__incoming_job_queue.receive(timeout=0.01) - # check if producing jobs are actually making progress # maybe the ingest transfer server was shut down in the middle of a transer? # the ingest transfer server is stateless, so it won't restart that job itself (by design) @@ -170,6 +99,7 @@ class IngestJobManager: else: logger.info('%s jobs are running', len(producing_jads)) + time.sleep(1) except KeyboardInterrupt: break except Exception as e: @@ -550,12 +480,12 @@ class IngestJobManager: status = jobState2String(job_admin_dict['status']) status = status[status.index('(') + 1:status.index(')')] - msg = EventMessage(subject="%s.%s" % (self.notification_prefix, status), content=contentDict) + msg = EventMessage(subject="%s.%s" % (INGEST_NOTIFICATION_PREFIX, status), content=contentDict) # remove message from queue's when not picked up within 48 hours, # otherwise mom might endlessly reject messages if it cannot handle them msg.ttl = 48 * 3600 logger.info('Sending notification %s: %s' % (status, str(contentDict).replace('\n', ' '))) - self.tobus.send(msg) + self._tobus.send(msg) except Exception as e: logger.error(str(e)) @@ -772,8 +702,8 @@ class IngestJobManager: if os.path.exists(job_admin_dict.get('path')): msg = CommandMessage(content=job_admin_dict.get('job_xml'), subject=DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT) msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY) - self.tobus.send(msg) - logger.info('submitted job %s to %s at %s', job_admin_dict['job']['JobId'], self.tobus.address, self.tobus.broker) + self._tobus.send(msg) + logger.info('submitted job %s to %s at %s', job_admin_dict['job']['JobId'], self._tobus.exchange, self._tobus.broker) self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled) else: job_id = job_admin_dict['job']['JobId'] @@ -791,14 +721,12 @@ class IngestJobManager: time.sleep(0.1) def onJobStarted(self, job_notification_dict): - self.__notification_listener._logJobNotification('started ', job_notification_dict) self.updateJobStatus(job_notification_dict.get('job_id'), JobProducing, job_notification_dict.get('lta_site'), job_notification_dict.get('message')) def onJobFinished(self, job_notification_dict): - self.__notification_listener._logJobNotification('finished', job_notification_dict) job_id = job_notification_dict.get('job_id') # file_type might have changed to unspec for example @@ -826,14 +754,12 @@ class IngestJobManager: job_notification_dict.get('message')) def onJobTransferFailed(self, job_notification_dict): - self.__notification_listener._logJobNotification('transfer failed ', job_notification_dict) self.updateJobStatus(job_notification_dict.get('job_id'), JobTransferFailed, job_notification_dict.get('lta_site'), job_notification_dict.get('message')) def onJobProgress(self, job_notification_dict): - self.__notification_listener._logJobNotification('progress', job_notification_dict, level=logging.DEBUG) # touch job # producing jobs which are untouched for 5min are put back to JobToDo self.updateJobStatus(job_notification_dict.get('job_id'), @@ -1191,6 +1117,43 @@ Total Files: %(total)i logger.warning('no email recipients for sending notification email for export job %s', job_group_id) +class IngestServiceMessageHandler(ServiceMessageHandler): + def __init__(self, job_manager: IngestJobManager): + super(IngestServiceMessageHandler, self).__init__() + # register some of the job_manager's methods for the service + self.register_service_method('RemoveExportJob', job_manager.removeExportJob) + self.register_service_method('SetExportJobPriority', job_manager.setExportJobPriority) + self.register_service_method('GetStatusReport', job_manager.getStatusReportDict) + self.register_service_method('GetReport', job_manager.getReport) + self.register_service_method('GetExportIds', job_manager.getExportIds) + + +class IngestIncomingJobsHandler(AbstractMessageHandler): + def __init__(self, job_manager: IngestJobManager): + self._job_manager = job_manager + super(IngestIncomingJobsHandler, self).__init__() + + def handle_message(self, msg: LofarMessage): + if not isinstance(msg, CommandMessage): + raise ValueError("%s: Ignoring non-CommandMessage: %s" % (self.__class__.__name__, msg)) + + job = parseJobXml(msg.content) + if job and job.get('JobId'): + if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY): + job['priority'] = msg.priority + + logger.info("received job from bus: %s", job) + job_admin_dict = {'job': job, 'job_xml': msg.content} + self._job_manager.addNewJob(job_admin_dict, check_done_dirs=True, add_old_jobs_from_disk=True) + +class IngestEventMessageHandlerForJobManager(IngestEventMessageHandler): + def __init__(self, job_manager: IngestJobManager): + self.onJobStarted = job_manager.onJobStarted + self.onJobProgress = job_manager.onJobProgress + self.onJobTransferFailed = job_manager.onJobTransferFailed + self.onJobFinished = job_manager.onJobFinished + super(IngestEventMessageHandlerForJobManager, self).__init__() + def main(): from optparse import OptionParser @@ -1207,16 +1170,6 @@ def main(): parser.add_option("--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus on the services listen. [default: %default]") - parser.add_option('--incoming_job_queue_name', - dest='incoming_job_queue_name', - type='string', - default=DEFAULT_INGEST_JOBS_QUEUENAME, - help='name of the incoming job queue (jobs comming from mom/useringest/etc), default: %default') - parser.add_option('--jobs_for_transfer_queue_name', - dest='jobs_for_transfer_queue_name', - type='string', - default=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, - help='name of the managed job queue (which jobs are handled by the ingestservices), default: %default') (options, args) = parser.parse_args() logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', @@ -1226,16 +1179,25 @@ def main(): logger.info('Started IngestJobManager') logger.info('*****************************************') - manager = IngestJobManager(busname=options.busname, - incoming_job_queue_name=options.incoming_job_queue_name, - jobs_for_transfer_queue_name=options.jobs_for_transfer_queue_name, + manager = IngestJobManager(exchange=options.busname, jobs_dir=options.jobs_dir, max_num_retries=options.max_num_retries, broker=options.broker) - manager.run() + incoming_jobs_listener = BusListener(IngestIncomingJobsHandler, {'job_manager': manager}, + exchange=options.busname, routing_key="%s.#" % DEFAULT_INGEST_INCOMING_JOB_SUBJECT) + + ingest_event_listener = IngestEventMesssageBusListener(IngestEventMessageHandlerForJobManager, + {'job_manager': manager}, + exchange=options.busname) + + ingest_service = Service(DEFAULT_INGEST_SERVICENAME, IngestServiceMessageHandler, {'job_manager': manager}, + exchange=options.busname, broker=options.broker, num_threads=1) + + with incoming_jobs_listener, ingest_event_listener, ingest_service: + manager.run() if __name__ == '__main__': main() -__all__ = ["IngestJobManager"] +__all__ = ["IngestJobManager", "main"] diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py index f3d32c94168a19d2e80bab04e4d8f58cbf033470..683691e7d2fc2070fa16097108fff6b7f58ba50d 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py @@ -19,22 +19,24 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener +from lofar.lta.ingest.client.ingestbuslistener import IngestEventMessageHandler, IngestEventMesssageBusListener from lofar.lta.ingest.client.rpc import IngestRPC from lofar.lta.ingest.common.job import * -from lofar.lta.ingest.server.config import DEFAULT_INGEST_INCOMING_JOB_SUBJECT, DEFAULT_INGEST_NOTIFICATION_PREFIX +from lofar.lta.ingest.server.config import DEFAULT_INGEST_INCOMING_JOB_SUBJECT, INGEST_NOTIFICATION_PREFIX from lofar.lta.ingest.server.config import DEFAULT_MOM_XMLRPC_HOST, DEFAULT_MOM_XMLRPC_PORT from lofar.lta.ingest.server.config import MAX_NR_OF_RETRIES from lofar.lta.ingest.server.momclient import * -from lofar.messaging import CommandMessage, EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging.messages import CommandMessage, EventMessage from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.common.datetimeutils import totalSeconds +from lofar.common.dbcredentials import DBCredentials from lofar.common.util import waitForInterrupt -import sys -import time -from datetime import datetime, timedelta from threading import Thread +import time +from datetime import datetime +from typing import Union from http.server import HTTPServer import pysimplesoap as soap @@ -42,104 +44,222 @@ import pysimplesoap as soap import logging logger = logging.getLogger() -class IngestBusListenerForMomAdapter(IngestBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, momrpc=None): - self._busname = busname - self._broker = broker - self._momrpc = momrpc +class IngestEventMessageHandlerForMomAdapter(IngestEventMessageHandler): + def __init__(self, mom_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + self.exchange = exchange + self.broker = broker + self._momrpc = MoMQueryRPC(exchange=exchange, broker=broker, timeout=180) + self._mom_client = MoMClient(mom_creds.user, mom_creds.password) + self._tobus = ToBus(exchange=exchange, broker=broker) self._removed_export_ids = set() # keep track of which export_id's were removed, so we don't have to remove them again - super(IngestBusListenerForMomAdapter, self).__init__(busname=busname, broker=broker) + super(IngestEventMessageHandler, self).__init__() + + def start_handling(self): + self._momrpc.open() + self._mom_client.login() + self._tobus.open() - def _handleMessage(self, msg): + def stop_handling(self): + self._momrpc.close() + self._mom_client.logout() + self._tobus.close() + + def handle_message(self, msg: EventMessage) -> bool: try: - super(IngestBusListenerForMomAdapter, self)._handleMessage(msg) + # try 'normal' handling of msg, should result in normal calls to onJob* methods + # but MoM (via the momrpc) is notorious in properly handling the messages.... + if super(IngestEventMessageHandlerForMomAdapter, self).handle_message(msg): + return True except Exception as e: - if msg and msg.content: - if self._momrpc and isinstance(msg.content, dict): - if msg.content.get('type','').lower() == 'mom': - export_id = msg.content.get('export_id') + # ... so handle the exceptions... + logger.warning(e) - if export_id is None: - job_id = msg.content.get('job_id') - export_id = int(job_id.split('_')[1]) + # ... and try to deal with MoM's quirks. + if self._remove_unknown_export_job_if_needed(msg): + return True - if export_id and export_id not in self._momrpc.getObjectDetails(export_id): - if export_id not in self._removed_export_ids: - logger.warn('Export job %s cannot be found (anymore) in mom. Removing export job from ingest queue', export_id) + if self._resubmit_message_if_applicable(msg): + return True - # keep track of which export_id's were removed, so we don't have to remove them again - # this keeps stuff flowing faster - self._removed_export_ids.add(export_id) + return False - with IngestRPC(broker=self._broker) as ingest_rpc: - ingest_rpc.removeExportJob(export_id) + def onJobStarted(self, job_dict): + self._update_mom_status_if_applicable(job_dict, JobProducing) - return + def onJobFinished(self, job_dict): + self._update_mom_status_if_applicable(job_dict, JobProduced) + self._checkTaskFullyIngested(job_dict) - with ToBus(self._busname, broker=self._broker) as tobus: - retry_count = msg.content.get('retry_count', 0) + def onJobFailed(self, job_dict): + self._update_mom_status_if_applicable(job_dict, JobFailed) - if retry_count < min(MAX_NR_OF_RETRIES, 127): #mom can't handle more than 127 status updates... - retry_count += 1 - try: - retry_timestamp = msg.content.get('retry_timestamp', datetime.utcnow()) - ago = totalSeconds(datetime.utcnow()-retry_timestamp) - time.sleep(max(0, 2*retry_count-ago)) #wait longer between each retry, other messages can be processed in between - except Exception as e: - #just continue - logger.warning(e) + def onJobRemoved(self, job_dict): + job_dict['message'] = 'removed from ingest queue before transfer' + self._update_mom_status_if_applicable(job_dict, JobFailed) - #set/update retry values - msg.content['retry_count'] = retry_count - msg.content['retry_timestamp'] = datetime.utcnow() + def _update_mom_status_if_applicable(self, job_dict, status): + if job_dict.get('type','').lower() == 'mom': + if not self._mom_client.setStatus(job_dict.get('job_id'), status, job_dict.get('message')): + raise Exception('Could not update status in MoM to %s for %s' % (jobState2String(status), job_dict.get('job_id'))) - #resubmit - new_msg = EventMessage(subject=msg.subject, content=msg.content) - new_msg.ttl = 48*3600 #remove message from queue's when not picked up within 48 hours - logger.info('resubmitting unhandled message to back of queue %s: %s %s', self._busname, new_msg.subject, str(new_msg.content).replace('\n', ' ')) - tobus.send(new_msg) + def _checkTaskFullyIngested(self, job_dict): + try: + if job_dict.get('type','').lower() != 'mom': + return + if 'otdb_id' not in job_dict: + return + otdb_id = int(job_dict['otdb_id']) + mom2id = self._momrpc.getMoMIdsForOTDBIds(otdb_id).get(otdb_id) -class IngestMomAdapter: - def __init__(self, momClient = None, - busname = DEFAULT_BUSNAME, - mom_xmlrpc_host=DEFAULT_MOM_XMLRPC_HOST, - mom_xmlrpc_port=DEFAULT_MOM_XMLRPC_PORT, - broker=None, - **kwargs): - self._opened = False + if mom2id is None: + return - self.__momClient = MoMClient() if momClient is None else momClient + job_dict['mom2id'] = mom2id - self.__momrpc = MoMQueryRPC(busname=busname, broker=broker, timeout=180) + dps = self._momrpc.getDataProducts(mom2id).get(mom2id) - self.__ingest_notification_listener = IngestBusListenerForMomAdapter(busname=busname, broker=broker, momrpc=self.__momrpc, **kwargs) - self.__ingest_notification_listener.onJobStarted = self.onJobStarted - self.__ingest_notification_listener.onJobFinished = self.onJobFinished - self.__ingest_notification_listener.onJobFailed = self.onJobFailed - self.__ingest_notification_listener.onJobRemoved = self.onJobRemoved + if len(dps) <= 0: + return - self.__notification_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX+"." - self.__tobus = ToBus(address=busname, broker=broker) + ingested_dps = [dp for dp in dps if dp['status'] == 'ingested'] + #reuse part of job_dict contents for new notification message + job_dict2 = {} + for key in ['job_id', 'export_id', 'project', 'type', 'ingest_server', 'otdb_id', 'lta_site']: + if key in job_dict: + job_dict2[key] = job_dict[key] + + if 'srm_url' in job_dict: + try: + # try to parse the srm_url and get the observation dir name from the dataproduct srmurl + # example url: srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/lc8_029/652884/L652884_SAP000_B000_P001_bf_e619e5da.tar + # should become: srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/lc8_029/652884 + srm_url = job_dict['srm_url'] + srm_dir_url = '/'.join(srm_url.split('/')[:-2]) + job_dict2['srm_url'] = srm_dir_url + except Exception as e: + logger.error("could not derive srm_dir_url from %s. error=%s", srm_url, e) + + message = 'Ingested %s/%s (%.1f%%) dataproducts for otdb_id %s mom2id %s' % (len(ingested_dps), + len(dps), + 100.0*len(ingested_dps)/len(dps), + otdb_id, + mom2id) + logger.info(message) + + job_dict2['message'] = message + job_dict2['percentage_done'] = 100.0*len(ingested_dps)/len(dps) + + self._send_notification('TaskProgress', job_dict2) + + if len(dps) == len(ingested_dps): + job_dict2['message'] = 'All dataproducts of task with otdb_id=%s mom2id=%s were ingested' % (otdb_id, mom2id) + self._send_notification('TaskFinished', job_dict2) + except Exception as e: + logger.error(str(e)) + return False + + return True + + def _send_notification(self, subject, content_dict): try: - url = 'http://%s:%s' % (mom_xmlrpc_host, mom_xmlrpc_port) - logger.info('Setting up MoM SOAP server on %s', url) - # self._server = SOAPpy.SOAPServer((mom_xmlrpc_host, mom_xmlrpc_port)) - # self._server.registerFunction(self.onXmlRPCJobReceived, 'urn:pipeline.export', 'new_job') - dispatcher = soap.server.SoapDispatcher(name="mom_xmlrpc", location=url, action=url, namespace="urn:pipeline.export", debug=True) - dispatcher.register_function('new_job', - self.onXmlRPCJobReceived, - args={'fileName':str , 'fileContent': str}, - returns={'new_job_result': bool}) - - self._server = HTTPServer((mom_xmlrpc_host, mom_xmlrpc_port), soap.server.SOAPHandler) - self._server.dispatcher = dispatcher - - except IOError as e: - logger.error('Could not listen on %s. %s', url, e) - sys.exit(-1) + msg = EventMessage(subject="%s.%s" % (INGEST_NOTIFICATION_PREFIX, subject), content=content_dict) + msg.ttl = 48*3600 #remove message from queue's when not picked up within 48 hours + logger.info('Sending notification %s: %s' % (subject, str(content_dict).replace('\n', ' '))) + self._tobus.send(msg) + except Exception as e: + logger.error(str(e)) + + def _remove_unknown_export_job_if_needed(self, msg: EventMessage) -> bool: + if msg and msg.content: + if self._momrpc and isinstance(msg.content, dict): + if msg.content.get('type', '').lower() == 'mom': + export_id = msg.content.get('export_id') + + if export_id is None: + job_id = msg.content.get('job_id') + export_id = int(job_id.split('_')[1]) + + if export_id and export_id not in self._momrpc.getObjectDetails(export_id): + if export_id not in self._removed_export_ids: + logger.warning( + 'Export job %s cannot be found (anymore) in mom. Removing export job from ingest queue', + export_id) + + # keep track of which export_id's were removed, so we don't have to remove them again + # this keeps stuff flowing faster + self._removed_export_ids.add(export_id) + + with IngestRPC(broker=self.broker) as ingest_rpc: + ingest_rpc.removeExportJob(export_id) + return True + return False + + def _resubmit_message_if_applicable(self, msg: EventMessage) -> bool: + try: + if msg and msg.content: + retry_count = msg.content.get('retry_count', 0) + + if retry_count < min(MAX_NR_OF_RETRIES, 127): # mom can't handle more than 127 status updates... + retry_count += 1 + try: + retry_timestamp = msg.content.get('retry_timestamp', datetime.utcnow()) + ago = totalSeconds(datetime.utcnow() - retry_timestamp) + time.sleep(max(0, + 2 * retry_count - ago)) # wait longer between each retry, other messages can be processed in between + except Exception as e: + # just continue + logger.warning(e) + + # set/update retry values + msg.content['retry_count'] = retry_count + msg.content['retry_timestamp'] = datetime.utcnow() + + # resubmit + new_msg = EventMessage(subject=msg.subject, content=msg.content) + new_msg.ttl = 48 * 3600 # remove message from queue's when not picked up within 48 hours + logger.info('resubmitting unhandled message to back of queue %s: %s %s', self.exchange, new_msg.subject, + str(new_msg.content).replace('\n', ' ')) + self._tobus.send(new_msg) + return True + except Exception as e: + logger.error("_resubmit_message_if_applicable error: %s", e) + + return False + +class IngestBusListenerForMomAdapter(IngestEventMesssageBusListener): + def __init__(self, mom_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + super(IngestBusListenerForMomAdapter, self).__init__(handler_type=IngestEventMessageHandlerForMomAdapter, + handler_kwargs={'mom_creds': mom_creds, + 'exchange': exchange, + 'broker': broker}, + exchange=exchange, + broker=broker) + +class MoMXMLRPCHandler: + def __init__(self, + exchange: str = DEFAULT_BUSNAME, + broker: str =DEFAULT_BROKER, + mom_xmlrpc_host: str =DEFAULT_MOM_XMLRPC_HOST, + mom_xmlrpc_port: Union[str, int] =DEFAULT_MOM_XMLRPC_PORT): + self._tobus = ToBus(exchange=exchange, broker=broker) + + url = 'http://%s:%s' % (mom_xmlrpc_host, mom_xmlrpc_port) + logger.info('Setting up MoM SOAP server on %s', url) + dispatcher = soap.server.SoapDispatcher(name="mom_xmlrpc", + location=url, + action=url, + namespace="urn:pipeline.export") + dispatcher.register_function('new_job', + self.onXmlRPCJobReceived, + args={'fileName':str , 'fileContent': str}, + returns={'new_job_result': bool}) + + self._server = HTTPServer((mom_xmlrpc_host, mom_xmlrpc_port), soap.server.SOAPHandler) + self._server.dispatcher = dispatcher + def server_address(self): """get the xml-rpc server address as host,port tuple""" @@ -157,53 +277,28 @@ class IngestMomAdapter: self.close() def open(self): - if self._opened: + if self._tobus.is_connected(): return - self.__tobus.open() - - self.__momClient.login() + self._tobus.open() #run the soap server in a seperate thread - logger.info('Starting MoM SOAP server on %s:%s', self._server.server_address[0], self._server.server_address[1]) + logger.info('Starting MoM SOAP server on %s', self.server_url()) self._server_thread = Thread(target=self._server.serve_forever) self._server_thread.daemon = True self._server_thread.start() - self._opened = True - def close(self): + if not self._tobus.is_connected(): + return + # shutdown soap server and wait for its thread to complete - logger.info('Shutting down MoM SOAP server on %s:%s', self._server.server_address[0], self._server.server_address[1]) + logger.info('Shutting down MoM SOAP server on %s', self.server_url()) self._server.shutdown() self._server_thread.join() logger.info('MoM SOAP server stopped') - self.__momClient.logout() - - self.__tobus.close() - self._opened = False - - def run(self): - with self: - #run event loop forever until KeyboardInterrupt - running = True - while running: - #open __ingest_notification_listener, closes on leaving scope - with self.__ingest_notification_listener: - #run for 15 min, doing nothing except listing for event and KeyboardInterrupt - for i in range(15*60): - try: - time.sleep(1) - except KeyboardInterrupt: - logger.info("Ctrl-C pressed..") - running = False - break - #15min loop ended, or KeyboardInterrupt - #leave scope of __ingest_notification_listener - #so it reconnects in next iteration of while loop - #and it can (re)process any messages which were not ack'ed - #because mom might have been unreachable + self._tobus.close() def onXmlRPCJobReceived(self, fileName, fileContent): try: @@ -213,17 +308,18 @@ class IngestMomAdapter: if job: logger.info("Received job on MoM SOAP server: %s", job) - logger.debug('submitting job %s to queue %s at %s', job['JobId'], self.__tobus.address, self.__tobus.broker) msg = CommandMessage(content=fileContent, subject=DEFAULT_INGEST_INCOMING_JOB_SUBJECT) + logger.debug('submitting job %s to exchange %s with subject %s at broker %s', + job['JobId'], self._tobus.exchange, msg.subject, self._tobus.broker) try: msg.priority = int(job.get('priority', 4)) except Exception as e: logger.error("Cannot set priority in job message: %s", e) - self.__tobus.send(msg) + self._tobus.send(msg) logger.info('submitted job %s with subject=%s to %s at %s', job['JobId'], msg.subject, - self.__tobus.address, self.__tobus.broker) + self._tobus.exchange, self._tobus.broker) else: logger.info("Could not parse message as job: %s", fileContent) except Exception as e: @@ -232,108 +328,38 @@ class IngestMomAdapter: return True - def _update_mom_status_if_applicable(self, job_dict, status): - if job_dict.get('type','').lower() == 'mom': - if not self.__momClient.setStatus(job_dict.get('job_id'), status, job_dict.get('message')): - raise Exception('Could not update status in MoM to %s for %s' % (jobState2String(status), job_dict.get('job_id'))) - - def onJobStarted(self, job_dict): - self.__ingest_notification_listener._logJobNotification('started ', job_dict); - self._update_mom_status_if_applicable(job_dict, JobProducing) - - def onJobFinished(self, job_dict): - self.__ingest_notification_listener._logJobNotification('finished', job_dict); - self._update_mom_status_if_applicable(job_dict, JobProduced) - self.__checkTaskFullyIngested(job_dict) - - def onJobFailed(self, job_dict): - self.__ingest_notification_listener._logJobNotification('failed ', job_dict, level=logging.WARNING); - self._update_mom_status_if_applicable(job_dict, JobFailed) - - def onJobRemoved(self, job_dict): - self.__ingest_notification_listener._logJobNotification('removed ', job_dict); - - job_dict['message'] = 'removed from ingest queue before transfer' - self._update_mom_status_if_applicable(job_dict, JobFailed) - - def __checkTaskFullyIngested(self, job_dict): - try: - if job_dict.get('type','').lower() != 'mom': - return - if 'otdb_id' not in job_dict: - return - - otdb_id = int(job_dict['otdb_id']) - mom2id = self.__momrpc.getMoMIdsForOTDBIds(otdb_id).get(otdb_id) - - if mom2id is None: - return - - job_dict['mom2id'] = mom2id - - dps = self.__momrpc.getDataProducts(mom2id).get(mom2id) - - if len(dps) <= 0: - return - - ingested_dps = [dp for dp in dps if dp['status'] == 'ingested'] - - #reuse part of job_dict contents for new notification message - job_dict2 = {} - for key in ['job_id', 'export_id', 'project', 'type', 'ingest_server', 'otdb_id', 'lta_site']: - if key in job_dict: - job_dict2[key] = job_dict[key] - - if 'srm_url' in job_dict: - try: - # try to parse the srm_url and get the observation dir name from the dataproduct srmurl - # example url: srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/lc8_029/652884/L652884_SAP000_B000_P001_bf_e619e5da.tar - # should become: srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/lc8_029/652884 - srm_url = job_dict['srm_url'] - srm_dir_url = '/'.join(srm_url.split('/')[:-2]) - job_dict2['srm_url'] = srm_dir_url - except Exception as e: - logger.error("could not derive srm_dir_url from %s. error=%s", srm_url, e) - - message = 'Ingested %s/%s (%.1f%%) dataproducts for otdb_id %s mom2id %s' % (len(ingested_dps), - len(dps), - 100.0*len(ingested_dps)/len(dps), - otdb_id, - mom2id) - logger.info(message) +class IngestMomAdapter: + def __init__(self, mom_creds: DBCredentials, + exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, + mom_xmlrpc_host: str = DEFAULT_MOM_XMLRPC_HOST, + mom_xmlrpc_port: Union[str, int] = DEFAULT_MOM_XMLRPC_PORT): + self.ingest2mom_adapter = IngestBusListenerForMomAdapter(mom_creds, exchange, broker) + self.mom2ingest_adapter = MoMXMLRPCHandler(exchange, broker, mom_xmlrpc_host, mom_xmlrpc_port) - job_dict2['message'] = message - job_dict2['percentage_done'] = 100.0*len(ingested_dps)/len(dps) + def open(self): + self.ingest2mom_adapter.start_listening() + self.mom2ingest_adapter.open() - self.__sendNotification('TaskProgress', job_dict2) + def close(self): + self.ingest2mom_adapter.stop_listening() + self.mom2ingest_adapter.close() - if len(dps) == len(ingested_dps): - job_dict2['message'] = 'All dataproducts of task with otdb_id=%s mom2id=%s were ingested' % (otdb_id, mom2id) - self.__sendNotification('TaskFinished', job_dict2) - except Exception as e: - logger.error(str(e)) + def __enter__(self): + self.open() + return self - def __sendNotification(self, subject, content_dict): - try: - msg = EventMessage(subject=self.__notification_prefix + subject, content=content_dict) - msg.ttl = 48*3600 #remove message from queue's when not picked up within 48 hours - logger.info('Sending notification %s: %s' % (subject, str(content_dict).replace('\n', ' '))) - self.__tobus.send(msg) - except Exception as e: - logger.error(str(e)) + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() def main(): - from lofar.common.util import waitForInterrupt - from lofar.messaging import setQpidLogLevel - from lofar.common import dbcredentials from optparse import OptionParser # Check the invocation arguments parser = OptionParser('%prog [options]', description='run ingest mom adapter, which receives jobs from MoM, and updates ingest statuses to MoM') parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') - parser.add_option('--busname', dest='busname', type='string', + parser.add_option('--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, help='Name of the bus on which the services listen, default: %default') parser.add_option("-m", "--mom_credentials", dest="mom_credentials", @@ -350,29 +376,34 @@ def main(): help="port on which the xmlrpc server listens for (mom) jobs [default=%default]") (options, args) = parser.parse_args() - setQpidLogLevel(logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - momcreds = dbcredentials.DBCredentials().get(options.mom_credentials) - logger.info("Using username \'%s\' for MoM web client" % momcreds.user) + mom_creds = DBCredentials().get(options.mom_credentials) + logger.info("Using username \'%s\' for MoM web client" % mom_creds.user) + + logger.info('*****************************************') logger.info('Starting IngestMomAdapter...') + logger.info('*****************************************') + + with IngestMomAdapter(mom_creds, options.exchange, options.broker, options.host, options.port) as adapter: + # create a job... + job_xml = createJobXml('project', 0, 1, 'dp_id', 2, '/tmp/path/to/dataproduct') + + from pysimplesoap.client import SoapClient + + # submit the job like MoM would via xml-rpc + soap_client = SoapClient(location=adapter.mom2ingest_adapter.server_url(), namespace="urn:pipeline.export") + soap_client.new_job(fileName='my_job_file.xml', fileContent=job_xml) + + waitForInterrupt() + - with MoMClient(momcreds.user, momcreds.password) as momClient: - adapter = IngestMomAdapter(momClient=momClient, - busname=options.busname, - mom_xmlrpc_host=options.host, - mom_xmlrpc_port=options.port, - broker=options.broker) - logger.info('*****************************************') - logger.info('Started IngestMomAdapter') - logger.info('*****************************************') - adapter.run() logger.info('Stopped IngestMomAdapter') if __name__ == '__main__': main() -__all__ = ["IngestMomAdapter"] + diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py index 567f69545e8f73cbf580620b77f4458398d50c1d..c0f723a7c63c25bad66a6609f03d32d242665e8f 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py @@ -17,43 +17,25 @@ logger = logging.getLogger(__name__) from lofar.messaging.messagebus import FromBus, ToBus, TemporaryExchange, TemporaryQueue from lofar.messaging.messages import CommandMessage, EventMessage -import lofar.messaging.config as msg_config import lofar.lta.ingest.server.config as ingest_config testname = 'TEST_INGESTJOBMANAGEMENTSERVER_%s' % uuid.uuid1().hex[:6] -with TemporaryExchange(testname+"_bus") as tmp_bus, \ - TemporaryQueue(testname + "_incoming_jobs_queue", - exchange_name=tmp_bus.address, - routing_key=ingest_config.DEFAULT_INGEST_INCOMING_JOB_SUBJECT) as tmp_incoming_jobs_queue, \ - TemporaryQueue(testname + "_jobs_for_transfer_queue", - exchange_name=tmp_bus.address, - routing_key=ingest_config.DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT) as tmp_jobs_for_transfer_queue: +with TemporaryExchange(testname+"_bus") as tmp_bus: logger.info(tmp_bus.address) - # time.sleep(5) - - #overwrite some defaults in the config to run this as an isolated test - msg_config.DEFAULT_BROKER = "localhost" - msg_config.DEFAULT_BUSNAME = tmp_bus.address - ingest_config.DEFAULT_INGEST_JOBS_QUEUENAME = tmp_incoming_jobs_queue.address - ingest_config.DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME = tmp_jobs_for_transfer_queue.address ingest_config.JOBS_DIR = os.path.join(tempfile.gettempdir(), testname, 'jobs') ingest_config.FINISHED_NOTIFICATION_MAILING_LIST = '' ingest_config.MAX_NR_OF_RETRIES = 3 + from lofar.lta.ingest.server.ingestjobmanagementserver import IngestJobManager from lofar.lta.ingest.common.job import * - from lofar.lta.ingest.client.ingestbuslistener import JobsMonitor - connection = None - broker = None manager = None manager_thread = None exit_code = 0 try: - from lofar.lta.ingest.server.ingestjobmanagementserver import IngestJobManager - # create some 'to do' job files for group 999999999 for i in range(3): testfile_path = os.path.join(ingest_config.JOBS_DIR, 'to_do', 'testjob_%s.xml' % i) @@ -83,7 +65,7 @@ with TemporaryExchange(testname+"_bus") as tmp_bus, \ content['percentage_done'] = percentage_done if export_id: content['export_id'] = export_id - event_msg = EventMessage(subject="%s.%s" % (ingest_config.DEFAULT_INGEST_NOTIFICATION_PREFIX, event), + event_msg = EventMessage(subject="%s.%s" % (ingest_config.INGEST_NOTIFICATION_PREFIX, event), content=content) logger.info('sending test event message on %s subject=%s content=%s', test_notifier.address, event_msg.subject, event_msg.content) @@ -117,280 +99,275 @@ with TemporaryExchange(testname+"_bus") as tmp_bus, \ manager_thread.daemon = True manager_thread.start() - # setup a special tmp queue for the JobsMonitor to listen for ingest-job-notifications - # do not bind it to the exchange here in the test, but allow the JobsMonitor constructor to create the - # binding to the test exchange. - with TemporaryQueue(testname + "_notifications_for_test_jobsmonitor") as notification_queue: - with JobsMonitor(busname=tmp_bus.address, queue_name=notification_queue.address) as monitor: - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished before any job was started' - assert manager.nrOfJobs() == 3, 'expected 3 jobs in total before any job was started' - - #mimick receiving and transferring of jobs - #check the status of the manager for correctness - job1 = receiveJobForTransfer() - logger.info("jobs: %s", job1) - - assert job1['JobId'] == 'A_999999999_777777777_L888888888_SB000_uv.MS', 'unexpected job %s' % job1['JobId'] - sendNotification('JobStarted', job1['JobId'], export_id=job1['job_group_id']) - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job was started' - - sendNotification('JobProgress', job1['JobId'], percentage_done=25, export_id=job1['job_group_id']) - assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job made progress' - - #just finish normally - sendNotification('JobFinished', job1['JobId'], export_id=job1['job_group_id']) - - time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - - - #2nd job will fail one transfer before completing - job2 = receiveJobForTransfer() - assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] - sendNotification('JobStarted', job2['JobId'], export_id=job2['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[0] == 1' - - # let job2 fail - sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong', export_id=job2['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - - #the 2nd job failed, so did not finish, and will be retried later - #the next received job should be the 3rd job - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - - - #3rd job will fail all the time - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - - - #receive again, 2nd and 3rd job are going to be retried - #this should be the 2nd job - job2 = receiveJobForTransfer() - assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] - sendNotification('JobStarted', job2['JobId'], export_id=job2['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #keep job2 running while we process job3 - #check report - report = manager.getStatusReportDict()[999999999] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - - - #only 3rd job is unfinished, and job2 is running - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - - #3rd job will fail again - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 1 == report['jobs']['finished'], 'expected 1 job finished' - assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - - - # in the mean time, finish job2 normally - sendNotification('JobFinished', job2['JobId'], export_id=job2['job_group_id']) - - #one job to go - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' - assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' - - - #still 3rd job is still unfinished, final retry - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' - - #check report - report = manager.getStatusReportDict()[999999999] - assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' - assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' - assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' - assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' - assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' - assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' - assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' - assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' - assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' - assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' - assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' - assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' - assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' - assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' - assert 1 == report['series']['running_jobs']['values'][10], 'expected running jobs series[10] == 1' - - #3rd job will fail again - sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) - - #3rd job should have failed after 3 retries - #no more jobs to go - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' - - #there should be no more reports, cause the job group 999999999 is finished as a whole - #and is removed from the manager at this point - reports = manager.getStatusReportDict() - assert 0 == len(reports), 'expected 0 reports' - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - - jobgroup_999999999_failed_dir = os.path.join(ingest_config.JOBS_DIR, 'failed', 'MoM_999999999') - failed_jobgroup_999999999_files = [os.path.join(jobgroup_999999999_failed_dir, f) for f in - os.listdir(jobgroup_999999999_failed_dir) - if fnmatch.fnmatch(f, '*_999999999_*.xml*')] - - assert 1 == len(failed_jobgroup_999999999_files), '1 and only 1 failed file expected for job_group 999999999' - for file in failed_jobgroup_999999999_files: - sendJobFileToManager(file) - - time.sleep(1.0) - - assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 jobs unfinished' - assert manager.nrOfJobs() == 3, 'expected 3 jobs' #1 to_do/scheduled, 2 done - assert len(manager.getJobAdminDicts(status=JobToDo) + manager.getJobAdminDicts(status=JobScheduled)) == 1, 'expected 1 todo/scheduled jobs' - assert len(manager.getJobAdminDicts(status=JobProduced)) == 2, 'expected 2 done jobs' - - # this time, start and finish job3 normally - job3 = receiveJobForTransfer() - assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] - sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) - sendNotification('JobFinished', job3['JobId'], export_id=job3['job_group_id']) - - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - - #there should be no more reports, cause the job group 999999999 is finished as a whole - #and is removed from the manager at this point - reports = manager.getStatusReportDict() - assert 0 == len(reports), 'expected 0 reports' - assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' - time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout - - manager.quit() - manager_thread.join() + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished before any job was started' + assert manager.nrOfJobs() == 3, 'expected 3 jobs in total before any job was started' + + #mimick receiving and transferring of jobs + #check the status of the manager for correctness + job1 = receiveJobForTransfer() + logger.info("jobs: %s", job1) + + assert job1['JobId'] == 'A_999999999_777777777_L888888888_SB000_uv.MS', 'unexpected job %s' % job1['JobId'] + sendNotification('JobStarted', job1['JobId'], export_id=job1['job_group_id']) + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job was started' + + sendNotification('JobProgress', job1['JobId'], percentage_done=25, export_id=job1['job_group_id']) + assert manager.nrOfUnfinishedJobs() == 3, 'expected 3 jobs unfinished after 1st job made progress' + + #just finish normally + sendNotification('JobFinished', job1['JobId'], export_id=job1['job_group_id']) + + time.sleep(1.0) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + + + #2nd job will fail one transfer before completing + job2 = receiveJobForTransfer() + assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] + sendNotification('JobStarted', job2['JobId'], export_id=job2['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[0] == 1' + + # let job2 fail + sendNotification('JobTransferFailed', job2['JobId'], message='something went wrong', export_id=job2['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + + #the 2nd job failed, so did not finish, and will be retried later + #the next received job should be the 3rd job + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + + + #3rd job will fail all the time + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + + + #receive again, 2nd and 3rd job are going to be retried + #this should be the 2nd job + job2 = receiveJobForTransfer() + assert job2['JobId'] == 'A_999999999_777777778_L888888888_SB001_uv.MS', 'unexpected job %s' % job2['JobId'] + sendNotification('JobStarted', job2['JobId'], export_id=job2['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #keep job2 running while we process job3 + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + + + #only 3rd job is unfinished, and job2 is running + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + + #3rd job will fail again + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 2, 'expected 2 jobs unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 1 == report['jobs']['finished'], 'expected 1 job finished' + assert 1 == len(report['series']['finished_jobs']['values']), 'expected 1 job in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + + + # in the mean time, finish job2 normally + sendNotification('JobFinished', job2['JobId'], export_id=job2['job_group_id']) + + #one job to go + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' + assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' + + + #still 3rd job is still unfinished, final retry + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 job unfinished' + + #check report + report = manager.getStatusReportDict()[999999999] + assert 2 == report['jobs']['finished'], 'expected 2 jobs finished' + assert 2 == len(report['series']['finished_jobs']['values']), 'expected 2 jobs in finished jobs series' + assert 1 == report['series']['finished_jobs']['values'][0], 'expected finished jobs series[0] == 1' + assert 2 == report['series']['finished_jobs']['values'][1], 'expected finished jobs series[1] == 2' + assert 1 == report['series']['running_jobs']['values'][0], 'expected running jobs series[0] == 1' + assert 0 == report['series']['running_jobs']['values'][1], 'expected running jobs series[1] == 0' + assert 1 == report['series']['running_jobs']['values'][2], 'expected running jobs series[2] == 1' + assert 0 == report['series']['running_jobs']['values'][3], 'expected running jobs series[3] == 0' + assert 1 == report['series']['running_jobs']['values'][4], 'expected running jobs series[4] == 1' + assert 0 == report['series']['running_jobs']['values'][5], 'expected running jobs series[5] == 0' + assert 1 == report['series']['running_jobs']['values'][6], 'expected running jobs series[6] == 1' + assert 2 == report['series']['running_jobs']['values'][7], 'expected running jobs series[7] == 2' + assert 1 == report['series']['running_jobs']['values'][8], 'expected running jobs series[8] == 1' + assert 0 == report['series']['running_jobs']['values'][9], 'expected running jobs series[9] == 0' + assert 1 == report['series']['running_jobs']['values'][10], 'expected running jobs series[10] == 1' + + #3rd job will fail again + sendNotification('JobTransferFailed', job3['JobId'], message='something went wrong', export_id=job3['job_group_id']) + + #3rd job should have failed after 3 retries + #no more jobs to go + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' + + #there should be no more reports, cause the job group 999999999 is finished as a whole + #and is removed from the manager at this point + reports = manager.getStatusReportDict() + assert 0 == len(reports), 'expected 0 reports' + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + + jobgroup_999999999_failed_dir = os.path.join(ingest_config.JOBS_DIR, 'failed', 'MoM_999999999') + failed_jobgroup_999999999_files = [os.path.join(jobgroup_999999999_failed_dir, f) for f in + os.listdir(jobgroup_999999999_failed_dir) + if fnmatch.fnmatch(f, '*_999999999_*.xml*')] + + assert 1 == len(failed_jobgroup_999999999_files), '1 and only 1 failed file expected for job_group 999999999' + for file in failed_jobgroup_999999999_files: + sendJobFileToManager(file) + + time.sleep(1.0) + + assert manager.nrOfUnfinishedJobs() == 1, 'expected 1 jobs unfinished' + assert manager.nrOfJobs() == 3, 'expected 3 jobs' #1 to_do/scheduled, 2 done + assert len(manager.getJobAdminDicts(status=JobToDo) + manager.getJobAdminDicts(status=JobScheduled)) == 1, 'expected 1 todo/scheduled jobs' + assert len(manager.getJobAdminDicts(status=JobProduced)) == 2, 'expected 2 done jobs' + + # this time, start and finish job3 normally + job3 = receiveJobForTransfer() + assert job3['JobId'] == 'A_999999999_777777779_L888888888_SB002_uv.MS', 'unexpected job %s' % job3['JobId'] + sendNotification('JobStarted', job3['JobId'], export_id=job3['job_group_id']) + sendNotification('JobFinished', job3['JobId'], export_id=job3['job_group_id']) + + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + + #there should be no more reports, cause the job group 999999999 is finished as a whole + #and is removed from the manager at this point + reports = manager.getStatusReportDict() + assert 0 == len(reports), 'expected 0 reports' + assert manager.nrOfUnfinishedJobs() == 0, 'expected 0 jobs unfinished' + time.sleep(1.5) #TODO: should not wait fixed amount of time, but poll for expected output with a timeout + + manager.quit() + manager_thread.join() except Exception as e: logger.exception(e) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py index 0c06d8ff260dc04330c65f96128d53deb94aa263..0a36fb8a6a435b14e62c1075aa3370410ece23f4 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py @@ -1,19 +1,21 @@ #!/usr/bin/env python3 -import uuid import unittest from unittest import mock from random import randint from pysimplesoap.client import SoapClient +from time import sleep import logging logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) logger = logging.getLogger(__name__) -from lofar.messaging.messagebus import TemporaryQueue, TemporaryExchange -from lofar.lta.ingest.server.ingestmomadapter import IngestMomAdapter -from lofar.lta.ingest.server.config import DEFAULT_INGEST_INCOMING_JOB_SUBJECT +from lofar.messaging.messagebus import TemporaryExchange, TemporaryQueue, BusListenerJanitor +from lofar.messaging.messages import EventMessage from lofar.lta.ingest.common.job import createJobXml +from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX + +from lofar.common.dbcredentials import Credentials class TestIngestMoMAdapter(unittest.TestCase): def setUp(self): @@ -21,45 +23,68 @@ class TestIngestMoMAdapter(unittest.TestCase): self.tmp_exchange.open() self.addCleanup(self.tmp_exchange.close) - self.tmp_job_queue = TemporaryQueue("t_ingestmomadapter_lofar.lta.ingest.jobs", - exchange_name=self.tmp_exchange.address, - routing_key=DEFAULT_INGEST_INCOMING_JOB_SUBJECT) - self.tmp_job_queue.open() - self.addCleanup(self.tmp_job_queue.close) - momclient_patcher = mock.patch('lofar.lta.ingest.server.momclient.MoMClient') - self.addCleanup(momclient_patcher.stop) - self.momclient_mock = momclient_patcher.start() + def test_onXmlRPCJobReceived_no_soap(self): + """test the handler routine onXmlRPCJobReceived to check if a job_xml is converted to a message and send on the correct bus""" + from lofar.lta.ingest.server.ingestmomadapter import MoMXMLRPCHandler - self.adapter = IngestMomAdapter(busname=self.tmp_exchange.address, - mom_xmlrpc_host='localhost', - mom_xmlrpc_port=randint(2345, 4567), # pick random port to reduce chance of clashes - momClient = self.momclient_mock) + with MoMXMLRPCHandler(exchange=self.tmp_exchange.address, + mom_xmlrpc_host='localhost', + mom_xmlrpc_port=randint(2345, 4567)) as handler: - # Note: at this moment we only test the xml-rpc soap receiver. - # TODO: add more tests + # create a tmp job receiver queue + with TemporaryQueue(exchange=self.tmp_exchange.address) as tmp_job_queue: + with tmp_job_queue.create_frombus() as job_receiver: - def test_onXmlRPCJobReceived_no_soap(self): - """test the handler routine onXmlRPCJobReceived to check if a job_xml is converted to a message and send on the correct bus""" - with self.adapter: - job_xml = createJobXml('project', 0, 1, 'dp_id', 2, '/tmp/path/to/dataproduct') - self.adapter.onXmlRPCJobReceived('my_job_file.xml', job_xml) + # create a job... + job_xml = createJobXml('project', 0, 1, 'dp_id', 2, '/tmp/path/to/dataproduct') - with self.tmp_job_queue.create_frombus() as job_receiver: - job_msg = job_receiver.receive() - self.assertEqual(job_xml, job_msg.content) + # and let it be handled by the handler (as if it was received via xml-rpc) + handler.onXmlRPCJobReceived('my_job_file.xml', job_xml) + + # there should now be a job on the tmp_job_queue + # receive and check it. + job_msg = job_receiver.receive() + self.assertEqual(job_xml, job_msg.content) def test_mom_soap_to_job_queue(self): """assuming test_onXmlRPCJobReceived_no_soap passes, test the correct behaviour when called via soap xml-rpc""" - with self.adapter: - job_xml = createJobXml('project', 0, 1, 'dp_id', 2, '/tmp/path/to/dataproduct') - soap_client = SoapClient(location=self.adapter.server_url(), namespace="urn:pipeline.export") - soap_client.new_job(fileName='my_job_file.xml', fileContent=job_xml) + from lofar.lta.ingest.server.ingestmomadapter import MoMXMLRPCHandler + with MoMXMLRPCHandler(exchange=self.tmp_exchange.address, + mom_xmlrpc_host='localhost', + mom_xmlrpc_port=randint(2345, 4567)) as handler: + # create a tmp job receiver queue + with TemporaryQueue(exchange=self.tmp_exchange.address) as tmp_job_queue: + with tmp_job_queue.create_frombus() as job_receiver: + # create a job... + job_xml = createJobXml('project', 0, 1, 'dp_id', 2, '/tmp/path/to/dataproduct') + + # submit the job like MoM would via xml-rpc + soap_client = SoapClient(location=handler.server_url(), namespace="urn:pipeline.export") + soap_client.new_job(fileName='my_job_file.xml', fileContent=job_xml) + + # there should now be a job on the tmp_job_queue + # receive and check it. + job_msg = job_receiver.receive() + self.assertEqual(job_xml, job_msg.content) + + def test_ingest_bus_listener_for_mom_adapter(self): + with mock.patch('lofar.lta.ingest.server.momclient.MoMClient', autospec=True) as momclient_patcher: + + # import IngestBusListenerForMomAdapter here, because otherwise the MoMClient won't be mocked + from lofar.lta.ingest.server.ingestmomadapter import IngestBusListenerForMomAdapter - with self.tmp_job_queue.create_frombus() as job_receiver: - job_msg = job_receiver.receive() - self.assertEqual(job_xml, job_msg.content) + # start the IngestBusListenerForMomAdapter in a BusListenerJanitor so the auto-generated queues are auto deleted. + with BusListenerJanitor(IngestBusListenerForMomAdapter(Credentials(), self.tmp_exchange.address)) as listener: + with self.tmp_exchange.create_tobus() as tobus: + tobus.send(EventMessage(subject="%s.JobStarted" % INGEST_NOTIFICATION_PREFIX, + content={'type': 'MoM', + 'job_id': 9876543321, + 'message': "The job started!"})) + sleep(1) + momclient_patcher.login.assert_called() + momclient_patcher.logout.assert_called() if __name__ == '__main__': diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py index aae8a6e2f0f6bd3a2bd993778744317ec693e09e..8f151700f8a9916e08ec557fc004136ecd03d885 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py @@ -13,7 +13,7 @@ DEFAULT_MOM_XMLRPC_HOST=hostnameToIp('lexar003.lexar.control.lofar' if isProduct 'localhost') DEFAULT_MOM_XMLRPC_PORT=2010 if isProductionEnvironment() else 2009 -MOM_BASE_URL = 'https://lcs029.control.lofar:8443/' if isProductionEnvironment() else 'http://lofartest.control.lofar:8080/' +MOM_BASE_URL = 'https://lcs029.control.lofar:8443/'# if isProductionEnvironment() else 'http://lofartest.control.lofar:8080/' LTA_BASE_URL = 'https://%s:%s@lta-ingest.lofar.eu:9443/' if isProductionEnvironment() else 'https://%s:%s@lta-ingest-test.lofar.eu:19443/' diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py index e1df976e99c324940d567ddbc4a96629a098a400..73998a8d46a76092c14612386450782cb568b122 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py @@ -46,7 +46,6 @@ class IngestPipeline(): def __init__(self, job, momClient, ltaClient, exchange=DEFAULT_BUSNAME, - notification_prefix=INGEST_NOTIFICATION_PREFIX, broker=DEFAULT_BROKER, user=getpass.getuser(), globus_timeout=GLOBUS_TIMEOUT, @@ -66,7 +65,6 @@ class IngestPipeline(): self.minimal_SIP = minimal_SIP - self.notification_prefix = notification_prefix self.event_bus = ToBus(exchange, broker=broker) self.Project = job['Project'] @@ -77,8 +75,8 @@ class IngestPipeline(): self.ObsId = int(job['ObservationId']) self.ExportID = job['ExportID'] self.Type = job["Type"] - self.HostLocation = job['Location'].split(':')[0] - self.Location = job['Location'].split(':')[1] + self.HostLocation = job['Location'].partition(':')[0] + self.Location = job['Location'].partition(':')[1] self.ticket = '' self.FileSize = '0' self.MD5Checksum = '' @@ -382,7 +380,7 @@ class IngestPipeline(): for k,v in list(kwargs.items()): contentDict[k] = v - msg = EventMessage(subject="%s.%s" % (self.notification_prefix, subject), content=contentDict) + msg = EventMessage(subject="%s.%s" % (INGEST_NOTIFICATION_PREFIX, subject), content=contentDict) msg.ttl = 48*3600 #remove message from queue's when not picked up within 48 hours logger.info('Sending notification %s: %s' % (subject, str(contentDict).replace('\n', ' '))) self.event_bus.send(msg) @@ -479,7 +477,6 @@ def main(): description='Run the ingestpipeline on a single jobfile.') parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') parser.add_option('--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker on which the ingest notifications are published, default: %default') - parser.add_option("--ingest_notification_prefix", dest="ingest_notification_prefix", type="string", default=INGEST_NOTIFICATION_PREFIX, help="The prefix for all notifications of this publisher, [default: %default]") parser.add_option("-u", "--user", dest="user", type="string", default=getpass.getuser(), help="username for to login on <host>, [default: %default]") parser.add_option('-s', '--minimal-SIP', dest='minimal_SIP', action='store_true', help='create and upload a minimal SIP to the LTA catalogue when the normal SIP is not accepted.') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') @@ -515,7 +512,6 @@ def main(): jobPipeline = IngestPipeline(job, momClient, ltaClient, busname=options.busname, - notification_prefix=options.ingest_notification_prefix, broker=options.broker, user=options.user, globus_timeout=options.globus_timeout, diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py index 3b3fcda732530cc9107823e5be6337a16c084bd3..f9694ae324d3462d8a662463551d66a7d5323fab 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py @@ -30,16 +30,13 @@ import socket import getpass import pprint from threading import Thread, Lock -from lofar.messaging.messagebus import FromBus, ToBus -from lofar.messaging.messagebus import create_queue, create_binding, create_exchange -from lofar.messaging.messages import * -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME, BusListener, AbstractMessageHandler +from lofar.messaging import LofarMessage, CommandMessage, EventMessage from lofar.common import isProductionEnvironment from lofar.common import dbcredentials from lofar.common.datetimeutils import totalSeconds from lofar.common.util import humanreadablesize -from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME -from lofar.lta.ingest.server.config import DEFAULT_INGEST_NOTIFICATION_PREFIX +from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, INGEST_NOTIFICATION_PREFIX from lofar.lta.ingest.server.config import MAX_NR_OF_JOBS, MAX_USED_BANDWITH_TO_START_NEW_JOBS, NET_IF_TO_MONITOR from lofar.lta.ingest.server.config import TRANSFER_TIMEOUT from lofar.lta.ingest.common.job import * @@ -66,34 +63,19 @@ def _getBytesSent(): class IngestTransferServer: def __init__(self, - busname = DEFAULT_BUSNAME, - job_queuename = DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, + exchange = DEFAULT_BUSNAME, mom_credentials = None, lta_credentials = None, user = None, broker = None, max_nr_of_parallel_jobs = MAX_NR_OF_JOBS): - - self.broker = broker - self.job_queuename = job_queuename - - # create the exchange if not present - create_exchange(busname, durable=True, broker=broker) - - # incoming jobs (from mom, eor, user ingest, etc) - # create it if not already existing - if create_queue(job_queuename, durable=True, broker=broker): - create_binding(busname, job_queuename, routing_key=DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, broker=broker) - self.user = user if not self.user: self.user = getpass.getuser() self.mom_credentials = mom_credentials self.lta_credentials = lta_credentials - self.busname = busname - self.notification_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX - self.event_bus = ToBus(address=busname, broker = broker) + self.event_bus = ToBus(exchange=exchange, broker = broker) self.max_nr_of_parallel_jobs = max_nr_of_parallel_jobs self.__running_jobs = {} @@ -104,32 +86,33 @@ class IngestTransferServer: self.__prev_used_bandwidth = 0.0 self.__running_jobs_log_timestamp = datetime.utcnow() - def __start_job(self, job_dict): + def start_job(self, job_dict): + if not self.__enoughResourcesAvailable(): + raise ResourceWarning("Not enough resources available to start new job: %s" % job_dict) + job_id = job_dict['JobId'] if job_id in self.__running_jobs: - logger.warning('job %s is already running. Discarding this new job copy, and keeping the current one running...', job_id) - return + raise ValueError('job %s is already running. Discarding this new job copy, and keeping the current one running...' % job_id) def threaded_pipeline_func(job): logger.info('starting job %s in the background', job_id) ltaClient = LTAClient(self.lta_credentials.user, self.lta_credentials.password) with MoMClient(self.mom_credentials.user, self.mom_credentials.password) as momClient: jobPipeline = IngestPipeline(job, momClient, ltaClient, - busname = self.busname, - notification_prefix = self.notification_prefix, - broker = self.broker, + exchange = self.event_bus.exchange, + broker = self.event_bus.broker, user = self.user) with self.__lock: self.__running_jobs[job_id]['pipeline'] = jobPipeline jobPipeline.run() - thread = Thread(target = threaded_pipeline_func, args = [job_dict]) - thread.daemon = True with self.__lock: + thread = Thread(target = threaded_pipeline_func, args = [job_dict]) + thread.daemon = True self.__running_jobs[job_id] = { 'thread':thread } - thread.start() + thread.start() def __clearFinishedJobs(self): try: @@ -269,53 +252,12 @@ class IngestTransferServer: return True def run(self): - log_recource_warning = True - with FromBus(address = self.job_queuename, broker = self.broker) as job_frombus, self.event_bus as _1: + with self.event_bus: while True: try: - try: - if self.__enoughResourcesAvailable(): - logger.info("enough resources available to start new jobs. waiting for new job on %s", job_frombus.address) - msg = job_frombus.receive(timeout = 10) - if msg: - logger.info("received msg on job queue: %s", msg) - - if isinstance(msg, CommandMessage): - job_dict = parseJobXml(msg.content) - logger.info("received job: %s", job_dict) - - self.__start_job(job_dict) - - # allow 1 new recource_warning to be logged - self.__log_recource_warning = True - else: - logger.warning("unexpected message type: %s", msg) - else: - # wait for resource to become available - time.sleep(5) - except KeyboardInterrupt: - break - except Exception as e: - logger.error(e) - self.__clearFinishedJobs() try: - # sleep a little - # so jobs have a little time to start consuming resources - # this limits the numer of jobs that can be started to 1000 starts per minute - # it does not limit the total number of parallel jobs - # that is limited dynamically by __enoughResourcesAvailable - # and by the hard limit self.max_nr_of_parallel_jobs - time.sleep(0.1) - - # if already running at high bandwith usages, - # we can sleep a little extra depending on how close we are to the MAX_USED_BANDWITH_TO_START_NEW_JOBS - if self.__prev_used_bandwidth > 0.5 * MAX_USED_BANDWITH_TO_START_NEW_JOBS: - time.sleep(0.5) - if self.__prev_used_bandwidth > 0.85 * MAX_USED_BANDWITH_TO_START_NEW_JOBS: - time.sleep(1.0) - if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds = 2): with self.__lock: starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict] @@ -338,34 +280,11 @@ class IngestTransferServer: logger.info(status_log_line) self.__running_jobs_log_timestamp = datetime.utcnow() - msg = EventMessage(subject = self.notification_prefix + '.TransferServiceStatus', + msg = EventMessage(subject = "%s.%s" % (INGEST_NOTIFICATION_PREFIX, 'TransferServiceStatus'), content = { 'ingest_server': socket.gethostname(), 'message' : status_log_line }) msg.ttl = 3600 # remove message from queue's when not picked up within 1 hours self.event_bus.send(msg) - - # HACK: for some unknown reason sometimes the ingesttransferserver does not pick up new jobs... - # When there are no running jobs, and the jobmanager reports that there are jobs to do, - # then do a hard exit, so supervisor restarts this ingesttransferserver automatically - # which solves the problem. - # Yes, it's ugly, but for now it works - if len(self.__running_jobs) == 0: - with IngestRPC() as ingest_rpc: - report = ingest_rpc.getStatusReport() - num_unfinished_jobs = sum((x.get('to_do', 0) + - x.get('scheduled', 0) + - x.get('retry', 0)) for x in report.values()) - if num_unfinished_jobs > 0: - # try to reconnect first... - if not job_frombus.reconnect(num_retries=5, retry_wait_time=10): - # no success, so exit, relying on supervisor-triggerred restart - time.sleep(10) # sleep a little first, so the exit/restart cycle isn't spinning fast. - logger.warning("Forcing a hard restart because there are jobs to do, but this ingesttransferserver is not picking them up...") - exit(1) - - - except KeyboardInterrupt: - break except Exception as e: logger.error(e) @@ -374,6 +293,28 @@ class IngestTransferServer: except Exception as e: logger.error(e) +class IngestJobsForTransferHandler(AbstractMessageHandler): + def __init__(self, transfer_server: IngestTransferServer): + self._transfer_server = transfer_server + super(IngestJobsForTransferHandler, self).__init__() + + def handle_message(self, msg: LofarMessage): + if not isinstance(msg, CommandMessage): + raise ValueError("%s: Ignoring non-CommandMessage: %s" % (self.__class__.__name__, msg)) + + job = parseJobXml(msg.content) + if job and job.get('JobId'): + logger.info("received job from bus: %s", job) + self._transfer_server.start_job(job) + + # sleep a little + # so jobs have a little time to start consuming resources + # this limits the numer of jobs that can be started to 1000 starts per minute + # it does not limit the total number of parallel jobs + # that is limited dynamically by __enoughResourcesAvailable + # and by the hard limit self.max_nr_of_parallel_jobs + time.sleep(0.1) + def main(): # make sure we run in UTC timezone @@ -381,22 +322,17 @@ def main(): os.environ['TZ'] = 'UTC' from optparse import OptionParser - from lofar.messaging import setQpidLogLevel - from lofar.common.util import waitForInterrupt # Check the invocation arguments parser = OptionParser("%prog [options]", description = 'runs the ingest transfer server which picks up as many jobs as it can handle from the given --ingest_job_queuename and tranfers the dataproducts to the LTA, updates the LTA catalogue, and updates MoM') - parser.add_option('-q', '--broker', dest = 'broker', type = 'string', + parser.add_option('-b', '--broker', dest = 'broker', type = 'string', default = DEFAULT_BROKER, help = 'Address of the qpid broker, default: %default') - parser.add_option("--ingest_job_queuename", dest = "ingest_job_queuename", type = "string", - default = DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, - help = "Name of the job queue. [default: %default]") parser.add_option("-p", "--max_nr_of_parallel_jobs", dest = "max_nr_of_parallel_jobs", type = "int", default = MAX_NR_OF_JOBS, help = "Name of the job queue. [default: %default]") - parser.add_option('--busname', dest = 'busname', type = 'string', default = DEFAULT_BUSNAME, help = 'Name of the bus exchange on the qpid broker on which the ingest notifications are published, default: %default') + parser.add_option('-e', '--exchange', dest = 'exchange', type = 'string', default = DEFAULT_BUSNAME, help = 'Name of the common bus exchange on the broker, default: %default') parser.add_option("-u", "--user", dest = "user", type = "string", default = getpass.getuser(), help = "username for to login on data source host, [default: %default]") parser.add_option("-l", "--lta_credentials", dest = "lta_credentials", type = "string", default = 'LTA' if isProductionEnvironment() else 'LTA_test', @@ -407,7 +343,6 @@ def main(): parser.add_option('-V', '--verbose', dest = 'verbose', action = 'store_true', help = 'verbose logging') (options, args) = parser.parse_args() - setQpidLogLevel(logging.INFO) logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.DEBUG if options.verbose else logging.INFO) @@ -423,13 +358,17 @@ def main(): ltacreds = dbcredentials.DBCredentials().get(options.lta_credentials) momcreds = dbcredentials.DBCredentials().get(options.mom_credentials) - server = IngestTransferServer(job_queuename = options.ingest_job_queuename, - busname = options.busname, - broker = options.broker, - mom_credentials = momcreds, - lta_credentials = ltacreds, - max_nr_of_parallel_jobs = options.max_nr_of_parallel_jobs) - server.run() + transfer_server = IngestTransferServer(exchange = options.exchange, + broker = options.broker, + mom_credentials = momcreds, + lta_credentials = ltacreds, + max_nr_of_parallel_jobs = options.max_nr_of_parallel_jobs) + + incoming_jobs_listener = BusListener(IngestJobsForTransferHandler, {'transfer_server': transfer_server}, + exchange=options.exchange, routing_key="%s.#" % DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT) + + with incoming_jobs_listener: + transfer_server.run() if __name__ == '__main__': main()