diff --git a/LCS/MessageBus/src/message.py b/LCS/MessageBus/src/message.py index 4b31779a59e6737e7aa7b2ed38e28f9787b8f506..4746d3238de8f9d919f362f5c87659ef6f0b5ad5 100644 --- a/LCS/MessageBus/src/message.py +++ b/LCS/MessageBus/src/message.py @@ -18,6 +18,7 @@ import qpid.messaging import xml.dom.minidom as xml +import xml.parsets.expat as expat import datetime LOFAR_MSG_TEMPLATE = """ @@ -58,28 +59,42 @@ def _uuid(): """ return str(qpid.messaging.uuid4()) +class MessageException(Exception): + pass + class Message(object): def __init__(self, from_="", forUser="", summary="", protocol="", protocolVersion="", momid="", sasid="", qpidMsg=None): - self.document = xml.parseString(LOFAR_MSG_TEMPLATE) for name, element in self._property_list().iteritems(): self._add_property(name, element) - # Set properties provided by constructor - self.system = "LOFAR" - self.headerVersion = "1.0.0" - self.protocol = protocol - self.protocolVersion = protocolVersion - self.from_ = from_ - self.forUser = forUser - self.summary = summary - self.uuid = _uuid() - self.timestamp = _timestamp() - self.momid = momid - self.sasid = sasid - - if qpidMsg is not None: - self.setQpidMsg(qpidMsg) + if qpidMsg is None: + self.document = xml.parseString(LOFAR_MSG_TEMPLATE) + self.qpidMsg = qpid.messaging.Message(content_type="text/plain", durable=True) + + # Set properties provided by constructor + self.system = "LOFAR" + self.headerVersion = "1.0.0" + self.protocol = protocol + self.protocolVersion = protocolVersion + self.from_ = from_ + self.forUser = forUser + self.summary = summary + self.uuid = _uuid() + self.timestamp = _timestamp() + self.momid = momid + self.sasid = sasid + else: + self.qpidMsg = qpidMsg + + # Set properties by provided qpidMsg + try: + # Replace literal << in the content, which is occasionally inserted by the C++ + # code as part of the Parset ("Observation.Clock=<<Clock200") + self.document = xml.parseString(qpidMsg.content.replace("<<","<<")) + except expat.ExpatError, e: + print "Could not parse XML message content: ", e, msg.content + raise MessageException(e) def _add_property(self, name, element): def getter(self): @@ -127,17 +142,10 @@ class Message(object): "payload : %s\n" % (self.payload,) ) - def qpidMsg(self): - """ Return a QPID message with the configured content. """ - - msg = qpid.messaging.Message(content=self.document.toxml(), content_type="text/plain", durable=True) - - return msg - - def setQpidMsg(self, msg): - """ Use a QPID message to set the content. """ + def generate_message(self): + """ Construct the QPID message content. """ - self.document = xml.parseString(msg.getContent()) + self.qpidMsg.content = self.document.toxml() """ XML support functions. See also lofarpipe/support/xmllogging.py. """ diff --git a/LCS/MessageBus/src/msgbus.py b/LCS/MessageBus/src/msgbus.py index 64795acdc236dfcf6b20ea5eae8fb897c19bff98..dede39f3c6ae8381e5b156679b17d075f2bd3991 100644 --- a/LCS/MessageBus/src/msgbus.py +++ b/LCS/MessageBus/src/msgbus.py @@ -62,8 +62,11 @@ class ToBus(Session): self.connection.close() def send(self, msg): + # (Re)generate the QPID message body + msg.generate_message() + try: - self.sender.send(msg.qpidMsg()) + self.sender.send(msg.qpidMsg) except qpid.messaging.SessionError, m: raise BusException(m) @@ -97,5 +100,5 @@ class FromBus(Session): return Message(qpidMsg=msg) def ack(self, msg): - self.session.acknowledge(msg.qpidMsg()) + self.session.acknowledge(msg.qpidMsg)