Skip to content
Snippets Groups Projects
Commit af5d36d4 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #7336: Keep a cached QPID message to return the same message on multiple requests

parent 49d7cf01
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@
import qpid.messaging
import xml.dom.minidom as xml
import xml.parsets.expat as expat
import datetime
LOFAR_MSG_TEMPLATE = """
......@@ -58,28 +59,42 @@ def _uuid():
"""
return str(qpid.messaging.uuid4())
class MessageException(Exception):
pass
class Message(object):
def __init__(self, from_="", forUser="", summary="", protocol="", protocolVersion="", momid="", sasid="", qpidMsg=None):
self.document = xml.parseString(LOFAR_MSG_TEMPLATE)
for name, element in self._property_list().iteritems():
self._add_property(name, element)
# Set properties provided by constructor
self.system = "LOFAR"
self.headerVersion = "1.0.0"
self.protocol = protocol
self.protocolVersion = protocolVersion
self.from_ = from_
self.forUser = forUser
self.summary = summary
self.uuid = _uuid()
self.timestamp = _timestamp()
self.momid = momid
self.sasid = sasid
if qpidMsg is not None:
self.setQpidMsg(qpidMsg)
if qpidMsg is None:
self.document = xml.parseString(LOFAR_MSG_TEMPLATE)
self.qpidMsg = qpid.messaging.Message(content_type="text/plain", durable=True)
# Set properties provided by constructor
self.system = "LOFAR"
self.headerVersion = "1.0.0"
self.protocol = protocol
self.protocolVersion = protocolVersion
self.from_ = from_
self.forUser = forUser
self.summary = summary
self.uuid = _uuid()
self.timestamp = _timestamp()
self.momid = momid
self.sasid = sasid
else:
self.qpidMsg = qpidMsg
# Set properties by provided qpidMsg
try:
# Replace literal << in the content, which is occasionally inserted by the C++
# code as part of the Parset ("Observation.Clock=<<Clock200")
self.document = xml.parseString(qpidMsg.content.replace("<<","&lt;&lt;"))
except expat.ExpatError, e:
print "Could not parse XML message content: ", e, msg.content
raise MessageException(e)
def _add_property(self, name, element):
def getter(self):
......@@ -127,17 +142,10 @@ class Message(object):
"payload : %s\n" % (self.payload,)
)
def qpidMsg(self):
""" Return a QPID message with the configured content. """
msg = qpid.messaging.Message(content=self.document.toxml(), content_type="text/plain", durable=True)
return msg
def setQpidMsg(self, msg):
""" Use a QPID message to set the content. """
def generate_message(self):
""" Construct the QPID message content. """
self.document = xml.parseString(msg.getContent())
self.qpidMsg.content = self.document.toxml()
""" XML support functions. See also lofarpipe/support/xmllogging.py. """
......
......@@ -62,8 +62,11 @@ class ToBus(Session):
self.connection.close()
def send(self, msg):
# (Re)generate the QPID message body
msg.generate_message()
try:
self.sender.send(msg.qpidMsg())
self.sender.send(msg.qpidMsg)
except qpid.messaging.SessionError, m:
raise BusException(m)
......@@ -97,5 +100,5 @@ class FromBus(Session):
return Message(qpidMsg=msg)
def ack(self, msg):
self.session.acknowledge(msg.qpidMsg())
self.session.acknowledge(msg.qpidMsg)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment