diff --git a/.gitattributes b/.gitattributes index 1c7c65e98205284c4cc6c3e72496cdb43cf61707..fa5a341f4f9f6731636a4e03b3c0352f3a3573b4 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2698,6 +2698,9 @@ LCS/PyCommon/test/python-coverage.sh eol=lf LCS/PyCommon/test/t_dbcredentials.run eol=lf LCS/PyCommon/test/t_dbcredentials.sh eol=lf LCS/PyCommon/test/t_methodtrigger.sh eol=lf +LCS/PyCommon/test/t_util.py -text +LCS/PyCommon/test/t_util.run -text +LCS/PyCommon/test/t_util.sh -text LCS/PyCommon/util.py -text LCS/Tools/src/checkcomp.py -text LCS/Tools/src/countalllines -text diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 2b034e092e50782082e44abb719afeaf1db98266..d56ab7a6d7c3b47f7e9154f4d8c13cdc1b739834 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -29,12 +29,14 @@ Provide an easy way exchange messages on the message bus. from lofar.messaging.exceptions import MessageBusError, MessageFactoryError from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY from lofar.common.util import raise_exception +from lofar.common.util import convertStringValuesToBuffer, convertBufferValuesToString import qpid.messaging import logging import sys import uuid import threading +from copy import deepcopy logger = logging.getLogger(__name__) @@ -213,6 +215,17 @@ class FromBus(object): raise_exception(MessageBusError, "[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e)) + try: + if isinstance(msg.content, dict): + #qpid cannot handle strings longer than 64k within dicts + #so each string was converted to a buffer which qpid can fit in 2^32-1 bytes + #and now we convert it back on this end + msg.content = convertBufferValuesToString(msg.content) + except MessageFactoryError: + self.reject(msg) + raise_exception(MessageBusError, "[FromBus] Message rejected") + + logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject)) if logDebugMessages: logger.debug("[FromBus] %s" % msg) @@ -446,6 +459,15 @@ class ToBus(object): """ sender = self._get_sender() qmsg = to_qpid_message(message) + + if isinstance(qmsg.content, dict): + #qpid cannot handle strings longer than 64k within dicts + #so convert each string to a buffer which qpid can fit in 2^32-1 bytes + #convert it back on the other end + #make copy of qmsg first, because we are modifying the contents, and we don't want any side effects + qmsg = deepcopy(qmsg) + qmsg.content = convertStringValuesToBuffer(qmsg.content, 65535) + logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg) try: sender.send(qmsg, timeout=timeout) diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index bd490c4edef899147364642190e9965c283494d9..b7bc2ec0dea5647eb79cae6aa9d56f510bafc598 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -255,9 +255,8 @@ class SendReceiveMessage(unittest.TestCase): self.assertEqual( (send_msg.SystemName, send_msg.MessageId, send_msg.MessageType), (recv_msg.SystemName, recv_msg.MessageId, recv_msg.MessageType)) - self.assertEqual( - (send_msg.content, send_msg.content_type), - (recv_msg.content, recv_msg.content_type)) + self.assertEqual(send_msg.content, recv_msg.content) + self.assertEqual(send_msg.content_type, recv_msg.content_type) def test_sendrecv_event_message(self): """ @@ -291,10 +290,10 @@ class SendReceiveMessage(unittest.TestCase): """ Test send/receive of an RequestMessage, containing a dict with a large string value. Qpid, cannot (de)serialize strings > 64k in a dict + We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back. """ - with self.assertRaises(Exception): - content = {"key1": "short message", "key2": "long message " + (2**17)*'a'} - self._test_sendrecv(RequestMessage(content, reply_to=QUEUE)) + content = {"key1": "short message", "key2": "long message " + (2**17)*'a'} + self._test_sendrecv(RequestMessage(content, reply_to=QUEUE)) if __name__ == '__main__': diff --git a/LCS/PyCommon/test/CMakeLists.txt b/LCS/PyCommon/test/CMakeLists.txt index 79c9b43bfa8f22c32c1e1d4278b3ff298192a581..a9d316cf73ad82904eff010202ea2b7adc12c092 100644 --- a/LCS/PyCommon/test/CMakeLists.txt +++ b/LCS/PyCommon/test/CMakeLists.txt @@ -8,3 +8,4 @@ file(COPY lofar_add_test(t_dbcredentials) lofar_add_test(t_methodtrigger) +lofar_add_test(t_util) diff --git a/LCS/PyCommon/test/t_util.py b/LCS/PyCommon/test/t_util.py new file mode 100644 index 0000000000000000000000000000000000000000..f388807a136b4cae24a58bdb444a0e4a7c675b0e --- /dev/null +++ b/LCS/PyCommon/test/t_util.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python + +import unittest +import tempfile +from lofar.common.util import * + +def setUpModule(): + pass + +def tearDownModule(): + pass + +class TestUtils(unittest.TestCase): + def test_string_to_buffer_and_back(self): + original = 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit.' + + d = { 'test-key' : original } + #print str(d) + self.assertTrue(isinstance(d['test-key'], basestring)) + + d2 = convertStringValuesToBuffer(d, 0) + print d2 + self.assertTrue(isinstance(d2['test-key'], buffer)) + + d3 = convertBufferValuesToString(d2) + print d3 + self.assertTrue(isinstance(d3['test-key'], basestring)) + self.assertEqual(original, d3['test-key']) + + #try conversion again but only for long strings + d2 = convertStringValuesToBuffer(d, 10000) + print d2 + #type should still be basestring (so no conversion happened) + self.assertTrue(isinstance(d2['test-key'], basestring)) + + d3 = convertBufferValuesToString(d2) + print d3 + #type should still be basestring (so no conversion back was needed) + self.assertTrue(isinstance(d3['test-key'], basestring)) + self.assertEqual(original, d3['test-key']) + + #try with nested dict + d4 = { 'outer': d } + + d2 = convertStringValuesToBuffer(d4, 0) + print d2 + self.assertTrue(isinstance(d2['outer']['test-key'], buffer)) + + d3 = convertBufferValuesToString(d2) + print d3 + self.assertTrue(isinstance(d3['outer']['test-key'], basestring)) + self.assertEqual(original, d3['outer']['test-key']) + +def main(argv): + unittest.main() + +if __name__ == "__main__": + # run all tests + import sys + main(sys.argv[1:]) diff --git a/LCS/PyCommon/test/t_util.run b/LCS/PyCommon/test/t_util.run new file mode 100755 index 0000000000000000000000000000000000000000..ba348910d65e902f142c7f10c6488b557b0f6e77 --- /dev/null +++ b/LCS/PyCommon/test/t_util.run @@ -0,0 +1,4 @@ +#!/bin/bash +source python-coverage.sh + +python_coverage_test util t_util.py diff --git a/LCS/PyCommon/test/t_util.sh b/LCS/PyCommon/test/t_util.sh new file mode 100755 index 0000000000000000000000000000000000000000..02ec67618d566c80a229bb26eaebb7e9cc9d2b5b --- /dev/null +++ b/LCS/PyCommon/test/t_util.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./runctest.sh t_util diff --git a/LCS/PyCommon/util.py b/LCS/PyCommon/util.py index f6f0847ab259ffda78fa23981ce16160e5b6585a..813b22f091d42fd944bb3c229d0b62846b009ae0 100644 --- a/LCS/PyCommon/util.py +++ b/LCS/PyCommon/util.py @@ -154,5 +154,13 @@ def convertStringDigitKeysToInt(dct): #python2.6 using dict constructor and list comprehension return dict((int(k) if isinstance(k, basestring) and k.isdigit() else k, convertStringDigitKeysToInt(v) if isinstance(v, dict) else v) for k,v in dct.items()) +def convertBufferValuesToString(dct): + '''recursively convert all string values in the dict to buffer''' + return dict( (k, convertBufferValuesToString(v) if isinstance(v, dict) else str(v) if isinstance(v, buffer) else v) for k,v in dct.items()) + +def convertStringValuesToBuffer(dct, max_string_length=65535): + '''recursively convert all string values in the dict to buffer''' + return dict( (k, convertStringValuesToBuffer(v, max_string_length) if isinstance(v, dict) else (buffer(v, 0, len(v)) if (isinstance(v, basestring) and len(v) > max_string_length) else v)) for k,v in dct.items()) + def to_csv_string(values): return ','.join(str(x) for x in values)