diff --git a/LCS/Messaging/python/messaging/test/t_RPC.py b/LCS/Messaging/python/messaging/test/t_RPC.py index 0ab87557e5bde033357a88c7945e077d16dfe278..359de67f68e17e5f4cb3beba1a9070fb73763380 100644 --- a/LCS/Messaging/python/messaging/test/t_RPC.py +++ b/LCS/Messaging/python/messaging/test/t_RPC.py @@ -1,273 +1,70 @@ #!/usr/bin/env python3 """ Program to test the RPCClient and RPCService class of the Messaging package. -It defines 5 functions and first calls those functions directly to check -that the functions are OK. Next the same tests are done with the RPCClient and -RPCService classes in between. This should give the same results. """ -from time import sleep -from contextlib import ExitStack import logging - logger = logging.getLogger(__name__) -from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor -from lofar.messaging.rpc import RPCClient, RPCService, RPCClientContextManagerMixin - -class UserException(Exception): - "Always thrown in one of the functions" - pass -class InvalidArgType(Exception): - "Thrown when the input is wrong for one of the functions" - pass - -def ErrorFunc(input_value): - " Always thrown a predefined exception" - raise UserException("Intentional exception for testing") - -def ExceptionFunc(input_value): - "Generate a exception not caught by the function" - a = "aap" - b = a[23] - -def StringFunc(input_value): - "Convert the string to uppercase." - if not isinstance(input_value, str): - raise InvalidArgType("Input value must be of the type 'string'") - return input_value.upper() - -def TimeoutFunc(input_value): - """ - create a timeout by sleeping - """ - sleep(2) - -def ListFunc(input_value): - "Convert the list to uppercase." - if not isinstance(input_value, list): - raise InvalidArgType("Input value must be of the type 'list'") - result = [] - for item in input_value: - if isinstance(item, str): - result.append(item.upper()) - elif isinstance(item, list): - result.append(ListFunc(item)) - elif isinstance(item, dict): - result.append(DictFunc(item)) - else: - result.append(item) - return result - -def DictFunc(input_value): - "Convert the dict to uppercase." - if not isinstance(input_value, dict): - raise InvalidArgType("Input value must be of the type 'dict'") - result = {} - for key, value in list(input_value.items()): - if isinstance(value, str): - result[key] = str(value).upper() - elif isinstance(value, list): - result[key] = ListFunc(value) - elif isinstance(value, dict): - result[key] = DictFunc(value) - else: - result[key] = value - return result - - -def main(): - # First do basic test for the functions - # ErrorFunc - try: - result = ErrorFunc("aap noot mies") - except UserException as e: - pass - - # ExceptionFunc - try: - result = ExceptionFunc("aap noot mies") - except IndexError as e: - pass - - # StringFunc - try: - result = StringFunc(25) - except InvalidArgType as e: - pass - result = StringFunc("aap noot mies") - if result != "AAP NOOT MIES": - raise Exception("String function failed:{}".format(result)) - - # ListFunc - try: - result = ListFunc(25) - except InvalidArgType as e: - pass - result = ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}]) - if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]: - raise Exception("List function failed:{}".format(result)) - - # DictFunc - try: - result = DictFunc(25) - except InvalidArgType as e: - pass - result = DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]}) - if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}: - raise Exception("Dict function failed:{}".format(result)) - - logger.info("Functions tested outside RPCClient: All OK") - - with TemporaryExchange("TEST") as test_exchange: - - # Register functions as a service handler listening at busname and ServiceName - serv1 = Service("ErrorService", ErrorFunc, exchange=test_exchange.address, num_threads=2) - serv2 = Service("ExceptionService", ExceptionFunc, exchange=test_exchange.address, num_threads=2) - serv3 = Service("StringService", StringFunc, exchange=test_exchange.address, num_threads=2) - serv4 = Service("ListService", ListFunc, exchange=test_exchange.address, num_threads=2) - serv5 = Service("DictService", DictFunc, exchange=test_exchange.address, num_threads=2) - serv6 = Service("TimeoutService", TimeoutFunc, exchange=test_exchange.address, num_threads=2) - - # 'with' sets up the connection context and defines the scope of the service. - # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues - with BusListenerJanitor(serv1), BusListenerJanitor(serv2), \ - BusListenerJanitor(serv3), BusListenerJanitor(serv4), \ - BusListenerJanitor(serv5), BusListenerJanitor(serv6): - - # # Redo all tests but via through RPCClient - # # ErrorFunc - # with RPCClient("ErrorService", exchange=test_exchange.address) as rpc: - # try: - # result = rpc("aap noot mies") - # except RPCException as e: - # if not 'UserException' in str(e): - # raise - # - # # ExceptionFunc - # with RPCClient("ExceptionService", exchange=test_exchange.address) as rpc: - # try: - # result = rpc("aap noot mies") - # except RPCException as e: - # if not 'IndexError' in str(e): - # raise - # - # # StringFunc - # with RPCClient("StringService", exchange=test_exchange.address) as rpc: - # try: - # result = rpc([25]) - # except RPCException as e: - # if not 'InvalidArgType' in str(e): - # raise - # - # result = rpc("aap noot mies") - # if result[0] != "AAP NOOT MIES": - # raise Exception("String function failed:{}".format(result)) - # - # # ListFunc - # with RPCClient("ListService", exchange=test_exchange.address) as rpc: - # try: - # result = rpc("25") - # except RPCException as e: - # if not 'InvalidArgType' in str(e): - # raise - # result = rpc(["aap", 25, [1, 2], {'mies' : "meisje"}]) - # if result[0] != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]: - # raise Exception("List function failed:{}".format(result)) - # - # # DictFunc - # with RPCClient("DictService", exchange=test_exchange.address) as rpc: - # try: - # result = rpc([25]) - # except RPCException as e: - # if not 'InvalidArgType' in str(e): - # raise - # result = rpc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]}) - # if result[0] != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}: - # raise Exception("Dict function failed:{}".format(result)) - # - # # TimeoutFunc - # with RPCClient("TimeoutService", exchange=test_exchange.address, timeout=1) as rpc: - # try: - # result = rpc("some random string") - # raise Exception("TimeoutService did not timeout as expected...") - # except RPCTimeoutException as e: - # logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args) - - logger.info("Functions tested with RPCClient: All OK") - logger.info("************************************************") - - # let's do the same tests, but now with the rpc calls wrapped in a RPCWrapper - # define a RPCWrapper subclass wrapping the above service calls... - class MyRPC(RPCWrapper): - def ErrorFunc(self, input_value): - return self.rpc('ErrorService', input_value) - - def ExceptionFunc(self, input_value): - return self.rpc('ExceptionService', input_value) +import unittest +import uuid +from time import sleep - def StringFunc(self, input_value): - return self.rpc('StringService', input_value) +from lofar.messaging.messagebus import TemporaryExchange +from lofar.messaging.rpc import RPCClient, RPCService, RPCException, ServiceMessageHandler - def TimeoutFunc(self, input_value): - return self.rpc('TimeoutService', input_value) +TEST_SERVICE_NAME = "%s.%s" % (__name__, uuid.uuid4()) - def ListFunc(self, input_value): - return self.rpc('ListService', input_value) +class MyServiceMessageHandler(ServiceMessageHandler): + def __init__(self, my_arg1, my_arg2): + super().__init__() + self.my_arg1 = my_arg1 + self.my_arg2 = my_arg2 - def DictFunc(self, input_value): - return self.rpc('DictService', input_value) + def my_public_method1(self): + return self.my_arg1 + def my_public_method2(self, parameter1): + return (self.my_arg2, parameter1) - # and use the MyRPC RPCWrapper class for testing - with MyRPC(exchange=test_exchange.address, timeout=1) as my_rpc: - try: - result = my_rpc.ErrorFunc("aap noot mies") - except RPCException as e: - if not 'UserException' in str(e): - raise + def my_public_failing_method(self): + raise Exception("intentional test exception") - try: - result = my_rpc.ExceptionFunc("aap noot mies") - except RPCException as e: - if not 'IndexError' in str(e): - raise + def my_public_slow_method(self): + sleep(2) - try: - result = my_rpc.StringFunc([25]) - except RPCException as e: - if not 'InvalidArgType' in str(e): - raise +class TestRPC(unittest.TestCase): + def test_registered_service_methods(self): + handler = MyServiceMessageHandler("foo", "bar") + handler.register_public_handler_methods() + self.assertEqual(4, len(handler._subject_to_method_map)) + self.assertTrue('my_public_method1' in handler._subject_to_method_map) + self.assertTrue('my_public_method2' in handler._subject_to_method_map) + self.assertTrue('my_public_failing_method' in handler._subject_to_method_map) + self.assertTrue('my_public_slow_method' in handler._subject_to_method_map) - result = my_rpc.StringFunc("aap noot mies") - if result != "AAP NOOT MIES": - raise Exception("String function failed:{}".format(result)) + def test_rpc_client_to_service_call(self): + with TemporaryExchange(__name__) as tmp_exchange: + with RPCService(TEST_SERVICE_NAME, + handler_type=MyServiceMessageHandler, + handler_kwargs={'my_arg1': "foo", + 'my_arg2': "bar"}, + exchange=tmp_exchange.address, + num_threads=1) as service: + self.assertTrue(service.is_listening()) + self.assertTrue(service.is_running()) - try: - result = my_rpc.ListFunc("25") - except RPCException as e: - if not 'InvalidArgType' in str(e): - raise - result = my_rpc.ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}]) - if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]: - raise Exception("List function failed:{}".format(result)) + with RPCClient(service_name=TEST_SERVICE_NAME, exchange=tmp_exchange.address, timeout=1) as rpc_client: + self.assertEqual("foo", rpc_client.execute("my_public_method1")) + self.assertEqual(("bar", 42), rpc_client.execute("my_public_method2", 42)) - try: - result = my_rpc.DictFunc([25]) - except RPCException as e: - if not 'InvalidArgType' in str(e): - raise - result = my_rpc.DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]}) - if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}: - raise Exception("Dict function failed:{}".format(result)) + with self.assertRaises(RPCException): + rpc_client.execute("my_public_failing_method") - try: - result = my_rpc.TimeoutFunc("some random string") - raise Exception("TimeoutService did not timeout as expected...") - except RPCTimeoutException as e: - logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args) + with self.assertRaises(TimeoutError): + rpc_client.execute("my_public_slow_method") if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(process)d %(levelname)s %(message)s', level=logging.DEBUG) - main() + unittest.main()