#!/usr/bin/env python3 """ Program to test the RPC and Service class of the Messaging package. It defines 5 functions and first calls those functions directly to check that the functions are OK. Next the same tests are done with the RPC and Service classes in between. This should give the same results. """ from time import sleep from contextlib import ExitStack import logging logger = logging.getLogger(__name__) from lofar.messaging import TemporaryExchange, BusListenerJanitor from lofar.messaging import Service, RPC, RPCWrapper, RPCException, RPCTimeoutException class UserException(Exception): "Always thrown in one of the functions" pass class InvalidArgType(Exception): "Thrown when the input is wrong for one of the functions" pass def ErrorFunc(input_value): " Always thrown a predefined exception" raise UserException("Intentional exception for testing") def ExceptionFunc(input_value): "Generate a exception not caught by the function" a = "aap" b = a[23] def StringFunc(input_value): "Convert the string to uppercase." if not isinstance(input_value, str): raise InvalidArgType("Input value must be of the type 'string'") return input_value.upper() def TimeoutFunc(input_value): """ create a timeout by sleeping """ sleep(2) def ListFunc(input_value): "Convert the list to uppercase." if not isinstance(input_value, list): raise InvalidArgType("Input value must be of the type 'list'") result = [] for item in input_value: if isinstance(item, str): result.append(item.upper()) elif isinstance(item, list): result.append(ListFunc(item)) elif isinstance(item, dict): result.append(DictFunc(item)) else: result.append(item) return result def DictFunc(input_value): "Convert the dict to uppercase." if not isinstance(input_value, dict): raise InvalidArgType("Input value must be of the type 'dict'") result = {} for key, value in list(input_value.items()): if isinstance(value, str): result[key] = str(value).upper() elif isinstance(value, list): result[key] = ListFunc(value) elif isinstance(value, dict): result[key] = DictFunc(value) else: result[key] = value return result def main(): # First do basic test for the functions # ErrorFunc try: result = ErrorFunc("aap noot mies") except UserException as e: pass # ExceptionFunc try: result = ExceptionFunc("aap noot mies") except IndexError as e: pass # StringFunc try: result = StringFunc(25) except InvalidArgType as e: pass result = StringFunc("aap noot mies") if result != "AAP NOOT MIES": raise Exception("String function failed:{}".format(result)) # ListFunc try: result = ListFunc(25) except InvalidArgType as e: pass result = ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}]) if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]: raise Exception("List function failed:{}".format(result)) # DictFunc try: result = DictFunc(25) except InvalidArgType as e: pass result = DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]}) if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}: raise Exception("Dict function failed:{}".format(result)) logger.info("Functions tested outside RPC: All OK") with TemporaryExchange("TEST") as test_exchange: # Register functions as a service handler listening at busname and ServiceName serv1 = Service("ErrorService", ErrorFunc, busname=test_exchange.address, num_threads=2) serv2 = Service("ExceptionService", ExceptionFunc, busname=test_exchange.address, num_threads=2) serv3 = Service("StringService", StringFunc, busname=test_exchange.address, num_threads=2) serv4 = Service("ListService", ListFunc, busname=test_exchange.address, num_threads=2) serv5 = Service("DictService", DictFunc, busname=test_exchange.address, num_threads=2) serv6 = Service("TimeoutService", TimeoutFunc, busname=test_exchange.address, num_threads=2) # 'with' sets up the connection context and defines the scope of the service. # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues with BusListenerJanitor(serv1), BusListenerJanitor(serv2), \ BusListenerJanitor(serv3), BusListenerJanitor(serv4), \ BusListenerJanitor(serv5), BusListenerJanitor(serv6): # # Redo all tests but via through RPC # # ErrorFunc # with RPC("ErrorService", busname=test_exchange.address) as rpc: # try: # result = rpc("aap noot mies") # except RPCException as e: # if not 'UserException' in str(e): # raise # # # ExceptionFunc # with RPC("ExceptionService", busname=test_exchange.address) as rpc: # try: # result = rpc("aap noot mies") # except RPCException as e: # if not 'IndexError' in str(e): # raise # # # StringFunc # with RPC("StringService", busname=test_exchange.address) as rpc: # try: # result = rpc([25]) # except RPCException as e: # if not 'InvalidArgType' in str(e): # raise # # result = rpc("aap noot mies") # if result[0] != "AAP NOOT MIES": # raise Exception("String function failed:{}".format(result)) # # # ListFunc # with RPC("ListService", busname=test_exchange.address) as rpc: # try: # result = rpc("25") # except RPCException as e: # if not 'InvalidArgType' in str(e): # raise # result = rpc(["aap", 25, [1, 2], {'mies' : "meisje"}]) # if result[0] != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]: # raise Exception("List function failed:{}".format(result)) # # # DictFunc # with RPC("DictService", busname=test_exchange.address) as rpc: # try: # result = rpc([25]) # except RPCException as e: # if not 'InvalidArgType' in str(e): # raise # result = rpc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]}) # if result[0] != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}: # raise Exception("Dict function failed:{}".format(result)) # # # TimeoutFunc # with RPC("TimeoutService", busname=test_exchange.address, timeout=1) as rpc: # try: # result = rpc("some random string") # raise Exception("TimeoutService did not timeout as expected...") # except RPCTimeoutException as e: # logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args) logger.info("Functions tested with RPC: All OK") logger.info("************************************************") # let's do the same tests, but now with the rpc calls wrapped in a RPCWrapper # define a RPCWrapper subclass wrapping the above service calls... class MyRPC(RPCWrapper): def ErrorFunc(self, input_value): return self.rpc('ErrorService', input_value) def ExceptionFunc(self, input_value): return self.rpc('ExceptionService', input_value) def StringFunc(self, input_value): return self.rpc('StringService', input_value) def TimeoutFunc(self, input_value): return self.rpc('TimeoutService', input_value) def ListFunc(self, input_value): return self.rpc('ListService', input_value) def DictFunc(self, input_value): return self.rpc('DictService', input_value) # and use the MyRPC RPCWrapper class for testing with MyRPC(busname=test_exchange.address, timeout=1) as my_rpc: try: result = my_rpc.ErrorFunc("aap noot mies") except RPCException as e: if not 'UserException' in str(e): raise try: result = my_rpc.ExceptionFunc("aap noot mies") except RPCException as e: if not 'IndexError' in str(e): raise try: result = my_rpc.StringFunc([25]) except RPCException as e: if not 'InvalidArgType' in str(e): raise result = my_rpc.StringFunc("aap noot mies") if result != "AAP NOOT MIES": raise Exception("String function failed:{}".format(result)) try: result = my_rpc.ListFunc("25") except RPCException as e: if not 'InvalidArgType' in str(e): raise result = my_rpc.ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}]) if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]: raise Exception("List function failed:{}".format(result)) try: result = my_rpc.DictFunc([25]) except RPCException as e: if not 'InvalidArgType' in str(e): raise result = my_rpc.DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]}) if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}: raise Exception("Dict function failed:{}".format(result)) try: result = my_rpc.TimeoutFunc("some random string") raise Exception("TimeoutService did not timeout as expected...") except RPCTimeoutException as e: logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(process)d %(levelname)s %(message)s', level=logging.DEBUG) main()