Skip to content
Snippets Groups Projects
Commit 2dd16ee7 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8582: merged changes to PyMessaging back to trunk

parent 512e2547
No related branches found
No related tags found
No related merge requests found
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
# #
# RPC invocation with possible timeout # RPC invocation with possible timeout
from lofar.messaging.messagebus import ToBus, FromBus from .messagebus import ToBus, FromBus
from lofar.messaging.messages import RequestMessage, ReplyMessage from .messages import RequestMessage, ReplyMessage
import uuid import uuid
def _analyze_args(args,kwargs): def _analyze_args(args,kwargs):
...@@ -126,7 +126,7 @@ class RPC(): ...@@ -126,7 +126,7 @@ class RPC():
else: else:
Reply = FromBus("%s/%s" % (self.BusName, ReplyAddress)) Reply = FromBus("%s/%s" % (self.BusName, ReplyAddress))
with Reply: with Reply:
MyMsg = RequestMessage(Content, ReplyAddress , has_args=HasArgs, has_kwargs=HasKwArgs) MyMsg = RequestMessage(content=Content, reply_to=ReplyAddress, has_args=HasArgs, has_kwargs=HasKwArgs)
MyMsg.ttl = timeout MyMsg.ttl = timeout
self.Request.send(MyMsg) self.Request.send(MyMsg)
answer = Reply.receive(timeout) answer = Reply.receive(timeout)
......
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# #
from lofar.messaging.messagebus import ToBus,FromBus from .messagebus import ToBus,FromBus
from lofar.messaging.messages import ReplyMessage,RequestMessage from .messages import ReplyMessage,RequestMessage
import threading import threading
import time import time
import uuid import uuid
......
...@@ -176,8 +176,8 @@ class FromBus(object): ...@@ -176,8 +176,8 @@ class FromBus(object):
raise_exception(MessageBusError, raise_exception(MessageBusError,
"[FromBus] Failed to fetch message from queue: " "[FromBus] Failed to fetch message from queue: "
"%s" % self.address) "%s" % self.address)
logger.info("[FromBus] Message received on queue: %s", self.address) logger.info("[FromBus] Message received on queue: %s subject: %s" % (self.address, msg.subject))
logger.debug("[FromBus] %s", msg) logger.debug("[FromBus] %s" % msg)
try: try:
amsg = MESSAGE_FACTORY.create(msg) amsg = MESSAGE_FACTORY.create(msg)
except MessageFactoryError: except MessageFactoryError:
...@@ -399,7 +399,7 @@ class ToBus(object): ...@@ -399,7 +399,7 @@ class ToBus(object):
raise_exception(MessageBusError, raise_exception(MessageBusError,
"[ToBus] Failed to send message to queue: %s" % "[ToBus] Failed to send message to queue: %s" %
sender.target) sender.target)
logger.info("[ToBus] Message sent to queue: %s", sender.target) logger.info("[ToBus] Message sent to queue: %s subject: %s" % (sender.target, message.subject))
__all__ = ["FromBus", "ToBus"] __all__ = ["FromBus", "ToBus"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment