Skip to content
Snippets Groups Projects
Commit b28d04f8 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: make distinction between DEFAULT_BUS_TIMEOUT and DEFAULT_RPC_TIMEOUT

parent be426bc6
No related branches found
No related tags found
No related merge requests found
......@@ -23,5 +23,3 @@ except:
# default exchange to use for publishing messages
DEFAULT_BUSNAME = adaptNameToEnvironment("lofar")
# default timeout in seconds
DEFAULT_TIMEOUT = 5
\ No newline at end of file
......@@ -184,7 +184,7 @@ some.event bar
from lofar.messaging.exceptions import *
from lofar.messaging.messages import *
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_TIMEOUT, DEFAULT_PORT, DEFAULT_USER, DEFAULT_PASSWORD
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_PORT, DEFAULT_USER, DEFAULT_PASSWORD
from lofar.common.threading_utils import TimeoutLock
from lofar.common.util import program_name
......@@ -206,7 +206,8 @@ kombu.enable_insecure_serializers(['pickle'])
# make default kombu/amqp logger less spammy
logging.getLogger("amqp").setLevel(logging.INFO)
# default receive timeout in seconds
DEFAULT_BUS_TIMEOUT = 5
def create_exchange(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG) -> bool:
"""
......@@ -535,7 +536,7 @@ class FromBus(_AbstractBus):
logger.exception(error_msg)
raise MessagingError(error_msg)
def receive(self, timeout: float=DEFAULT_TIMEOUT, acknowledge: bool = True) -> Optional[LofarMessage]:
def receive(self, timeout: float=DEFAULT_BUS_TIMEOUT, acknowledge: bool = True) -> Optional[LofarMessage]:
"""
Receive the next message from the queue we're listening on.
:param timeout: maximum time in seconds to wait for a message.
......@@ -1341,7 +1342,7 @@ class BusListenerJanitor:
# do not expose create/delete_queue/exchange etc methods in all, it's not part of the public API
__all__ = ['FromBus', 'ToBus', 'BusListener', 'BusListenerJanitor', 'TemporaryQueue', 'TemporaryExchange', 'AbstractMessageHandler']
__all__ = ['DEFAULT_BUS_TIMEOUT', 'FromBus', 'ToBus', 'BusListener', 'BusListenerJanitor', 'TemporaryQueue', 'TemporaryExchange', 'AbstractMessageHandler']
if __name__ == "__main__":
logging.basicConfig(format='%(levelname)s %(message)s', level=logging.INFO)
......
......@@ -60,7 +60,7 @@ foo was called. my_param = whatever
bar was called.
"""
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_TIMEOUT
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.messagebus import ToBus, BusListener, AbstractMessageHandler, TemporaryQueue
from lofar.messaging.messages import LofarMessage, MessageFactory
from typing import Optional
......@@ -68,6 +68,8 @@ from datetime import datetime, timedelta
import logging
import inspect
DEFAULT_RPC_TIMEOUT = 60
logger = logging.getLogger(__name__)
class RequestMessage(LofarMessage):
......@@ -296,7 +298,7 @@ class RPCClient():
def __init__(self, service_name: str,
exchange: str = DEFAULT_BUSNAME,
broker: str = DEFAULT_BROKER,
timeout: int=DEFAULT_TIMEOUT):
timeout: int = DEFAULT_RPC_TIMEOUT):
"""
Create an RPCClient instance, enabling the execution of remote procedure calls on the given <service_name>
via the given <exchange> and <broker>.
......@@ -414,7 +416,7 @@ class RPCClientContextManagerMixin:
self.close()
__all__ = ['ServiceMessageHandler', 'RPCService', 'RPCClient', 'RPCClientContextManagerMixin', 'RequestMessage', 'ReplyMessage']
__all__ = ['DEFAULT_RPC_TIMEOUT', 'ServiceMessageHandler', 'RPCService', 'RPCClient', 'RPCClientContextManagerMixin', 'RequestMessage', 'ReplyMessage']
if __name__ == "__main__":
logging.basicConfig(format='%(levelname)s %(threadName)s %(message)s', level=logging.DEBUG)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment