Skip to content
Snippets Groups Projects
Commit 176ecee6 authored by Ruud Overeem's avatar Ruud Overeem
Browse files

Task #8571: Pylint improvements.

parent bdded0a6
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
import time, sys
"""
Program to test the RPC and Service class of the Messaging package.
It defines 5 functions and first calls those functions directly to check
that the functions are OK. Next the same tests are done with the RPC and
Service classes in between. This should give the same results.
"""
import sys
from lofar.messaging import Service, RPC
class UserException(Exception):
"Always thrown in one of the functions"
pass
class InvalidArgType(Exception):
"Thrown when the input is wrong for one of the functions"
pass
# create several function:
def ErrorFunc(inputVal):
def ErrorFunc(input_value):
" Always thrown a predefined exception"
raise UserException("Exception thrown by the user")
def ExceptionFunc(inputVal):
def ExceptionFunc(input_value):
"Generate a exception not caught by the function"
a = "aap"
b = a[23]
def StringFunc(inputVal):
def StringFunc(input_value):
"Convert the string to uppercase."
if not isinstance(inputVal, str) and not isinstance(inputVal, unicode):
if not isinstance(input_value, str) and not isinstance(input_value, unicode):
raise InvalidArgType("Input value must be of the type 'string'")
return inputVal.upper();
return input_value.upper()
def ListFunc(inputVal):
def ListFunc(input_value):
"Convert the list to uppercase."
if not isinstance(inputVal, list):
if not isinstance(input_value, list):
raise InvalidArgType("Input value must be of the type 'list'")
result = []
for item in inputVal:
for item in input_value:
if isinstance(item, str) or isinstance(item, unicode):
result.append(item.upper())
elif isinstance(item, list):
......@@ -39,12 +47,12 @@ def ListFunc(inputVal):
result.append(item)
return result
def DictFunc(inputVal):
def DictFunc(input_value):
"Convert the dict to uppercase."
if not isinstance(inputVal, dict):
if not isinstance(input_value, dict):
raise InvalidArgType("Input value must be of the type 'dict'")
result = {}
for key,value in inputVal.items():
for key, value in input_value.items():
if isinstance(value, str) or isinstance(value, unicode):
result[key] = str(value).upper()
elif isinstance(value, list):
......@@ -83,8 +91,8 @@ if __name__ == '__main__':
result = ListFunc(25)
except InvalidArgType as e:
pass
result = ListFunc(["aap", 25, [1,2], { 'mies' : "meisje"}])
if result != ["AAP", 25, [1,2], { 'mies' : "MEISJE"}]:
result = ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}])
if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
raise Exception("List function failed:{}".format(result))
# DictFunc
......@@ -92,8 +100,8 @@ if __name__ == '__main__':
result = DictFunc(25)
except InvalidArgType as e:
pass
result = DictFunc({ 'mies' : "meisje", "aap" : 125, "noot" : [2,3]})
if result != { 'mies' : "MEISJE", "aap" : 125, "noot" : [2,3]}:
result = DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
raise Exception("Dict function failed:{}".format(result))
print "Functions tested outside RPC: All OK"
......@@ -101,7 +109,7 @@ if __name__ == '__main__':
# Used settings
busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest"
# Register function as a service handler listeneing at Bus busname and ServiceName
# Register functs as a service handler listening at busname and ServiceName
serv1 = Service(busname, "ErrorService", ErrorFunc, numthreads=1)
serv2 = Service(busname, "ExceptionService", ExceptionFunc, numthreads=1)
serv3 = Service(busname, "StringService", StringFunc, numthreads=1)
......@@ -109,64 +117,64 @@ if __name__ == '__main__':
serv5 = Service(busname, "DictService", DictFunc, numthreads=1)
# 'with' sets up the connection context and defines the scope of the service.
with serv1,serv2,serv3,serv4,serv5:
# Start listening in the background. This will start as many threads as defined by the instance
serv1.StartListening()
serv2.StartListening()
serv3.StartListening()
serv4.StartListening()
serv5.StartListening()
# Redo all tests but via through RPC
# ErrorFunc
with RPC(busname, "ErrorService") as rpc:
try:
with serv1, serv2, serv3, serv4, serv5:
# Start listening in the background. This will start as many threads as defined by the instance
serv1.StartListening()
serv2.StartListening()
serv3.StartListening()
serv4.StartListening()
serv5.StartListening()
# Redo all tests but via through RPC
# ErrorFunc
with RPC(busname, "ErrorService") as rpc:
try:
result = rpc("aap noot mies")
except UserException as e:
pass
# ExceptionFunc
with RPC(busname, "ExceptionService") as rpc:
try:
result = rpc("aap noot mies")
except IndexError as e:
pass
# StringFunc
with RPC(busname, "StringService") as rpc:
try:
result = rpc([25])
except InvalidArgType as e:
pass
result = rpc("aap noot mies")
except UserException as e:
pass
# ExceptionFunc
with RPC(busname, "ExceptionService") as rpc:
try:
result = rpc("aap noot mies")
except IndexError as e:
pass
# StringFunc
with RPC(busname, "StringService") as rpc:
try:
result = rpc([25])
except InvalidArgType as e:
pass
result = rpc("aap noot mies")
if result[0] != "AAP NOOT MIES":
raise Exception("String function failed:{}".format(result))
# ListFunc
with RPC(busname, "ListService") as rpc:
try:
result = rpc("25")
except InvalidArgType as e:
pass
result = rpc(["aap", 25, [1,2], { 'mies' : "meisje"}])
if result[0] != ["AAP", 25, [1,2], { 'mies' : "MEISJE"}]:
raise Exception("List function failed:{}".format(result))
# DictFunc
with RPC(busname, "DictService") as rpc:
try:
result = rpc([25])
except InvalidArgType as e:
pass
result = rpc({ 'mies' : "meisje", "aap" : 125, "noot" : [2,3]})
if result[0] != { 'mies' : "MEISJE", "aap" : 125, "noot" : [2,3]}:
raise Exception("Dict function failed:{}".format(result))
print "Functions tested with RPC: All OK"
# Tell all background listener threads to stop and wait for them to finish.
serv1.StopListening()
serv2.StopListening()
serv3.StopListening()
serv4.StopListening()
serv5.StopListening()
if result[0] != "AAP NOOT MIES":
raise Exception("String function failed:{}".format(result))
# ListFunc
with RPC(busname, "ListService") as rpc:
try:
result = rpc("25")
except InvalidArgType as e:
pass
result = rpc(["aap", 25, [1, 2], {'mies' : "meisje"}])
if result[0] != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
raise Exception("List function failed:{}".format(result))
# DictFunc
with RPC(busname, "DictService") as rpc:
try:
result = rpc([25])
except InvalidArgType as e:
pass
result = rpc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
if result[0] != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
raise Exception("Dict function failed:{}".format(result))
print "Functions tested with RPC: All OK"
# Tell all background listener threads to stop and wait for them to finish.
serv1.StopListening()
serv2.StopListening()
serv3.StopListening()
serv4.StopListening()
serv5.StopListening()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment