Skip to content
Snippets Groups Projects
Commit 911e2bf5 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #8888: Fix queue options in FromBus and ToBus, and force the use of...

Task #8888: Fix queue options in FromBus and ToBus, and force the use of topics when providing Services (to prevent bugs)
parent a693f73c
No related branches found
No related tags found
No related merge requests found
......@@ -115,6 +115,13 @@ class Service(AbstractBusListener):
address = self.busname+"/"+self.service_name if self.busname else self.service_name
kwargs["exclusive"] = True #set binding to exclusive for services
# Force the use of a topic in the bus options by setting
# options["node"]["type"] = "topic"
options = kwargs.get("options", {})
options.setdefault("node", {})
options["node"]["type"] = "topic"
kwargs["options"] = options
super(Service, self).__init__(address, broker, **kwargs)
def start_listening(self, numthreads=None):
......
......@@ -45,6 +45,19 @@ DEFAULT_BROKER_OPTIONS = {'reconnect': True}
DEFAULT_RECEIVER_CAPACITY = 1
DEFAULT_TIMEOUT = 5
# Construct address options string (address options object not supported well in Python)
def address_options_to_str(opt):
if isinstance(opt, dict):
return "{%s}" % (", ".join('%s: %s' % (k,address_options_to_str(v)) for (k,v) in opt.iteritems()))
elif isinstance(opt, list):
return "[%s]" % (", ".join(address_options_to_str(v) for v in opt))
elif isinstance(opt, int):
return '%s' % (opt,)
elif isinstance(opt, bool):
return '%s' % (opt,)
else:
return '"%s"' % (opt,)
class FromBus(object):
"""
......@@ -155,14 +168,20 @@ class FromBus(object):
"""
self._check_session()
options = options if options else self.options
options.setdefault("capacity", DEFAULT_RECEIVER_CAPACITY)
what = "receiver for source: %s (broker: %s, session: %s)" % \
(address, self.broker, self.session.name)
# Extract capacity (not supported in address string in Python, see COMMON_OPTS in qpid/messaging/driver.py)
capacity = options.pop("capacity", DEFAULT_RECEIVER_CAPACITY)
optstr = address_options_to_str(options)
what = "receiver for source: %s (broker: %s, session: %s, options: %s)" % \
(address, self.broker, self.session.name, optstr)
try:
self.session.receiver(address, **options)
self.session.receiver("%s; %s" % (address, optstr), capacity=capacity)
except qpid.messaging.MessagingError:
raise_exception(MessageBusError,
"[FromBus] Failed to create %s" % what)
"[FromBus] Failed to create %s" % (what,))
logger.info("[FromBus] Created %s", what)
def receive(self, timeout=DEFAULT_TIMEOUT):
......@@ -382,13 +401,17 @@ class ToBus(object):
:raise MessageBusError: if sender could not be created
"""
self._check_session()
what = "sender for target: %s (broker: %s, session: %s)" % (
address, self.broker, self.session.name)
optstr = address_options_to_str(options)
what = "sender for source: %s (broker: %s, session: %s, options: %s)" % \
(address, self.broker, self.session.name, optstr)
try:
self.session.sender(address, **options)
self.session.sender("%s; %s" % (address, optstr))
except qpid.messaging.MessagingError:
raise_exception(MessageBusError,
"[ToBus] Failed to create %s" % what)
"[ToBus] Failed to create %s" % (what,))
logger.info("[ToBus] Created %s", what)
def send(self, message, timeout=DEFAULT_TIMEOUT):
......@@ -440,9 +463,12 @@ class AbstractBusListener(object):
# Set appropriate flags for exclusive binding
if self.exclusive == True:
binding_key = address.split('/')[-1]
self.frombus_options["link"] = '''{name:"%s",
x-bindings:[{key: %s,
arguments: {"qpid.exclusive-binding":True}}]}''' % (str(uuid.uuid4()), binding_key)
self.frombus_options["link"] = { "name": str(uuid.uuid4()),
"x-bindings": [ { "key": binding_key,
"arguments": { "\"qpid.exclusive-binding\"": True }
}
]
}
# only add options if it is given as a dictionary
if isinstance(options,dict):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment