Skip to content
Snippets Groups Projects
Commit 0612e9e6 authored by Jan Rinze Peterzon's avatar Jan Rinze Peterzon
Browse files

Task #8531: start and stop listening can now be used without -with- which...

Task #8531: start and stop listening can now be used without -with- which defaults to start listening now. refactored some code and added support in RPC for multiple arguments.
parent db257fc6
Branches
Tags
No related merge requests found
......@@ -41,7 +41,7 @@ class RPC():
As a side-effect the sender and session are destroyed.
"""
def __init__(self, service, busname=None, timeout=None, ForwardExceptions=None, Verbose=None):
def __init__(self, service, **kwargs ): #busname=None, timeout=None, ForwardExceptions=None, Verbose=None):
"""
Initialize an Remote procedure call using:
service= <str> Service Name
......@@ -52,19 +52,17 @@ class RPC():
Use with extra care: ForwardExceptions= <bool>
This enables forwarding exceptions from the server side tobe raised at the client side durting RPC invocation.
"""
self.timeout = timeout
self.ForwardExceptions = False
self.Verbose = False
if ForwardExceptions is True:
self.ForwardExceptions = True
if Verbose is True:
self.Verbose = True
self.BusName = busname
self.timeout = kwargs.pop("timeout",None)
self.ForwardExceptions = kwargs.pop("ForwardExceptions",False)
self.Verbose = kwargs.pop("Verbose",False)
self.BusName = kwargs.pop("busname",None)
self.ServiceName = service
if self.BusName is None:
self.Request = ToBus(self.ServiceName)
else:
self.Request = ToBus(self.BusName + "/" + self.ServiceName)
if len(kwargs):
raise AttributeError("Unexpected argument passed to RPC class: %s", kwargs)
def __enter__(self):
"""
......@@ -79,7 +77,7 @@ class RPC():
"""
self.Request.close()
def __call__(self, msg, timeout=None):
def __call__(self, *msg, **kwargs):
"""
Enable the use of the object to directly invoke the RPC.
......@@ -89,8 +87,27 @@ class RPC():
result=myrpc(request)
"""
if timeout is None:
timeout = self.timeout
timeout= kwargs.pop("timeout",self.timeout)
Content=list(msg)
HasKwArgs=(len(kwargs)>0)
# more than one argument given?
HasArgs=(len(msg)> 1 ) or (( len(kwargs)>0 ) and (len(msg)>0))
if HasArgs:
# convert arguments to list
Content = list(msg)
if HasKwArgs:
# if both positional and named arguments then
# we add the kwargs dictionary as the last item in the list
Content.append(kwargs)
else:
if HasKwArgs:
# we have only one named argument
Content=kwargs
else:
# we have only one positional argument
Content=Content[0]
# create unique reply address for this rpc call
options={'create':'always','delete':'receiver'}
ReplyAddress= "reply." + str(uuid.uuid4())
......@@ -99,7 +116,7 @@ class RPC():
else:
Reply = FromBus(self.BusName + "/" + ReplyAddress)
with Reply:
MyMsg = ServiceMessage(msg, ReplyAddress)
MyMsg = ServiceMessage(Content, ReplyAddress , has_args=HasArgs, has_kwargs=HasKwArgs)
MyMsg.ttl = timeout
self.Request.send(MyMsg)
answer = Reply.receive(timeout)
......
......@@ -117,10 +117,11 @@ class Service(object):
self.options = {"capacity": self._numthreads*20}
options = kwargs.pop("options", None)
self.parsefullmessage = kwargs.pop("parsefullmessage", False)
self.startonwith = kwargs.pop("startonwith", False)
self.startonwith = kwargs.pop("startonwith", None)
self.handler_args = kwargs.pop("handler_args", None)
self.listening = False
if len(kwargs):
raise AttributeError("Unexpected argument passed to Serice class: %s", kwargs)
raise AttributeError("Unexpected argument passed to Service class: %s", kwargs)
# Set appropriate flags for exclusive binding
if self.exclusive is True:
......@@ -144,12 +145,29 @@ class Service(object):
"""
Start the background threads and process incoming messages.
"""
if self.listening is True:
return
# Usually a service will be listening on a 'bus' implemented by a topic exchange
if self.busname is not None:
self.listener = FromBus(self.busname+"/"+self.service_name, options=self.options)
self.reply_bus = ToBus(self.busname)
self.listener.open()
self.reply_bus.open()
# Handle case when queues are used
else:
# assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies.
self.listener = FromBus(self.service_name, options=self.options)
self.listener.open()
self.reply_bus=None
self.connected = True
if numthreads is not None:
self._numthreads = numthreads
if self.connected is False:
raise Exception("start_listening Called on closed connections")
self.running = True
# use a list to ensure that threads always 'see' changes in the running state.
self.running = [ True ]
self._tr = []
self.reccounter = []
self.okcounter =[]
......@@ -175,12 +193,21 @@ class Service(object):
Stop the background threads that listen to incoming messages.
"""
# stop all running threads
if self.running is True:
self.running = False
if self.running[0] is True:
self.running[0] = False
for i in range(self._numthreads):
self._tr[i].join()
logger.info("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." % (i, self.busname, self.service_name))
logger.info(" %d messages received and %d processed OK." % (self.reccounter[i], self.okcounter[i]))
self.listening = False
# close the listeners
if self.connected is True:
if isinstance(self.listener, FromBus):
self.listener.close()
if isinstance(self.reply_bus, ToBus):
self.reply_bus.close()
self.connected = False
def wait_for_interrupt(self):
"""
......@@ -199,23 +226,6 @@ class Service(object):
"""
Internal use only. Handles scope with keyword 'with'
"""
# Usually a service will be listening on a 'bus' implemented by a topic exchange
if self.busname is not None:
self.listener = FromBus(self.busname+"/"+self.service_name, options=self.options)
self.reply_bus = ToBus(self.busname)
self.listener.open()
self.reply_bus.open()
# Handle case when queues are used
else:
# assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies.
self.listener = FromBus(self.service_name, options=self.options)
self.listener.open()
self.reply_bus=None
self.connected = True
# If required start listening on 'with'
if self.startonwith is True:
self.start_listening()
return self
......@@ -224,13 +234,6 @@ class Service(object):
Internal use only. Handles scope with keyword 'with'
"""
self.stop_listening()
# close the listeners
if self.connected is True:
self.connected = False
if isinstance(self.listener, FromBus):
self.listener.close()
if isinstance(self.reply_bus, ToBus):
self.reply_bus.close()
def _send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""):
"""
......@@ -285,7 +288,7 @@ class Service(object):
except Exception as e:
logger.error("prepare_loop() failed with %s", e)
while self.running:
while self.running[0]:
try:
service_handler.prepare_receive()
except Exception as e:
......@@ -313,8 +316,27 @@ class Service(object):
self._debug("Running handler")
if self.parsefullmessage is True:
replymessage = service_handler.handle_message(msg)
else:
# check for positional arguments and named arguments
if msg.has_args=="True":
rpcargs=msg.content
if msg.has_kwargs=="True":
# both positional and named arguments
rpckwargs=rpcargs[-1]
del rpcargs[-1]
rpcargs=tuple(rpcargs)
replymessage = service_handler.handle_message(*rpcargs,**rpckwargs)
else:
# only positional arguments
rpcargs=tuple(rpcargs)
replymessage = service_handler.handle_message(*rpcargs)
else:
if msg.has_kwargs=="True":
# only named arguments
replymessage = service_handler.handle_message(**(msg.content))
else:
replymessage = service_handler.handle_message(msg.content)
self._debug("finished handler")
self._send_reply(replymessage,"OK",msg.reply_to)
self.okcounter[thread_idx] += 1
......
......@@ -281,11 +281,15 @@ class ServiceMessage(LofarMessage):
subsystem. A service message must contain a valid ``ReplyTo`` property.
"""
def __init__(self, content=None, reply_to=None):
def __init__(self, content=None, reply_to=None,**kwargs): #reply_to=None, has_args=None, has_kwargs=None):
super(ServiceMessage, self).__init__(content)
if (reply_to!=None):
#if (len(kwargs)>0):
#reply_to = kwargs.pop("reply_to",None)
#if (reply_to!=None):
self.reply_to = reply_to
self.has_args = str(kwargs.pop("has_args",False))
self.has_kwargs = str(kwargs.pop("has_kwargs",False))
class ReplyMessage(LofarMessage):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment