From bebe92b10c0d70d43c5be0606b1c717c2d19a4b5 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 6 Feb 2017 16:35:27 +0000 Subject: [PATCH] Task #10337: Qpid, cannot (de)serialize strings > 64k in a dict. We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back. --- .gitattributes | 3 + LCS/Messaging/python/messaging/messagebus.py | 22 +++++++ .../python/messaging/test/t_messagebus.py | 11 ++-- LCS/PyCommon/test/CMakeLists.txt | 1 + LCS/PyCommon/test/t_util.py | 60 +++++++++++++++++++ LCS/PyCommon/test/t_util.run | 4 ++ LCS/PyCommon/test/t_util.sh | 2 + LCS/PyCommon/util.py | 8 +++ 8 files changed, 105 insertions(+), 6 deletions(-) create mode 100644 LCS/PyCommon/test/t_util.py create mode 100755 LCS/PyCommon/test/t_util.run create mode 100755 LCS/PyCommon/test/t_util.sh diff --git a/.gitattributes b/.gitattributes index 1c7c65e9820..fa5a341f4f9 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2698,6 +2698,9 @@ LCS/PyCommon/test/python-coverage.sh eol=lf LCS/PyCommon/test/t_dbcredentials.run eol=lf LCS/PyCommon/test/t_dbcredentials.sh eol=lf LCS/PyCommon/test/t_methodtrigger.sh eol=lf +LCS/PyCommon/test/t_util.py -text +LCS/PyCommon/test/t_util.run -text +LCS/PyCommon/test/t_util.sh -text LCS/PyCommon/util.py -text LCS/Tools/src/checkcomp.py -text LCS/Tools/src/countalllines -text diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 2b034e092e5..d56ab7a6d7c 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -29,12 +29,14 @@ Provide an easy way exchange messages on the message bus. from lofar.messaging.exceptions import MessageBusError, MessageFactoryError from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY from lofar.common.util import raise_exception +from lofar.common.util import convertStringValuesToBuffer, convertBufferValuesToString import qpid.messaging import logging import sys import uuid import threading +from copy import deepcopy logger = logging.getLogger(__name__) @@ -213,6 +215,17 @@ class FromBus(object): raise_exception(MessageBusError, "[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e)) + try: + if isinstance(msg.content, dict): + #qpid cannot handle strings longer than 64k within dicts + #so each string was converted to a buffer which qpid can fit in 2^32-1 bytes + #and now we convert it back on this end + msg.content = convertBufferValuesToString(msg.content) + except MessageFactoryError: + self.reject(msg) + raise_exception(MessageBusError, "[FromBus] Message rejected") + + logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject)) if logDebugMessages: logger.debug("[FromBus] %s" % msg) @@ -446,6 +459,15 @@ class ToBus(object): """ sender = self._get_sender() qmsg = to_qpid_message(message) + + if isinstance(qmsg.content, dict): + #qpid cannot handle strings longer than 64k within dicts + #so convert each string to a buffer which qpid can fit in 2^32-1 bytes + #convert it back on the other end + #make copy of qmsg first, because we are modifying the contents, and we don't want any side effects + qmsg = deepcopy(qmsg) + qmsg.content = convertStringValuesToBuffer(qmsg.content, 65535) + logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg) try: sender.send(qmsg, timeout=timeout) diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index bd490c4edef..b7bc2ec0dea 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -255,9 +255,8 @@ class SendReceiveMessage(unittest.TestCase): self.assertEqual( (send_msg.SystemName, send_msg.MessageId, send_msg.MessageType), (recv_msg.SystemName, recv_msg.MessageId, recv_msg.MessageType)) - self.assertEqual( - (send_msg.content, send_msg.content_type), - (recv_msg.content, recv_msg.content_type)) + self.assertEqual(send_msg.content, recv_msg.content) + self.assertEqual(send_msg.content_type, recv_msg.content_type) def test_sendrecv_event_message(self): """ @@ -291,10 +290,10 @@ class SendReceiveMessage(unittest.TestCase): """ Test send/receive of an RequestMessage, containing a dict with a large string value. Qpid, cannot (de)serialize strings > 64k in a dict + We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back. """ - with self.assertRaises(Exception): - content = {"key1": "short message", "key2": "long message " + (2**17)*'a'} - self._test_sendrecv(RequestMessage(content, reply_to=QUEUE)) + content = {"key1": "short message", "key2": "long message " + (2**17)*'a'} + self._test_sendrecv(RequestMessage(content, reply_to=QUEUE)) if __name__ == '__main__': diff --git a/LCS/PyCommon/test/CMakeLists.txt b/LCS/PyCommon/test/CMakeLists.txt index 79c9b43bfa8..a9d316cf73a 100644 --- a/LCS/PyCommon/test/CMakeLists.txt +++ b/LCS/PyCommon/test/CMakeLists.txt @@ -8,3 +8,4 @@ file(COPY lofar_add_test(t_dbcredentials) lofar_add_test(t_methodtrigger) +lofar_add_test(t_util) diff --git a/LCS/PyCommon/test/t_util.py b/LCS/PyCommon/test/t_util.py new file mode 100644 index 00000000000..f388807a136 --- /dev/null +++ b/LCS/PyCommon/test/t_util.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python + +import unittest +import tempfile +from lofar.common.util import * + +def setUpModule(): + pass + +def tearDownModule(): + pass + +class TestUtils(unittest.TestCase): + def test_string_to_buffer_and_back(self): + original = 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit.' + + d = { 'test-key' : original } + #print str(d) + self.assertTrue(isinstance(d['test-key'], basestring)) + + d2 = convertStringValuesToBuffer(d, 0) + print d2 + self.assertTrue(isinstance(d2['test-key'], buffer)) + + d3 = convertBufferValuesToString(d2) + print d3 + self.assertTrue(isinstance(d3['test-key'], basestring)) + self.assertEqual(original, d3['test-key']) + + #try conversion again but only for long strings + d2 = convertStringValuesToBuffer(d, 10000) + print d2 + #type should still be basestring (so no conversion happened) + self.assertTrue(isinstance(d2['test-key'], basestring)) + + d3 = convertBufferValuesToString(d2) + print d3 + #type should still be basestring (so no conversion back was needed) + self.assertTrue(isinstance(d3['test-key'], basestring)) + self.assertEqual(original, d3['test-key']) + + #try with nested dict + d4 = { 'outer': d } + + d2 = convertStringValuesToBuffer(d4, 0) + print d2 + self.assertTrue(isinstance(d2['outer']['test-key'], buffer)) + + d3 = convertBufferValuesToString(d2) + print d3 + self.assertTrue(isinstance(d3['outer']['test-key'], basestring)) + self.assertEqual(original, d3['outer']['test-key']) + +def main(argv): + unittest.main() + +if __name__ == "__main__": + # run all tests + import sys + main(sys.argv[1:]) diff --git a/LCS/PyCommon/test/t_util.run b/LCS/PyCommon/test/t_util.run new file mode 100755 index 00000000000..ba348910d65 --- /dev/null +++ b/LCS/PyCommon/test/t_util.run @@ -0,0 +1,4 @@ +#!/bin/bash +source python-coverage.sh + +python_coverage_test util t_util.py diff --git a/LCS/PyCommon/test/t_util.sh b/LCS/PyCommon/test/t_util.sh new file mode 100755 index 00000000000..02ec67618d5 --- /dev/null +++ b/LCS/PyCommon/test/t_util.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./runctest.sh t_util diff --git a/LCS/PyCommon/util.py b/LCS/PyCommon/util.py index f6f0847ab25..813b22f091d 100644 --- a/LCS/PyCommon/util.py +++ b/LCS/PyCommon/util.py @@ -154,5 +154,13 @@ def convertStringDigitKeysToInt(dct): #python2.6 using dict constructor and list comprehension return dict((int(k) if isinstance(k, basestring) and k.isdigit() else k, convertStringDigitKeysToInt(v) if isinstance(v, dict) else v) for k,v in dct.items()) +def convertBufferValuesToString(dct): + '''recursively convert all string values in the dict to buffer''' + return dict( (k, convertBufferValuesToString(v) if isinstance(v, dict) else str(v) if isinstance(v, buffer) else v) for k,v in dct.items()) + +def convertStringValuesToBuffer(dct, max_string_length=65535): + '''recursively convert all string values in the dict to buffer''' + return dict( (k, convertStringValuesToBuffer(v, max_string_length) if isinstance(v, dict) else (buffer(v, 0, len(v)) if (isinstance(v, basestring) and len(v) > max_string_length) else v)) for k,v in dct.items()) + def to_csv_string(values): return ','.join(str(x) for x in values) -- GitLab