diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index ba1c9ebd582266ee8c1a85840f5358a75c1b6cec..30fc7d2bdfbebcbc386d672e7a830a5c86251e2b 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -102,7 +102,6 @@ class Service(AbstractBusListener): Initialize Service object with servicename (str) and servicehandler function. additional parameters: busname = <string> Name of the bus in case exchanges are used in stead of queues - options = <dict> Dictionary of options passed to QPID exclusive = <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True) numthreads = <int> Number of parallel threads processing messages (default: 1) verbose = <bool> Output extra logging over stdout (default: False) @@ -119,13 +118,6 @@ class Service(AbstractBusListener): address = self.busname+"/"+self.service_name if self.busname else self.service_name kwargs["exclusive"] = True #set binding to exclusive for services - # Force the use of a topic in the bus options by setting - # options["node"]["type"] = "topic" - options = kwargs.get("options", {}) - options.setdefault("node", {}) - options["node"]["type"] = "topic" - kwargs["options"] = options - # if the service_handler wants to map the 2nd part of the subject to a method # then we need to listen to <servicename>.* servicename = self.service_name+'.*' if self.use_service_methods else self.service_name diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 6de1814a546fba619837a0be6be91119eca6dfa6..31094ab9c63068ee63c2b98354745e3b1eb9cb27 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -44,26 +44,11 @@ import re logger = logging.getLogger(__name__) # Default settings for often used parameters. -DEFAULT_ADDRESS_OPTIONS = {'create': 'always'} DEFAULT_BROKER = "localhost:5672" DEFAULT_BROKER_OPTIONS = {'reconnect': True} DEFAULT_RECEIVER_CAPACITY = 128 DEFAULT_TIMEOUT = 5 -# Construct address options string (address options object not supported well in Python) -def address_options_to_str(opt): - if isinstance(opt, dict): - return "{%s}" % (", ".join('%s: %s' % (k,address_options_to_str(v)) for (k,v) in opt.items())) - elif isinstance(opt, list): - return "[%s]" % (", ".join(address_options_to_str(v) for v in opt)) - elif isinstance(opt, int): - return '%s' % (opt,) - elif isinstance(opt, bool): - return '%s' % (opt,) - else: - return '"%s"' % (opt,) - - class FromBus(object): """ *** The following was true for the Py2 qpid library, not necessarily for Proton *** @@ -82,16 +67,14 @@ class FromBus(object): but that of __new__(). """ - def __init__(self, address, options=None, broker=None, broker_options=None): + def __init__(self, address, broker=None, broker_options=None): """ Initializer. :param address: valid Qpid address - :param options: valid Qpid address options, e.g. {'create': 'never'} :param broker: valid Qpid broker URL, e.g. "localhost:5672" :param broker_options: valid Qpid broker options, e.g. {'reconnect': True} """ self.address = address - self.options = options if options else DEFAULT_ADDRESS_OPTIONS self.broker = broker if broker else DEFAULT_BROKER self.broker_options = broker_options if broker_options else DEFAULT_BROKER_OPTIONS @@ -123,7 +106,7 @@ class FromBus(object): if (self.opened==0): # create sender try: - self._add_queue(self.address, self.options) + self._add_queue(self.address) except proton.ProtonException: self.__exit__(*sys.exc_info()) raise_exception(MessageBusError, "[FromBus] Receiver initialization failed") @@ -171,11 +154,10 @@ class FromBus(object): raise MessageBusError( "[FromBus] No active receiver (broker: %s)" % self.broker) - def _add_queue(self, address, options=None): + def _add_queue(self, address): """ Add a queue that you want to receive messages from. :param address: valid Qpid address - :param options: dict containing valid Qpid address options """ if address and '/' in address: @@ -184,18 +166,9 @@ class FromBus(object): subject=None logger.debug("[FromBus] Receiving from bus: %s with subject: %s" % (address, subject)) - options = options if options else self.options - - optstr = address_options_to_str(options) - - what = "receiver for source: %s (broker: %s, session: %s, options: %s)" % \ - (address, self.broker, 'unknown', optstr) + what = "receiver for source: %s (broker: %s, session: %s)" % (address, self.broker, 'unknown') try: - if options: - # todo: options=optstr) # "%s; %s" % (address, optstr), capacity=capacity) - logger.warning('[FromBus] Options are currently ignored since the switch to Proton!') - # helper class for filtering by subject class ProtonSubjectFilter(proton.reactor.Filter): def __init__(self, value): @@ -366,16 +339,14 @@ class ToBus(object): but that of __new__(). """ - def __init__(self, address, options=None, broker=None, broker_options=None): + def __init__(self, address, broker=None, broker_options=None): """ Initializer. :param address: valid Qpid address - :param options: valid Qpid address options, e.g. {'create': 'never'} :param broker: valid Qpid broker URL, e.g. "localhost:5672" :param broker_options: valid Qpid broker options, e.g. {'reconnect': True} """ self.address = address - self.options = options if options else DEFAULT_ADDRESS_OPTIONS self.broker = broker if broker else DEFAULT_BROKER self.broker_options = broker_options if broker_options else DEFAULT_BROKER_OPTIONS @@ -395,7 +366,7 @@ class ToBus(object): def open(self): if (self.opened==0): try: - self._add_queue(self.address, self.options) + self._add_queue(self.address) except proton.ProtonException: self.__exit__(*sys.exc_info()) raise_exception(MessageBusError, "[ToBus] Sender initialization failed") @@ -414,20 +385,6 @@ class ToBus(object): :raise MessageBusError: if any of the above actions failed. :return: self """ - """ - try: - self.connection.open() - logger.debug("[ToBus] Connected to broker: %s", self.broker) - self.session = self.connection.session() - logger.debug("[ToBus] Created session: %s", self.session.name) - self._add_queue(self.address, self.options) - except qpid.messaging.MessagingError: - self.__exit__(*sys.exc_info()) - raise_exception(MessageBusError, "[ToBus] Initialization failed") - except MessageBusError: - self.__exit__(*sys.exc_info()) - raise - """ self.open() return self @@ -482,11 +439,10 @@ class ToBus(object): # raise MessageBusError("[ToBus] %s (broker: %s, session %s)" % # (msg, self.broker, self.session)) - def _add_queue(self, address, options): + def _add_queue(self, address): """ Add a queue that you want to sends messages to. :param address: valid Qpid address - :param options: dict containing valid Qpid address options :raise MessageBusError: if sender could not be created """ @@ -495,17 +451,11 @@ class ToBus(object): else: self.subject = None - optstr = address_options_to_str(options) - - what = "sender for source: %s (broker: %s, session: %s, options: %s)" % \ - (address, self.broker, 'unknown', optstr) + what = "sender for source: %s (broker: %s, session: %s)" % (address, self.broker, 'unknown') try: if hasattr(self, 'sender') and self.sender is not None: raise_exception(MessageBusError, "[ToBus] More than one sender") - if options: - # todo: create sender with options -> "%s; %s" % (address, optstr)) - logger.warning('[FromBus] Options are currently ignored since the switch to Proton!') self.sender = self.connection.create_sender(address=address) except proton.ProtonException: raise_exception(MessageBusError, @@ -612,6 +562,9 @@ class TemporaryQueue(object): logger.info("Closed TemporaryQueue at %s", self.address) self.address = None + def __str__(self): + return "TemporaryQueue address=%s".format(self.address) + def create_frombus(self, subject=None): """ Factory method to create a FromBus instance which is connected to this TemporaryQueue @@ -640,7 +593,6 @@ class AbstractBusListener(object): Initialize AbstractBusListener object with address (str). :param address: valid Qpid address additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID exclusive= <bool> Create an exclusive binding so no other listeners can consume duplicate messages (default: False) numthreads= <int> Number of parallel threads processing messages (default: 1) verbose= <bool> Output extra logging over stdout (default: False) @@ -652,14 +604,14 @@ class AbstractBusListener(object): self.exclusive = kwargs.pop("exclusive", False) self._numthreads = kwargs.pop("numthreads", 1) self.verbose = kwargs.pop("verbose", False) - self.frombus_options = {"capacity": self._numthreads*20} - options = kwargs.pop("options", None) + self.frombus_options = {} if len(kwargs): raise AttributeError("Unexpected argument passed to AbstractBusListener constructor: %s", kwargs) # Set appropriate flags for exclusive binding if self.exclusive == True: + logger.warning("exclusive binding is not supported yet since our switch to proton") binding_key = address.split('/')[-1] self.frombus_options["link"] = { "name": str(uuid.uuid4()), "x-bindings": [ { "key": binding_key, @@ -668,11 +620,6 @@ class AbstractBusListener(object): ] } - # only add options if it is given as a dictionary - if isinstance(options,dict): - for key,val in options.items(): - self.frombus_options[key] = val - def _debug(self, txt): """ Internal use only. diff --git a/LCS/Messaging/python/messaging/messages.py b/LCS/Messaging/python/messaging/messages.py index 838b2882ee2fe2183b93eb4d41ed4e53c15e750c..ca6609bbb0facb65d47ea0631bcdc2763aafaa8f 100644 --- a/LCS/Messaging/python/messaging/messages.py +++ b/LCS/Messaging/python/messaging/messages.py @@ -329,7 +329,6 @@ class CommandMessage(LofarMessage): self.durable = True self.context=context self.recipients=recipients - self.subject='command', MESSAGE_FACTORY.register("EventMessage", EventMessage) diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index d1517d1ef7c71a8decbba9ee1b5b3b32086afcfe..90f16ece4aacd59428f8552f92c883d42e3198d7 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -303,7 +303,7 @@ with TemporaryQueue("t_messagebus") as test_queue: with ToBus(test_queue.address) as tobus: regexp = re.escape("[ToBus] More than one sender") with self.assertRaisesRegex(MessageBusError, regexp): - tobus._add_queue(test_queue.address, {}) + tobus._add_queue(test_queue.address) def test_send_invalid_message_raises(self): """