Skip to content
Snippets Groups Projects
Commit cdd7cace authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-516: fixed (de)desrialization of datetimes and large strings in messages...

SW-516: fixed (de)desrialization of datetimes and large strings in messages now that we're using python3 and proton
parent 4f33536b
No related branches found
No related tags found
No related merge requests found
......@@ -28,8 +28,8 @@ Provide an easy way exchange messages on the message bus.
from lofar.messaging.exceptions import MessageBusError, MessageFactoryError
from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY
from lofar.common.util import raise_exception
from lofar.common.util import convertStringValuesToBuffer, convertBufferValuesToString
from lofar.common.util import raise_exception, is_iterable
from lofar.common.datetimeutils import to_milliseconds_since_unix_epoch, from_milliseconds_since_unix_epoch
import proton
import proton.utils
......@@ -38,6 +38,7 @@ import logging
import sys
import uuid
import threading
from datetime import datetime
from copy import deepcopy
import re
......@@ -213,25 +214,23 @@ class FromBus(object):
raise_exception(MessageBusError,
"[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e))
try:
if isinstance(msg.body, dict):
#qpid cannot handle strings longer than 64k within dicts
#so each string was converted to a buffer which qpid can fit in 2^32-1 bytes
#and now we convert it back on this end
msg.body = convertBufferValuesToString(msg.body)
except MessageFactoryError:
# convert proton.timestamps back to datetimes
msg.body = _convert_timestamps_to_datetimes(msg.body)
except Exception as e:
self.reject(msg)
raise_exception(MessageBusError, "[FromBus] Message rejected")
raise_exception(MessageBusError, "[FromBus] Message rejected. Error=%s".format(e))
logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject))
if logDebugMessages:
logger.debug("[FromBus] %s" % msg)
try:
amsg = MESSAGE_FACTORY.create(msg)
except MessageFactoryError:
except MessageFactoryError as mfe:
self.reject(msg)
raise_exception(MessageBusError, "[FromBus] Message rejected")
raise_exception(MessageBusError, "[FromBus] Message rejected. Error=%s".format(mfe))
self.ack(msg)
return amsg
......@@ -472,29 +471,23 @@ class ToBus(object):
sender = self._get_sender()
qmsg = to_qpid_message(message)
if isinstance(qmsg.body, dict):
#qpid cannot handle strings longer than 64k within dicts
#so convert each string to a buffer which qpid can fit in 2^32-1 bytes
#convert it back on the other end
# --- JK, Python3 change:
# We used to have a deep copy of the message before altering the strings, but we can't do that any more.
# I commented it out. Why was it even required? I don't see any side effects from that?
# In Py3, deepcopy raises: (TypeError: object.__new__(SwigPyObject) is not safe, use SwigPyObject.__new__())
# ---
# make copy of qmsg first, because we are modifying the contents, and we don't want any side effects
# qmsg = deepcopy(qmsg)
qmsg.body = convertStringValuesToBuffer(qmsg.body, 65535)
# convert datetimes in (nested) dicts/lists to proton.timestamps,
# convert them back on the other end
# make copy of qmsg.body first, because we are modifying the contents, and we don't want any side effects
qmsg_body_original = deepcopy(qmsg.body)
qmsg.body = _convert_datetimes_to_timestamps(qmsg.body)
logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg)
try:
if hasattr(self, 'subject') and self.subject:
qmsg.subject = self.subject
sender.send(qmsg, timeout=timeout)
except proton.ProtonException:
raise_exception(MessageBusError,
"[ToBus] Failed to send message to: %s" %
sender.target)
except proton.ProtonException as pe:
raise_exception(MessageBusError, "[ToBus] Failed to send message to: %s error=%s" % (sender.target, pe))
finally:
# restore the original body (in case it was modified)
qmsg.body = qmsg_body_original
logger.debug("[ToBus] Message sent to: %s subject: %s" % (self.address, qmsg.subject))
......@@ -544,11 +537,11 @@ class TemporaryQueue(object):
Open/create the temporary queue.
It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
"""
logger.info("Creating TemporaryQueue...")
logger.debug("Creating TemporaryQueue...")
connection = proton.utils.BlockingConnection(self.broker)
self._dynamic_receiver = connection.create_receiver(address=None, dynamic=True, name=self.name)
self.address = self._dynamic_receiver.link.remote_source.address
logger.info("Created TemporaryQueue at %s", self.address)
logger.debug("Created TemporaryQueue at %s", self.address)
def close(self):
"""
......@@ -559,7 +552,7 @@ class TemporaryQueue(object):
self._dynamic_receiver.close()
self._dynamic_receiver.connection.close()
self._dynamic_receiver = None
logger.info("Closed TemporaryQueue at %s", self.address)
logger.debug("Closed TemporaryQueue at %s", self.address)
self.address = None
def __str__(self):
......@@ -787,5 +780,43 @@ class AbstractBusListener(object):
except Exception as e:
logger.error("finalize_loop() failed with %s", e)
def _convert_datetimes_to_timestamps(thing):
"""recursively convert python datetimes to proton timestamps"""
if isinstance(thing, dict):
return { k: _convert_datetimes_to_timestamps(v) if is_iterable(v) else
proton.timestamp(to_milliseconds_since_unix_epoch(v)) if isinstance(v, datetime) else
v
for k, v in thing.items()}
if isinstance(thing, list):
return [ _convert_datetimes_to_timestamps(v) if is_iterable(v) else
proton.timestamp(to_milliseconds_since_unix_epoch(v)) if isinstance(v, datetime) else
v
for v in thing]
if isinstance(thing, datetime):
return proton.timestamp(to_milliseconds_since_unix_epoch(thing))
return thing
def _convert_timestamps_to_datetimes(thing):
"""recursively convert proton timestamps to python datetimes"""
if isinstance(thing, dict):
return { k: _convert_timestamps_to_datetimes(v) if is_iterable(v) else
from_milliseconds_since_unix_epoch(v) if isinstance(v, proton.timestamp) else
v
for k, v in thing.items()}
if isinstance(thing, list):
return [ _convert_timestamps_to_datetimes(v) if is_iterable(v) else
from_milliseconds_since_unix_epoch(v) if isinstance(v, proton.timestamp) else
v
for v in thing ]
if isinstance(thing, proton.timestamp):
return from_milliseconds_since_unix_epoch(thing)
return thing
__all__ = ["FromBus", "ToBus", "TemporaryQueue", "AbstractBusListener"]
This diff is collapsed.
......@@ -100,3 +100,44 @@ def from_modified_julian_date_in_seconds(modified_julian_date_secs):
:return: datetime, the timestamp as python datetime
'''
return MDJ_EPOCH + timedelta(seconds=modified_julian_date_secs)
def to_seconds_since_unix_epoch(timestamp):
'''
computes the (fractional) number of seconds since the unix epoch for a python datetime.timestamp
:param timestamp: datetime a python datetime timestamp (in UTC)
:return: double, the (fractional) number of seconds since the unix epoch
'''
return totalSeconds(timestamp - datetime.utcfromtimestamp(0))
def to_milliseconds_since_unix_epoch(timestamp):
'''
computes the (fractional) number of milliseconds since the unix epoch for a python datetime.timestamp
:param timestamp: datetime a python datetime timestamp
:return: double, the (fractional) number of milliseconds since the unix epoch
'''
return 1000.0 * to_seconds_since_unix_epoch(timestamp)
def from_seconds_since_unix_epoch(nr_of_seconds_since_epoch):
'''
computes a python datetime.timestamp given the (fractional) number of seconds since the unix epoch
:param double or int, the (fractional) number of seconds since the unix epoch
:return: timestamp: datetime a python datetime timestamp (in UTC)
'''
return datetime.utcfromtimestamp(nr_of_seconds_since_epoch)
def from_milliseconds_since_unix_epoch(nr_of_milliseconds_since_epoch):
'''
computes a python datetime.timestamp given the (fractional) number of milliseconds since the unix epoch
:param double or int, the (fractional) number of milliseconds since the unix epoch
:return: timestamp: datetime a python datetime timestamp (in UTC)
'''
return from_seconds_since_unix_epoch(nr_of_milliseconds_since_epoch/1000.0)
def round_to_millisecond_precision(timestamp):
"""
returns the given timestamp rounded to the nearest millisecond
:param timestamp: datetime a python datetime timestamp
:return: the given timestamp rounded to the nearest millisecond
"""
diff_to_rounded_millisecond = timestamp.microsecond - 1000*round(timestamp.microsecond/1000)
return timestamp - timedelta(microseconds=diff_to_rounded_millisecond)
\ No newline at end of file
......@@ -139,29 +139,18 @@ def humanreadablesize(num, suffix='B', base=1000):
def convertIntKeysToString(dct):
'''recursively convert all int keys in a dict to string'''
#python2.7 using dict comprehension
#return {str(k): convertIntKeysToString(v) if isinstance(v, dict) else v for k,v in dct.items()}
#python2.6 using dict constructor and list comprehension
return dict((str(k), convertIntKeysToString(v) if isinstance(v, dict) else v) for k,v in list(dct.items()))
return {str(k): convertIntKeysToString(v) if isinstance(v, dict) else v for k,v in dct.items()}
def convertStringDigitKeysToInt(dct):
'''recursively convert all string keys which are a digit in a dict to int'''
#python2.7 using dict comprehension
#return {int(k) if isinstance(k, basestring) and k.isdigit() else k : convertStringDigitKeysToInt(v) if isinstance(v, dict) else v for k,v in dct.items()}
#python2.6 using dict constructor and list comprehension
return dict((int(k) if isinstance(k, str) and k.isdigit() else k, convertStringDigitKeysToInt(v) if isinstance(v, dict) else v) for k,v in list(dct.items()))
def convertBufferValuesToString(dct):
'''recursively convert all string values in the dict to buffer'''
return dict( (k, convertBufferValuesToString(v) if isinstance(v, dict) else str(v.tobytes(), encoding='utf8') if isinstance(v, memoryview) else v) for k,v in list(dct.items()))
def convertStringValuesToBuffer(dct, max_string_length=65535):
'''recursively convert all string values in the dict to buffer'''
# Note: After the conversion to Python3, I had to change from buffer to memoryview, and since Python3 strings don't implement the buffer interface, also convert to bytes.
return dict( (k, convertStringValuesToBuffer(v, max_string_length) if isinstance(v, dict) else (memoryview(bytes(v, 'utf8')) if (isinstance(v, str) and len(v) > max_string_length) else v)) for k,v in list(dct.items()))
return {int(k) if isinstance(k, str) and k.isdigit() else k : convertStringDigitKeysToInt(v) if isinstance(v, dict) else v for k,v in dct.items()}
def to_csv_string(values):
return ','.join(str(x) for x in values)
def is_iterable(thing):
try:
iter(thing)
return True
except TypeError:
return False
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment