Skip to content
Snippets Groups Projects
Commit cc429b99 authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

Task SW-516: Switch from qpid.messaging library to Apache Qpid Proton so we...

Task SW-516: Switch from qpid.messaging library to Apache Qpid Proton so we can use the message bus with Python 3
parent be1b38ac
Branches
Tags
No related merge requests found
...@@ -19,9 +19,10 @@ ...@@ -19,9 +19,10 @@
try: try:
import proton import proton
import proton.utils import proton.utils
import uuid
MESSAGING_ENABLED = True MESSAGING_ENABLED = True
except ImportError: except ImportError:
from . import noqpidfallback as messaging from . import noqpidfallback as proton
MESSAGING_ENABLED = False MESSAGING_ENABLED = False
import xml.dom.minidom as xml import xml.dom.minidom as xml
...@@ -67,7 +68,7 @@ def _uuid(): ...@@ -67,7 +68,7 @@ def _uuid():
""" """
Return an UUID Return an UUID
""" """
return str(messaging.uuid4()) return str(uuid.uuid4())
class MessageException(Exception): class MessageException(Exception):
pass pass
...@@ -77,7 +78,7 @@ class XMLDoc(object): ...@@ -77,7 +78,7 @@ class XMLDoc(object):
try: try:
self.document = xml.parseString(content) self.document = xml.parseString(content)
except expat.ExpatError as e: except expat.ExpatError as e:
#print "Could not parse XML message content: ", e, qpidMsg.content #print "Could not parse XML message content: ", e, qpidMsg.body
raise MessageException(e) raise MessageException(e)
def content(self): def content(self):
...@@ -137,7 +138,7 @@ class XMLDoc(object): ...@@ -137,7 +138,7 @@ class XMLDoc(object):
for child in node.childNodes: for child in node.childNodes:
if child.nodeType == child.TEXT_NODE: if child.nodeType == child.TEXT_NODE:
node.replaceChild(newchild, child) node.replaceChild(newchild, child)
break; break
else: else:
node.appendChild(newchild) node.appendChild(newchild)
...@@ -191,19 +192,19 @@ class MessageContent(object): ...@@ -191,19 +192,19 @@ class MessageContent(object):
# Try to encode '<', '&', '>' in the content payload, whenever possible. # Try to encode '<', '&', '>' in the content payload, whenever possible.
# Content header should not have these. For C++ MessageBus non-libxml++ # Content header should not have these. For C++ MessageBus non-libxml++
# builds, skip encode if XML tags continue in <payload>. Hack ahead! # builds, skip encode if XML tags continue in <payload>. Hack ahead!
if qpidMsg.content is None: if qpidMsg.body is None:
qpidMsg.content = '' # avoid find() or replace() via escape() on None qpidMsg.body = '' # avoid find() or replace() via escape() on None
plIdx = qpidMsg.content.find('<payload>') plIdx = qpidMsg.body.find('<payload>')
if plIdx != -1: if plIdx != -1:
plIdx += len('<payload>') plIdx += len('<payload>')
plEndIdx = qpidMsg.content.rfind('</payload>', plIdx) plEndIdx = qpidMsg.body.rfind('</payload>', plIdx)
if plEndIdx != -1: if plEndIdx != -1:
eqIdx = qpidMsg.content.find('=', plIdx, plEndIdx) # non-empty parset eqIdx = qpidMsg.body.find('=', plIdx, plEndIdx) # non-empty parset
if eqIdx != -1 and eqIdx < qpidMsg.content.find('<', plIdx, plEndIdx): if eqIdx != -1 and eqIdx < qpidMsg.body.find('<', plIdx, plEndIdx):
qpidMsg.content = qpidMsg.content[ : plIdx] + \ qpidMsg.body = qpidMsg.body[ : plIdx] + \
escape(qpidMsg.content[plIdx : plEndIdx]) + \ escape(qpidMsg.body[plIdx : plEndIdx]) + \
qpidMsg.content[plEndIdx : ] qpidMsg.body[plEndIdx : ]
self.document = XMLDoc(qpidMsg.content) # may raise MessageException self.document = XMLDoc(qpidMsg.body) # may raise MessageException
def _add_property(self, name, element): def _add_property(self, name, element):
def getter(self): def getter(self):
...@@ -247,8 +248,8 @@ class MessageContent(object): ...@@ -247,8 +248,8 @@ class MessageContent(object):
def qpidMsg(self): def qpidMsg(self):
""" Construct a NEW QPID message. """ """ Construct a NEW QPID message. """
msg = messaging.Message(content_type="text/plain", durable=True) msg = proton.Message(content_type="text/plain", durable=True)
msg.content = self.content() msg.body = self.content()
return msg return msg
...@@ -270,7 +271,7 @@ class Message(object): ...@@ -270,7 +271,7 @@ class Message(object):
return MessageContent(qpidMsg=self._qpidMsg) return MessageContent(qpidMsg=self._qpidMsg)
def raw_content(self): def raw_content(self):
return self._qpidMsg.content return self._qpidMsg.body
def __repr__(self): def __repr__(self):
msg = self.content() msg = self.content()
......
...@@ -19,10 +19,11 @@ ...@@ -19,10 +19,11 @@
# $Id$ # $Id$
try: try:
import qpid.messaging as messaging import proton
import proton.utils
MESSAGING_ENABLED = True MESSAGING_ENABLED = True
except ImportError: except ImportError:
from . import noqpidfallback as messaging from . import noqpidfallback as proton
MESSAGING_ENABLED = False MESSAGING_ENABLED = False
import os import os
...@@ -45,15 +46,14 @@ class BusException(Exception): ...@@ -45,15 +46,14 @@ class BusException(Exception):
class Session: class Session:
def __init__(self, broker): def __init__(self, broker):
self.closed = False self.closed = False
self.connection = messaging.Connection(broker)
self.connection.reconnect = True
logger.info("[Bus] Connecting to broker %s", broker) logger.info("[Bus] Connecting to broker %s", broker)
try: try:
self.connection.open() self.connection = proton.utils.BlockingConnection(broker)
self.connection.reconnect = True
logger.info("[Bus] Connected to broker %s", broker) logger.info("[Bus] Connected to broker %s", broker)
self.session = self.connection.session() #self.session = self.connection.session()
except messaging.MessagingError as m: except proton.ProtonException as m:
raise BusException(m) raise BusException(m)
# NOTE: We cannot use: # NOTE: We cannot use:
...@@ -85,8 +85,8 @@ class Session: ...@@ -85,8 +85,8 @@ class Session:
# We set a timeout to prevent freezing, which obviously leads # We set a timeout to prevent freezing, which obviously leads
# to data loss if the stall was legit. # to data loss if the stall was legit.
try: try:
self.connection.close(5.0) self.connection.close()
except messaging.exceptions.Timeout as t: except proton.Timeout as t:
logger.error("[Bus] Could not close connection: %s", t) logger.error("[Bus] Could not close connection: %s", t)
def __enter__(self): def __enter__(self):
...@@ -97,7 +97,7 @@ class Session: ...@@ -97,7 +97,7 @@ class Session:
return False return False
def address(self, queue, options): def address(self, queue, options):
return "%s%s; {%s}" % (self._queue_prefix(), queue, options) return "%s%s" % (self._queue_prefix(), queue) # + ' ; {%s}' % options
def _queue_prefix(self): def _queue_prefix(self):
lofarenv = os.environ.get("LOFARENV", "") lofarenv = os.environ.get("LOFARENV", "")
...@@ -118,8 +118,8 @@ class ToBus(Session): ...@@ -118,8 +118,8 @@ class ToBus(Session):
self.queue = queue self.queue = queue
try: try:
self.sender = self.session.sender(self.address(queue, options)) self.sender = self.connection.create_sender(self.address(queue, options))
except messaging.MessagingError as m: except proton.ProtonException as m:
raise BusException(m) raise BusException(m)
def send(self, msg): def send(self, msg):
...@@ -134,7 +134,7 @@ class ToBus(Session): ...@@ -134,7 +134,7 @@ class ToBus(Session):
self.sender.send(msg) self.sender.send(msg)
logger.info("[ToBus] Message sent to queue %s", self.queue) logger.info("[ToBus] Message sent to queue %s", self.queue)
except messaging.SessionError as m: except proton.SessionError as m:
raise BusException(m) raise BusException(m)
class FromBus(Session): class FromBus(Session):
...@@ -145,27 +145,24 @@ class FromBus(Session): ...@@ -145,27 +145,24 @@ class FromBus(Session):
def add_queue(self, queue, options=options): def add_queue(self, queue, options=options):
try: try:
receiver = self.session.receiver(self.address(queue, options)) self.receiver = self.connection.create_receiver(self.address(queue, options))
except messaging.MessagingError as m: except proton.ProtonException as m:
raise BusException(m) raise BusException(m)
# Need capacity >=1 for 'self.session.next_receiver' to function across multiple queues # Need capacity >=1 for 'self.session.next_receiver' to function across multiple queues
receiver.capacity = 1 self.receiver.capacity = 1
def get(self, timeout=None): def get(self, timeout=None):
msg = None msg = None
logger.info("[FromBus] Waiting for message") logger.info("[FromBus] Waiting for message")
try: try:
receiver = self.session.next_receiver(timeout) msg = self.receiver.receive(timeout)
if receiver != None: if msg is None:
logger.info("[FromBus] Message available on queue %s", receiver.source) logger.error("[FromBus] Could not retrieve available message on queue %s", self.receiver.source)
msg = receiver.fetch() # receiver.get() is better, but requires qpid 0.31+ else:
if msg is None: logger.info("[FromBus] Message received on queue %s", self.receiver.source)
logger.error("[FromBus] Could not retrieve available message on queue %s", receiver.source) except proton.Timeout as e:
else:
logger.info("[FromBus] Message received on queue %s", receiver.source)
except messaging.exceptions.Empty as e:
return None return None
if msg is None: if msg is None:
...@@ -174,6 +171,6 @@ class FromBus(Session): ...@@ -174,6 +171,6 @@ class FromBus(Session):
return message.Message(qpidMsg=msg) return message.Message(qpidMsg=msg)
def ack(self, msg): def ack(self, msg):
self.session.acknowledge(msg.qpidMsg()) self.receiver.acknowledge(msg.qpidMsg())
logging.info("[FromBus] Message ACK'ed"); logging.info("[FromBus] Message ACK'ed")
#!/usr/bin/env python #!/usr/bin/env python
import sys import sys
print("QPID support NOT enabled! Will NOT connect to any broker, and messages will be lost!", file=sys.stderr) print("QPID support NOT enabled! Will NOT connect to any broker, and messages will be lost!")
def uuid4():
return "<uuid>"
""" """
Exceptions. Exceptions.
""" """
class MessagingError(Exception): class ProtonException(Exception):
pass pass
class SessionError(Exception): class SessionError(Exception):
pass pass
class exceptions: class Timeout(Exception):
class Timeout(Exception): pass
pass
class Empty(Exception):
pass
""" """
Messages. Messages.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment