From 911e2bf56e39506b534bc107ea8a9ea5409df46f Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Tue, 26 Jan 2016 16:03:36 +0000 Subject: [PATCH] Task #8888: Fix queue options in FromBus and ToBus, and force the use of topics when providing Services (to prevent bugs) --- LCS/Messaging/python/messaging/Service.py | 7 +++ LCS/Messaging/python/messaging/messagebus.py | 50 +++++++++++++++----- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index 9812ccefff5..6d452c821eb 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -115,6 +115,13 @@ class Service(AbstractBusListener): address = self.busname+"/"+self.service_name if self.busname else self.service_name kwargs["exclusive"] = True #set binding to exclusive for services + # Force the use of a topic in the bus options by setting + # options["node"]["type"] = "topic" + options = kwargs.get("options", {}) + options.setdefault("node", {}) + options["node"]["type"] = "topic" + kwargs["options"] = options + super(Service, self).__init__(address, broker, **kwargs) def start_listening(self, numthreads=None): diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 9f31cdefedb..fcafd2caae1 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -45,6 +45,19 @@ DEFAULT_BROKER_OPTIONS = {'reconnect': True} DEFAULT_RECEIVER_CAPACITY = 1 DEFAULT_TIMEOUT = 5 +# Construct address options string (address options object not supported well in Python) +def address_options_to_str(opt): + if isinstance(opt, dict): + return "{%s}" % (", ".join('%s: %s' % (k,address_options_to_str(v)) for (k,v) in opt.iteritems())) + elif isinstance(opt, list): + return "[%s]" % (", ".join(address_options_to_str(v) for v in opt)) + elif isinstance(opt, int): + return '%s' % (opt,) + elif isinstance(opt, bool): + return '%s' % (opt,) + else: + return '"%s"' % (opt,) + class FromBus(object): """ @@ -155,14 +168,20 @@ class FromBus(object): """ self._check_session() options = options if options else self.options - options.setdefault("capacity", DEFAULT_RECEIVER_CAPACITY) - what = "receiver for source: %s (broker: %s, session: %s)" % \ - (address, self.broker, self.session.name) + + # Extract capacity (not supported in address string in Python, see COMMON_OPTS in qpid/messaging/driver.py) + capacity = options.pop("capacity", DEFAULT_RECEIVER_CAPACITY) + + optstr = address_options_to_str(options) + + what = "receiver for source: %s (broker: %s, session: %s, options: %s)" % \ + (address, self.broker, self.session.name, optstr) + try: - self.session.receiver(address, **options) + self.session.receiver("%s; %s" % (address, optstr), capacity=capacity) except qpid.messaging.MessagingError: raise_exception(MessageBusError, - "[FromBus] Failed to create %s" % what) + "[FromBus] Failed to create %s" % (what,)) logger.info("[FromBus] Created %s", what) def receive(self, timeout=DEFAULT_TIMEOUT): @@ -382,13 +401,17 @@ class ToBus(object): :raise MessageBusError: if sender could not be created """ self._check_session() - what = "sender for target: %s (broker: %s, session: %s)" % ( - address, self.broker, self.session.name) + + optstr = address_options_to_str(options) + + what = "sender for source: %s (broker: %s, session: %s, options: %s)" % \ + (address, self.broker, self.session.name, optstr) + try: - self.session.sender(address, **options) + self.session.sender("%s; %s" % (address, optstr)) except qpid.messaging.MessagingError: raise_exception(MessageBusError, - "[ToBus] Failed to create %s" % what) + "[ToBus] Failed to create %s" % (what,)) logger.info("[ToBus] Created %s", what) def send(self, message, timeout=DEFAULT_TIMEOUT): @@ -440,9 +463,12 @@ class AbstractBusListener(object): # Set appropriate flags for exclusive binding if self.exclusive == True: binding_key = address.split('/')[-1] - self.frombus_options["link"] = '''{name:"%s", - x-bindings:[{key: %s, - arguments: {"qpid.exclusive-binding":True}}]}''' % (str(uuid.uuid4()), binding_key) + self.frombus_options["link"] = { "name": str(uuid.uuid4()), + "x-bindings": [ { "key": binding_key, + "arguments": { "\"qpid.exclusive-binding\"": True } + } + ] + } # only add options if it is given as a dictionary if isinstance(options,dict): -- GitLab