diff --git a/LCS/MessageBus/src/msgbus.py b/LCS/MessageBus/src/msgbus.py index 99cb69bd28fb6123b74829d9b9b4badb0c9892ef..932680d7658be2e017e1382eeada234480f94a57 100644 --- a/LCS/MessageBus/src/msgbus.py +++ b/LCS/MessageBus/src/msgbus.py @@ -28,6 +28,7 @@ import os import signal import logging import lofar.messagebus.message as message +import atexit # Candidate for a config file broker="127.0.0.1" @@ -43,6 +44,8 @@ class BusException(Exception): class Session: def __init__(self, broker): + self.closed = False + logger.info("[Bus] Connecting to broker %s", broker) self.connection = qpid.messaging.Connection(broker) self.connection.reconnect = True @@ -54,12 +57,30 @@ class Session: except qpid.messaging.MessagingError, m: raise BusException(m) - def __del__(self): + # NOTE: We cannuot use: + # __del__: its broken (does not always get called, destruction order is unpredictable) + # with: not supported in python 2.4, does not work well on arrays of objects + # weakref: dpes not guarantee to be called (depends on gc) + # + # Note that this atexit call will prevent self from being destructed until the end of the program, + # since a reference will be retained + atexit.register(self.close) + + def close(self): + if self.closed: + return + + self.closed = True + # NOTE: session.close() freezes under certain error conditions, # f.e. opening a receiver on a non-existing queue. # This seems to happen whenever a Python exception was thrown # by the qpid wrapper. # + # This especially happens if we would put this code in __del__. + # Note that we cannot use __enter__ and __exit__ either due to + # ccu001/ccu099 still carrying python 2.4. + # # See https://issues.apache.org/jira/browse/QPID-6402 # # We set a timeout to prevent freezing, which obviously leads @@ -69,6 +90,13 @@ class Session: except qpid.messaging.exceptions.Timeout, t: logger.error("[Bus] Could not close connection: %s", t) + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + return False + def address(self, queue, options): return "%s%s; {%s}" % (self._queue_prefix(), queue, options)