Skip to content
Snippets Groups Projects
Commit bebe92b1 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #10337: Qpid, cannot (de)serialize strings > 64k in a dict. We circumvent...

Task #10337: Qpid, cannot (de)serialize strings > 64k in a dict. We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back.
parent 7739d9cb
No related branches found
No related tags found
No related merge requests found
...@@ -2698,6 +2698,9 @@ LCS/PyCommon/test/python-coverage.sh eol=lf ...@@ -2698,6 +2698,9 @@ LCS/PyCommon/test/python-coverage.sh eol=lf
LCS/PyCommon/test/t_dbcredentials.run eol=lf LCS/PyCommon/test/t_dbcredentials.run eol=lf
LCS/PyCommon/test/t_dbcredentials.sh eol=lf LCS/PyCommon/test/t_dbcredentials.sh eol=lf
LCS/PyCommon/test/t_methodtrigger.sh eol=lf LCS/PyCommon/test/t_methodtrigger.sh eol=lf
LCS/PyCommon/test/t_util.py -text
LCS/PyCommon/test/t_util.run -text
LCS/PyCommon/test/t_util.sh -text
LCS/PyCommon/util.py -text LCS/PyCommon/util.py -text
LCS/Tools/src/checkcomp.py -text LCS/Tools/src/checkcomp.py -text
LCS/Tools/src/countalllines -text LCS/Tools/src/countalllines -text
......
...@@ -29,12 +29,14 @@ Provide an easy way exchange messages on the message bus. ...@@ -29,12 +29,14 @@ Provide an easy way exchange messages on the message bus.
from lofar.messaging.exceptions import MessageBusError, MessageFactoryError from lofar.messaging.exceptions import MessageBusError, MessageFactoryError
from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY
from lofar.common.util import raise_exception from lofar.common.util import raise_exception
from lofar.common.util import convertStringValuesToBuffer, convertBufferValuesToString
import qpid.messaging import qpid.messaging
import logging import logging
import sys import sys
import uuid import uuid
import threading import threading
from copy import deepcopy
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -213,6 +215,17 @@ class FromBus(object): ...@@ -213,6 +215,17 @@ class FromBus(object):
raise_exception(MessageBusError, raise_exception(MessageBusError,
"[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e)) "[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e))
try:
if isinstance(msg.content, dict):
#qpid cannot handle strings longer than 64k within dicts
#so each string was converted to a buffer which qpid can fit in 2^32-1 bytes
#and now we convert it back on this end
msg.content = convertBufferValuesToString(msg.content)
except MessageFactoryError:
self.reject(msg)
raise_exception(MessageBusError, "[FromBus] Message rejected")
logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject)) logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject))
if logDebugMessages: if logDebugMessages:
logger.debug("[FromBus] %s" % msg) logger.debug("[FromBus] %s" % msg)
...@@ -446,6 +459,15 @@ class ToBus(object): ...@@ -446,6 +459,15 @@ class ToBus(object):
""" """
sender = self._get_sender() sender = self._get_sender()
qmsg = to_qpid_message(message) qmsg = to_qpid_message(message)
if isinstance(qmsg.content, dict):
#qpid cannot handle strings longer than 64k within dicts
#so convert each string to a buffer which qpid can fit in 2^32-1 bytes
#convert it back on the other end
#make copy of qmsg first, because we are modifying the contents, and we don't want any side effects
qmsg = deepcopy(qmsg)
qmsg.content = convertStringValuesToBuffer(qmsg.content, 65535)
logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg) logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg)
try: try:
sender.send(qmsg, timeout=timeout) sender.send(qmsg, timeout=timeout)
......
...@@ -255,9 +255,8 @@ class SendReceiveMessage(unittest.TestCase): ...@@ -255,9 +255,8 @@ class SendReceiveMessage(unittest.TestCase):
self.assertEqual( self.assertEqual(
(send_msg.SystemName, send_msg.MessageId, send_msg.MessageType), (send_msg.SystemName, send_msg.MessageId, send_msg.MessageType),
(recv_msg.SystemName, recv_msg.MessageId, recv_msg.MessageType)) (recv_msg.SystemName, recv_msg.MessageId, recv_msg.MessageType))
self.assertEqual( self.assertEqual(send_msg.content, recv_msg.content)
(send_msg.content, send_msg.content_type), self.assertEqual(send_msg.content_type, recv_msg.content_type)
(recv_msg.content, recv_msg.content_type))
def test_sendrecv_event_message(self): def test_sendrecv_event_message(self):
""" """
...@@ -291,8 +290,8 @@ class SendReceiveMessage(unittest.TestCase): ...@@ -291,8 +290,8 @@ class SendReceiveMessage(unittest.TestCase):
""" """
Test send/receive of an RequestMessage, containing a dict with a large string value. Test send/receive of an RequestMessage, containing a dict with a large string value.
Qpid, cannot (de)serialize strings > 64k in a dict Qpid, cannot (de)serialize strings > 64k in a dict
We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back.
""" """
with self.assertRaises(Exception):
content = {"key1": "short message", "key2": "long message " + (2**17)*'a'} content = {"key1": "short message", "key2": "long message " + (2**17)*'a'}
self._test_sendrecv(RequestMessage(content, reply_to=QUEUE)) self._test_sendrecv(RequestMessage(content, reply_to=QUEUE))
......
...@@ -8,3 +8,4 @@ file(COPY ...@@ -8,3 +8,4 @@ file(COPY
lofar_add_test(t_dbcredentials) lofar_add_test(t_dbcredentials)
lofar_add_test(t_methodtrigger) lofar_add_test(t_methodtrigger)
lofar_add_test(t_util)
#!/usr/bin/env python
import unittest
import tempfile
from lofar.common.util import *
def setUpModule():
pass
def tearDownModule():
pass
class TestUtils(unittest.TestCase):
def test_string_to_buffer_and_back(self):
original = 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit.'
d = { 'test-key' : original }
#print str(d)
self.assertTrue(isinstance(d['test-key'], basestring))
d2 = convertStringValuesToBuffer(d, 0)
print d2
self.assertTrue(isinstance(d2['test-key'], buffer))
d3 = convertBufferValuesToString(d2)
print d3
self.assertTrue(isinstance(d3['test-key'], basestring))
self.assertEqual(original, d3['test-key'])
#try conversion again but only for long strings
d2 = convertStringValuesToBuffer(d, 10000)
print d2
#type should still be basestring (so no conversion happened)
self.assertTrue(isinstance(d2['test-key'], basestring))
d3 = convertBufferValuesToString(d2)
print d3
#type should still be basestring (so no conversion back was needed)
self.assertTrue(isinstance(d3['test-key'], basestring))
self.assertEqual(original, d3['test-key'])
#try with nested dict
d4 = { 'outer': d }
d2 = convertStringValuesToBuffer(d4, 0)
print d2
self.assertTrue(isinstance(d2['outer']['test-key'], buffer))
d3 = convertBufferValuesToString(d2)
print d3
self.assertTrue(isinstance(d3['outer']['test-key'], basestring))
self.assertEqual(original, d3['outer']['test-key'])
def main(argv):
unittest.main()
if __name__ == "__main__":
# run all tests
import sys
main(sys.argv[1:])
#!/bin/bash
source python-coverage.sh
python_coverage_test util t_util.py
#!/bin/sh
./runctest.sh t_util
...@@ -154,5 +154,13 @@ def convertStringDigitKeysToInt(dct): ...@@ -154,5 +154,13 @@ def convertStringDigitKeysToInt(dct):
#python2.6 using dict constructor and list comprehension #python2.6 using dict constructor and list comprehension
return dict((int(k) if isinstance(k, basestring) and k.isdigit() else k, convertStringDigitKeysToInt(v) if isinstance(v, dict) else v) for k,v in dct.items()) return dict((int(k) if isinstance(k, basestring) and k.isdigit() else k, convertStringDigitKeysToInt(v) if isinstance(v, dict) else v) for k,v in dct.items())
def convertBufferValuesToString(dct):
'''recursively convert all string values in the dict to buffer'''
return dict( (k, convertBufferValuesToString(v) if isinstance(v, dict) else str(v) if isinstance(v, buffer) else v) for k,v in dct.items())
def convertStringValuesToBuffer(dct, max_string_length=65535):
'''recursively convert all string values in the dict to buffer'''
return dict( (k, convertStringValuesToBuffer(v, max_string_length) if isinstance(v, dict) else (buffer(v, 0, len(v)) if (isinstance(v, basestring) and len(v) > max_string_length) else v)) for k,v in dct.items())
def to_csv_string(values): def to_csv_string(values):
return ','.join(str(x) for x in values) return ','.join(str(x) for x in values)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment