diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py index bf662fc42942349fc8bdba78fb5b4da91d684057..c2c3d1763507fa7c5df6d79876c18928f97e6d43 100644 --- a/LCS/Messaging/python/messaging/RPC.py +++ b/LCS/Messaging/python/messaging/RPC.py @@ -79,6 +79,7 @@ class RPC(): """ def __init__(self, service_name: str, busname: str, timeout: int=DEFAULT_TIMEOUT, broker: str=DEFAULT_BROKER): + self._busname = busname self._service_name = service_name self._timeout = timeout self._broker = broker @@ -135,6 +136,8 @@ class RPC(): timeout) with TemporaryQueue(name_prefix=self._service_name + "-reply", + exchange_name=self._busname, + addressed_to_me_only=True, broker=self._broker) as tmp_reply_queue: with tmp_reply_queue.create_frombus() as reply_receiver: request_msg = RequestMessage(content=Content, diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index a53fe389b8f962303073785275cee52ad578ab3d..8386147771015c482c3ae0b8694976ddca0e8a9b 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -96,7 +96,7 @@ class Service(AbstractBusListener): def __init__(self, service_name: str, service_handler: MessageHandlerInterface, - exchange_name: str, + busname: str, num_threads: int = 1, broker: str = DEFAULT_BROKER, **kwargs): @@ -124,7 +124,7 @@ class Service(AbstractBusListener): if not isinstance(self.service_handler, MessageHandlerInterface): raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") - super(Service, self).__init__(exchange_name=exchange_name, + super(Service, self).__init__(exchange_name=busname, routing_key="%s.#" % (self.service_name,), broker=broker, num_threads=num_threads) @@ -140,8 +140,8 @@ class Service(AbstractBusListener): # send the result to the RPC client try: - # send the msg to the reply_to_address directly (no subject_based_routing) - with ToBus(reply_to_address, broker=self.broker, subject_based_routing=False) as reply_to_sender: + # send the msg to the common exchange, and use reply_to_address as subject for routing it to the reply queue. + with ToBus(self.exchange_name, broker=self.broker) as reply_to_sender: reply_to_sender.send(reply_msg) except MessagingError as e: logger.error("Failed to send reply messgage to %s. Error: %s", reply_to_address, e) diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index caec8dad3ccc74bc4dce643433af395c1cbf15cd..5f03cfbb8a6006fb5e9c2d897ba7d0ddc56046bf 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -487,17 +487,17 @@ class ToBus(_AbstractBus): raise_exception(MessagingError, "[ToBus] Failed to send message to: %s error=%s" % (self.address, e)) -class TemporaryExchange(object): +class TemporaryExchange: """ A TemporaryExchange instance can be used to setup a dynamic temporary exchange which is closed and deleted automagically when leaving context. """ - def __init__(self, name, broker=DEFAULT_BROKER): + def __init__(self, name_prefix: str, broker: str=DEFAULT_BROKER): """ Create a TemporaryExchange instance with an optional name on the given broker. - :param name: name, which is part of the final address which also includes a uuid. - :param broker: the qpid broker to connect to. + :param name_prefix: prefix for the final address which also includes a uuid. + :param broker: the message broker to connect to. """ - self.name = name + self._name_prefix = name_prefix self.broker = broker self._tmp_exchange = None self.address = None @@ -522,7 +522,7 @@ class TemporaryExchange(object): It is advised to use the TemporaryExchange instance in a 'with' context, which guarantees the close call. """ # create an identifiable address based on the given name which is also (almost) unique, and readable. - self.address = "%s-tmp-exchange-%s" % (self.name, uuid.uuid4().hex[:8]) + self.address = "%s-tmp-exchange-%s" % (self._name_prefix, uuid.uuid4().hex[:8]) logger.debug("Creating TemporaryExchange at %s ...", self.address) create_exchange(name=self.address, broker=self.broker) logger.debug("Created TemporaryExchange at %s", self.address) @@ -550,7 +550,7 @@ class TemporaryExchange(object): """ return ToBus(broker=self.broker, address=self.address) -class TemporaryQueue(object): +class TemporaryQueue: """ A TemporaryQueue instance can be used to setup a dynamic temporary queue which is closed and deleted automagically when leaving context. Together with the factory methods create_frombus and/or create_tobus it gives us to following simple but often used use case: @@ -566,15 +566,29 @@ class TemporaryQueue(object): Alternative use cases with only a tobus or only a frombus on the tmp_queue are also possible. """ - def __init__(self, name_prefix=None, broker=DEFAULT_BROKER): + def __init__(self, name_prefix: str=None, exchange_name: str=None, + routing_key: str="#", + addressed_to_me_only: bool = False, + broker=DEFAULT_BROKER): """ Create a TemporaryQueue instance with an optional name on the given broker. - :param name_prefix: Optional name, which is part of the final address which also includes a uuid. - :param broker: the qpid broker to connect to. - """ - self.name_prefix = name_prefix + :param name_prefix: Optional prefix for the final address which also includes a uuid. + :param exchange_name: Optional exchange name to bind this queue to (with the given routing_key). + If the exchange does not exist it is created and deleted automagically. + :param routing_key: Optional routing_key for binding this queue to the given exchange_name. + If "#" (the default), then route all messages to this queue. + This routing_key can be overruled by addressed_to_me_only. + :param addressed_to_me_only: If True then apply the tmp-queue's address as binding routing key, + so only messages for this queue are routed to this queue. + This overrules the given routing_key parameter. + :param broker: the messaging broker to connect to. + """ + self._name_prefix = name_prefix self.broker = broker - self._tmp_queue = None + self._bound_exchange_name = exchange_name + self._routing_key = routing_key + self._addressed_to_me_only = addressed_to_me_only + self._created_exchange: bool = False self.address = None def __enter__(self): @@ -597,10 +611,32 @@ class TemporaryQueue(object): It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call. """ # create an identifiable address based on the given name which is also (almost) unique, and readable. - self.address = "%s-tmp-queue-%s" % (self.name_prefix, uuid.uuid4().hex[:8]) + self.address = "%s-tmp-queue-%s" % (self._name_prefix, uuid.uuid4().hex[:8]) logger.debug("Creating TemporaryQueue at %s ...", self.address) - create_bound_queue(exchange_name=self.address, queue_name=self.address, routing_key="#", broker=self.broker) - logger.debug("Created TemporaryQueue at %s", self.address) + + if not self._bound_exchange_name: + # if there is no exhange to bind to, + # then we create an exchange with the same name as the queue, + # and route all messages from the exchange to the queue. + # That's because messaging is designed to only publish messages to exchanges. + self._bound_exchange_name = self.address + + # create the tmp queue... + create_queue(self.address, broker=self.broker, durable=False) + + # create the exchange (if needed), and remember if we need to destoy it (if it was created) + self._created_exchange = create_exchange(self._bound_exchange_name, + broker=self.broker, durable=False) + + # and finally create the binding + # if no routing_key given, then use this tmp-queue's specific address as routing key + create_binding(exchange_name=self._bound_exchange_name, queue_name=self.address, + routing_key=self.address if self._addressed_to_me_only else self._routing_key , + broker=self.broker, durable=False) + + logger.debug("Created TemporaryQueue %s bound to exchange %s with routing_key %s", + self.address, self._bound_exchange_name, self._routing_key) + def close(self): """ @@ -613,14 +649,16 @@ class TemporaryQueue(object): except Exception as e: logger.error(e) try: - delete_exchange(self.address) + if self._created_exchange: + delete_exchange(self._bound_exchange_name) except Exception as e: logger.error(e) logger.debug("Closed TemporaryQueue at %s", self.address) self.address = None def __str__(self): - return "TemporaryQueue address=%s".format(self.address) + return "TemporaryQueue address=%s bound to exchange=%s with routing_key=%s" % ( + self.address, self._bound_exchange_name, self._routing_key) def create_frombus(self, subject=None): """ @@ -636,16 +674,16 @@ class TemporaryQueue(object): Factory method to create a ToBus instance which is connected to this TemporaryQueue :return: ToBus """ - return ToBus(broker=self.broker, address=self.address) + return ToBus(broker=self.broker, address=self._bound_exchange_name) -class AbstractBusListener(object): +class AbstractBusListener: """ AbstractBusListener class for handling messages which are received on a message bus. Typical usage is to derive from this class and implement the handle_message method with concrete logic. """ - def __init__(self, exchange_name, routing_key, num_threads=1, broker=DEFAULT_BROKER): + def __init__(self, exchange_name: str, routing_key: str, num_threads: int = 1, broker: str = DEFAULT_BROKER): """ .....TODO.... :param num_threads: the number of receiver/handler threads. Default=1, use higher number if it makes sense, @@ -665,9 +703,12 @@ class AbstractBusListener(object): # We intentionally do not remove the queue and binding upon closing this listener, so messages are stored on the broker # in the queue for this listener for later processing once the program and this listener restarts. # If you would like to have automatic cleanup of the created queue, then use the buslistener in a BusListenerJanitor's context. + sanitized_routing_key = self.routing_key.replace(".#", "").replace(".*", "").replace("#", "").replace("*", "") + if not sanitized_routing_key: + sanitized_routing_key = "all" self.queue_address = "%s.for.%s.%s" % (exchange_name, program_name(include_extension=False), - routing_key.replace(".#", "").replace(".*", "")) + sanitized_routing_key) create_bound_queue(exchange_name=exchange_name, queue_name=self.queue_address, routing_key=routing_key, broker=self.broker) def isRunning(self): diff --git a/LCS/Messaging/python/messaging/messages.py b/LCS/Messaging/python/messaging/messages.py index 94e20a3770b133fd9ed3464c4550ae9bd4ee6203..30e3a396656baec3982c085d08c16be73fea84b6 100644 --- a/LCS/Messaging/python/messaging/messages.py +++ b/LCS/Messaging/python/messaging/messages.py @@ -77,7 +77,7 @@ class LofarMessage(object): message properties and provide direct access to them. """ - def __init__(self, content=None, subject=None, priority=4, id=None): + def __init__(self, content=None, subject:str=None, priority:int=4, ttl:float=None, id=None): """Constructor. :param content: Content can either be a qpid.messaging.Message object, @@ -95,17 +95,23 @@ class LofarMessage(object): self.content = content self.subject = subject self.priority = priority + self.ttl = ttl self.id = uuid.uuid4() if id is None else uuid.UUID(id) def as_kombu_publish_kwargs(self): - return {'body':self.content, - 'priority': self.priority, - 'routing_key': self.subject, - 'headers': {'SystemName': 'LOFAR', - 'MessageId': str(self.id), - 'MessageType': self.__class__.__name__, - 'Subject': self.subject} - } + publish_kwargs = {'body':self.content, + 'priority': self.priority, + 'routing_key': self.subject, + 'headers': {'SystemName': 'LOFAR', + 'MessageId': str(self.id), + 'MessageType': self.__class__.__name__, + 'Subject': self.subject} + } + + if self.ttl: + publish_kwargs['expiration']: self.ttl + + return publish_kwargs def show(self): """ @@ -116,7 +122,7 @@ class LofarMessage(object): def __str__(self): content_str = str(self.content) - delimited_content_str = content_str if len(content_str) <= 32 else (content_str[:32] + "...") + delimited_content_str = content_str if len(content_str) <= 128 else (content_str[:128] + "...") return "%s subject=%s id=%s content=%s" % (self.__class__.__name__, self.subject, self.id, @@ -130,8 +136,8 @@ class EventMessage(LofarMessage): will be stored in a persistent queue for later delivery. """ - def __init__(self, content=None, subject=None, id=None): - super(EventMessage, self).__init__(content, subject=subject, id=id) + def __init__(self, content=None, subject:str=None, priority:int=4, ttl:float=None, id=None): + super(EventMessage, self).__init__(content, subject=subject, priority=priority, ttl=ttl, id=id) class CommandMessage(LofarMessage): """ @@ -141,8 +147,8 @@ class CommandMessage(LofarMessage): a persistent queue for later delivery. """ - def __init__(self, content=None, subject=None, id=None): - super(CommandMessage, self).__init__(content=content, subject=subject, id=id) + def __init__(self, content=None, subject:str=None, priority:int=4, ttl:float=None, id=None): + super(CommandMessage, self).__init__(content=content, subject=subject, priority=priority, ttl=ttl, id=id) class RequestMessage(LofarMessage): """ @@ -152,8 +158,8 @@ class RequestMessage(LofarMessage): """ #TODO: refactor args kwargs quirks - def __init__(self, content, reply_to, subject=None, id=None, has_args=False, has_kwargs=False): - super(RequestMessage, self).__init__(content=content, subject=subject, id=id) + def __init__(self, content, reply_to, subject:str=None, priority:int=4, ttl:float=None, id=None, has_args=False, has_kwargs=False): + super(RequestMessage, self).__init__(content=content, subject=subject, priority=priority, ttl=ttl, id=id) self.reply_to = reply_to self.has_args = has_args self.has_kwargs = has_kwargs @@ -170,8 +176,8 @@ class ReplyMessage(LofarMessage): message. These use topic exchanges and thus are routed by the 'subject' property """ - def __init__(self, content, status, subject=None, id=None, errmsg="", backtrace=None): - super(ReplyMessage, self).__init__(content=content, subject=subject, id=id) + def __init__(self, content, status, subject:str=None, priority:int=4, ttl:float=None, id=None, errmsg:str="", backtrace:str=None): + super(ReplyMessage, self).__init__(content=content, subject=subject, priority=priority, ttl=ttl, id=id) self.status = status self.errmsg = errmsg self.backtrace = backtrace diff --git a/LCS/Messaging/python/messaging/test/t_RPC.py b/LCS/Messaging/python/messaging/test/t_RPC.py index f6157086f6d41ecc151173c415424eaca3e887b9..77924e488eccdbe0648bcba1b63d463e5b718341 100644 --- a/LCS/Messaging/python/messaging/test/t_RPC.py +++ b/LCS/Messaging/python/messaging/test/t_RPC.py @@ -122,12 +122,12 @@ def main(): with TemporaryExchange("TEST") as test_exchange: # Register functions as a service handler listening at busname and ServiceName - serv1 = Service("ErrorService", ErrorFunc, exchange_name=test_exchange.address, num_threads=2) - serv2 = Service("ExceptionService", ExceptionFunc, exchange_name=test_exchange.address, num_threads=2) - serv3 = Service("StringService", StringFunc, exchange_name=test_exchange.address, num_threads=2) - serv4 = Service("ListService", ListFunc, exchange_name=test_exchange.address, num_threads=2) - serv5 = Service("DictService", DictFunc, exchange_name=test_exchange.address, num_threads=2) - serv6 = Service("TimeoutService", TimeoutFunc, exchange_name=test_exchange.address, num_threads=2) + serv1 = Service("ErrorService", ErrorFunc, busname=test_exchange.address, num_threads=2) + serv2 = Service("ExceptionService", ExceptionFunc, busname=test_exchange.address, num_threads=2) + serv3 = Service("StringService", StringFunc, busname=test_exchange.address, num_threads=2) + serv4 = Service("ListService", ListFunc, busname=test_exchange.address, num_threads=2) + serv5 = Service("DictService", DictFunc, busname=test_exchange.address, num_threads=2) + serv6 = Service("TimeoutService", TimeoutFunc, busname=test_exchange.address, num_threads=2) # 'with' sets up the connection context and defines the scope of the service. # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index 9cf58522edefc90e9d7656a553bd5eb2d98c8683..e641d9013829444a790a5d37a7b7156bd3557e29 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -95,10 +95,25 @@ class TestCreateDeleteFunctions(unittest.TestCase): -class TestTemporaryQueue(unittest.TestCase): - """Test the TemporaryQueue class""" +class TestTemporaryExchangeAndQueue(unittest.TestCase): + """Test the TemporaryExchange and TemporaryQueue classes""" - def test_temporary_is_really_temporary(self): + def test_temporary_exchange_is_really_temporary(self): + """ + test if the temporary exchange is really removed after usage + """ + tmp_exchange_address = None + with TemporaryExchange("MyTestExchange") as tmp_exchange: + tmp_exchange_address = tmp_exchange.address + self.assertTrue("MyTestExchange" in tmp_exchange_address) + + # test if the temporary exchange has been deleted when leaving scope + # We should not be able to connect to it anymore + with self.assertRaisesRegex(MessagingError, '.*NOT_FOUND.*'): + with FromBus(tmp_exchange_address): + pass + + def test_temporary_queue_is_really_temporary(self): """ test if the temporary queue is really removed after usage """ @@ -113,23 +128,35 @@ class TestTemporaryQueue(unittest.TestCase): with FromBus(tmp_queue_address): pass - def test_send_receive_over_temporary_queue(self): + def test_send_receive_over_temporary_exchange_and_queue(self): """ - test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage + test the usage of the TemporaryExchange and TemporaryQueue in conjunction with normal ToBus and Frombus usage """ - with TemporaryQueue("MyTestQueue") as tmp_queue: - # create a normal To/FromBus on this tmp_queue - with tmp_queue.create_tobus() as tobus, tmp_queue.create_frombus() as frombus: - # send a message... - original_msg = EventMessage(content="foobar") - tobus.send(original_msg) + with TemporaryExchange("MyTestExchange") as tmp_exchange: + # create a normal ToBus on this tmp_exchange + with tmp_exchange.create_tobus() as tobus_on_exchange: + # create a TemporaryQueue, bound to the tmp_exchange + with TemporaryQueue("MyTestQueue", exchange_name=tmp_exchange.address) as tmp_queue: + # create a normal FromBus on this tmp_queue + with tmp_queue.create_frombus() as frombus: + # and let's see if the tmp_queue can also create a tobus which then points to the bound_exchange + with tmp_queue.create_tobus() as tobus_on_tmp_queue: + + self.assertEqual(tobus_on_exchange.address, tobus_on_tmp_queue.address) - # ...receive the message... - received_msg = frombus.receive() + # test sending a message to both "types" of tobuses. + for tobus in [tobus_on_exchange, tobus_on_tmp_queue]: + # send a message... + original_msg = EventMessage(content="foobar") + tobus.send(original_msg) - # and test if they are equal - self.assertEqual(original_msg.id, received_msg.id) - self.assertEqual(original_msg.content, received_msg.content) + # ...receive the message... + received_msg = frombus.receive() + self.assertIsNotNone(received_msg) + + # and test if they are equal + self.assertEqual(original_msg.id, received_msg.id) + self.assertEqual(original_msg.content, received_msg.content) def test_send_receive_over_temporary_queue_with_subject_filtering(self): """ @@ -169,6 +196,100 @@ class TestTemporaryQueue(unittest.TestCase): logger.info("received message: %s", received_msg) self.assertEqual(None, received_msg) + def test_send_receive_over_temporary_exchange_with_queue_with_subject_filtering(self): + """ + test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject + """ + with TemporaryExchange("MyTestExchange") as tmp_exchange: + with tmp_exchange.create_tobus() as tobus: + with TemporaryQueue("MyTestQueue", exchange_name=tmp_exchange.address) as tmp_queue: + # create a normal To/FromBus on this tmp_queue + SUBJECT = "FooBarSubject" + SUBJECT2 = "FAKE_SUBJECT" + NUM_MESSAGES_TO_SEND = 3 + + # create a FromBus, which listens for/receives only the messages with the given SUBJECT + with tmp_queue.create_frombus(SUBJECT) as frombus: + for i in range(NUM_MESSAGES_TO_SEND): + # send a message... + original_msg = EventMessage(subject=SUBJECT, + content="test message %d with subject='%s'" % ( + i, SUBJECT)) + logger.info("Sending message: %s", original_msg) + tobus.send(original_msg) + + # ...receive the message... + received_msg = frombus.receive(timeout=1) + logger.info("received message: %s", received_msg) + + # and test if they are equal + self.assertEqual(original_msg.id, received_msg.id) + self.assertEqual(original_msg.content, received_msg.content) + self.assertEqual(original_msg.subject, received_msg.subject) + + # now send a message with a different subject... + original_msg = EventMessage(subject=SUBJECT2, content="foobar") + logger.info("Sending message: %s", original_msg) + tobus.send(original_msg) + + # ... and try to receive it (should yield None, because of the non-matching subject) + received_msg = frombus.receive(timeout=1) + logger.info("received message: %s", received_msg) + self.assertEqual(None, received_msg) + + def test_send_receive_over_temporary_exchange_with_multiple_bound_queues_with_subject_filtering( + self): + """ + test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject + """ + with TemporaryExchange("MyTestExchange") as tmp_exchange: + with tmp_exchange.create_tobus() as tobus: + SUBJECT1 = "FooBarSubject" + SUBJECT2 = "FAKE_SUBJECT" + with TemporaryQueue("MyTestQueue", exchange_name=tmp_exchange.address, routing_key=SUBJECT1) as tmp_queue1, \ + TemporaryQueue("MyTestQueue", exchange_name=tmp_exchange.address, routing_key=SUBJECT2) as tmp_queue2: + # create a normal To/FromBus on this tmp_queue + NUM_MESSAGES_TO_SEND = 3 + + # create two FromBus'es, which listen for/receive only the messages with their routing_key + with tmp_queue1.create_frombus() as frombus1, tmp_queue2.create_frombus() as frombus2: + for i in range(NUM_MESSAGES_TO_SEND): + # send a message... + original_msg = EventMessage(subject=SUBJECT1, + content="test message %d with subject='%s'" % ( + i, SUBJECT1)) + logger.info("Sending message: %s", original_msg) + tobus.send(original_msg) + + # ...receive the message... + received_msg1 = frombus1.receive(timeout=1) + received_msg2 = frombus2.receive(timeout=1) + self.assertIsNotNone(received_msg1) + self.assertIsNone(received_msg2) + logger.info("received message: %s", received_msg1) + + # and test if they are equal + self.assertEqual(original_msg.id, received_msg1.id) + self.assertEqual(original_msg.content, received_msg1.content) + self.assertEqual(original_msg.subject, received_msg1.subject) + + # now send a message with a different subject... + original_msg = EventMessage(subject=SUBJECT2, content="foobar") + logger.info("Sending message: %s", original_msg) + tobus.send(original_msg) + + # ... and try to receive it + received_msg1 = frombus1.receive(timeout=1) + received_msg2 = frombus2.receive(timeout=1) + self.assertIsNone(received_msg1) + self.assertIsNotNone(received_msg2) + logger.info("received message: %s", received_msg2) + + # and test if they are equal + self.assertEqual(original_msg.id, received_msg2.id) + self.assertEqual(original_msg.content, received_msg2.content) + self.assertEqual(original_msg.subject, received_msg2.subject) + # ======== FromBus unit tests ======== # @@ -430,8 +551,6 @@ class PingPongTester(unittest.TestCase): with BusListenerJanitor(PingPongPlayer("Player1", "Player2", tmp_exchange.address, num_threads_per_player)) as player1: with BusListenerJanitor(PingPongPlayer("Player2", "Player1", tmp_exchange.address, num_threads_per_player)) as player2: start_timestamp = datetime.utcnow() - return - # first serve, referee throws a ping ball on the table in the direction of player1 with tmp_exchange.create_tobus() as referee: @@ -467,6 +586,6 @@ class PingPongTester(unittest.TestCase): num_threads_per_player, 2*NUM_TURNS/(datetime.utcnow() - start_timestamp).total_seconds()) if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.INFO) + logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG) unittest.main() diff --git a/LCS/Messaging/python/messaging/test/t_service_message_handler.py b/LCS/Messaging/python/messaging/test/t_service_message_handler.py index 12e29899ce69e25755a2aed89c3fd2e86603b232..3ef87438297f0c005a58f0077de46a2d25c59f59 100644 --- a/LCS/Messaging/python/messaging/test/t_service_message_handler.py +++ b/LCS/Messaging/python/messaging/test/t_service_message_handler.py @@ -86,12 +86,12 @@ def main(): busname = tmp_exchange.address # Register StringFunc functions as a service handler listening at busname and ServiceName - serv1_plain = Service("String1Service", StringFunc, exchange_name=busname) - serv1_minimal_class = Service("String2Service", OnlyMessageHandling, exchange_name=busname, + serv1_plain = Service("String1Service", StringFunc, busname=busname) + serv1_minimal_class = Service("String2Service", OnlyMessageHandling, busname=busname, handler_args={"function" : StringFunc}) - serv1_full_class = Service("String3Service", FullMessageHandling, exchange_name=busname, + serv1_full_class = Service("String3Service", FullMessageHandling, busname=busname, handler_args={"function" : StringFunc}) - serv1_failing_class = Service("String4Service", FailingMessageHandling, exchange_name=busname, + serv1_failing_class = Service("String4Service", FailingMessageHandling, busname=busname, handler_args={"function" : StringFunc}) # 'with' sets up the connection context and defines the scope of the service. @@ -127,12 +127,12 @@ def main(): logger.info("******************************************************************************") # Register ErrorFunc function as a service handler listening at busname and ServiceName - serv2_plain = Service("Error1Service", ErrorFunc, exchange_name=busname) - serv2_minimal_class = Service("Error2Service", OnlyMessageHandling, exchange_name=busname, + serv2_plain = Service("Error1Service", ErrorFunc, busname=busname) + serv2_minimal_class = Service("Error2Service", OnlyMessageHandling, busname=busname, handler_args={"function" : ErrorFunc}) - serv2_full_class = Service("Error3Service", FullMessageHandling, exchange_name=busname, + serv2_full_class = Service("Error3Service", FullMessageHandling, busname=busname, handler_args={"function" : ErrorFunc}) - serv2_failing_class = Service("Error4Service", FailingMessageHandling, exchange_name=busname, + serv2_failing_class = Service("Error4Service", FailingMessageHandling, busname=busname, handler_args={"function" : ErrorFunc}) # 'with' sets up the connection context and defines the scope of the service. @@ -168,12 +168,12 @@ def main(): logger.info("******************************************************************************") # Register ExceptionFunc functions as a service handler listening at busname and ServiceName - serv3_plain = Service("Except1Service", ExceptionFunc, exchange_name=busname) - serv3_minimal_class = Service("Except2Service", OnlyMessageHandling, exchange_name=busname, + serv3_plain = Service("Except1Service", ExceptionFunc, busname=busname) + serv3_minimal_class = Service("Except2Service", OnlyMessageHandling, busname=busname, handler_args={"function" : ExceptionFunc}) - serv3_full_class = Service("Except3Service", FullMessageHandling, exchange_name=busname, + serv3_full_class = Service("Except3Service", FullMessageHandling, busname=busname, handler_args={"function" : ExceptionFunc}) - serv3_failing_class = Service("Except4Service", FailingMessageHandling, exchange_name=busname, + serv3_failing_class = Service("Except4Service", FailingMessageHandling, busname=busname, handler_args={"function" : ExceptionFunc}) # 'with' sets up the connection context and defines the scope of the service.