diff --git a/LCS/MessageBus/src/message.py b/LCS/MessageBus/src/message.py index 7a84a0daf9213237fce3ffc49e9a90fa4fe21025..e588884ed4ac8d9880032368ef9772c8d05d9701 100644 --- a/LCS/MessageBus/src/message.py +++ b/LCS/MessageBus/src/message.py @@ -19,9 +19,10 @@ try: import proton import proton.utils + import uuid MESSAGING_ENABLED = True except ImportError: - from . import noqpidfallback as messaging + from . import noqpidfallback as proton MESSAGING_ENABLED = False import xml.dom.minidom as xml @@ -67,7 +68,7 @@ def _uuid(): """ Return an UUID """ - return str(messaging.uuid4()) + return str(uuid.uuid4()) class MessageException(Exception): pass @@ -77,7 +78,7 @@ class XMLDoc(object): try: self.document = xml.parseString(content) except expat.ExpatError as e: - #print "Could not parse XML message content: ", e, qpidMsg.content + #print "Could not parse XML message content: ", e, qpidMsg.body raise MessageException(e) def content(self): @@ -137,7 +138,7 @@ class XMLDoc(object): for child in node.childNodes: if child.nodeType == child.TEXT_NODE: node.replaceChild(newchild, child) - break; + break else: node.appendChild(newchild) @@ -191,19 +192,19 @@ class MessageContent(object): # Try to encode '<', '&', '>' in the content payload, whenever possible. # Content header should not have these. For C++ MessageBus non-libxml++ # builds, skip encode if XML tags continue in <payload>. Hack ahead! - if qpidMsg.content is None: - qpidMsg.content = '' # avoid find() or replace() via escape() on None - plIdx = qpidMsg.content.find('<payload>') + if qpidMsg.body is None: + qpidMsg.body = '' # avoid find() or replace() via escape() on None + plIdx = qpidMsg.body.find('<payload>') if plIdx != -1: plIdx += len('<payload>') - plEndIdx = qpidMsg.content.rfind('</payload>', plIdx) + plEndIdx = qpidMsg.body.rfind('</payload>', plIdx) if plEndIdx != -1: - eqIdx = qpidMsg.content.find('=', plIdx, plEndIdx) # non-empty parset - if eqIdx != -1 and eqIdx < qpidMsg.content.find('<', plIdx, plEndIdx): - qpidMsg.content = qpidMsg.content[ : plIdx] + \ - escape(qpidMsg.content[plIdx : plEndIdx]) + \ - qpidMsg.content[plEndIdx : ] - self.document = XMLDoc(qpidMsg.content) # may raise MessageException + eqIdx = qpidMsg.body.find('=', plIdx, plEndIdx) # non-empty parset + if eqIdx != -1 and eqIdx < qpidMsg.body.find('<', plIdx, plEndIdx): + qpidMsg.body = qpidMsg.body[ : plIdx] + \ + escape(qpidMsg.body[plIdx : plEndIdx]) + \ + qpidMsg.body[plEndIdx : ] + self.document = XMLDoc(qpidMsg.body) # may raise MessageException def _add_property(self, name, element): def getter(self): @@ -247,8 +248,8 @@ class MessageContent(object): def qpidMsg(self): """ Construct a NEW QPID message. """ - msg = messaging.Message(content_type="text/plain", durable=True) - msg.content = self.content() + msg = proton.Message(content_type="text/plain", durable=True) + msg.body = self.content() return msg @@ -270,7 +271,7 @@ class Message(object): return MessageContent(qpidMsg=self._qpidMsg) def raw_content(self): - return self._qpidMsg.content + return self._qpidMsg.body def __repr__(self): msg = self.content() diff --git a/LCS/MessageBus/src/messagebus.py b/LCS/MessageBus/src/messagebus.py index 8a711e2641f2d4f114c8e0ba4ec8ea7bd836ddfa..fcf99f0ef1764c9f74b3dc60411a583a3fcc1575 100644 --- a/LCS/MessageBus/src/messagebus.py +++ b/LCS/MessageBus/src/messagebus.py @@ -19,10 +19,11 @@ # $Id$ try: - import qpid.messaging as messaging + import proton + import proton.utils MESSAGING_ENABLED = True except ImportError: - from . import noqpidfallback as messaging + from . import noqpidfallback as proton MESSAGING_ENABLED = False import os @@ -45,15 +46,14 @@ class BusException(Exception): class Session: def __init__(self, broker): self.closed = False - self.connection = messaging.Connection(broker) - self.connection.reconnect = True logger.info("[Bus] Connecting to broker %s", broker) try: - self.connection.open() + self.connection = proton.utils.BlockingConnection(broker) + self.connection.reconnect = True logger.info("[Bus] Connected to broker %s", broker) - self.session = self.connection.session() - except messaging.MessagingError as m: + #self.session = self.connection.session() + except proton.ProtonException as m: raise BusException(m) # NOTE: We cannot use: @@ -85,8 +85,8 @@ class Session: # We set a timeout to prevent freezing, which obviously leads # to data loss if the stall was legit. try: - self.connection.close(5.0) - except messaging.exceptions.Timeout as t: + self.connection.close() + except proton.Timeout as t: logger.error("[Bus] Could not close connection: %s", t) def __enter__(self): @@ -97,7 +97,7 @@ class Session: return False def address(self, queue, options): - return "%s%s; {%s}" % (self._queue_prefix(), queue, options) + return "%s%s" % (self._queue_prefix(), queue) # + ' ; {%s}' % options def _queue_prefix(self): lofarenv = os.environ.get("LOFARENV", "") @@ -118,8 +118,8 @@ class ToBus(Session): self.queue = queue try: - self.sender = self.session.sender(self.address(queue, options)) - except messaging.MessagingError as m: + self.sender = self.connection.create_sender(self.address(queue, options)) + except proton.ProtonException as m: raise BusException(m) def send(self, msg): @@ -134,7 +134,7 @@ class ToBus(Session): self.sender.send(msg) logger.info("[ToBus] Message sent to queue %s", self.queue) - except messaging.SessionError as m: + except proton.SessionError as m: raise BusException(m) class FromBus(Session): @@ -145,27 +145,24 @@ class FromBus(Session): def add_queue(self, queue, options=options): try: - receiver = self.session.receiver(self.address(queue, options)) - except messaging.MessagingError as m: + self.receiver = self.connection.create_receiver(self.address(queue, options)) + except proton.ProtonException as m: raise BusException(m) # Need capacity >=1 for 'self.session.next_receiver' to function across multiple queues - receiver.capacity = 1 + self.receiver.capacity = 1 def get(self, timeout=None): msg = None logger.info("[FromBus] Waiting for message") try: - receiver = self.session.next_receiver(timeout) - if receiver != None: - logger.info("[FromBus] Message available on queue %s", receiver.source) - msg = receiver.fetch() # receiver.get() is better, but requires qpid 0.31+ - if msg is None: - logger.error("[FromBus] Could not retrieve available message on queue %s", receiver.source) - else: - logger.info("[FromBus] Message received on queue %s", receiver.source) - except messaging.exceptions.Empty as e: + msg = self.receiver.receive(timeout) + if msg is None: + logger.error("[FromBus] Could not retrieve available message on queue %s", self.receiver.source) + else: + logger.info("[FromBus] Message received on queue %s", self.receiver.source) + except proton.Timeout as e: return None if msg is None: @@ -174,6 +171,6 @@ class FromBus(Session): return message.Message(qpidMsg=msg) def ack(self, msg): - self.session.acknowledge(msg.qpidMsg()) - logging.info("[FromBus] Message ACK'ed"); + self.receiver.acknowledge(msg.qpidMsg()) + logging.info("[FromBus] Message ACK'ed") diff --git a/LCS/MessageBus/src/noqpidfallback.py b/LCS/MessageBus/src/noqpidfallback.py index 36bcfb3c0b3e16d9ec05795c5f24137000ee2e7c..98d5c6353d6cd31daa769e831ed1715bcfbe7516 100644 --- a/LCS/MessageBus/src/noqpidfallback.py +++ b/LCS/MessageBus/src/noqpidfallback.py @@ -1,28 +1,22 @@ #!/usr/bin/env python import sys -print("QPID support NOT enabled! Will NOT connect to any broker, and messages will be lost!", file=sys.stderr) - -def uuid4(): - return "<uuid>" +print("QPID support NOT enabled! Will NOT connect to any broker, and messages will be lost!") """ Exceptions. """ -class MessagingError(Exception): +class ProtonException(Exception): pass class SessionError(Exception): pass -class exceptions: - class Timeout(Exception): - pass +class Timeout(Exception): + pass - class Empty(Exception): - pass """ Messages.