-
Jorrit Schaap authored
SW-699: fixed some typos. Made TemporaryQueue bindable to and exchange. Route RPC-ReplyMessages via exchange to temporary reply queue
Jorrit Schaap authoredSW-699: fixed some typos. Made TemporaryQueue bindable to and exchange. Route RPC-ReplyMessages via exchange to temporary reply queue
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_RPC.py 10.62 KiB
#!/usr/bin/env python3
"""
Program to test the RPC and Service class of the Messaging package.
It defines 5 functions and first calls those functions directly to check
that the functions are OK. Next the same tests are done with the RPC and
Service classes in between. This should give the same results.
"""
from time import sleep
from contextlib import ExitStack
import logging
logger = logging.getLogger(__name__)
from lofar.messaging import TemporaryExchange, BusListenerJanitor
from lofar.messaging import Service, RPC, RPCWrapper, RPCException, RPCTimeoutException
class UserException(Exception):
"Always thrown in one of the functions"
pass
class InvalidArgType(Exception):
"Thrown when the input is wrong for one of the functions"
pass
def ErrorFunc(input_value):
" Always thrown a predefined exception"
raise UserException("Intentional exception for testing")
def ExceptionFunc(input_value):
"Generate a exception not caught by the function"
a = "aap"
b = a[23]
def StringFunc(input_value):
"Convert the string to uppercase."
if not isinstance(input_value, str):
raise InvalidArgType("Input value must be of the type 'string'")
return input_value.upper()
def TimeoutFunc(input_value):
"""
create a timeout by sleeping
"""
sleep(2)
def ListFunc(input_value):
"Convert the list to uppercase."
if not isinstance(input_value, list):
raise InvalidArgType("Input value must be of the type 'list'")
result = []
for item in input_value:
if isinstance(item, str):
result.append(item.upper())
elif isinstance(item, list):
result.append(ListFunc(item))
elif isinstance(item, dict):
result.append(DictFunc(item))
else:
result.append(item)
return result
def DictFunc(input_value):
"Convert the dict to uppercase."
if not isinstance(input_value, dict):
raise InvalidArgType("Input value must be of the type 'dict'")
result = {}
for key, value in list(input_value.items()):
if isinstance(value, str):
result[key] = str(value).upper()
elif isinstance(value, list):
result[key] = ListFunc(value)
elif isinstance(value, dict):
result[key] = DictFunc(value)
else:
result[key] = value
return result
def main():
# First do basic test for the functions
# ErrorFunc
try:
result = ErrorFunc("aap noot mies")
except UserException as e:
pass
# ExceptionFunc
try:
result = ExceptionFunc("aap noot mies")
except IndexError as e:
pass
# StringFunc
try:
result = StringFunc(25)
except InvalidArgType as e:
pass
result = StringFunc("aap noot mies")
if result != "AAP NOOT MIES":
raise Exception("String function failed:{}".format(result))
# ListFunc
try:
result = ListFunc(25)
except InvalidArgType as e:
pass
result = ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}])
if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
raise Exception("List function failed:{}".format(result))
# DictFunc
try:
result = DictFunc(25)
except InvalidArgType as e:
pass
result = DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
raise Exception("Dict function failed:{}".format(result))
logger.info("Functions tested outside RPC: All OK")
with TemporaryExchange("TEST") as test_exchange:
# Register functions as a service handler listening at busname and ServiceName
serv1 = Service("ErrorService", ErrorFunc, busname=test_exchange.address, num_threads=2)
serv2 = Service("ExceptionService", ExceptionFunc, busname=test_exchange.address, num_threads=2)
serv3 = Service("StringService", StringFunc, busname=test_exchange.address, num_threads=2)
serv4 = Service("ListService", ListFunc, busname=test_exchange.address, num_threads=2)
serv5 = Service("DictService", DictFunc, busname=test_exchange.address, num_threads=2)
serv6 = Service("TimeoutService", TimeoutFunc, busname=test_exchange.address, num_threads=2)
# 'with' sets up the connection context and defines the scope of the service.
# also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues
with BusListenerJanitor(serv1), BusListenerJanitor(serv2), \
BusListenerJanitor(serv3), BusListenerJanitor(serv4), \
BusListenerJanitor(serv5), BusListenerJanitor(serv6):
# # Redo all tests but via through RPC
# # ErrorFunc
# with RPC("ErrorService", busname=test_exchange.address) as rpc:
# try:
# result = rpc("aap noot mies")
# except RPCException as e:
# if not 'UserException' in str(e):
# raise
#
# # ExceptionFunc
# with RPC("ExceptionService", busname=test_exchange.address) as rpc:
# try:
# result = rpc("aap noot mies")
# except RPCException as e:
# if not 'IndexError' in str(e):
# raise
#
# # StringFunc
# with RPC("StringService", busname=test_exchange.address) as rpc:
# try:
# result = rpc([25])
# except RPCException as e:
# if not 'InvalidArgType' in str(e):
# raise
#
# result = rpc("aap noot mies")
# if result[0] != "AAP NOOT MIES":
# raise Exception("String function failed:{}".format(result))
#
# # ListFunc
# with RPC("ListService", busname=test_exchange.address) as rpc:
# try:
# result = rpc("25")
# except RPCException as e:
# if not 'InvalidArgType' in str(e):
# raise
# result = rpc(["aap", 25, [1, 2], {'mies' : "meisje"}])
# if result[0] != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
# raise Exception("List function failed:{}".format(result))
#
# # DictFunc
# with RPC("DictService", busname=test_exchange.address) as rpc:
# try:
# result = rpc([25])
# except RPCException as e:
# if not 'InvalidArgType' in str(e):
# raise
# result = rpc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
# if result[0] != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
# raise Exception("Dict function failed:{}".format(result))
#
# # TimeoutFunc
# with RPC("TimeoutService", busname=test_exchange.address, timeout=1) as rpc:
# try:
# result = rpc("some random string")
# raise Exception("TimeoutService did not timeout as expected...")
# except RPCTimeoutException as e:
# logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args)
logger.info("Functions tested with RPC: All OK")
logger.info("************************************************")
# let's do the same tests, but now with the rpc calls wrapped in a RPCWrapper
# define a RPCWrapper subclass wrapping the above service calls...
class MyRPC(RPCWrapper):
def ErrorFunc(self, input_value):
return self.rpc('ErrorService', input_value)
def ExceptionFunc(self, input_value):
return self.rpc('ExceptionService', input_value)
def StringFunc(self, input_value):
return self.rpc('StringService', input_value)
def TimeoutFunc(self, input_value):
return self.rpc('TimeoutService', input_value)
def ListFunc(self, input_value):
return self.rpc('ListService', input_value)
def DictFunc(self, input_value):
return self.rpc('DictService', input_value)
# and use the MyRPC RPCWrapper class for testing
with MyRPC(busname=test_exchange.address, timeout=1) as my_rpc:
try:
result = my_rpc.ErrorFunc("aap noot mies")
except RPCException as e:
if not 'UserException' in str(e):
raise
try:
result = my_rpc.ExceptionFunc("aap noot mies")
except RPCException as e:
if not 'IndexError' in str(e):
raise
try:
result = my_rpc.StringFunc([25])
except RPCException as e:
if not 'InvalidArgType' in str(e):
raise
result = my_rpc.StringFunc("aap noot mies")
if result != "AAP NOOT MIES":
raise Exception("String function failed:{}".format(result))
try:
result = my_rpc.ListFunc("25")
except RPCException as e:
if not 'InvalidArgType' in str(e):
raise
result = my_rpc.ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}])
if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
raise Exception("List function failed:{}".format(result))
try:
result = my_rpc.DictFunc([25])
except RPCException as e:
if not 'InvalidArgType' in str(e):
raise
result = my_rpc.DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
raise Exception("Dict function failed:{}".format(result))
try:
result = my_rpc.TimeoutFunc("some random string")
raise Exception("TimeoutService did not timeout as expected...")
except RPCTimeoutException as e:
logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args)
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(process)d %(levelname)s %(message)s', level=logging.DEBUG)
main()