From 05c8860e56f443f2e8149615ec4f7ea0d78cb3a0 Mon Sep 17 00:00:00 2001
From: Jan Rinze Peterzon <peterzon@astron.nl>
Date: Mon, 3 Nov 2014 14:22:52 +0000
Subject: [PATCH] Task #6737: Added MsgBus.py and updated CMakeList.txt to
 include in library.

---
 .gitattributes                               |  1 +
 CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt |  4 +-
 CEP/LAPS/Messaging/src/MsgBus/MsgBus.py      | 91 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 2 deletions(-)
 create mode 100644 CEP/LAPS/Messaging/src/MsgBus/MsgBus.py

diff --git a/.gitattributes b/.gitattributes
index 15486568957..6337b2a1d43 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1141,6 +1141,7 @@ CEP/LAPS/Messaging/examples/receivemsg.py eol=lf
 CEP/LAPS/Messaging/examples/sendmsg.py eol=lf
 CEP/LAPS/Messaging/examples/server.py eol=lf
 CEP/LAPS/Messaging/src/MsgBus/Bus.py eol=lf
+CEP/LAPS/Messaging/src/MsgBus/MsgBus.py -text
 CEP/LAPS/Messaging/src/MsgBus/__init__.py eol=lf
 CEP/LAPS/Messaging/src/__init__.py eol=lf
 CEP/LAPS/ParsetCombiner/src/pcombine.py eol=lf
diff --git a/CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt b/CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt
index 15cbd152b3f..e5464848fd7 100644
--- a/CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt
+++ b/CEP/LAPS/Messaging/src/MsgBus/CMakeLists.txt
@@ -4,5 +4,5 @@ include(PythonInstall)
 
 python_install(
   __init__.py
-  Bus.py
-  DESTINATION LAPS/MsgBus)
\ No newline at end of file
+  MsgBus.py
+  DESTINATION LAPS/)
\ No newline at end of file
diff --git a/CEP/LAPS/Messaging/src/MsgBus/MsgBus.py b/CEP/LAPS/Messaging/src/MsgBus/MsgBus.py
new file mode 100644
index 00000000000..b6d9d628106
--- /dev/null
+++ b/CEP/LAPS/Messaging/src/MsgBus/MsgBus.py
@@ -0,0 +1,91 @@
+#!/usr/bin/python
+# Copyright (C) 2012-2013  ASTRON (Netherlands Institute for Radio Astronomy)
+# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
+#
+# This file is part of the LOFAR software suite.
+# The LOFAR software suite is free software: you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# The LOFAR software suite is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+#
+# id.. TDB
+from qpid.messaging import *
+
+# Candidate for a config file
+broker="localhost" 
+address="laps.defualtqueue"
+options="create:always, node: { type: queue, durable: True}"
+
+class Bus():
+    def __init__(self, address=address, broker=broker, options=options):
+        self.connection = Connection(broker)
+        self.connection.reconnect = True
+
+        try:
+            self.connection.open()
+            self.session = self.connection.session() 
+            self.receiver = self.session.receiver("%s;{%s}" %(address,options))
+	    self.receiver.capacity = 32
+            self.sender = self.session.sender(address)
+
+        except MessagingError,m:
+            print " OMG!!"
+            print m
+
+    def send(self,parsetdata,subject="defaultfilename.out"):
+        msg = Message(parsetdata)
+        msg.subject=subject
+        msg.durable=True
+        self.sender.send(msg)
+
+    def get(self):
+        msg= self.receiver.fetch()
+        return msg.content, msg.subject
+
+    def ack(self):
+          self.session.acknowledge()
+
+class MultiReceiveBus():
+    def __init__(self, handler, address=address, broker=broker, options=options):
+        self.connection = Connection(broker)
+        self.connection.reconnect = True
+        self.handlers={}
+        try:
+            self.connection.open()
+            self.session = self.connection.session()
+            receiver = self.session.receiver("%s;{%s}" %(address,options))
+            receiver.capacity = 32
+            self.handlers[receiver] = handler
+
+        except MessagingError,m:
+            print " OMG!!"
+            print m
+
+    def add(self,handler,address,options=options):
+        try:
+            receiver=self.session.receiver("%s;{%s}" %(address,options))
+            receiver.capacity = 32
+            self.handlers[receiver]=handler
+        except MessagingError,m:
+            print "Error adding receiver"
+            print m
+
+    def HandleMessages(self):
+        while True:
+                print "waiting for messages"
+                receiver = self.session.next_receiver()
+                print "got incoming message"
+                handler = self.handlers[receiver]
+                msg = receiver.fetch()
+                handler(self,msg.content,msg.subject)
+
+    def ack(self):
+          self.session.acknowledge()
-- 
GitLab