t_messagebus.py 43.4 KB
Newer Older
1
# t_messagebus.py: Test program for the module lofar.messaging.messagebus
2 3 4 5 6
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
7 8
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
9 10 11 12
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
13
# The LOFAR software suite is distributed in the hope that it will be
14 15 16 17 18
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
19
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
20 21 22 23
#
# $Id: t_messagebus.py 1580 2015-09-30 14:18:57Z loose $

"""
24
Test program for the module lofar.messaging.messagebus
25 26
"""

27
import uuid
28
import unittest
29
import requests
30
import logging
31

32
logger = logging.getLogger(__name__)
33
logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG)
34

35
from datetime import datetime
36

37 38
from lofar.messaging.messages import *
from lofar.messaging.messagebus import *
39
from lofar.messaging.messagebus import _AbstractBus, can_connect_to_broker
40
from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue, exchange_exists, queue_exists
41
from lofar.messaging.config import DEFAULT_USER, DEFAULT_PASSWORD
42
from lofar.messaging.rpc import RequestMessage
43
from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError, MessagingTimeoutError
44
from lofar.common.datetimeutils import round_to_millisecond_precision
45
from time import sleep
46
from threading import Lock, Event as ThreadingEvent
47
from lofar.common.test_utils import unit_test, integration_test
48

49
TIMEOUT = 1.0
50

51 52 53
class TestCreateDeleteFunctions(unittest.TestCase):
    """Test the various create/delete exchange/queue/binding funcions"""

54
    @integration_test
55 56
    def test_create_delete_exchange(self):
        name = "test-exchange-%s" % (uuid.uuid4())
57 58 59

        self.assertFalse(exchange_exists(name))

60 61 62
        # creating this new unique test exchange should succeed, and return True cause it's a new exchange
        self.assertTrue(create_exchange(name, durable=False))

63 64
        self.assertTrue(exchange_exists(name))

65 66 67 68 69 70
        # creating it again should return False
        self.assertFalse(create_exchange(name, durable=False))

        # deleting it should succeed
        self.assertTrue(delete_exchange(name))

71 72
        self.assertFalse(exchange_exists(name))

73 74 75
        # deleting it again should return False as there is nothing to deleting
        self.assertFalse(delete_exchange(name))

76
    @integration_test
77 78
    def test_create_delete_queue(self):
        name = "test-queue-%s" % (uuid.uuid4())
79 80 81

        self.assertFalse(queue_exists(name))

82 83 84
        # creating this new unique test queue should succeed, and return True cause it's a new queue
        self.assertTrue(create_queue(name, durable=False))

85 86
        self.assertTrue(queue_exists(name))

87 88 89 90 91 92
        # creating it again should return False
        self.assertFalse(create_queue(name, durable=False))

        # deleting it should succeed
        self.assertTrue(delete_queue(name))

93 94
        self.assertFalse(queue_exists(name))

95 96 97
        # deleting it again should return False as there is nothing to deleting
        self.assertFalse(delete_queue(name))

98
    @integration_test
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
    def test_create_binding(self):
        exchange = "test-exchange-%s" % (uuid.uuid4())
        queue = "test-queue-%s" % (uuid.uuid4())

        # try to create the binding on non-existing exchange/queue
        with self.assertRaisesRegex(MessageBusError, ".*does not exist.*"):
            create_binding(exchange=exchange, queue=queue)

        try:
            # now, do make sure the exchange/queue exist
            create_exchange(exchange)
            create_queue(queue)

            # and do the actual binding test
            self.assertTrue(create_binding(exchange=exchange, queue=queue))
        finally:
            # and cleanup the exchange/queue
            delete_queue(queue)
            delete_exchange(exchange)




class TestTemporaryExchangeAndQueue(unittest.TestCase):
    """Test the TemporaryExchange and TemporaryQueue classes"""

125
    @integration_test
126 127 128 129 130 131 132 133 134
    def test_temporary_exchange_is_really_temporary(self):
        """
        test if the temporary exchange is really removed after usage
        """
        tmp_exchange_address = None
        with TemporaryExchange("MyTestExchange") as tmp_exchange:
            tmp_exchange_address = tmp_exchange.address
            self.assertTrue("MyTestExchange" in tmp_exchange_address)

135 136
        self.assertFalse(exchange_exists(tmp_exchange_address))

137 138 139 140 141 142
        # test if the temporary exchange has been deleted when leaving scope
        # We should not be able to connect to it anymore
        with self.assertRaisesRegex(MessageBusError, '.*NOT_FOUND.*'):
            with FromBus(tmp_exchange_address):
                pass

143
    @integration_test
144
    def test_temporary_queue_is_really_temporary(self):
145
        """
146
        test if the temporary queue is really removed after usage
147
        """
148 149 150 151 152
        tmp_queue_address = None
        with TemporaryQueue("MyTestQueue") as tmp_queue:
            tmp_queue_address = tmp_queue.address
            self.assertTrue("MyTestQueue" in tmp_queue_address)

153 154
        self.assertFalse(queue_exists(tmp_queue_address))

155 156
        # test if the temporary queue has been deleted when leaving scope
        # We should not be able to connect to it anymore
157
        with self.assertRaisesRegex(MessageBusError, '.*NOT_FOUND.*'):
158
            with FromBus(tmp_queue_address):
159 160
                pass

161
    @integration_test
162
    def test_send_receive_over_temporary_exchange_and_queue(self):
163
        """
164
        test the usage of the TemporaryExchange and TemporaryQueue in conjunction with normal ToBus and Frombus usage
165
        """
166
        with TemporaryExchange("MyTestExchange") as tmp_exchange:
167
            tmp_exchange_address = tmp_exchange.address
168 169 170 171
            # create a normal ToBus on this tmp_exchange
            with tmp_exchange.create_tobus() as tobus_on_exchange:
                # create a TemporaryQueue, bound to the tmp_exchange
                with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address) as tmp_queue:
172
                    tmp_queue_address = tmp_queue.address
173 174 175 176
                    # create a normal FromBus on this tmp_queue
                    with tmp_queue.create_frombus() as frombus:
                        # and let's see if the tmp_queue can also create a tobus which then points to the bound_exchange
                        with tmp_queue.create_tobus() as tobus_on_tmp_queue:
177

178
                            self.assertEqual(tobus_on_exchange.exchange, tobus_on_tmp_queue.exchange)
179

180 181 182 183 184 185 186 187 188 189 190 191 192
                            # test sending a message to both "types" of tobuses.
                            for tobus in [tobus_on_exchange, tobus_on_tmp_queue]:
                                # send a message...
                                original_msg = EventMessage(content="foobar")
                                tobus.send(original_msg)

                                # ...receive the message...
                                received_msg = frombus.receive()
                                self.assertIsNotNone(received_msg)

                                # and test if they are equal
                                self.assertEqual(original_msg.id, received_msg.id)
                                self.assertEqual(original_msg.content, received_msg.content)
193

194 195 196
        self.assertFalse(exchange_exists(tmp_exchange_address))
        self.assertFalse(queue_exists(tmp_queue_address))

197
    @integration_test
198 199 200 201
    def test_send_receive_over_temporary_queue_with_subject_filtering(self):
        """
        test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject
        """
202 203 204 205
        SUBJECT = "FooBarSubject"
        SUBJECT2 = "FAKE_SUBJECT"

        with TemporaryQueue("MyTestQueue", routing_key=SUBJECT) as tmp_queue:
206
            tmp_queue_address = tmp_queue.address
207 208 209 210
            # create a normal To/FromBus on this tmp_queue
            NUM_MESSAGES_TO_SEND = 3
            with tmp_queue.create_tobus() as tobus:
                # create a FromBus, which listens for/receives only the messages with the given SUBJECT
211
                with tmp_queue.create_frombus() as frombus:
212 213
                    for i in range(NUM_MESSAGES_TO_SEND):
                        # send a message...
214 215
                        original_msg = EventMessage(subject=SUBJECT,
                                                    content="test message %d with subject='%s'" % (i, SUBJECT))
216 217 218 219 220 221 222 223 224
                        logger.info("Sending message: %s", original_msg)
                        tobus.send(original_msg)

                        # ...receive the message...
                        received_msg = frombus.receive(timeout=0.1)
                        logger.info("received message: %s", received_msg)

                        # and test if they are equal
                        self.assertEqual(original_msg.id, received_msg.id)
225
                        self.assertEqual(original_msg.content, received_msg.content)
226 227 228
                        self.assertEqual(original_msg.subject, received_msg.subject)

                        # now send a message with a different subject...
229
                        original_msg = EventMessage(subject=SUBJECT2, content="foobar")
230 231 232 233 234 235 236 237
                        logger.info("Sending message: %s", original_msg)
                        tobus.send(original_msg)

                        # ... and try to receive it (should yield None, because of the non-matching subject)
                        received_msg = frombus.receive(timeout=0.1)
                        logger.info("received message: %s", received_msg)
                        self.assertEqual(None, received_msg)

238 239
        self.assertFalse(queue_exists(tmp_queue_address))

240
    @integration_test
241 242 243 244 245 246 247 248
    def test_send_receive_over_temporary_exchange_with_queue_with_subject_filtering(self):
        """
        test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject
        """
        SUBJECT = "FooBarSubject"
        SUBJECT2 = "FAKE_SUBJECT"
        NUM_MESSAGES_TO_SEND = 3
        with TemporaryExchange("MyTestExchange") as tmp_exchange:
249
            tmp_exchange_address = tmp_exchange.address
250 251 252
            with tmp_exchange.create_tobus() as tobus:
                # create a TemporaryQueue, which listens for/receives only the messages with the given SUBJECT
                with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT) as tmp_queue:
253
                    tmp_queue_address = tmp_queue.address
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
                    with tmp_queue.create_frombus() as frombus:
                        for i in range(NUM_MESSAGES_TO_SEND):
                            # send a message...
                            original_msg = EventMessage(subject=SUBJECT,
                                                        content="test message %d with subject='%s'" % (
                                                        i, SUBJECT))
                            logger.info("Sending message: %s", original_msg)
                            tobus.send(original_msg)

                            # ...receive the message...
                            received_msg = frombus.receive(timeout=0.1)
                            logger.info("received message: %s", received_msg)

                            # and test if they are equal
                            self.assertEqual(original_msg.id, received_msg.id)
                            self.assertEqual(original_msg.content, received_msg.content)
                            self.assertEqual(original_msg.subject, received_msg.subject)

                            # now send a message with a different subject...
                            original_msg = EventMessage(subject=SUBJECT2, content="foobar")
                            logger.info("Sending message: %s", original_msg)
                            tobus.send(original_msg)

                            # ... and try to receive it (should yield None, because of the non-matching subject)
                            received_msg = frombus.receive(timeout=0.1)
                            logger.info("received message: %s", received_msg)
                            self.assertEqual(None, received_msg)

282 283 284
        self.assertFalse(exchange_exists(tmp_exchange_address))
        self.assertFalse(queue_exists(tmp_queue_address))

285
    @integration_test
286 287 288 289 290 291
    def test_send_receive_over_temporary_exchange_with_multiple_bound_queues_with_subject_filtering(
            self):
        """
        test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject
        """
        with TemporaryExchange("MyTestExchange") as tmp_exchange:
292
            tmp_exchange_address = tmp_exchange.address
293 294 295 296 297
            with tmp_exchange.create_tobus() as tobus:
                SUBJECT1 = "FooBarSubject"
                SUBJECT2 = "FAKE_SUBJECT"
                with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT1) as tmp_queue1, \
                     TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT2) as tmp_queue2:
298 299
                    tmp_queue1_address = tmp_queue1.address
                    tmp_queue2_address = tmp_queue2.address
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
                    # create a normal To/FromBus on this tmp_queue
                    NUM_MESSAGES_TO_SEND = 3

                    # create two FromBus'es, which listen for/receive only the messages with their routing_key
                    with tmp_queue1.create_frombus() as frombus1, tmp_queue2.create_frombus() as frombus2:
                        for i in range(NUM_MESSAGES_TO_SEND):
                            # send a message...
                            original_msg = EventMessage(subject=SUBJECT1,
                                                        content="test message %d with subject='%s'" % (
                                                            i, SUBJECT1))
                            logger.info("Sending message: %s", original_msg)
                            tobus.send(original_msg)

                            # ...receive the message...
                            received_msg1 = frombus1.receive(timeout=0.1)
                            received_msg2 = frombus2.receive(timeout=0.1)
                            self.assertIsNotNone(received_msg1)
                            self.assertIsNone(received_msg2)
                            logger.info("received message: %s", received_msg1)

                            # and test if they are equal
                            self.assertEqual(original_msg.id, received_msg1.id)
                            self.assertEqual(original_msg.content, received_msg1.content)
                            self.assertEqual(original_msg.subject, received_msg1.subject)

                            # now send a message with a different subject...
                            original_msg = EventMessage(subject=SUBJECT2, content="foobar")
                            logger.info("Sending message: %s", original_msg)
                            tobus.send(original_msg)

                            # ... and try to receive it
                            received_msg1 = frombus1.receive(timeout=0.1)
                            received_msg2 = frombus2.receive(timeout=0.1)
                            self.assertIsNone(received_msg1)
                            self.assertIsNotNone(received_msg2)
                            logger.info("received message: %s", received_msg2)

                            # and test if they are equal
                            self.assertEqual(original_msg.id, received_msg2.id)
                            self.assertEqual(original_msg.content, received_msg2.content)
                            self.assertEqual(original_msg.subject, received_msg2.subject)
341

342 343 344
        self.assertFalse(exchange_exists(tmp_exchange_address))
        self.assertFalse(queue_exists(tmp_queue1_address))
        self.assertFalse(queue_exists(tmp_queue2_address))
345

346
# ========  FromBus unit tests  ======== #
347

348 349 350 351
class FromBusInitFailed(unittest.TestCase):
    """
    Class to test initialization failures of FromBus
    """
352

353 354 355
    def setUp(self):
        self.test_queue = TemporaryQueue(__class__.__name__)
        self.test_queue.open()
356 357 358 359 360

    def tearDown(self):
        tmp_queue_address = self.test_queue.address
        self.test_queue.close()
        self.assertFalse(queue_exists(tmp_queue_address))
361

362
    @unit_test
363
    def test_no_broker_address(self):
364
        """
365
        Connecting to non-existent broker address must raise MessageBusError
366
        """
367 368
        with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"):
            with FromBus(self.test_queue.address, broker="foo.bar"):
369
                pass
370

371
    @unit_test
372 373 374 375
    def test_connection_refused(self):
        """
        Connecting to broker on wrong port must raise MessageBusError
        """
376
        with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"):
377
            with FromBus("fake" + self.test_queue.address, broker="fdjsafhdjlahflaieoruieow"):
378
                pass
379 380


381 382 383 384
class FromBusInContext(unittest.TestCase):
    """
    Class to test FromBus when inside context.
    """
385

386 387 388
    def setUp(self):
        self.test_queue = TemporaryQueue(__class__.__name__)
        self.test_queue.open()
389 390 391 392 393

    def tearDown(self):
        tmp_queue_address = self.test_queue.address
        self.test_queue.close()
        self.assertFalse(queue_exists(tmp_queue_address))
394

395
    @unit_test
396
    def test_receiver_exists(self):
397
        with FromBus(self.test_queue.address) as frombus:
398
            self.assertTrue(frombus._receiver is not None)
399

400
    @unit_test
401 402 403 404 405 406 407
    def test_connect_fails(self):
        random_non_existing_address = str(uuid.uuid4())

        with self.assertRaisesRegex(MessageBusError, ".*failed*"):
            with FromBus(random_non_existing_address) as frombus:
                self.assertTrue(frombus._receiver is not None)

408
    @unit_test
409 410 411 412 413 414
    def test_receive_timeout(self):
        """
        Getting a message when there's none must yield None after timeout.
        """
        with FromBus(self.test_queue.address) as frombus:
            self.assertIsNone(frombus.receive(timeout=TIMEOUT))
415 416


417
# ========  ToBus unit tests  ======== #
418

419 420 421 422
class ToBusInitFailed(unittest.TestCase):
    """
    Class to test initialization failures of ToBus
    """
423

424
    def setUp(self):
425 426 427 428 429 430 431
        self.test_exchange = TemporaryExchange(__class__.__name__)
        self.test_exchange.open()

    def tearDown(self):
        tmp_exchange_address = self.test_exchange.address
        self.test_exchange.close()
        self.assertFalse(exchange_exists(tmp_exchange_address))
432

433
    @unit_test
434 435 436 437
    def test_no_broker_address(self):
        """
        Connecting to non-existent broker address must raise MessageBusError
        """
438
        with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"):
439
            with ToBus(self.test_exchange.address, broker="foo.bar"):
440
                pass
441

442
    @unit_test
443
    def test_connection_refused(self):
444
        """
445
        Connecting to broker on wrong port must raise MessageBusError
446
        """
447
        with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"):
448
            with ToBus(self.test_exchange.address, broker="fhjlahfowuefohwaueif"):
449
                pass
450

451 452 453 454 455 456 457

class SendReceiveMessage(unittest.TestCase):
    """
    Class to test sending and receiving a message.
    """

    def setUp(self):
458 459 460 461
        self.test_exchange = TemporaryExchange(__class__.__name__)
        self.test_exchange.open()

        self.test_queue = TemporaryQueue(__class__.__name__, exchange=self.test_exchange.address)
462 463
        self.test_queue.open()

464
        self.frombus = self.test_queue.create_frombus()
465 466 467 468 469 470 471 472 473 474
        self.tobus = self.test_exchange.create_tobus()

    def tearDown(self):
        tmp_queue_address = self.test_queue.address
        self.test_queue.close()
        self.assertFalse(queue_exists(tmp_queue_address))

        tmp_exchange_address = self.test_exchange.address
        self.test_exchange.close()
        self.assertFalse(exchange_exists(tmp_exchange_address))
475 476 477 478 479 480 481 482 483

    def _test_sendrecv(self, send_msg):
        """
        Helper class that implements the send/receive logic and message checks.
        :param send_msg: Message to send
        :return the received message
        """
        with self.tobus, self.frombus:
            self.tobus.send(send_msg)
484
            recv_msg = self.frombus.receive(timeout=TIMEOUT)
Jorrit Schaap's avatar
Jorrit Schaap committed
485

486 487 488 489
        self.assertEqual(type(send_msg), type(recv_msg))
        self.assertEqual(send_msg.id, recv_msg.id)
        self.assertEqual(send_msg.subject, recv_msg.subject)
        self.assertEqual(send_msg.content, recv_msg.content)
490 491
        return recv_msg

492
    @integration_test
493 494 495 496 497 498 499
    def test_sendrecv_event_message(self):
        """
        Test send/receive of an EventMessage, containing a string.
        """
        content = "An event message"
        self._test_sendrecv(EventMessage(content))

500
    @integration_test
501 502 503 504
    def test_sendrecv_request_message(self):
        """
        Test send/receive of an RequestMessage, containing a byte array.
        """
505 506
        self._test_sendrecv(RequestMessage(subject="my_request",  reply_to=self.test_queue.address).with_args_kwargs(
                                           request="Do Something", argument="Very Often"))
507

508
    @integration_test
509 510 511 512 513 514
    def test_sendrecv_request_message_with_large_content_map(self):
        """
        Test send/receive of an RequestMessage, containing a dict with a large string value.
        Qpid, cannot (de)serialize strings > 64k in a dict
        We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back.
        """
515 516
        self._test_sendrecv(RequestMessage(subject="my_request",  reply_to=self.test_queue.address).with_args_kwargs(
                                           key1="short message", key2="long message " + (2 ** 17) * 'a'))
517

518
    @integration_test
519 520 521 522
    def test_sendrecv_request_message_with_datetime_in_dict(self):
        """
        Test send/receive of an RequestMessage, containing a datetime in the dict.
        """
523 524
        self._test_sendrecv(RequestMessage(subject="my_request", reply_to=self.test_queue.address).with_args_kwargs(
                                           starttime=round_to_millisecond_precision(datetime.utcnow())))
525

526
    @integration_test
527 528 529 530
    def test_sendrecv_request_message_with_datetime_in_list(self):
        """
        Test send/receive of an RequestMessage, containing a datetime in the list.
        """
531 532 533
        my_list = [round_to_millisecond_precision(datetime.utcnow()),round_to_millisecond_precision(datetime.utcnow())]
        self._test_sendrecv(RequestMessage(subject="my_request", reply_to=self.test_queue.address).with_args_kwargs(
                                           my_list=my_list))
534

535
    @integration_test
536 537 538 539
    def test_sendrecv_request_message_with_large_string(self):
        """
        Test send/receive of an RequestMessage, containing a large string
        """
540
        large = ((2**16)+1)*'a' # test if the messages can handle a string with more than 2^16 chars which is aparently a probly for some brokers of messaging libs.
541 542
                                  # so, we need a large enough string, but not too big to overload the broker buffers when running multiple tests at the same time

543 544
        self._test_sendrecv(RequestMessage(subject="my_request", reply_to=self.test_queue.address).with_args_kwargs(
                                           my_string=large))
545

546
    @integration_test
547 548 549 550 551 552 553 554 555 556 557 558
    def test_sendrecv_request_message_with_nested_dicts_and_lists_with_special_types(self):
        """
        Test send/receive of an RequestMessage, containing a datetimes in nested dicts/lists.
        """
        content = {'foo': [ {'timestamp1': round_to_millisecond_precision(datetime.utcnow()),
                             'timestamp2': round_to_millisecond_precision(datetime.utcnow()),
                             'foo': 'bar'},
                            {},
                            {'abc':[round_to_millisecond_precision(datetime.utcnow()), round_to_millisecond_precision(datetime.utcnow())]},
                            {'a': 'b',
                             'c': { 'timestamp': round_to_millisecond_precision(datetime.utcnow())}}],
                   'bar': [],
559 560
                   'large_string': ((2**16)+1)*'a' # test if the messages can handle a string with more than 2^16 chars which is aparently a probly for some brokers of messaging libs.
                                                   # so, we need a large enough string, but not too big to overload the broker buffers when running multiple tests at the same time
561
                   }
562
        self._test_sendrecv(RequestMessage(subject="my_request", reply_to=self.test_queue.address).with_args_kwargs(**content))
563

564
    @integration_test
565 566 567 568
    def test_sendrecv_request_message_with_int_keys(self):
        """
        Test send/receive of an RequestMessage, containing int's as keys
        """
569
        my_dict = { 0: 'foo',
570
                    1: 'bar' }
571 572 573
        recv_msg = self._test_sendrecv(RequestMessage(subject="my_request",  reply_to=self.test_queue.address).with_args_kwargs(
                                                      my_dict=my_dict))
        self.assertEqual(my_dict, recv_msg.content['kwargs']['my_dict'])
574

575
class PriorityTest(unittest.TestCase):
576 577

    @integration_test
578 579
    def test_priority(self):
        with TemporaryExchange(self.__class__.__name__) as tmp_exchange:
580
            tmp_exchange_address = tmp_exchange.address
581
            with tmp_exchange.create_temporary_queue() as tmp_queue:
582
                tmp_queue_address = tmp_queue.address
583 584 585 586 587 588 589 590 591 592 593 594 595 596
                msg1 = EventMessage(priority=4, subject="some.event", content=1)
                msg2 = EventMessage(priority=5, subject="some.event", content=2)

                with tmp_exchange.create_tobus() as tobus:
                    tobus.send(msg1)
                    tobus.send(msg2)

                with tmp_queue.create_frombus() as frombus:
                    result_msg1 = frombus.receive()
                    result_msg2 = frombus.receive()

                    # message with highest priority should arrive first
                    self.assertEqual(msg1.id, result_msg2.id)
                    self.assertEqual(msg2.id, result_msg1.id)
597

598 599 600
        self.assertFalse(exchange_exists(tmp_exchange_address))
        self.assertFalse(queue_exists(tmp_queue_address))

601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
class Rejector(BusListener):
    handled_messages = 0

    class Handler(AbstractMessageHandler):
        def __init__(self, rejector):
            self.rejector = rejector


        def handle_message(self, msg: LofarMessage):
            self.rejector.handled_messages += 1
            raise Exception("Intentional exception to reject message")

    def __init__(self, exchange):
        super(Rejector, self).__init__(handler_type=Rejector.Handler,
                                       handler_kwargs={"rejector": self},
                                       exchange=exchange,
                                       routing_key="spam")


class RejectorTester(unittest.TestCase):
621 622

    @integration_test
623 624 625 626
    def test_reject_should_result_in_empty_queue(self):
        number_of_messages = 1000

        with TemporaryExchange("Rejection") as tmp_exchange:
627
            tmp_exchange_address = tmp_exchange.address
628
            with BusListenerJanitor(Rejector(tmp_exchange.address)) as rejector:
629
                rejector_address = Rejector.designated_queue_name(tmp_exchange_address)
630 631 632 633
                with tmp_exchange.create_tobus() as spammer:
                    for _ in range(number_of_messages):
                        msg = EventMessage(content="ping", subject="spam")
                        spammer.send(msg)
634

635
                while rejector.handled_messages < number_of_messages:
636
                    logger.info("Handled messages: {}".format(rejector.handled_messages))
637
                    sleep(1)
638

639
                with FromBus(rejector.address) as frombus:
640
                    logger.info("Number of messages on queue: {}".format(frombus.nr_of_messages_in_queue()))
641
                    self.assertEqual(0, frombus.nr_of_messages_in_queue())
642

643 644 645
        self.assertFalse(exchange_exists(tmp_exchange_address))
        self.assertFalse(queue_exists(rejector_address))

646 647


648
class PingPongPlayer(BusListener):
649 650 651
    """
    Helper class with a simple purpose:
        - listen on one queue,
652
        - when receiving a message, send answer on exchange, flipping message contents between ping and pong.
653

654 655
    This is NOT the intended way of using the BusListener and AbstractMessageHandler... This weird construct is
    used to test the multi-threaded BusListener's behaviour, and tests if the underlying messaging lib can cope with multithreading.
656
    """
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684

    class Handler(AbstractMessageHandler):
        def __init__(self, player, opponent_name):
            self.player = player
            self.opponent_name = opponent_name

        def handle_message(self, msg):
            """Implementation of BusListener.handle_message
            log received message, and send a response message to the pingpong_table_exchange where it will be routed to the opponent's queue,
            flipping ping for pong and vice versa
            """
            logger.info("%s: received %s on %s", self.player.name, msg.content, self.player.address)

            response_msg = EventMessage(content="ping" if msg.content == "pong" else "pong",
                                        subject=self.opponent_name)

            logger.info("%s: sending %s to %s", self.player.name, response_msg.content, self.player.response_bus.exchange)

            # do not lock around the player's response_bus to test internal thread safety
            self.player.response_bus.send(response_msg)

            with self.player.lock: # do lock 'normal' assignement of variables
                self.player.num_turns += 1

            return True


    def __init__(self, name, opponent_name, pingpong_table_exchange, num_threads):
685
        self.name = name
686
        self.opponent_name = opponent_name
687
        self.num_turns = 0
688
        self.response_bus = ToBus(pingpong_table_exchange)
689
        self.lock = Lock() # a lock to keep track of self.num_turns in a multithreaded environment
690 691 692 693 694 695 696 697 698 699 700 701 702
        super(PingPongPlayer, self).__init__(handler_type=PingPongPlayer.Handler,
                                             handler_kwargs={'player': self, 'opponent_name': opponent_name},
                                             exchange=pingpong_table_exchange,
                                             routing_key=self.name,
                                             num_threads=num_threads)

    def start_listening(self):
        self.response_bus.open()
        super(PingPongPlayer, self).start_listening()

    def stop_listening(self):
        super(PingPongPlayer, self).stop_listening()
        self.response_bus.close()
703 704 705 706 707 708 709

    def get_num_turns(self):
        with self.lock:
            return self.num_turns

class PingPongTester(unittest.TestCase):
    """Test an event driven message ping/pong game, where two 'players' respond to each other.
710
    This test should work regardless of the number of threads the each 'player'/BusListener uses"""
711

712
    @integration_test
713 714 715
    def test_single_thread_per_player(self):
        self._play(1)

716
    @integration_test
717 718 719
    def test_two_threads_per_player(self):
        self._play(2)

720
    @integration_test
721 722 723 724 725 726
    def test_ten_threads_per_player(self):
        self._play(10)

    def _play(self, num_threads_per_player):
        """simulate a ping/pong event driven loop until each player played a given amount of turns, or timeout"""

727 728 729
        # game parameters
        NUM_TURNS = 10
        GAME_TIMEOUT = 10
730

731 732
        # setup temporary exchange, on which the player can publish their messages (ping/pong balls)
        with TemporaryExchange("PingPongTable") as tmp_exchange:
733 734
            tmp_exchange_address = tmp_exchange.address

735 736 737
            # create two players, on "both sides of the table"
            # i.e.: they each play on the tmp_exchange, but have the auto-generated designated listen queues for incoming balls
            with BusListenerJanitor(PingPongPlayer("Player1", "Player2", tmp_exchange.address, num_threads_per_player)) as player1:
738 739 740
                player1_address = player1.address
                with BusListenerJanitor(PingPongPlayer("Player2", "Player1", tmp_exchange.address, num_threads_per_player)) as player2:
                    player2_address = player2.address
741 742
                    start_timestamp = datetime.utcnow()

743 744 745 746 747
                    # first serve, referee throws a ping ball on the table in the direction of player1
                    with tmp_exchange.create_tobus() as referee:
                        first_msg = EventMessage(content="ping", subject="Player1")
                        logger.info("first message: sending %s to %s", first_msg.content, tmp_exchange.address)
                        referee.send(first_msg)
748 749 750 751 752 753 754 755 756

                    # play the game!
                    # run the "event loop". Actually there are multiple loops: num_threads per player
                    # this loop just tracks game progress.
                    while True:
                        player1_num_turns = player1.get_num_turns()
                        player2_num_turns = player2.get_num_turns()
                        time_remaining = GAME_TIMEOUT - (datetime.utcnow() - start_timestamp).total_seconds()

757
                        logger.info("PingPongTester STATUS: player1_num_turns=%d/%d player2_num_turns=%d/%d time_remaining=%.1fsec",
758 759 760 761 762 763 764 765 766 767 768 769 770 771
                                    player1_num_turns, NUM_TURNS, player2_num_turns, NUM_TURNS, time_remaining)

                        # assert on deadlocked game (should never happen!)
                        self.assertGreater(time_remaining, 0)

                        if player1_num_turns >= NUM_TURNS and player2_num_turns >= NUM_TURNS :
                            break

                        sleep(0.1)

                    # assert on players who did not finish the game
                    self.assertGreaterEqual(player1.get_num_turns(), NUM_TURNS)
                    self.assertGreaterEqual(player2.get_num_turns(), NUM_TURNS)

772 773 774 775
                    logger.info("SUCCESS! player1_num_turns=%d/%d player2_num_turns=%d/%d num_threads_per_player=%d #msg_per_sec=%.1f",
                                player1_num_turns, NUM_TURNS, player2_num_turns, NUM_TURNS,
                                num_threads_per_player, 2*NUM_TURNS/(datetime.utcnow() - start_timestamp).total_seconds())

776 777 778 779
        self.assertFalse(exchange_exists(tmp_exchange_address))
        self.assertFalse(queue_exists(player1_address))
        self.assertFalse(queue_exists(player2_address))

780 781

class MessageHandlerTester(unittest.TestCase):
782 783

    @unit_test
784 785 786 787 788 789 790 791
    def test_handler_init_raises(self):
        # define a MessageHandler that raises on init
        class RaisingHandler(AbstractMessageHandler):
            def __init__(self):
                raise Exception("intentional test exception")

        # try to start a BusListener using this handler. Should fail and raise a MessagingRuntimeError
        with TemporaryExchange(self.__class__.__name__) as tmp_exchange:
792
            tmp_exchange_name = tmp_exchange.address
793 794
            with self.assertRaises(MessagingRuntimeError):
                with BusListenerJanitor(BusListener(handler_type=RaisingHandler,
795
                                                    exchange=tmp_exchange_name)) as listener:
796 797
                    pass

798 799 800
        self.assertFalse(exchange_exists(tmp_exchange_name))
        self.assertFalse(queue_exists(BusListener.designated_queue_name(tmp_exchange_name)))

801
    @unit_test
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
    def test_empty_template_handler(self):
        # define a MessageHandler with a template for callback on<something> methods
        class BaseTemplateHandler(AbstractMessageHandler):
            def handle_message(self, msg: LofarMessage):
                if 'foo' in msg.subject:
                    self.on_foo()
                if 'bar' in msg.subject:
                    self.on_bar(msg.content)

            def on_foo(self):
                pass

            def on_bar(self, some_arg):
                pass

        self.assertTrue(BaseTemplateHandler().is_empty_template_handler())

        class ConcreteHandler1(BaseTemplateHandler):
            def on_foo(self):
                return 42

        self.assertFalse(ConcreteHandler1().is_empty_template_handler())

        class ConcreteHandler2(BaseTemplateHandler):
            def on_bar(self, some_arg):
                if some_arg:
                    return 3.14
                return 2.71

        self.assertFalse(ConcreteHandler2().is_empty_template_handler())

        class SimpleNonTemplateHandler(AbstractMessageHandler):
            def handle_message(self, msg: LofarMessage):
                if 'foo' in msg.subject:
                    return 42
                elif 'bar' in msg.subject:
                    if msg.content:
                        return 3.14
                    return 2.71

        self.assertFalse(SimpleNonTemplateHandler().is_empty_template_handler())

844
    @unit_test
845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
    def test_empty_template_handler_raises(self):
        # define a MessageHandler with a template for callback on<something> methods
        class BaseTemplateHandler(AbstractMessageHandler):
            def handle_message(self, msg: LofarMessage):
                if 'foo' in msg.subject:
                    self.on_foo()
                if 'bar' in msg.subject:
                    self.on_bar(msg.content)

            def on_foo(self):
                pass

            def on_bar(self, some_arg):
                pass

        # try to start a BusListener using a BaseTemplateHandler. Should fail and raise a TypeError
        with TemporaryExchange(self.__class__.__name__) as tmp_exchange:
862
            tmp_exchange_name = tmp_exchange.address
863 864
            with self.assertRaises(RuntimeError):
                with BusListenerJanitor(BusListener(handler_type=BaseTemplateHandler,
865
                                                    exchange=tmp_exchange_name)) as listener:
866 867
                    pass

868 869 870 871
        self.assertFalse(exchange_exists(tmp_exchange_name))
        self.assertFalse(queue_exists(BusListener.designated_queue_name(tmp_exchange_name)))


872 873 874 875 876 877 878 879
class ReconnectOnConnectionLossTests(unittest.TestCase):
    def setUp(self):
        self.tmp_exchange = TemporaryExchange()
        self.tmp_queue = self.tmp_exchange.create_temporary_queue()

        self.tmp_exchange.open()
        self.tmp_queue.open()

880 881 882 883 884 885 886 887 888
    def tearDown(self):
        tmp_queue_address = self.tmp_queue.address
        self.tmp_queue.close()
        self.assertFalse(queue_exists(tmp_queue_address))

        tmp_exchange_address = self.tmp_exchange.address
        self.tmp_exchange.close()
        self.assertFalse(exchange_exists(tmp_exchange_address))

889 890 891 892 893 894
    def _can_connect_to_rabbitmq_admin_site(self, hostname: str):
        try:
            url = 'http://%s:15672/api' % (hostname,)
            return requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD)).status_code in [200, 202]
        except requests.ConnectionError:
            return False
895

896
    def _close_connection_of_bus_on_broker(self, bus: _AbstractBus):
897 898 899
        if not self._can_connect_to_rabbitmq_admin_site(bus.broker):
            raise unittest.SkipTest("Cannot connect tot RabbitMQ admin server to close connection %s" % (bus.connection_name))

900 901 902 903 904 905 906 907 908
        # use the http REST API using request to forcefully close the connection on the broker-side
        url = "http://%s:15672/api/connections/%s" % (bus.broker, bus.connection_name)

        # rabbitmq http api is sometimes lagging a bit behind...
        # wait until the connection url responds with 200-ok.
        while True:
            response = requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD))
            if response.status_code == 200:
                break
909
            sleep(0.25)
910 911 912 913 914

        # now we can delete it.
        response = requests.delete(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD))
        self.assertEqual(204, response.status_code)

915
    @integration_test
Jorrit Schaap's avatar
Jorrit Schaap committed
916
    def test_tobus_send_handling_connection_loss(self):
917 918
        with ToBus(self.tmp_exchange.address) as tobus:
            tobus.send(EventMessage())
919 920

            # force server-side connection loss
921
            self._close_connection_of_bus_on_broker(tobus)
922 923 924 925 926 927 928

            # try to send with timeout of 0 (so there is no opportunity for reconnection) ->  MessagingTimeoutError
            with self.assertRaises(MessagingTimeoutError):
                tobus.send(EventMessage(), timeout=0)

            # send with normal timeout, should just succeed (and not raise)
            tobus.send(EventMessage(), timeout=5)
929

930
    @integration_test
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945
    def test_frombus_send_handling_connection_loss(self):
        with ToBus(self.tmp_exchange.address) as tobus:
            with self.tmp_exchange.create_temporary_queue(auto_delete_on_last_disconnect=False) as tmp_queue:
                with tmp_queue.create_frombus() as frombus:
                    # test normal send/receive -> should work
                    tobus.send(EventMessage())
                    self.assertIsNotNone(frombus.receive())

                    # force server-side connection loss for the receiving frombus connection
                    self._close_connection_of_bus_on_broker(frombus)

                    # test normal send/receive -> should work
                    tobus.send(EventMessage())
                    self.assertIsNotNone(frombus.receive())

946
    @integration_test
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976
    def test_buslistener_handling_connection_loss(self):
        msg_handled_event = ThreadingEvent()

        class SynchonizingHandler(AbstractMessageHandler):
            def handle_message(self, msg: LofarMessage):
                logger.info("handle_message(%s) ... setting msg_handled_event", msg)
                msg_handled_event.set()

        with BusListenerJanitor(BusListener(handler_type=SynchonizingHandler,
                                            exchange=self.tmp_exchange.address)) as listener:
            with ToBus(self.tmp_exchange.address) as tobus:
                # send test message
                tobus.send(EventMessage())

                # wait until mesage is handled...
                self.assertTrue(msg_handled_event.wait(2))
                msg_handled_event.clear()

                # magic lookup of the listeners receiver...
                frombus = list(listener._threads.values())[0]['receiver']
                # ... to force server-side connection loss
                self._close_connection_of_bus_on_broker(frombus)

                # send another test message...
                tobus.send(EventMessage())

                # listener should have handled the 2ns msg as well, even though the connection was broken
                # thanks to auto reconnect
                self.assertTrue(msg_handled_event.wait(2))

977

978 979 980 981 982 983
def load_tests(loader, tests, ignore):
    """add the doctests from lofar.messaging.messagebus to the unittest tests"""
    import doctest
    import lofar.messaging.messagebus
    tests.addTests(doctest.DocTestSuite(lofar.messaging.messagebus))
    return tests
984

985

986
if __name__ == '__main__':
Jorrit Schaap's avatar
Jorrit Schaap committed
987 988
    if not can_connect_to_broker():
        logger.error("Cannot connect to default rabbitmq broker. Skipping test.")
989 990
        exit(3)

Jorrit Schaap's avatar
Jorrit Schaap committed
991
    unittest.main()
992