Skip to content
Snippets Groups Projects
Commit 2af42c0d authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: fixed test t_RPC based on new messaging and rpc modules

parent 74a1b349
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Program to test the RPCClient and RPCService class of the Messaging package. Program to test the RPCClient and RPCService class of the Messaging package.
It defines 5 functions and first calls those functions directly to check
that the functions are OK. Next the same tests are done with the RPCClient and
RPCService classes in between. This should give the same results.
""" """
from time import sleep
from contextlib import ExitStack
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor import unittest
from lofar.messaging.rpc import RPCClient, RPCService, RPCClientContextManagerMixin import uuid
from time import sleep
class UserException(Exception):
"Always thrown in one of the functions"
pass
class InvalidArgType(Exception):
"Thrown when the input is wrong for one of the functions"
pass
def ErrorFunc(input_value):
" Always thrown a predefined exception"
raise UserException("Intentional exception for testing")
def ExceptionFunc(input_value):
"Generate a exception not caught by the function"
a = "aap"
b = a[23]
def StringFunc(input_value):
"Convert the string to uppercase."
if not isinstance(input_value, str):
raise InvalidArgType("Input value must be of the type 'string'")
return input_value.upper()
def TimeoutFunc(input_value):
"""
create a timeout by sleeping
"""
sleep(2)
def ListFunc(input_value):
"Convert the list to uppercase."
if not isinstance(input_value, list):
raise InvalidArgType("Input value must be of the type 'list'")
result = []
for item in input_value:
if isinstance(item, str):
result.append(item.upper())
elif isinstance(item, list):
result.append(ListFunc(item))
elif isinstance(item, dict):
result.append(DictFunc(item))
else:
result.append(item)
return result
def DictFunc(input_value):
"Convert the dict to uppercase."
if not isinstance(input_value, dict):
raise InvalidArgType("Input value must be of the type 'dict'")
result = {}
for key, value in list(input_value.items()):
if isinstance(value, str):
result[key] = str(value).upper()
elif isinstance(value, list):
result[key] = ListFunc(value)
elif isinstance(value, dict):
result[key] = DictFunc(value)
else:
result[key] = value
return result
def main():
# First do basic test for the functions
# ErrorFunc
try:
result = ErrorFunc("aap noot mies")
except UserException as e:
pass
# ExceptionFunc
try:
result = ExceptionFunc("aap noot mies")
except IndexError as e:
pass
# StringFunc
try:
result = StringFunc(25)
except InvalidArgType as e:
pass
result = StringFunc("aap noot mies")
if result != "AAP NOOT MIES":
raise Exception("String function failed:{}".format(result))
# ListFunc
try:
result = ListFunc(25)
except InvalidArgType as e:
pass
result = ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}])
if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
raise Exception("List function failed:{}".format(result))
# DictFunc
try:
result = DictFunc(25)
except InvalidArgType as e:
pass
result = DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
raise Exception("Dict function failed:{}".format(result))
logger.info("Functions tested outside RPCClient: All OK")
with TemporaryExchange("TEST") as test_exchange:
# Register functions as a service handler listening at busname and ServiceName
serv1 = Service("ErrorService", ErrorFunc, exchange=test_exchange.address, num_threads=2)
serv2 = Service("ExceptionService", ExceptionFunc, exchange=test_exchange.address, num_threads=2)
serv3 = Service("StringService", StringFunc, exchange=test_exchange.address, num_threads=2)
serv4 = Service("ListService", ListFunc, exchange=test_exchange.address, num_threads=2)
serv5 = Service("DictService", DictFunc, exchange=test_exchange.address, num_threads=2)
serv6 = Service("TimeoutService", TimeoutFunc, exchange=test_exchange.address, num_threads=2)
# 'with' sets up the connection context and defines the scope of the service.
# also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues
with BusListenerJanitor(serv1), BusListenerJanitor(serv2), \
BusListenerJanitor(serv3), BusListenerJanitor(serv4), \
BusListenerJanitor(serv5), BusListenerJanitor(serv6):
# # Redo all tests but via through RPCClient
# # ErrorFunc
# with RPCClient("ErrorService", exchange=test_exchange.address) as rpc:
# try:
# result = rpc("aap noot mies")
# except RPCException as e:
# if not 'UserException' in str(e):
# raise
#
# # ExceptionFunc
# with RPCClient("ExceptionService", exchange=test_exchange.address) as rpc:
# try:
# result = rpc("aap noot mies")
# except RPCException as e:
# if not 'IndexError' in str(e):
# raise
#
# # StringFunc
# with RPCClient("StringService", exchange=test_exchange.address) as rpc:
# try:
# result = rpc([25])
# except RPCException as e:
# if not 'InvalidArgType' in str(e):
# raise
#
# result = rpc("aap noot mies")
# if result[0] != "AAP NOOT MIES":
# raise Exception("String function failed:{}".format(result))
#
# # ListFunc
# with RPCClient("ListService", exchange=test_exchange.address) as rpc:
# try:
# result = rpc("25")
# except RPCException as e:
# if not 'InvalidArgType' in str(e):
# raise
# result = rpc(["aap", 25, [1, 2], {'mies' : "meisje"}])
# if result[0] != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
# raise Exception("List function failed:{}".format(result))
#
# # DictFunc
# with RPCClient("DictService", exchange=test_exchange.address) as rpc:
# try:
# result = rpc([25])
# except RPCException as e:
# if not 'InvalidArgType' in str(e):
# raise
# result = rpc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
# if result[0] != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
# raise Exception("Dict function failed:{}".format(result))
#
# # TimeoutFunc
# with RPCClient("TimeoutService", exchange=test_exchange.address, timeout=1) as rpc:
# try:
# result = rpc("some random string")
# raise Exception("TimeoutService did not timeout as expected...")
# except RPCTimeoutException as e:
# logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args)
logger.info("Functions tested with RPCClient: All OK")
logger.info("************************************************")
# let's do the same tests, but now with the rpc calls wrapped in a RPCWrapper
# define a RPCWrapper subclass wrapping the above service calls...
class MyRPC(RPCWrapper):
def ErrorFunc(self, input_value):
return self.rpc('ErrorService', input_value)
def ExceptionFunc(self, input_value):
return self.rpc('ExceptionService', input_value)
def StringFunc(self, input_value):
return self.rpc('StringService', input_value)
def TimeoutFunc(self, input_value):
return self.rpc('TimeoutService', input_value)
def ListFunc(self, input_value):
return self.rpc('ListService', input_value)
def DictFunc(self, input_value):
return self.rpc('DictService', input_value)
from lofar.messaging.messagebus import TemporaryExchange
from lofar.messaging.rpc import RPCClient, RPCService, RPCException, ServiceMessageHandler
# and use the MyRPC RPCWrapper class for testing TEST_SERVICE_NAME = "%s.%s" % (__name__, uuid.uuid4())
with MyRPC(exchange=test_exchange.address, timeout=1) as my_rpc:
try:
result = my_rpc.ErrorFunc("aap noot mies")
except RPCException as e:
if not 'UserException' in str(e):
raise
try: class MyServiceMessageHandler(ServiceMessageHandler):
result = my_rpc.ExceptionFunc("aap noot mies") def __init__(self, my_arg1, my_arg2):
except RPCException as e: super().__init__()
if not 'IndexError' in str(e): self.my_arg1 = my_arg1
raise self.my_arg2 = my_arg2
try: def my_public_method1(self):
result = my_rpc.StringFunc([25]) return self.my_arg1
except RPCException as e:
if not 'InvalidArgType' in str(e):
raise
result = my_rpc.StringFunc("aap noot mies") def my_public_method2(self, parameter1):
if result != "AAP NOOT MIES": return (self.my_arg2, parameter1)
raise Exception("String function failed:{}".format(result))
try: def my_public_failing_method(self):
result = my_rpc.ListFunc("25") raise Exception("intentional test exception")
except RPCException as e:
if not 'InvalidArgType' in str(e):
raise
result = my_rpc.ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}])
if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
raise Exception("List function failed:{}".format(result))
try: def my_public_slow_method(self):
result = my_rpc.DictFunc([25]) sleep(2)
except RPCException as e:
if not 'InvalidArgType' in str(e):
raise
result = my_rpc.DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
raise Exception("Dict function failed:{}".format(result))
try: class TestRPC(unittest.TestCase):
result = my_rpc.TimeoutFunc("some random string") def test_registered_service_methods(self):
raise Exception("TimeoutService did not timeout as expected...") handler = MyServiceMessageHandler("foo", "bar")
except RPCTimeoutException as e: handler.register_public_handler_methods()
logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args) self.assertEqual(4, len(handler._subject_to_method_map))
self.assertTrue('my_public_method1' in handler._subject_to_method_map)
self.assertTrue('my_public_method2' in handler._subject_to_method_map)
self.assertTrue('my_public_failing_method' in handler._subject_to_method_map)
self.assertTrue('my_public_slow_method' in handler._subject_to_method_map)
def test_rpc_client_to_service_call(self):
with TemporaryExchange(__name__) as tmp_exchange:
with RPCService(TEST_SERVICE_NAME,
handler_type=MyServiceMessageHandler,
handler_kwargs={'my_arg1': "foo",
'my_arg2': "bar"},
exchange=tmp_exchange.address,
num_threads=1) as service:
self.assertTrue(service.is_listening())
self.assertTrue(service.is_running())
with RPCClient(service_name=TEST_SERVICE_NAME, exchange=tmp_exchange.address, timeout=1) as rpc_client:
self.assertEqual("foo", rpc_client.execute("my_public_method1"))
self.assertEqual(("bar", 42), rpc_client.execute("my_public_method2", 42))
with self.assertRaises(RPCException):
rpc_client.execute("my_public_failing_method")
with self.assertRaises(TimeoutError):
rpc_client.execute("my_public_slow_method")
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(process)d %(levelname)s %(message)s', level=logging.DEBUG) logging.basicConfig(format='%(asctime)s %(process)d %(levelname)s %(message)s', level=logging.DEBUG)
main() unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment