Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_RPC.py 10.62 KiB
#!/usr/bin/env python3
"""
Program to test the RPC and Service class of the Messaging package.
It defines 5 functions and first calls those functions directly to check
that the functions are OK. Next the same tests are done with the RPC and
Service classes in between. This should give the same results.
"""
from time import sleep
from contextlib import ExitStack

import logging

logger = logging.getLogger(__name__)

from lofar.messaging import TemporaryExchange, BusListenerJanitor
from lofar.messaging import Service, RPC, RPCWrapper, RPCException, RPCTimeoutException

class UserException(Exception):
    "Always thrown in one of the functions"
    pass
class InvalidArgType(Exception):
    "Thrown when the input is wrong for one of the functions"
    pass

def ErrorFunc(input_value):
    " Always thrown a predefined exception"
    raise UserException("Intentional exception for testing")

def ExceptionFunc(input_value):
    "Generate a exception not caught by the function"
    a = "aap"
    b = a[23]

def StringFunc(input_value):
    "Convert the string to uppercase."
    if not isinstance(input_value, str):
        raise InvalidArgType("Input value must be of the type 'string'")
    return input_value.upper()

def TimeoutFunc(input_value):
    """
    create a timeout by sleeping
    """
    sleep(2)

def ListFunc(input_value):
    "Convert the list to uppercase."
    if not isinstance(input_value, list):
        raise InvalidArgType("Input value must be of the type 'list'")
    result = []
    for item in input_value:
        if isinstance(item, str):
            result.append(item.upper())
        elif isinstance(item, list):
            result.append(ListFunc(item))
        elif isinstance(item, dict):
            result.append(DictFunc(item))
        else:
            result.append(item)
    return result

def DictFunc(input_value):
    "Convert the dict to uppercase."
    if not isinstance(input_value, dict):
        raise InvalidArgType("Input value must be of the type 'dict'")
    result = {}
    for key, value in list(input_value.items()):
        if isinstance(value, str):
            result[key] = str(value).upper()
        elif isinstance(value, list):
            result[key] = ListFunc(value)
        elif isinstance(value, dict):
            result[key] = DictFunc(value)
        else:
            result[key] = value
    return result


def main():
    # First do basic test for the functions
    # ErrorFunc
    try:
        result = ErrorFunc("aap noot mies")
    except UserException as e:
        pass

    # ExceptionFunc
    try:
        result = ExceptionFunc("aap noot mies")
    except IndexError as e:
        pass

    # StringFunc
    try:
        result = StringFunc(25)
    except InvalidArgType as e:
        pass
    result = StringFunc("aap noot mies")
    if result != "AAP NOOT MIES":
        raise Exception("String function failed:{}".format(result))

    # ListFunc
    try:
        result = ListFunc(25)
    except InvalidArgType as e:
        pass
    result = ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}])
    if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
        raise Exception("List function failed:{}".format(result))

    # DictFunc
    try:
        result = DictFunc(25)
    except InvalidArgType as e:
        pass
    result = DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
    if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
        raise Exception("Dict function failed:{}".format(result))

    logger.info("Functions tested outside RPC: All OK")

    with TemporaryExchange("TEST") as test_exchange:

        # Register functions as a service handler listening at busname and ServiceName
        serv1 = Service("ErrorService",     ErrorFunc,     busname=test_exchange.address, num_threads=2)
        serv2 = Service("ExceptionService", ExceptionFunc, busname=test_exchange.address, num_threads=2)
        serv3 = Service("StringService",    StringFunc,    busname=test_exchange.address, num_threads=2)
        serv4 = Service("ListService",      ListFunc,      busname=test_exchange.address, num_threads=2)
        serv5 = Service("DictService",      DictFunc,      busname=test_exchange.address, num_threads=2)
        serv6 = Service("TimeoutService",   TimeoutFunc,   busname=test_exchange.address, num_threads=2)

        # 'with' sets up the connection context and defines the scope of the service.
        # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues
        with BusListenerJanitor(serv1), BusListenerJanitor(serv2), \
             BusListenerJanitor(serv3), BusListenerJanitor(serv4), \
             BusListenerJanitor(serv5), BusListenerJanitor(serv6):

            # # Redo all tests but via through RPC
            # # ErrorFunc
            # with RPC("ErrorService", busname=test_exchange.address) as rpc:
            #     try:
            #         result = rpc("aap noot mies")
            #     except RPCException as e:
            #         if not 'UserException' in str(e):
            #             raise
            #
            # # ExceptionFunc
            # with RPC("ExceptionService", busname=test_exchange.address) as rpc:
            #     try:
            #         result = rpc("aap noot mies")
            #     except RPCException as e:
            #         if not 'IndexError' in str(e):
            #             raise
            #
            # # StringFunc
            # with RPC("StringService", busname=test_exchange.address) as rpc:
            #     try:
            #         result = rpc([25])
            #     except RPCException as e:
            #         if not 'InvalidArgType' in str(e):
            #             raise
            #
            #     result = rpc("aap noot mies")
            #     if result[0] != "AAP NOOT MIES":
            #         raise Exception("String function failed:{}".format(result))
            #
            # # ListFunc
            # with RPC("ListService", busname=test_exchange.address) as rpc:
            #     try:
            #         result = rpc("25")
            #     except RPCException as e:
            #         if not 'InvalidArgType' in str(e):
            #             raise
            #     result = rpc(["aap", 25, [1, 2], {'mies' : "meisje"}])
            #     if result[0] != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
            #         raise Exception("List function failed:{}".format(result))
            #
            # # DictFunc
            # with RPC("DictService", busname=test_exchange.address) as rpc:
            #     try:
            #         result = rpc([25])
            #     except RPCException as e:
            #         if not 'InvalidArgType' in str(e):
            #             raise
            #     result = rpc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
            #     if result[0] != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
            #         raise Exception("Dict function failed:{}".format(result))
            #
            # # TimeoutFunc
            # with RPC("TimeoutService", busname=test_exchange.address, timeout=1) as rpc:
            #     try:
            #         result = rpc("some random string")
            #         raise Exception("TimeoutService did not timeout as expected...")
            #     except RPCTimeoutException as e:
            #         logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args)

            logger.info("Functions tested with RPC: All OK")
            logger.info("************************************************")

            # let's do the same tests, but now with the rpc calls wrapped in a RPCWrapper
            # define a RPCWrapper subclass wrapping the above service calls...
            class MyRPC(RPCWrapper):
                def ErrorFunc(self, input_value):
                    return self.rpc('ErrorService', input_value)

                def ExceptionFunc(self, input_value):
                    return self.rpc('ExceptionService', input_value)

                def StringFunc(self, input_value):
                    return self.rpc('StringService', input_value)

                def TimeoutFunc(self, input_value):
                    return self.rpc('TimeoutService', input_value)

                def ListFunc(self, input_value):
                    return self.rpc('ListService', input_value)

                def DictFunc(self, input_value):
                    return self.rpc('DictService', input_value)


            # and use the MyRPC RPCWrapper class for testing
            with MyRPC(busname=test_exchange.address, timeout=1) as my_rpc:
                try:
                    result = my_rpc.ErrorFunc("aap noot mies")
                except RPCException as e:
                    if not 'UserException' in str(e):
                        raise

                try:
                    result = my_rpc.ExceptionFunc("aap noot mies")
                except RPCException as e:
                    if not 'IndexError' in str(e):
                        raise

                try:
                    result = my_rpc.StringFunc([25])
                except RPCException as e:
                    if not 'InvalidArgType' in str(e):
                        raise

                result = my_rpc.StringFunc("aap noot mies")
                if result != "AAP NOOT MIES":
                    raise Exception("String function failed:{}".format(result))

                try:
                    result = my_rpc.ListFunc("25")
                except RPCException as e:
                    if not 'InvalidArgType' in str(e):
                        raise
                result = my_rpc.ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}])
                if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
                    raise Exception("List function failed:{}".format(result))

                try:
                    result = my_rpc.DictFunc([25])
                except RPCException as e:
                    if not 'InvalidArgType' in str(e):
                        raise
                result = my_rpc.DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
                if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
                    raise Exception("Dict function failed:{}".format(result))

                try:
                    result = my_rpc.TimeoutFunc("some random string")
                    raise Exception("TimeoutService did not timeout as expected...")
                except RPCTimeoutException as e:
                    logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args)


if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s %(process)d %(levelname)s %(message)s', level=logging.DEBUG)
    main()