From af5d36d4f35bb3d0e60bfff67cdeb48e6592ccd3 Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Fri, 13 Feb 2015 13:07:38 +0000 Subject: [PATCH] Task #7336: Keep a cached QPID message to return the same message on multiple requests --- LCS/MessageBus/src/message.py | 60 ++++++++++++++++++++--------------- LCS/MessageBus/src/msgbus.py | 7 ++-- 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/LCS/MessageBus/src/message.py b/LCS/MessageBus/src/message.py index 4b31779a59e..4746d3238de 100644 --- a/LCS/MessageBus/src/message.py +++ b/LCS/MessageBus/src/message.py @@ -18,6 +18,7 @@ import qpid.messaging import xml.dom.minidom as xml +import xml.parsets.expat as expat import datetime LOFAR_MSG_TEMPLATE = """ @@ -58,28 +59,42 @@ def _uuid(): """ return str(qpid.messaging.uuid4()) +class MessageException(Exception): + pass + class Message(object): def __init__(self, from_="", forUser="", summary="", protocol="", protocolVersion="", momid="", sasid="", qpidMsg=None): - self.document = xml.parseString(LOFAR_MSG_TEMPLATE) for name, element in self._property_list().iteritems(): self._add_property(name, element) - # Set properties provided by constructor - self.system = "LOFAR" - self.headerVersion = "1.0.0" - self.protocol = protocol - self.protocolVersion = protocolVersion - self.from_ = from_ - self.forUser = forUser - self.summary = summary - self.uuid = _uuid() - self.timestamp = _timestamp() - self.momid = momid - self.sasid = sasid - - if qpidMsg is not None: - self.setQpidMsg(qpidMsg) + if qpidMsg is None: + self.document = xml.parseString(LOFAR_MSG_TEMPLATE) + self.qpidMsg = qpid.messaging.Message(content_type="text/plain", durable=True) + + # Set properties provided by constructor + self.system = "LOFAR" + self.headerVersion = "1.0.0" + self.protocol = protocol + self.protocolVersion = protocolVersion + self.from_ = from_ + self.forUser = forUser + self.summary = summary + self.uuid = _uuid() + self.timestamp = _timestamp() + self.momid = momid + self.sasid = sasid + else: + self.qpidMsg = qpidMsg + + # Set properties by provided qpidMsg + try: + # Replace literal << in the content, which is occasionally inserted by the C++ + # code as part of the Parset ("Observation.Clock=<<Clock200") + self.document = xml.parseString(qpidMsg.content.replace("<<","<<")) + except expat.ExpatError, e: + print "Could not parse XML message content: ", e, msg.content + raise MessageException(e) def _add_property(self, name, element): def getter(self): @@ -127,17 +142,10 @@ class Message(object): "payload : %s\n" % (self.payload,) ) - def qpidMsg(self): - """ Return a QPID message with the configured content. """ - - msg = qpid.messaging.Message(content=self.document.toxml(), content_type="text/plain", durable=True) - - return msg - - def setQpidMsg(self, msg): - """ Use a QPID message to set the content. """ + def generate_message(self): + """ Construct the QPID message content. """ - self.document = xml.parseString(msg.getContent()) + self.qpidMsg.content = self.document.toxml() """ XML support functions. See also lofarpipe/support/xmllogging.py. """ diff --git a/LCS/MessageBus/src/msgbus.py b/LCS/MessageBus/src/msgbus.py index 64795acdc23..dede39f3c6a 100644 --- a/LCS/MessageBus/src/msgbus.py +++ b/LCS/MessageBus/src/msgbus.py @@ -62,8 +62,11 @@ class ToBus(Session): self.connection.close() def send(self, msg): + # (Re)generate the QPID message body + msg.generate_message() + try: - self.sender.send(msg.qpidMsg()) + self.sender.send(msg.qpidMsg) except qpid.messaging.SessionError, m: raise BusException(m) @@ -97,5 +100,5 @@ class FromBus(Session): return Message(qpidMsg=msg) def ack(self, msg): - self.session.acknowledge(msg.qpidMsg()) + self.session.acknowledge(msg.qpidMsg) -- GitLab