diff --git a/.gitattributes b/.gitattributes index 4b05331683f0e3ad9e320f794623612a5a3b8707..9652efcd8bdc221f76b95198ccf24344e6b145c1 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1642,6 +1642,7 @@ LCS/Messaging/python/messaging/CMakeLists.txt -text LCS/Messaging/python/messaging/RPC.py -text LCS/Messaging/python/messaging/Service.py -text LCS/Messaging/python/messaging/__init__.py -text +LCS/Messaging/python/messaging/broker.py -text LCS/Messaging/python/messaging/exceptions.py -text LCS/Messaging/python/messaging/messagebus.py -text LCS/Messaging/python/messaging/messages.py -text diff --git a/LCS/Messaging/python/messaging/CMakeLists.txt b/LCS/Messaging/python/messaging/CMakeLists.txt index 7da1a11a0d96fee3aa46de0b60518c58f9edb52f..efe03f8ed791a430b53b29328ef32c5f3f06842e 100644 --- a/LCS/Messaging/python/messaging/CMakeLists.txt +++ b/LCS/Messaging/python/messaging/CMakeLists.txt @@ -11,6 +11,7 @@ set(_py_files messages.py RPC.py Service.py + broker.py ) python_install(${_py_files} DESTINATION lofar/messaging) diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py index 4a03bbeea2e381aa456ec4f75c0ffe7fd7a22d04..f80694b7fec79f173bf57fb8ceae5723fa7d3e6d 100644 --- a/LCS/Messaging/python/messaging/RPC.py +++ b/LCS/Messaging/python/messaging/RPC.py @@ -97,6 +97,7 @@ class RPC(): self.BusName = kwargs.pop("busname", None) self.ServiceName = service self.broker = broker if broker else 'localhost' + if self.BusName is None: self.Request = ToBus(self.ServiceName, broker=self.broker) else: diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index 2c4aa63d4fff498f84bcab38bc3a1b0cfc20b449..ba1c9ebd582266ee8c1a85840f5358a75c1b6cec 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -236,7 +236,7 @@ class Service(AbstractBusListener): # send the result to the RPC client try: - with ToBus(reply_to) as dest: + with ToBus(reply_to, broker=self.broker) as dest: dest.send(reply_msg) except MessageBusError as e: logger.error("Failed to send reply messgage to reply address %s. Error: %s", reply_to, e) diff --git a/LCS/Messaging/python/messaging/broker.py b/LCS/Messaging/python/messaging/broker.py new file mode 100644 index 0000000000000000000000000000000000000000..5344b67c0ae2520009acb7fdce2be57fbef1bb6a --- /dev/null +++ b/LCS/Messaging/python/messaging/broker.py @@ -0,0 +1,112 @@ +# pretty much taken from the Proton example code +# https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/examples/broker.py.html + +import collections, optparse, uuid +from proton import Endpoint +from proton.handlers import MessagingHandler +from proton.reactor import Container + +class Queue(object): + def __init__(self, dynamic=False): + self.dynamic = dynamic + self.queue = collections.deque() + self.consumers = [] + + def subscribe(self, consumer): + self.consumers.append(consumer) + + def unsubscribe(self, consumer): + if consumer in self.consumers: + self.consumers.remove(consumer) + return len(self.consumers) == 0 and (self.dynamic or self.queue.count == 0) + + def publish(self, message): + self.queue.append(message) + self.dispatch() + + def dispatch(self, consumer=None): + if consumer: + c = [consumer] + else: + c = self.consumers + while self._deliver_to(c): pass + + def _deliver_to(self, consumers): + try: + result = False + for c in consumers: + if c.credit: + c.send(self.queue.popleft()) + result = True + return result + except IndexError: # no more messages + return False + +class Broker(MessagingHandler): + def __init__(self, url): + super(Broker, self).__init__() + self.url = url + self.queues = {} + + def on_start(self, event): + self.acceptor = event.container.listen(self.url) + + def _queue(self, address): + if address not in self.queues: + self.queues[address] = Queue() + return self.queues[address] + + def on_link_opening(self, event): + if event.link.is_sender: + if event.link.remote_source.dynamic: + address = str(uuid.uuid4()) + event.link.source.address = address + q = Queue(True) + self.queues[address] = q + q.subscribe(event.link) + elif event.link.remote_source.address: + event.link.source.address = event.link.remote_source.address + self._queue(event.link.source.address).subscribe(event.link) + elif event.link.remote_target.address: + event.link.target.address = event.link.remote_target.address + + def _unsubscribe(self, link): + if link.source.address in self.queues and self.queues[link.source.address].unsubscribe(link): + del self.queues[link.source.address] + + def on_link_closing(self, event): + if event.link.is_sender: + self._unsubscribe(event.link) + + def on_connection_closing(self, event): + self.remove_stale_consumers(event.connection) + + def on_disconnected(self, event): + self.remove_stale_consumers(event.connection) + + def remove_stale_consumers(self, connection): + l = connection.link_head(Endpoint.REMOTE_ACTIVE) + while l: + if l.is_sender: + self._unsubscribe(l) + l = l.next(Endpoint.REMOTE_ACTIVE) + + def on_sendable(self, event): + self._queue(event.link.source.address).dispatch(event.link) + + def on_message(self, event): + address = event.link.target.address + if address is None: + address = event.message.address + self._queue(address).publish(event.message) + + +if __name__ == '__main__': + parser = optparse.OptionParser(usage="usage: %prog [options]") + parser.add_option("-a", "--address", default="localhost:5672", + help="address router listens on (default %default)") + opts, args = parser.parse_args() + + try: + Container(Broker(opts.address)).run() + except KeyboardInterrupt: pass \ No newline at end of file diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 35888a367be92aa3b1116cb0e7e756451c81dffd..77a8b25abbbd5d5a737167691b4ce53eb021d63b 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -39,6 +39,7 @@ import sys import uuid import threading from copy import deepcopy +import re logger = logging.getLogger(__name__) @@ -222,7 +223,7 @@ class FromBus(object): if hasattr(self, 'subject') and self.subject is not None: logger.debug("got subject: %s | filter for subject: %s" % (msg.subject, self.subject)) # ...check if the message subject differs from the one we filter for - if msg.subject != self.subject: + if not re.match(re.compile(self.subject), msg.subject): pass # ignore, and receive next one else: break # handle this message diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py index d8fc25e24838a527849213662b408750f56e69cd..32f61d836dbdb8dfa38d9665b41125e991e38298 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py @@ -7,11 +7,13 @@ import logging from lofar.messaging import Service from lofar.sas.resourceassignment.resourceassignmentservice.service import createService from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC, RARPCException -from qpid.messaging.exceptions import * +from lofar.messaging.broker import Broker +import threading try: - from qpid.messaging import Connection - from qpidtoollibs import BrokerAgent + import proton + import proton.utils + #from qpidtoollibs import BrokerAgent except ImportError: print('Cannot run test without qpid tools') print('Please source qpid profile') @@ -23,17 +25,20 @@ from unittest.mock import patch connection = None broker = None + try: logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) - # setup broker connection - connection = Connection.establish('127.0.0.1') - broker = BrokerAgent(connection) + # setup broker + address = 'localhost:5673' # todo: auto-discover a suitable port for this + broker = proton.utils.Container(Broker(address)) + broker_thread = threading.Thread(target=broker.run).start() # todo: have this happen in a context + #connection = proton.utils.BlockingConnection(address) # add test service busname busname = 'test-lofarbus-%s' % (uuid.uuid1()) - broker.addExchange('topic', busname) + #broker.addExchange('topic', busname) # the system under test is the service and the rpc, not the RADatabase # so, patch (mock) the RADatabase class during these tests. @@ -59,7 +64,7 @@ try: def test(self): '''basic test ''' - rpc = RARPC(busname=busname) + rpc = RARPC(broker=address, busname=busname) self.assertEqual(mock.getTaskStatuses.return_value, rpc.getTaskStatuses()) self.assertEqual(mock.getTaskTypes.return_value, rpc.getTaskTypes()) self.assertEqual(mock.getResourceClaimStatuses.return_value, rpc.getResourceClaimStatuses()) @@ -86,17 +91,17 @@ try: #self.assertTrue('got an unexpected keyword argument \'fooarg\'' in str(cm.exception)) # create and run the service - with createService(busname=busname): + with createService(broker=address, busname=busname): # and run all tests unittest.main() -except ConnectError as ce: - logger.error(ce) +except proton.ProtonException as ce: + logging.error(ce) exit(3) finally: # cleanup test bus and exit if broker: - broker.delExchange(busname) - if connection: - connection.close() + broker.stop() + # if connection: + # connection.close()