Commit 00c5b706 authored by Jorrit Schaap's avatar Jorrit Schaap

TMSS-163: fixed kombu connection for gitlab CI tests

parent 8b977fd1
......@@ -35,12 +35,15 @@ if isProductionEnvironment() or isTestEnvironment():
# dynamically determine port where RabbitMQ server runs by trying to connect
DEFAULT_PORT = -1
def broker_url(hostname: str=DEFAULT_BROKER, port: int=DEFAULT_PORT, userid: str=DEFAULT_USER, password :str=DEFAULT_PASSWORD) -> str:
return 'amqp://%s:%s@%s:%d//' % (userid, password, hostname, port)
for port in [5672, 5675]:
try:
logger.debug("trying to connect to broker: hostname=%s port=%s userid=%s password=***",
DEFAULT_BROKER, port, DEFAULT_USER)
with kombu.Connection(hostname=DEFAULT_BROKER, port=port, userid=DEFAULT_USER, password=DEFAULT_PASSWORD,
max_retries=0, connect_timeout=1) as connection:
with kombu.Connection(broker_url(port=port), max_retries=0, connect_timeout=1, ) as connection:
connection.connect()
DEFAULT_PORT = port
logger.info("detected rabbitmq broker to which we can connect with hostname=%s port=%s userid=%s password=***",
......
......@@ -204,7 +204,7 @@ logger = logging.getLogger(__name__)
from lofar.messaging.exceptions import *
from lofar.messaging import adaptNameToEnvironment
from lofar.messaging.messages import *
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_PORT, DEFAULT_USER, DEFAULT_PASSWORD
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_PORT, DEFAULT_USER, DEFAULT_PASSWORD, broker_url
from lofar.common.threading_utils import TimeoutLock
from lofar.common.util import program_name
from lofar.common.util import is_empty_function
......@@ -222,7 +222,7 @@ def can_connect_to_broker(broker: str=DEFAULT_BROKER, port: int=DEFAULT_PORT) ->
try:
logger.debug("trying to connect to broker: hostname=%s port=%s userid=%s password=***",
broker, port, DEFAULT_USER)
with kombu.Connection(hostname=broker, port=port, userid=DEFAULT_USER, password=DEFAULT_PASSWORD,
with kombu.Connection(broker_url(hostname=broker, port=port, userid=DEFAULT_USER, password=DEFAULT_PASSWORD),
max_retries=0, connect_timeout=1) as connection:
connection.connect()
logger.debug("can connect to broker with hostname=%s port=%s userid=%s password=***",
......@@ -244,7 +244,7 @@ def create_exchange(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, l
:return True if created, False if not-created (because it already exists)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
exchange = kombu.Exchange(name, durable=durable, type='topic')
try:
exchange.declare(channel=connection.default_channel, passive=True)
......@@ -266,7 +266,7 @@ def delete_exchange(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEB
:return True if deleted, False if not-deleted (because it does not exist)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
exchange = kombu.Exchange(name, channel=connection)
try:
exchange.declare(channel=connection.default_channel, passive=True)
......@@ -286,7 +286,7 @@ def exchange_exists(name: str, broker: str=DEFAULT_BROKER) -> bool:
:return True if it exists, False if not.
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
exchange = kombu.Exchange(name, channel=connection)
try:
exchange.declare(channel=connection.default_channel, passive=True)
......@@ -309,7 +309,7 @@ def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_
:return True if created, False if not-created (because it already exists)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
queue = kombu.Queue(name,
durable=durable,
auto_delete=auto_delete,
......@@ -335,7 +335,7 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG)
:return True if deleted, False if not-deleted (because it does not exist)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
queue = kombu.Queue(name, no_declare=True, channel=connection)
try:
queue.queue_declare(channel=connection.default_channel, passive=True)
......@@ -355,7 +355,7 @@ def queue_exists(name: str, broker: str=DEFAULT_BROKER) -> bool:
:return True if it exists, False if not.
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
queue = kombu.Queue(name, no_declare=True, channel=connection)
try:
queue.queue_declare(channel=connection.default_channel, passive=True)
......@@ -389,7 +389,7 @@ def create_binding(exchange: str, queue: str, routing_key: str='#', durable: boo
:param log_level: optional logging level (to add/reduce spamming)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
kombu_exchange = kombu.Exchange(exchange, durable=durable, type='topic', no_declare=True)
kombu_queue = kombu.Queue(queue, exchange=kombu_exchange, routing_key=routing_key, durable=durable, no_declare=True)
if not kombu_queue.is_bound:
......@@ -483,7 +483,7 @@ class _AbstractBus:
return
logger.debug("[%s] Connecting to broker: %s", self.__class__.__name__, self.broker)
self._connection = kombu.Connection(hostname=self.broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)
self._connection = kombu.Connection(broker_url(hostname=self.broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD))
self._connection.connect()
logger.debug("[%s] Connected to broker: %s (%s)", self.__class__.__name__, self.broker, self.connection_name)
......
......@@ -374,7 +374,7 @@ class FromBusInitFailed(unittest.TestCase):
Connecting to broker on wrong port must raise MessageBusError
"""
with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"):
with FromBus("fake" + self.test_queue.address, broker="localhost:4"):
with FromBus("fake" + self.test_queue.address, broker="fdjsafhdjlahflaieoruieow"):
pass
......@@ -445,7 +445,7 @@ class ToBusInitFailed(unittest.TestCase):
Connecting to broker on wrong port must raise MessageBusError
"""
with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"):
with ToBus(self.test_exchange.address, broker="localhost:4"):
with ToBus(self.test_exchange.address, broker="fhjlahfowuefohwaueif"):
pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment