Skip to content
Snippets Groups Projects
Commit a81567cc authored by Jan Rinze Peterzon's avatar Jan Rinze Peterzon
Browse files

Task #8571: Refactored RPC.py

parent a86bc595
No related branches found
No related tags found
No related merge requests found
......@@ -24,16 +24,19 @@ from lofar.messaging.messagebus import ToBus,FromBus
from lofar.messaging.messages import ServiceMessage, ReplyMessage
import uuid
class RPC():
def __init__(self,bus,service,timeout=None,ForwardExceptions=None):
def __init__(self, bus, service, timeout=None, ForwardExceptions=None, Verbose=None):
self.timeout = timeout
self.ForwardExceptions = False
if (ForwardExceptions==True):
self.Verbose = False
if ForwardExceptions is True:
self.ForwardExceptions = True
if Verbose is True:
self.Verbose = True
self.BusName = bus
self.ServiceName = service
self.Request = ToBus(self.BusName+"/"+self.ServiceName,
options={"create":"always", "link":"{x-declare: {arguments:{ \"qpid.default_mandatory_topic\": True}}}"})
self.Request = ToBus(self.BusName + "/" + self.ServiceName)
self.ReplyAddress = "reply." + str(uuid.uuid4())
self.Reply = FromBus(self.BusName + "/" + self.ReplyAddress)
......@@ -47,56 +50,53 @@ class RPC():
self.Reply.close()
def __call__(self, msg, timeout=None):
if (timeout==None):
if timeout is None:
timeout = self.timeout
MyMsg = ServiceMessage(msg, self.ReplyAddress)
#MyMsg.reply_to=self.ReplyAddress
self.Request.send(MyMsg)
print("sent")
answer = self.Reply.receive(timeout)
print("received or timed out")
if (answer!=None):
if (isinstance(answer,ReplyMessage)):
# Check for Time-Out
if answer is None:
status = []
status["state"] = "TIMEOUT"
status["errmsg"] = "RPC Timed out"
status["backtrace"] = ""
return (None, status)
# Check for illegal message type
if isinstance(answer, ReplyMessage) is False:
# if we come here we had a Time-Out
status = []
status["state"] = "ERROR"
status["errmsg"] = "Incorrect messagetype (" + str(type(answer)) + ") received."
status["backtrace"] = ""
return (None, status)
# return content and status if status is 'OK'
if (answer.status == "OK"):
return (answer.content, answer.status)
# Compile error handling from status
status = {}
exception=None
errmsg=None
errbacktrace=None
try:
if (answer.status!="OK"):
status["state"] = answer.status
status["errmsg"] = answer.errmsg
status["backtrace"] = answer.backtrace
#if (answer.exception!=""):
# exception=pickle.loads(answer.exception)
if (answer.errmsg!=""):
errmsg=answer.errmsg
if (answer.backtrace!=""):
errbacktrace=answer.backtrace
else:
status="OK"
except Exception as e:
status["state"] = "ERROR"
status["errmsg"] = "Return state in message not found"
status["backtrace"] = ""
else:
if (self.ForwardExceptions==True):
#if (exception!=None):
# raise exception[0],exception[1],exception[2]
#else:
if (errmsg!=None):
return (Null, status)
# Does the client expect us to throw the exception?
if self.ForwardExceptions is True:
excep_mod = __import__("exceptions")
excep_class_= getattr(excep_mod,errmsg.split(':')[0],None)
if (excep_class_!=None):
instance=excep_class_(errbacktrace)
excep_class_ = getattr(excep_mod, answer.errmsg.split(':')[0], None)
if excep_class_ not is None:
instance = excep_class_(answer.backtrace)
raise (instance)
else:
raise(Exception(errmsg))
try:
answer=(answer.content,status)
except Exception as e:
# we can't properly convert to a result message.
answer=(None,{"ERROR":"Malformed return message"})
else:
# if we come here we had a Time-Out
answer=(None,"RPC Timed out")
return answer
raise (Exception(answer.errmsg))
return (None,status)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment