diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py index 9ae7af44c2eff1227db4aafadef3f629f332ace5..72d73d6d271df5371e195eed5a855c662b5a679c 100644 --- a/LCS/Messaging/python/messaging/RPC.py +++ b/LCS/Messaging/python/messaging/RPC.py @@ -41,7 +41,7 @@ class RPC(): As a side-effect the sender and session are destroyed. """ - def __init__(self, service, busname=None, timeout=None, ForwardExceptions=None, Verbose=None): + def __init__(self, service, **kwargs ): #busname=None, timeout=None, ForwardExceptions=None, Verbose=None): """ Initialize an Remote procedure call using: service= <str> Service Name @@ -52,19 +52,17 @@ class RPC(): Use with extra care: ForwardExceptions= <bool> This enables forwarding exceptions from the server side tobe raised at the client side durting RPC invocation. """ - self.timeout = timeout - self.ForwardExceptions = False - self.Verbose = False - if ForwardExceptions is True: - self.ForwardExceptions = True - if Verbose is True: - self.Verbose = True - self.BusName = busname + self.timeout = kwargs.pop("timeout",None) + self.ForwardExceptions = kwargs.pop("ForwardExceptions",False) + self.Verbose = kwargs.pop("Verbose",False) + self.BusName = kwargs.pop("busname",None) self.ServiceName = service if self.BusName is None: self.Request = ToBus(self.ServiceName) else: self.Request = ToBus(self.BusName + "/" + self.ServiceName) + if len(kwargs): + raise AttributeError("Unexpected argument passed to RPC class: %s", kwargs) def __enter__(self): """ @@ -79,7 +77,7 @@ class RPC(): """ self.Request.close() - def __call__(self, msg, timeout=None): + def __call__(self, *msg, **kwargs): """ Enable the use of the object to directly invoke the RPC. @@ -89,8 +87,27 @@ class RPC(): result=myrpc(request) """ - if timeout is None: - timeout = self.timeout + timeout= kwargs.pop("timeout",self.timeout) + + + Content=list(msg) + HasKwArgs=(len(kwargs)>0) + # more than one argument given? + HasArgs=(len(msg)> 1 ) or (( len(kwargs)>0 ) and (len(msg)>0)) + if HasArgs: + # convert arguments to list + Content = list(msg) + if HasKwArgs: + # if both positional and named arguments then + # we add the kwargs dictionary as the last item in the list + Content.append(kwargs) + else: + if HasKwArgs: + # we have only one named argument + Content=kwargs + else: + # we have only one positional argument + Content=Content[0] # create unique reply address for this rpc call options={'create':'always','delete':'receiver'} ReplyAddress= "reply." + str(uuid.uuid4()) @@ -99,7 +116,7 @@ class RPC(): else: Reply = FromBus(self.BusName + "/" + ReplyAddress) with Reply: - MyMsg = ServiceMessage(msg, ReplyAddress) + MyMsg = ServiceMessage(Content, ReplyAddress , has_args=HasArgs, has_kwargs=HasKwArgs) MyMsg.ttl = timeout self.Request.send(MyMsg) answer = Reply.receive(timeout) diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index dbf7f6f12af7a617b3c1a4b6d49983587dda6379..ee80120d1ecd80efc5ea9ca3c6a9541322c182e0 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -117,10 +117,11 @@ class Service(object): self.options = {"capacity": self._numthreads*20} options = kwargs.pop("options", None) self.parsefullmessage = kwargs.pop("parsefullmessage", False) - self.startonwith = kwargs.pop("startonwith", False) + self.startonwith = kwargs.pop("startonwith", None) self.handler_args = kwargs.pop("handler_args", None) + self.listening = False if len(kwargs): - raise AttributeError("Unexpected argument passed to Serice class: %s", kwargs) + raise AttributeError("Unexpected argument passed to Service class: %s", kwargs) # Set appropriate flags for exclusive binding if self.exclusive is True: @@ -144,12 +145,29 @@ class Service(object): """ Start the background threads and process incoming messages. """ + if self.listening is True: + return + + # Usually a service will be listening on a 'bus' implemented by a topic exchange + if self.busname is not None: + self.listener = FromBus(self.busname+"/"+self.service_name, options=self.options) + self.reply_bus = ToBus(self.busname) + self.listener.open() + self.reply_bus.open() + # Handle case when queues are used + else: + # assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies. + self.listener = FromBus(self.service_name, options=self.options) + self.listener.open() + self.reply_bus=None + + self.connected = True + if numthreads is not None: self._numthreads = numthreads - if self.connected is False: - raise Exception("start_listening Called on closed connections") - self.running = True + # use a list to ensure that threads always 'see' changes in the running state. + self.running = [ True ] self._tr = [] self.reccounter = [] self.okcounter =[] @@ -175,12 +193,21 @@ class Service(object): Stop the background threads that listen to incoming messages. """ # stop all running threads - if self.running is True: - self.running = False + if self.running[0] is True: + self.running[0] = False for i in range(self._numthreads): self._tr[i].join() logger.info("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." % (i, self.busname, self.service_name)) logger.info(" %d messages received and %d processed OK." % (self.reccounter[i], self.okcounter[i])) + self.listening = False + # close the listeners + if self.connected is True: + if isinstance(self.listener, FromBus): + self.listener.close() + if isinstance(self.reply_bus, ToBus): + self.reply_bus.close() + self.connected = False + def wait_for_interrupt(self): """ @@ -199,24 +226,7 @@ class Service(object): """ Internal use only. Handles scope with keyword 'with' """ - # Usually a service will be listening on a 'bus' implemented by a topic exchange - if self.busname is not None: - self.listener = FromBus(self.busname+"/"+self.service_name, options=self.options) - self.reply_bus = ToBus(self.busname) - self.listener.open() - self.reply_bus.open() - # Handle case when queues are used - else: - # assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies. - self.listener = FromBus(self.service_name, options=self.options) - self.listener.open() - self.reply_bus=None - - self.connected = True - - # If required start listening on 'with' - if self.startonwith is True: - self.start_listening() + self.start_listening() return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -224,13 +234,6 @@ class Service(object): Internal use only. Handles scope with keyword 'with' """ self.stop_listening() - # close the listeners - if self.connected is True: - self.connected = False - if isinstance(self.listener, FromBus): - self.listener.close() - if isinstance(self.reply_bus, ToBus): - self.reply_bus.close() def _send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""): """ @@ -285,7 +288,7 @@ class Service(object): except Exception as e: logger.error("prepare_loop() failed with %s", e) - while self.running: + while self.running[0]: try: service_handler.prepare_receive() except Exception as e: @@ -314,7 +317,26 @@ class Service(object): if self.parsefullmessage is True: replymessage = service_handler.handle_message(msg) else: - replymessage = service_handler.handle_message(msg.content) + # check for positional arguments and named arguments + if msg.has_args=="True": + rpcargs=msg.content + if msg.has_kwargs=="True": + # both positional and named arguments + rpckwargs=rpcargs[-1] + del rpcargs[-1] + rpcargs=tuple(rpcargs) + replymessage = service_handler.handle_message(*rpcargs,**rpckwargs) + else: + # only positional arguments + rpcargs=tuple(rpcargs) + replymessage = service_handler.handle_message(*rpcargs) + else: + if msg.has_kwargs=="True": + # only named arguments + replymessage = service_handler.handle_message(**(msg.content)) + else: + replymessage = service_handler.handle_message(msg.content) + self._debug("finished handler") self._send_reply(replymessage,"OK",msg.reply_to) self.okcounter[thread_idx] += 1 diff --git a/LCS/Messaging/python/messaging/messages.py b/LCS/Messaging/python/messaging/messages.py index 1ecf010eac46d6a53f1a59df5cb425f1edbab682..fab2e94e21272575b41917b90b6b6ee2ef035e54 100644 --- a/LCS/Messaging/python/messaging/messages.py +++ b/LCS/Messaging/python/messaging/messages.py @@ -281,11 +281,15 @@ class ServiceMessage(LofarMessage): subsystem. A service message must contain a valid ``ReplyTo`` property. """ - def __init__(self, content=None, reply_to=None): + def __init__(self, content=None, reply_to=None,**kwargs): #reply_to=None, has_args=None, has_kwargs=None): super(ServiceMessage, self).__init__(content) if (reply_to!=None): - self.reply_to = reply_to - + #if (len(kwargs)>0): + #reply_to = kwargs.pop("reply_to",None) + #if (reply_to!=None): + self.reply_to = reply_to + self.has_args = str(kwargs.pop("has_args",False)) + self.has_kwargs = str(kwargs.pop("has_kwargs",False)) class ReplyMessage(LofarMessage): """