diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index daf0fd828caa2e201cd09025476eaaca140a4db4..580e9b4fe96ae5152e56e7584d94ec87d419e965 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -520,8 +520,9 @@ class IngestJobManager: logger.info('removing export job %s', export_group_id) job_admin_dicts = self.getJobAdminDicts(job_group_id=export_group_id) - for jad in job_admin_dicts: - self.updateJobStatus(jad['job']['JobId'], JobRemoved) + if job_admin_dicts: + for jad in job_admin_dicts: + self.updateJobStatus(jad['job']['JobId'], JobRemoved) def getExportIds(self): with self.__lock: diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py index 33aacc78cf879ef6dd8c4742440f1fb6d14d3104..054618a079eb50d6d831e819d6a4d5273422ab20 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py @@ -20,6 +20,7 @@ # from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener +from lofar.lta.ingest.client.rpc import IngestRPC from lofar.lta.ingest.common.job import * from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_PREFIX @@ -47,6 +48,37 @@ except ImportError as ie: import logging logger = logging.getLogger() +class IngestBusListenerForMomAdapter(IngestBusListener): + def __init__(self, busname=DEFAULT_MOMINGESTADAPTER_NOTIFICATION_QUEUENAME, subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, broker=None, momrpc=None, **kwargs): + self._busname = busname + self._broker = broker + self._momrpc = momrpc + + super(IngestBusListenerForMomAdapter, self).__init__(busname=busname, subjects=subjects, broker=broker, **kwargs) + + def _handleMessage(self, msg): + try: + super(IngestBusListenerForMomAdapter, self)._handleMessage(msg) + except Exception as e: + if msg and msg.content: + if self._momrpc and isinstance(msg.content, dict): + if msg.content.get('type','').lower() == 'mom': + export_id = msg.content.get('export_id') + + if export_id and export_id not in self._momrpc.getProjectDetails(export_id): + logger.warn('Export job %s cannot be found (anymore) in mom. Removing export job from ingest queue', export_id) + + with IngestRPC(broker=self._broker) as ingest_rpc: + ingest_rpc.removeExportJob(export_id) + return + + with ToBus(self._busname, broker=self._broker) as tobus: + new_msg = EventMessage(context=msg.subject, content=msg.content) + new_msg.ttl = 48*3600 #remove message from queue's when not picked up within 48 hours + logger.info('resubmitting unhandled message to back of queue %s: %s %s', self._busname, new_msg.subject, str(new_msg.content).replace('\n', ' ')) + tobus.send(new_msg) + + class IngestMomAdapter: def __init__(self, momClient, notification_queue_name=DEFAULT_MOMINGESTADAPTER_NOTIFICATION_QUEUENAME, @@ -63,7 +95,11 @@ class IngestMomAdapter: **kwargs): self.__momClient = momClient - self.__ingest_notification_listener = IngestBusListener(busname=notification_queue_name, subjects=notification_subjects, broker=broker, **kwargs) + if not mom_broker: + mom_broker = broker + self.__momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=mom_broker, timeout=180) + + self.__ingest_notification_listener = IngestBusListenerForMomAdapter(busname=notification_queue_name, subjects=notification_subjects, broker=broker, momrpc=self.__momrpc, **kwargs) self.__ingest_notification_listener.onJobStarted = self.onJobStarted self.__ingest_notification_listener.onJobFinished = self.onJobFinished self.__ingest_notification_listener.onJobFailed = self.onJobFailed @@ -74,10 +110,6 @@ class IngestMomAdapter: self.__job_queue = ToBus(job_queue_name, broker=broker) - if not mom_broker: - mom_broker = broker - self.__momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=mom_broker, timeout=180) - try: logger.info('Setting up MoM SOAP server on %s:%s', mom_xmlrpc_host, mom_xmlrpc_port) self._server = SOAPpy.SOAPServer((mom_xmlrpc_host, mom_xmlrpc_port))