diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py index 164ade03098de92410dbbd36c27a8fabbcd3bfa6..ff458a4ed5819411c363ea6244178148f61cef12 100644 --- a/LCS/Messaging/python/messaging/RPC.py +++ b/LCS/Messaging/python/messaging/RPC.py @@ -27,6 +27,16 @@ import uuid class RPC(): def __init__(self, bus, service, timeout=None, ForwardExceptions=None, Verbose=None): + """ + Initialize an Remote procedure call using: + bus= <str> Bus Name + service= <str> Service Name + timeout= <float> Time to wait in seconds before the call is considered a failure. + Verbose= <bool> If True output extra logging to stdout. + + Use with extra care: ForwardExceptions= <bool> + This enables forwarding exceptions from the server side tobe raised at the client side durting RPC invocation. + """ self.timeout = timeout self.ForwardExceptions = False self.Verbose = False @@ -36,30 +46,52 @@ class RPC(): self.Verbose = True self.BusName = bus self.ServiceName = service - self.Request = ToBus(self.BusName + "/" + self.ServiceName) - self.ReplyAddress = "reply." + str(uuid.uuid4()) - self.Reply = FromBus(self.BusName + "/" + self.ReplyAddress) + if self.BusName is None: + self.Request = ToBus(self.ServiceName) + else: + self.Request = ToBus(self.BusName + "/" + self.ServiceName) def __enter__(self): + """ + Internal use only. (handles scope 'with') + """ self.Request.open() - self.Reply.open() return self def __exit__(self, exc_type, exc_val, exc_tb): + """ + Internal use only. (handles scope 'with') + """ self.Request.close() - self.Reply.close() def __call__(self, msg, timeout=None): + """ + Enable the use of the object to directly invoke the RPC. + + example: + + with RPC(bus,service) as myrpc: + result=myrpc(request) + + """ if timeout is None: timeout = self.timeout + # create unique reply address for this rpc call + options={'create':'always','delete':'receiver'} + ReplyAddress= "reply." + str(uuid.uuid4()) + if self.BusName is None: + Reply = FromBus(ReplyAddress+" ; "+str(options)) + else: + Reply = FromBus(self.BusName + "/" + ReplyAddress) + with Reply: + MyMsg = ServiceMessage(msg, ReplyAddress) + MyMsg.ttl = timeout + self.Request.send(MyMsg) + answer = Reply.receive(timeout) - MyMsg = ServiceMessage(msg, self.ReplyAddress) - self.Request.send(MyMsg) - answer = self.Reply.receive(timeout) - + status = {} # Check for Time-Out if answer is None: - status = [] status["state"] = "TIMEOUT" status["errmsg"] = "RPC Timed out" status["backtrace"] = "" @@ -68,7 +100,6 @@ class RPC(): # Check for illegal message type if isinstance(answer, ReplyMessage) is False: # if we come here we had a Time-Out - status = [] status["state"] = "ERROR" status["errmsg"] = "Incorrect messagetype (" + str(type(answer)) + ") received." status["backtrace"] = "" @@ -79,7 +110,6 @@ class RPC(): return (answer.content, answer.status) # Compile error handling from status - status = {} try: status["state"] = answer.status status["errmsg"] = answer.errmsg diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index f658ac5ce971da2aa4d1beddbc246433e8e34702..d2213db66ee023b8fd2361de3f9bb2178f984fe1 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -42,6 +42,14 @@ class Service: """ def __init__(self, busname, servicename, servicehandler, options=None, exclusive=True, numthreads=1, parsefullmessage=False, startonwith=False, verbose=False): + """ + Initialize Service object with busname (str) ,servicename (str) and servicehandler function. + additional parameters: + options= <dict> Dictionary of options passed to QPID + exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True) + numthreads= <int> Number of parallel threads processing messages (default: 1) + verbose= <bool> Output extra logging over stdout (default: False) + """ self.BusName = busname self.ServiceName = servicename self.ServiceHandler = servicehandler @@ -65,10 +73,16 @@ class Service: self.options[key] = val def _debug(self, txt): + """ + Internal use only. + """ if self.Verbose is True: print(txt) def StartListening(self, numthreads=None): + """ + Start the background threads and process incoming messages. + """ if numthreads is not None: self._numthreads = numthreads if self.connected is False: @@ -85,6 +99,9 @@ class Service: self._tr[i].start() def StopListening(self): + """ + Stop the background threads that listen to incoming messages. + """ # stop all running threads if self.running is True: self.running = False @@ -94,6 +111,9 @@ class Service: print(" %d messages received and %d processed OK." % (self.reccounter[i], self.okcounter[i])) def WaitForInterrupt(self): + """ + Useful (low cpu load) loop that waits for keyboard interrupt. + """ looping = True while looping: try: @@ -103,6 +123,9 @@ class Service: print("Keyboard interrupt received.") def __enter__(self): + """ + Internal use only. Handles scope with keyword 'with' + """ # Usually a service will be listening on a 'bus' implemented by a topic exchange if self.BusName is not None: self.Listen = FromBus(self.BusName+"/"+self.ServiceName, options=self.options) @@ -112,9 +135,9 @@ class Service: # Handle case when queues are used else: # assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies. - self.Listen = FromBus(self.BusName+"/"+self.ServiceName, options=self.options) + self.Listen = FromBus(self.ServiceName, options=self.options) self.Listen.open() - self.Reply=self.replyto + self.Reply=None self.connected = True @@ -124,6 +147,9 @@ class Service: return self def __exit__(self, exc_type, exc_val, exc_tb): + """ + Internal use only. Handles scope with keyword 'with' + """ self.StopListening() # close the listeners if self.connected is True: @@ -134,6 +160,9 @@ class Service: self.Reply.close() def _send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""): + """ + Internal use only. Send a reply message to the RPC client including exception info. + """ # Compose Reply message from reply and status. if isinstance(replymessage,ReplyMessage): ToSend = replymessage @@ -145,7 +174,6 @@ class Service: # show the message content if required by the Verbose flag. if self.Verbose is True: - msg.show() ToSend.show() # send the result to the RPC client @@ -153,11 +181,17 @@ class Service: ToSend.subject = reply_to self.Reply.send(ToSend) else: - with ToBus(reply_to) as dest: - dest.send(ToSend) + try: + with ToBus(reply_to) as dest: + dest.send(ToSend) + except MessageBusError as e: + print("Failed to send reply to reply address %s" %(reply_to)) def _loop(self, index): + """ + Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument. + """ print( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.BusName, self.ServiceName)) while self.running: try: