diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py index 800444fd632451fb9bbf82fa973e292625c5378d..7e7875436d29564c2487db35bbd50594232f8847 100644 --- a/LCS/Messaging/python/messaging/RPC.py +++ b/LCS/Messaging/python/messaging/RPC.py @@ -20,83 +20,83 @@ # # RPC invocation with possible timeout -from lofar.messaging.messagebus import ToBus,FromBus -from lofar.messaging.messages import ServiceMessage,ReplyMessage +from lofar.messaging.messagebus import ToBus, FromBus +from lofar.messaging.messages import ServiceMessage, ReplyMessage import uuid + class RPC(): - def __init__(self,bus,service,timeout=None,ForwardExceptions=None): - self.timeout=timeout - self.ForwardExceptions=False - if (ForwardExceptions==True): - self.ForwardExceptions=True - self.BusName=bus - self.ServiceName=service - self.Request = ToBus(self.BusName+"/"+self.ServiceName, - options={"create":"always", "link":"{x-declare: {arguments:{ \"qpid.default_mandatory_topic\": True}}}"}) - self.ReplyAddress="reply."+str(uuid.uuid4()) - self.Reply = FromBus(self.BusName+"/"+self.ReplyAddress) + def __init__(self, bus, service, timeout=None, ForwardExceptions=None, Verbose=None): + self.timeout = timeout + self.ForwardExceptions = False + self.Verbose = False + if ForwardExceptions is True: + self.ForwardExceptions = True + if Verbose is True: + self.Verbose = True + self.BusName = bus + self.ServiceName = service + self.Request = ToBus(self.BusName + "/" + self.ServiceName) + self.ReplyAddress = "reply." + str(uuid.uuid4()) + self.Reply = FromBus(self.BusName + "/" + self.ReplyAddress) + + def __enter__(self): + self.Request.open() + self.Reply.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.Request.close() + self.Reply.close() + + def __call__(self, msg, timeout=None): + if timeout is None: + timeout = self.timeout - def __enter__(self): - self.Request.open() - self.Reply.open() - return self + MyMsg = ServiceMessage(msg, self.ReplyAddress) + self.Request.send(MyMsg) + answer = self.Reply.receive(timeout) - def __exit__(self, exc_type, exc_val, exc_tb): - self.Request.close() - self.Reply.close() + # Check for Time-Out + if answer is None: + status = [] + status["state"] = "TIMEOUT" + status["errmsg"] = "RPC Timed out" + status["backtrace"] = "" + return (None, status) - def __call__(self,msg,timeout=None): - if (timeout==None): - timeout=self.timeout - MyMsg=ServiceMessage(msg,self.ReplyAddress) - #MyMsg.reply_to=self.ReplyAddress - self.Request.send(MyMsg) - print("sent") - answer=self.Reply.receive(timeout) - print("received or timed out") - if (answer!=None): - if (isinstance(answer,ReplyMessage)): - status={} - exception=None - errmsg=None - errbacktrace=None - try: - if (answer.status!="OK"): - status["state"]=answer.status - status["errmsg"]=answer.errmsg - status["backtrace"]=answer.backtrace - #if (answer.exception!=""): - # exception=pickle.loads(answer.exception) - if (answer.errmsg!=""): - errmsg=answer.errmsg - if (answer.backtrace!=""): - errbacktrace=answer.backtrace - else: - status="OK" - except Exception as e: - status["state"]="ERROR" - status["errmsg"]="Return state in message not found" - status["backtrace"]="" - else: - if (self.ForwardExceptions==True): - #if (exception!=None): - # raise exception[0],exception[1],exception[2] - #else: - if (errmsg!=None): - excep_mod=__import__("exceptions") - excep_class_= getattr(excep_mod,errmsg.split(':')[0],None) - if (excep_class_!=None): - instance=excep_class_(errbacktrace) - raise(instance) - else: - raise(Exception(errmsg)) + # Check for illegal message type + if isinstance(answer, ReplyMessage) is False: + # if we come here we had a Time-Out + status = [] + status["state"] = "ERROR" + status["errmsg"] = "Incorrect messagetype (" + str(type(answer)) + ") received." + status["backtrace"] = "" + return (None, status) + + # return content and status if status is 'OK' + if (answer.status == "OK"): + return (answer.content, answer.status) + + # Compile error handling from status + status = {} try: - answer=(answer.content,status) + status["state"] = answer.status + status["errmsg"] = answer.errmsg + status["backtrace"] = answer.backtrace except Exception as e: - # we can't properly convert to a result message. - answer=(None,{"ERROR":"Malformed return message"}) - else: - # if we come here we had a Time-Out - answer=(None,"RPC Timed out") - return answer + status["state"] = "ERROR" + status["errmsg"] = "Return state in message not found" + status["backtrace"] = "" + return (Null, status) + + # Does the client expect us to throw the exception? + if self.ForwardExceptions is True: + excep_mod = __import__("exceptions") + excep_class_ = getattr(excep_mod, answer.errmsg.split(':')[0], None) + if excep_class_ not is None: + instance = excep_class_(answer.backtrace) + raise (instance) + else: + raise (Exception(answer.errmsg)) + return (None,status)