diff --git a/.gitattributes b/.gitattributes index 1548656895700a0544f6f0134451e1108fded62c..6337b2a1d43db524efff0ca261dff3885bf4b81f 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1141,6 +1141,7 @@ CEP/LAPS/Messaging/examples/receivemsg.py eol=lf CEP/LAPS/Messaging/examples/sendmsg.py eol=lf CEP/LAPS/Messaging/examples/server.py eol=lf CEP/LAPS/Messaging/src/MsgBus/Bus.py eol=lf +CEP/LAPS/Messaging/src/MsgBus/MsgBus.py -text CEP/LAPS/Messaging/src/MsgBus/__init__.py eol=lf CEP/LAPS/Messaging/src/__init__.py eol=lf CEP/LAPS/ParsetCombiner/src/pcombine.py eol=lf diff --git a/CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt b/CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt index 15cbd152b3fe420bf2545ce61029af49c4cd52fa..e5464848fd7d8b299f2b97a9fdb70ffa8ba772b9 100644 --- a/CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt +++ b/CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt @@ -4,5 +4,5 @@ include(PythonInstall) python_install( __init__.py - Bus.py - DESTINATION LAPS/MsgBus) \ No newline at end of file + MsgBus.py + DESTINATION LAPS/) \ No newline at end of file diff --git a/CEP/LAPS/Messaging/src/MsgBus/MsgBus.py b/CEP/LAPS/Messaging/src/MsgBus/MsgBus.py new file mode 100644 index 0000000000000000000000000000000000000000..b6d9d6281067d19a30e21681a807c88d6c8fb9c2 --- /dev/null +++ b/CEP/LAPS/Messaging/src/MsgBus/MsgBus.py @@ -0,0 +1,91 @@ +#!/usr/bin/python +# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# id.. TDB +from qpid.messaging import * + +# Candidate for a config file +broker="localhost" +address="laps.defualtqueue" +options="create:always, node: { type: queue, durable: True}" + +class Bus(): + def __init__(self, address=address, broker=broker, options=options): + self.connection = Connection(broker) + self.connection.reconnect = True + + try: + self.connection.open() + self.session = self.connection.session() + self.receiver = self.session.receiver("%s;{%s}" %(address,options)) + self.receiver.capacity = 32 + self.sender = self.session.sender(address) + + except MessagingError,m: + print " OMG!!" + print m + + def send(self,parsetdata,subject="defaultfilename.out"): + msg = Message(parsetdata) + msg.subject=subject + msg.durable=True + self.sender.send(msg) + + def get(self): + msg= self.receiver.fetch() + return msg.content, msg.subject + + def ack(self): + self.session.acknowledge() + +class MultiReceiveBus(): + def __init__(self, handler, address=address, broker=broker, options=options): + self.connection = Connection(broker) + self.connection.reconnect = True + self.handlers={} + try: + self.connection.open() + self.session = self.connection.session() + receiver = self.session.receiver("%s;{%s}" %(address,options)) + receiver.capacity = 32 + self.handlers[receiver] = handler + + except MessagingError,m: + print " OMG!!" + print m + + def add(self,handler,address,options=options): + try: + receiver=self.session.receiver("%s;{%s}" %(address,options)) + receiver.capacity = 32 + self.handlers[receiver]=handler + except MessagingError,m: + print "Error adding receiver" + print m + + def HandleMessages(self): + while True: + print "waiting for messages" + receiver = self.session.next_receiver() + print "got incoming message" + handler = self.handlers[receiver] + msg = receiver.fetch() + handler(self,msg.content,msg.subject) + + def ack(self): + self.session.acknowledge()