diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py index f5cf306b6203110883c2c949bb3555af8bcfa273..95dca38e27b55405baf2802586bb3129c3ed615d 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py @@ -70,20 +70,17 @@ class IngestEventMessageHandlerForMomAdapter(IngestEventMessageHandler): try: # try 'normal' handling of msg, should result in normal calls to onJob* methods # but MoM (via the momrpc) is notorious in properly handling the messages.... - if super(IngestEventMessageHandlerForMomAdapter, self).handle_message(msg): - return True + super().handle_message(msg) except Exception as e: # ... so handle the exceptions... logger.warning(e) - # ... and try to deal with MoM's quirks. - if self._remove_unknown_export_job_if_needed(msg): - return True - - if self._resubmit_message_if_applicable(msg): - return True + # ... and try to deal with MoM's quirks. + if self._remove_unknown_export_job_if_needed(msg): + return True - return False + if self._resubmit_message_if_applicable(msg): + return True def onJobStarted(self, job_dict): self._update_mom_status_if_applicable(job_dict, JobProducing) @@ -208,34 +205,31 @@ class IngestEventMessageHandlerForMomAdapter(IngestEventMessageHandler): return False def _resubmit_message_if_applicable(self, msg: EventMessage) -> bool: - try: - if msg and msg.content: - retry_count = msg.content.get('retry_count', 0) - - if retry_count < min(MAX_NR_OF_RETRIES, 127): # mom can't handle more than 127 status updates... - retry_count += 1 - try: - retry_timestamp = msg.content.get('retry_timestamp', datetime.utcnow()) - ago = totalSeconds(datetime.utcnow() - retry_timestamp) - time.sleep(max(0, - 2 * retry_count - ago)) # wait longer between each retry, other messages can be processed in between - except Exception as e: - # just continue - logger.warning(e) - - # set/update retry values - msg.content['retry_count'] = retry_count - msg.content['retry_timestamp'] = datetime.utcnow() - - # resubmit - new_msg = EventMessage(subject=msg.subject, content=msg.content) - new_msg.ttl = 48 * 3600 # remove message from queue's when not picked up within 48 hours - logger.info('resubmitting unhandled message to back of queue %s: %s %s', self.exchange, new_msg.subject, - str(new_msg.content).replace('\n', ' ')) - self._tobus.send(new_msg) - return True - except Exception as e: - logger.error("_resubmit_message_if_applicable error: %s", e) + if msg and msg.content: + retry_count = msg.content.get('retry_count', 0) + + if retry_count < min(MAX_NR_OF_RETRIES, 127): # mom can't handle more than 127 status updates... + retry_count += 1 + try: + retry_timestamp = msg.content.get('retry_timestamp', datetime.utcnow()) + ago = totalSeconds(datetime.utcnow() - retry_timestamp) + time.sleep(max(0, + 2 * retry_count - ago)) # wait longer between each retry, other messages can be processed in between + except Exception as e: + # just continue + logger.warning(e) + + # set/update retry values + msg.content['retry_count'] = retry_count + msg.content['retry_timestamp'] = datetime.utcnow() + + # resubmit + new_msg = EventMessage(subject=msg.subject, content=msg.content) + new_msg.ttl = 48 * 3600 # remove message from queue's when not picked up within 48 hours + logger.info('resubmitting unhandled message to back of queue %s: %s %s', self.exchange, new_msg.subject, + str(new_msg.content).replace('\n', ' ')) + self._tobus.send(new_msg) + return True return False