Skip to content
Snippets Groups Projects
Commit e0c86f35 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #5708: Destruct Bus objects atexit, to guarantee destruction in python...

Task #5708: Destruct Bus objects atexit, to guarantee destruction in python 2.4 without placing requirements on using the code
parent 9647722d
No related branches found
No related tags found
No related merge requests found
...@@ -28,6 +28,7 @@ import os ...@@ -28,6 +28,7 @@ import os
import signal import signal
import logging import logging
import lofar.messagebus.message as message import lofar.messagebus.message as message
import atexit
# Candidate for a config file # Candidate for a config file
broker="127.0.0.1" broker="127.0.0.1"
...@@ -43,6 +44,8 @@ class BusException(Exception): ...@@ -43,6 +44,8 @@ class BusException(Exception):
class Session: class Session:
def __init__(self, broker): def __init__(self, broker):
self.closed = False
logger.info("[Bus] Connecting to broker %s", broker) logger.info("[Bus] Connecting to broker %s", broker)
self.connection = qpid.messaging.Connection(broker) self.connection = qpid.messaging.Connection(broker)
self.connection.reconnect = True self.connection.reconnect = True
...@@ -54,12 +57,30 @@ class Session: ...@@ -54,12 +57,30 @@ class Session:
except qpid.messaging.MessagingError, m: except qpid.messaging.MessagingError, m:
raise BusException(m) raise BusException(m)
def __del__(self): # NOTE: We cannuot use:
# __del__: its broken (does not always get called, destruction order is unpredictable)
# with: not supported in python 2.4, does not work well on arrays of objects
# weakref: dpes not guarantee to be called (depends on gc)
#
# Note that this atexit call will prevent self from being destructed until the end of the program,
# since a reference will be retained
atexit.register(self.close)
def close(self):
if self.closed:
return
self.closed = True
# NOTE: session.close() freezes under certain error conditions, # NOTE: session.close() freezes under certain error conditions,
# f.e. opening a receiver on a non-existing queue. # f.e. opening a receiver on a non-existing queue.
# This seems to happen whenever a Python exception was thrown # This seems to happen whenever a Python exception was thrown
# by the qpid wrapper. # by the qpid wrapper.
# #
# This especially happens if we would put this code in __del__.
# Note that we cannot use __enter__ and __exit__ either due to
# ccu001/ccu099 still carrying python 2.4.
#
# See https://issues.apache.org/jira/browse/QPID-6402 # See https://issues.apache.org/jira/browse/QPID-6402
# #
# We set a timeout to prevent freezing, which obviously leads # We set a timeout to prevent freezing, which obviously leads
...@@ -69,6 +90,13 @@ class Session: ...@@ -69,6 +90,13 @@ class Session:
except qpid.messaging.exceptions.Timeout, t: except qpid.messaging.exceptions.Timeout, t:
logger.error("[Bus] Could not close connection: %s", t) logger.error("[Bus] Could not close connection: %s", t)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
return False
def address(self, queue, options): def address(self, queue, options):
return "%s%s; {%s}" % (self._queue_prefix(), queue, options) return "%s%s; {%s}" % (self._queue_prefix(), queue, options)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment