Skip to content
Snippets Groups Projects
Commit ce9a061f authored by Jan Rinze Peterzon's avatar Jan Rinze Peterzon
Browse files

Task #8571: Added comment for functions and fix corner case for messages being...

Task #8571: Added comment for functions and fix corner case for messages being received from a previous timed-out rpc call. Also fixes a bug with using queues instead of a bus exchange.
parent e5c38fb9
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,16 @@ import uuid
class RPC():
def __init__(self, bus, service, timeout=None, ForwardExceptions=None, Verbose=None):
"""
Initialize an Remote procedure call using:
bus= <str> Bus Name
service= <str> Service Name
timeout= <float> Time to wait in seconds before the call is considered a failure.
Verbose= <bool> If True output extra logging to stdout.
Use with extra care: ForwardExceptions= <bool>
This enables forwarding exceptions from the server side tobe raised at the client side durting RPC invocation.
"""
self.timeout = timeout
self.ForwardExceptions = False
self.Verbose = False
......@@ -36,30 +46,52 @@ class RPC():
self.Verbose = True
self.BusName = bus
self.ServiceName = service
self.Request = ToBus(self.BusName + "/" + self.ServiceName)
self.ReplyAddress = "reply." + str(uuid.uuid4())
self.Reply = FromBus(self.BusName + "/" + self.ReplyAddress)
if self.BusName is None:
self.Request = ToBus(self.ServiceName)
else:
self.Request = ToBus(self.BusName + "/" + self.ServiceName)
def __enter__(self):
"""
Internal use only. (handles scope 'with')
"""
self.Request.open()
self.Reply.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Internal use only. (handles scope 'with')
"""
self.Request.close()
self.Reply.close()
def __call__(self, msg, timeout=None):
"""
Enable the use of the object to directly invoke the RPC.
example:
with RPC(bus,service) as myrpc:
result=myrpc(request)
"""
if timeout is None:
timeout = self.timeout
# create unique reply address for this rpc call
options={'create':'always','delete':'receiver'}
ReplyAddress= "reply." + str(uuid.uuid4())
if self.BusName is None:
Reply = FromBus(ReplyAddress+" ; "+str(options))
else:
Reply = FromBus(self.BusName + "/" + ReplyAddress)
with Reply:
MyMsg = ServiceMessage(msg, ReplyAddress)
MyMsg.ttl = timeout
self.Request.send(MyMsg)
answer = Reply.receive(timeout)
MyMsg = ServiceMessage(msg, self.ReplyAddress)
self.Request.send(MyMsg)
answer = self.Reply.receive(timeout)
status = {}
# Check for Time-Out
if answer is None:
status = []
status["state"] = "TIMEOUT"
status["errmsg"] = "RPC Timed out"
status["backtrace"] = ""
......@@ -68,7 +100,6 @@ class RPC():
# Check for illegal message type
if isinstance(answer, ReplyMessage) is False:
# if we come here we had a Time-Out
status = []
status["state"] = "ERROR"
status["errmsg"] = "Incorrect messagetype (" + str(type(answer)) + ") received."
status["backtrace"] = ""
......@@ -79,7 +110,6 @@ class RPC():
return (answer.content, answer.status)
# Compile error handling from status
status = {}
try:
status["state"] = answer.status
status["errmsg"] = answer.errmsg
......
......@@ -42,6 +42,14 @@ class Service:
"""
def __init__(self, busname, servicename, servicehandler, options=None, exclusive=True, numthreads=1, parsefullmessage=False, startonwith=False, verbose=False):
"""
Initialize Service object with busname (str) ,servicename (str) and servicehandler function.
additional parameters:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
self.BusName = busname
self.ServiceName = servicename
self.ServiceHandler = servicehandler
......@@ -65,10 +73,16 @@ class Service:
self.options[key] = val
def _debug(self, txt):
"""
Internal use only.
"""
if self.Verbose is True:
print(txt)
def StartListening(self, numthreads=None):
"""
Start the background threads and process incoming messages.
"""
if numthreads is not None:
self._numthreads = numthreads
if self.connected is False:
......@@ -85,6 +99,9 @@ class Service:
self._tr[i].start()
def StopListening(self):
"""
Stop the background threads that listen to incoming messages.
"""
# stop all running threads
if self.running is True:
self.running = False
......@@ -94,6 +111,9 @@ class Service:
print(" %d messages received and %d processed OK." % (self.reccounter[i], self.okcounter[i]))
def WaitForInterrupt(self):
"""
Useful (low cpu load) loop that waits for keyboard interrupt.
"""
looping = True
while looping:
try:
......@@ -103,6 +123,9 @@ class Service:
print("Keyboard interrupt received.")
def __enter__(self):
"""
Internal use only. Handles scope with keyword 'with'
"""
# Usually a service will be listening on a 'bus' implemented by a topic exchange
if self.BusName is not None:
self.Listen = FromBus(self.BusName+"/"+self.ServiceName, options=self.options)
......@@ -112,9 +135,9 @@ class Service:
# Handle case when queues are used
else:
# assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies.
self.Listen = FromBus(self.BusName+"/"+self.ServiceName, options=self.options)
self.Listen = FromBus(self.ServiceName, options=self.options)
self.Listen.open()
self.Reply=self.replyto
self.Reply=None
self.connected = True
......@@ -124,6 +147,9 @@ class Service:
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Internal use only. Handles scope with keyword 'with'
"""
self.StopListening()
# close the listeners
if self.connected is True:
......@@ -134,6 +160,9 @@ class Service:
self.Reply.close()
def _send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""):
"""
Internal use only. Send a reply message to the RPC client including exception info.
"""
# Compose Reply message from reply and status.
if isinstance(replymessage,ReplyMessage):
ToSend = replymessage
......@@ -145,7 +174,6 @@ class Service:
# show the message content if required by the Verbose flag.
if self.Verbose is True:
msg.show()
ToSend.show()
# send the result to the RPC client
......@@ -153,11 +181,17 @@ class Service:
ToSend.subject = reply_to
self.Reply.send(ToSend)
else:
with ToBus(reply_to) as dest:
dest.send(ToSend)
try:
with ToBus(reply_to) as dest:
dest.send(ToSend)
except MessageBusError as e:
print("Failed to send reply to reply address %s" %(reply_to))
def _loop(self, index):
"""
Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument.
"""
print( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.BusName, self.ServiceName))
while self.running:
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment