Skip to content
Snippets Groups Projects
Commit 1abae87e authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: proper use of exceptions in handling message

parent b631a2f9
No related branches found
No related tags found
No related merge requests found
......@@ -70,20 +70,17 @@ class IngestEventMessageHandlerForMomAdapter(IngestEventMessageHandler):
try:
# try 'normal' handling of msg, should result in normal calls to onJob* methods
# but MoM (via the momrpc) is notorious in properly handling the messages....
if super(IngestEventMessageHandlerForMomAdapter, self).handle_message(msg):
return True
super().handle_message(msg)
except Exception as e:
# ... so handle the exceptions...
logger.warning(e)
# ... and try to deal with MoM's quirks.
if self._remove_unknown_export_job_if_needed(msg):
return True
if self._resubmit_message_if_applicable(msg):
return True
# ... and try to deal with MoM's quirks.
if self._remove_unknown_export_job_if_needed(msg):
return True
return False
if self._resubmit_message_if_applicable(msg):
return True
def onJobStarted(self, job_dict):
self._update_mom_status_if_applicable(job_dict, JobProducing)
......@@ -208,34 +205,31 @@ class IngestEventMessageHandlerForMomAdapter(IngestEventMessageHandler):
return False
def _resubmit_message_if_applicable(self, msg: EventMessage) -> bool:
try:
if msg and msg.content:
retry_count = msg.content.get('retry_count', 0)
if retry_count < min(MAX_NR_OF_RETRIES, 127): # mom can't handle more than 127 status updates...
retry_count += 1
try:
retry_timestamp = msg.content.get('retry_timestamp', datetime.utcnow())
ago = totalSeconds(datetime.utcnow() - retry_timestamp)
time.sleep(max(0,
2 * retry_count - ago)) # wait longer between each retry, other messages can be processed in between
except Exception as e:
# just continue
logger.warning(e)
# set/update retry values
msg.content['retry_count'] = retry_count
msg.content['retry_timestamp'] = datetime.utcnow()
# resubmit
new_msg = EventMessage(subject=msg.subject, content=msg.content)
new_msg.ttl = 48 * 3600 # remove message from queue's when not picked up within 48 hours
logger.info('resubmitting unhandled message to back of queue %s: %s %s', self.exchange, new_msg.subject,
str(new_msg.content).replace('\n', ' '))
self._tobus.send(new_msg)
return True
except Exception as e:
logger.error("_resubmit_message_if_applicable error: %s", e)
if msg and msg.content:
retry_count = msg.content.get('retry_count', 0)
if retry_count < min(MAX_NR_OF_RETRIES, 127): # mom can't handle more than 127 status updates...
retry_count += 1
try:
retry_timestamp = msg.content.get('retry_timestamp', datetime.utcnow())
ago = totalSeconds(datetime.utcnow() - retry_timestamp)
time.sleep(max(0,
2 * retry_count - ago)) # wait longer between each retry, other messages can be processed in between
except Exception as e:
# just continue
logger.warning(e)
# set/update retry values
msg.content['retry_count'] = retry_count
msg.content['retry_timestamp'] = datetime.utcnow()
# resubmit
new_msg = EventMessage(subject=msg.subject, content=msg.content)
new_msg.ttl = 48 * 3600 # remove message from queue's when not picked up within 48 hours
logger.info('resubmitting unhandled message to back of queue %s: %s %s', self.exchange, new_msg.subject,
str(new_msg.content).replace('\n', ' '))
self._tobus.send(new_msg)
return True
return False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment