Skip to content
Snippets Groups Projects
Commit ad0705a0 authored by Jan Rinze Peterzon's avatar Jan Rinze Peterzon
Browse files

Task #8571: Refactored Service.py

parent 2efba3af
No related branches found
No related tags found
No related merge requests found
......@@ -29,181 +29,168 @@ import sys
import traceback
import pickle
# create service:
class Service():
"""
Service class for registering python functions with a Service name on a messgage bus.
"""
def __init__(self,BusName,ServiceName,ServiceHandler,options=None,exclusive=True,numthreads=1,Verbose=False):
self.BusName=BusName
self.ServiceName=ServiceName
self.ServiceHandler=ServiceHandler
self.connected=False
self.running=False
self.exclusive=exclusive
self.link_uuid=str(uuid.uuid4())
self._numthreads=numthreads
self.Verbose=Verbose
self.options={}
self.options["capacity"]=numthreads*20
if (self.exclusive==True):
self.options["link"]="{name:\""+self.link_uuid+"\", x-bindings:[{key:" + self.ServiceName + ", arguments: {\"qpid.exclusive-binding\":True}}]}"
if (isinstance(options,dict)):
for key,val in options.iter_items():
self.options[key]=val
if (self.BusName!=None):
self.Listen=FromBus(self.BusName+"/"+self.ServiceName,options=self.options)
self.Reply=ToBus(self.BusName)
else:
self.Listen=FromBus(self.ServiceName,options=self.options)
self.Reply=self.replyto
def _debug(self,txt):
if (self.Verbose==True):
print(txt)
def StartListening(self,numthreads=None):
if (numthreads!=None):
self._numthreads=numthreads
self.connected=True
self.running=True
self._tr=[]
self.counter=[]
for i in range(self._numthreads):
self._tr.append(threading.Thread(target=self.loop,args=[i]))
self.counter.append(0)
self._tr[i].start()
def __enter__(self):
if (isinstance(self.Listen,FromBus)):
self.Listen.open()
if (isinstance(self.Reply,ToBus)):
self.Reply.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.StopListening()
def loop(self,index):
print( "Thread %d START Listening for messages on Bus %s and service name %s." %(index,self.BusName,self.ServiceName))
while self.running:
msg=None
try:
# get the next message
msg=self.Listen.receive(1)
except Exception as e:
print e
try:
if (isinstance(msg,ServiceMessage)):
# Initial status is unknown
status="unknown"
backtrace=None
errtxt=None
#exception=None
# Keep track of number of processed messages
self.counter[index]+=1
replymessage=""
# Execute the service handler function and send reply back to client
try:
self._debug("Running handler")
replymessage=self.ServiceHandler(msg.content)
self._debug("finished handler")
status="OK"
self._debug(status)
except Exception as e:
# Any thrown exceptions either Service exception or unhandled exception
# during the execution of the service handler is caught here.
self._debug("handling exception")
exc_info = sys.exc_info()
backtrace=traceback.format_exception(*exc_info)
errtxt=backtrace[-1]
self._debug(backtrace)
del backtrace[1]
del backtrace[0]
del backtrace[-1]
status="ERROR"
backtrace= ''.join(backtrace).encode('latin-1').decode('unicode_escape')
self._debug(backtrace)
if self.Verbose==True:
print status
print errtxt
print backtrace
replymessage=None
#exception=pickle.dumps(exc_info)
self._debug("Done call")
# Compose Reply message from reply and status.
ToSend=ReplyMessage(replymessage,msg.reply_to)
ToSend.status=status
if (status!="OK"):
if (errtxt!=None):
ToSend.errmsg=errtxt
else:
ToSend.errmsg=""
if (backtrace!=None):
ToSend.backtrace=backtrace
else:
ToSend.backtrace=""
#if (exception!=None):
# ToSend.exception=exception
#else:
# ToSend.exception=""
# ensure to deliver at the destination in the reply_to field
#ToSend.subject=msg.reply_to
# show the message content if required by the Verbose flag.
if (self.Verbose==True):
msg.show()
ToSend.show()
# send the result to the RPC client
if (isinstance(self.Reply,ToBus)):
self.Reply.send(ToSend)
else:
dest=ToBus(self.Reply)
with dest:
dest.send(ToSend)
# acknowledge the message to the messaging subsystem
self.Listen.ack(msg)
else:
# Report if message wasn't a ServiceMessage.
if (msg!=None):
print "Received wrong messagetype %s, ServiceMessage expected." %(str(type(msg)))
except Exception as e:
# Unknown problem in the library. Report this and continue.
excinfo = sys.exc_info()
print "ERROR during processing of incoming message."
traceback.print_exception(*excinfo)
print "Thread %d: Resuming listening on bus %s for service %s" %(index,self.BusName,self.ServiceName)
print("Thread %d STOPPED Listening for messages on Bus %s and service name %s and %d processed." %(index,self.BusName,self.ServiceName,self.counter[index]))
def StopListening(self):
# stop all running threads
if (self.running):
self.running=False
for i in range(self._numthreads):
self._tr[i].join()
# possibly doubly defined..
if (self.connected):
self.connected=False
if (isinstance(self.Listen,FromBus)):
self.Listen.close()
if (isinstance(self.Reply,ToBus)):
self.Reply.close()
def WaitForInterrupt(self):
looping=True
while looping:
try:
time.sleep(100)
except (KeyboardInterrupt):
looping=False
print("Keyboard interrupt received.")
class Service:
"""
Service class for registering python functions with a Service name on a messgage bus.
"""
def __init__(self, busname, servicename, servicehandler, options=None, exclusive=True, numthreads=1, verbose=False):
self.BusName = busname
self.ServiceName = servicename
self.ServiceHandler = servicehandler
self.connected = False
self.running = False
self.exclusive = exclusive
self.link_uuid = str(uuid.uuid4())
self._numthreads = numthreads
self.Verbose = verbose
self.options = {"capacity": numthreads*20}
# Set appropriate flags for exclusive binding
if self.exclusive is True:
self.options["link"] = '{name:"' + self.link_uuid + '", x-bindings:[{key:' + self.ServiceName + ', arguments: {"qpid.exclusive-binding":True}}]}'
# only add options if it is given as a dictionary
if isinstance(options,dict):
for key,val in options.iteritems():
self.options[key] = val
# Usually a service will be listening on a 'bus' implemented by a topic exchange
if self.BusName is not None:
self.Listen = FromBus(self.BusName+"/"+self.ServiceName,options=self.options)
self.Reply = ToBus(self.BusName)
else:
# assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies.
self.Listen = FromBus(self.ServiceName,options=self.options)
self.Reply=self.replyto
def _debug(self, txt):
if self.Verbose is True:
print(txt)
def StartListening(self, numthreads=None):
if numthreads is not None:
self._numthreads = numthreads
self.connected = True
self.running = True
self._tr = []
self.reccounter = []
self.okcounter =[]
for i in range(self._numthreads):
self._tr.append(threading.Thread(target=self.loop, args=[i]))
self.reccounter.append(0)
self.okcounter.append(0)
self._tr[i].start()
def __enter__(self):
if isinstance(self.Listen,FromBus):
self.Listen.open()
if isinstance(self.Reply,ToBus):
self.Reply.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.StopListening()
def send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""):
# Compose Reply message from reply and status.
ToSend = ReplyMessage(replymessage, reply_to)
ToSend.status = status
ToSend.errmsg = errtxt
ToSend.backtrace = backtrace
# show the message content if required by the Verbose flag.
if self.Verbose is True:
msg.show()
ToSend.show()
# send the result to the RPC client
if isinstance(self.Reply,ToBus):
self.Reply.send(ToSend)
else:
with ToBus(self.Reply) as dest:
dest.send(ToSend)
def loop(self, index):
print( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.BusName, self.ServiceName))
while self.running:
try:
# get the next message
msg = self.Listen.receive(1)
# loop until we get a valid message.
if msg is None:
continue
# report if messages are not Service Messages
if isinstance(msg, ServiceMessage) is not True:
print "Received wrong messagetype %s, ServiceMessage expected." %(str(type(msg)))
self.Listen.ack(msg)
continue
# Keep track of number of received messages
self.reccounter[index] += 1
# Execute the service handler function and send reply back to client
try:
self._debug("Running handler")
replymessage = self.ServiceHandler(msg.content)
self._debug("finished handler")
self.send_reply(replymessage,"OK",msg.reply_to)
self.okcounter[index] += 1
self.Listen.ack(msg)
continue
except Exception as e:
# Any thrown exceptions either Service exception or unhandled exception
# during the execution of the service handler is caught here.
self._debug("handling exception")
exc_info = sys.exc_info()
status="ERROR"
rawbacktrace = traceback.format_exception(*exc_info)
errtxt = rawbacktrace[-1]
self._debug(rawbacktrace)
# cleanup the backtrace print by removing the first two lines and the last
del rawbacktrace[1]
del rawbacktrace[0]
del rawbacktrace[-1]
backtrace= ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape')
self._debug(backtrace)
if self.Verbose is True:
print status
print errtxt
print backtrace
self.send_reply(None,status,msg.reply_to,errtxt=errtxt,backtrace=backtrace)
except Exception as e:
# Unknown problem in the library. Report this and continue.
excinfo = sys.exc_info()
print "ERROR during processing of incoming message."
traceback.print_exception(*excinfo)
print "Thread %d: Resuming listening on bus %s for service %s" %(index,self.BusName,self.ServiceName)
print("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." %(index,self.BusName,self.ServiceName))
print(" %d messages received and %d processed OK." %(self.reccounter[index],self.okcounter[index]))
def StopListening(self):
# stop all running threads
if self.running is True:
self.running = False
for i in range(self._numthreads):
self._tr[i].join()
# possibly doubly defined..
if self.connected is True:
self.connected = False
if isinstance(self.Listen, FromBus):
self.Listen.close()
if isinstance(self.Reply, ToBus):
self.Reply.close()
def WaitForInterrupt(self):
looping = True
while looping:
try:
time.sleep(100)
except KeyboardInterrupt:
looping = False
print("Keyboard interrupt received.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment