diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index 1eacf13a2719a0231c4bb8e860b3191510dc194d..222317afafcdca81d6190ab2a47596365a3ba3ee 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -29,181 +29,168 @@ import sys import traceback import pickle + # create service: -class Service(): - """ - Service class for registering python functions with a Service name on a messgage bus. - """ - def __init__(self,BusName,ServiceName,ServiceHandler,options=None,exclusive=True,numthreads=1,Verbose=False): - self.BusName=BusName - self.ServiceName=ServiceName - self.ServiceHandler=ServiceHandler - self.connected=False - self.running=False - self.exclusive=exclusive - self.link_uuid=str(uuid.uuid4()) - self._numthreads=numthreads - self.Verbose=Verbose - self.options={} - self.options["capacity"]=numthreads*20 - if (self.exclusive==True): - self.options["link"]="{name:\""+self.link_uuid+"\", x-bindings:[{key:" + self.ServiceName + ", arguments: {\"qpid.exclusive-binding\":True}}]}" - if (isinstance(options,dict)): - for key,val in options.iter_items(): - self.options[key]=val - if (self.BusName!=None): - self.Listen=FromBus(self.BusName+"/"+self.ServiceName,options=self.options) - self.Reply=ToBus(self.BusName) - else: - self.Listen=FromBus(self.ServiceName,options=self.options) - self.Reply=self.replyto - - def _debug(self,txt): - if (self.Verbose==True): - print(txt) - - def StartListening(self,numthreads=None): - if (numthreads!=None): - self._numthreads=numthreads - self.connected=True - self.running=True - self._tr=[] - self.counter=[] - for i in range(self._numthreads): - self._tr.append(threading.Thread(target=self.loop,args=[i])) - self.counter.append(0) - self._tr[i].start() - - def __enter__(self): - if (isinstance(self.Listen,FromBus)): - self.Listen.open() - if (isinstance(self.Reply,ToBus)): - self.Reply.open() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.StopListening() - - def loop(self,index): - print( "Thread %d START Listening for messages on Bus %s and service name %s." %(index,self.BusName,self.ServiceName)) - while self.running: - msg=None - try: - # get the next message - msg=self.Listen.receive(1) - except Exception as e: - print e - - try: - if (isinstance(msg,ServiceMessage)): - # Initial status is unknown - status="unknown" - backtrace=None - errtxt=None - #exception=None - - # Keep track of number of processed messages - self.counter[index]+=1 - replymessage="" - # Execute the service handler function and send reply back to client - try: - self._debug("Running handler") - replymessage=self.ServiceHandler(msg.content) - self._debug("finished handler") - status="OK" - self._debug(status) - except Exception as e: - # Any thrown exceptions either Service exception or unhandled exception - # during the execution of the service handler is caught here. - self._debug("handling exception") - exc_info = sys.exc_info() - backtrace=traceback.format_exception(*exc_info) - errtxt=backtrace[-1] - self._debug(backtrace) - del backtrace[1] - del backtrace[0] - del backtrace[-1] - status="ERROR" - backtrace= ''.join(backtrace).encode('latin-1').decode('unicode_escape') - self._debug(backtrace) - if self.Verbose==True: - print status - print errtxt - print backtrace - replymessage=None - #exception=pickle.dumps(exc_info) - - - self._debug("Done call") - # Compose Reply message from reply and status. - ToSend=ReplyMessage(replymessage,msg.reply_to) - ToSend.status=status - if (status!="OK"): - if (errtxt!=None): - ToSend.errmsg=errtxt - else: - ToSend.errmsg="" - if (backtrace!=None): - ToSend.backtrace=backtrace - else: - ToSend.backtrace="" - #if (exception!=None): - # ToSend.exception=exception - #else: - # ToSend.exception="" - - - # ensure to deliver at the destination in the reply_to field - #ToSend.subject=msg.reply_to - - # show the message content if required by the Verbose flag. - if (self.Verbose==True): - msg.show() - ToSend.show() - - # send the result to the RPC client - if (isinstance(self.Reply,ToBus)): - self.Reply.send(ToSend) - else: - dest=ToBus(self.Reply) - with dest: - dest.send(ToSend) - - # acknowledge the message to the messaging subsystem - self.Listen.ack(msg) - else: - # Report if message wasn't a ServiceMessage. - if (msg!=None): - print "Received wrong messagetype %s, ServiceMessage expected." %(str(type(msg))) - - except Exception as e: - # Unknown problem in the library. Report this and continue. - excinfo = sys.exc_info() - print "ERROR during processing of incoming message." - traceback.print_exception(*excinfo) - print "Thread %d: Resuming listening on bus %s for service %s" %(index,self.BusName,self.ServiceName) - - print("Thread %d STOPPED Listening for messages on Bus %s and service name %s and %d processed." %(index,self.BusName,self.ServiceName,self.counter[index])) - - def StopListening(self): - # stop all running threads - if (self.running): - self.running=False - for i in range(self._numthreads): - self._tr[i].join() - # possibly doubly defined.. - if (self.connected): - self.connected=False - if (isinstance(self.Listen,FromBus)): - self.Listen.close() - if (isinstance(self.Reply,ToBus)): - self.Reply.close() - - def WaitForInterrupt(self): - looping=True - while looping: - try: - time.sleep(100) - except (KeyboardInterrupt): - looping=False - print("Keyboard interrupt received.") +class Service: + """ + Service class for registering python functions with a Service name on a messgage bus. + """ + def __init__(self, busname, servicename, servicehandler, options=None, exclusive=True, numthreads=1, verbose=False): + self.BusName = busname + self.ServiceName = servicename + self.ServiceHandler = servicehandler + self.connected = False + self.running = False + self.exclusive = exclusive + self.link_uuid = str(uuid.uuid4()) + self._numthreads = numthreads + self.Verbose = verbose + self.options = {"capacity": numthreads*20} + # Set appropriate flags for exclusive binding + if self.exclusive is True: + self.options["link"] = '{name:"' + self.link_uuid + '", x-bindings:[{key:' + self.ServiceName + ', arguments: {"qpid.exclusive-binding":True}}]}' + # only add options if it is given as a dictionary + if isinstance(options,dict): + for key,val in options.iteritems(): + self.options[key] = val + # Usually a service will be listening on a 'bus' implemented by a topic exchange + if self.BusName is not None: + self.Listen = FromBus(self.BusName+"/"+self.ServiceName,options=self.options) + self.Reply = ToBus(self.BusName) + else: + # assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies. + self.Listen = FromBus(self.ServiceName,options=self.options) + self.Reply=self.replyto + + def _debug(self, txt): + if self.Verbose is True: + print(txt) + + def StartListening(self, numthreads=None): + if numthreads is not None: + self._numthreads = numthreads + self.connected = True + self.running = True + self._tr = [] + self.reccounter = [] + self.okcounter =[] + for i in range(self._numthreads): + self._tr.append(threading.Thread(target=self.loop, args=[i])) + self.reccounter.append(0) + self.okcounter.append(0) + self._tr[i].start() + + def __enter__(self): + if isinstance(self.Listen,FromBus): + self.Listen.open() + if isinstance(self.Reply,ToBus): + self.Reply.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.StopListening() + + def send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""): + # Compose Reply message from reply and status. + ToSend = ReplyMessage(replymessage, reply_to) + ToSend.status = status + ToSend.errmsg = errtxt + ToSend.backtrace = backtrace + + # show the message content if required by the Verbose flag. + if self.Verbose is True: + msg.show() + ToSend.show() + + # send the result to the RPC client + if isinstance(self.Reply,ToBus): + self.Reply.send(ToSend) + else: + with ToBus(self.Reply) as dest: + dest.send(ToSend) + + + def loop(self, index): + print( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.BusName, self.ServiceName)) + while self.running: + try: + # get the next message + msg = self.Listen.receive(1) + # loop until we get a valid message. + if msg is None: + continue + + # report if messages are not Service Messages + if isinstance(msg, ServiceMessage) is not True: + print "Received wrong messagetype %s, ServiceMessage expected." %(str(type(msg))) + self.Listen.ack(msg) + continue + + # Keep track of number of received messages + self.reccounter[index] += 1 + + # Execute the service handler function and send reply back to client + try: + self._debug("Running handler") + replymessage = self.ServiceHandler(msg.content) + self._debug("finished handler") + self.send_reply(replymessage,"OK",msg.reply_to) + self.okcounter[index] += 1 + self.Listen.ack(msg) + continue + + except Exception as e: + # Any thrown exceptions either Service exception or unhandled exception + # during the execution of the service handler is caught here. + self._debug("handling exception") + exc_info = sys.exc_info() + status="ERROR" + rawbacktrace = traceback.format_exception(*exc_info) + errtxt = rawbacktrace[-1] + self._debug(rawbacktrace) + # cleanup the backtrace print by removing the first two lines and the last + del rawbacktrace[1] + del rawbacktrace[0] + del rawbacktrace[-1] + backtrace= ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape') + self._debug(backtrace) + if self.Verbose is True: + print status + print errtxt + print backtrace + self.send_reply(None,status,msg.reply_to,errtxt=errtxt,backtrace=backtrace) + + + except Exception as e: + # Unknown problem in the library. Report this and continue. + excinfo = sys.exc_info() + print "ERROR during processing of incoming message." + traceback.print_exception(*excinfo) + print "Thread %d: Resuming listening on bus %s for service %s" %(index,self.BusName,self.ServiceName) + + print("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." %(index,self.BusName,self.ServiceName)) + print(" %d messages received and %d processed OK." %(self.reccounter[index],self.okcounter[index])) + + def StopListening(self): + # stop all running threads + if self.running is True: + self.running = False + for i in range(self._numthreads): + self._tr[i].join() + + # possibly doubly defined.. + if self.connected is True: + self.connected = False + if isinstance(self.Listen, FromBus): + self.Listen.close() + if isinstance(self.Reply, ToBus): + self.Reply.close() + + def WaitForInterrupt(self): + looping = True + while looping: + try: + time.sleep(100) + except KeyboardInterrupt: + looping = False + print("Keyboard interrupt received.")