diff --git a/.gitattributes b/.gitattributes index 310267b59bf7283e7b2c5531fd8325e53819cee4..36ebd7b52b19f8bb175a40e401764ea375339311 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1156,6 +1156,20 @@ CEP/LAPS/GRIDInterface/docs/Presentation[!!-~]LOFAR[!!-~]DPU[!!-~]XML[!!-~]inter CEP/LAPS/GRIDInterface/src/dpu_xml_interface.py eol=lf CEP/LAPS/GRIDInterface/src/pcombine.py eol=lf CEP/LAPS/GRIDInterface/src/pipeline_job.py eol=lf +CEP/LAPS/Messaging/commandlineUtils/addqueue.sh eol=lf +CEP/LAPS/Messaging/commandlineUtils/addtopic.sh eol=lf +CEP/LAPS/Messaging/commandlineUtils/cleanupq.sh eol=lf +CEP/LAPS/Messaging/commandlineUtils/delqueue.sh eol=lf +CEP/LAPS/Messaging/commandlineUtils/deltopic.sh eol=lf +CEP/LAPS/Messaging/commandlineUtils/fed.sh eol=lf +CEP/LAPS/Messaging/commandlineUtils/listqueues.sh eol=lf +CEP/LAPS/Messaging/commandlineUtils/qls.sh eol=lf +CEP/LAPS/Messaging/examples/client.py eol=lf +CEP/LAPS/Messaging/examples/receivemsg.py eol=lf +CEP/LAPS/Messaging/examples/sendmsg.py eol=lf +CEP/LAPS/Messaging/examples/server.py eol=lf +CEP/LAPS/Messaging/src/MsgBus.py eol=lf +CEP/LAPS/Messaging/src/__init__.py eol=lf CEP/LAPS/ParsetCombiner/src/pcombine.py eol=lf CEP/LAPS/QToPipeline/src/LapsServer.py.orig -text CEP/LAPS/QToPipeline/src/QToPipeline.py eol=lf diff --git a/CEP/LAPS/Messaging/CMakeLists.txt b/CEP/LAPS/Messaging/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..bf70f941643fca4bb8ae147bcad20d54eb9de2c4 --- /dev/null +++ b/CEP/LAPS/Messaging/CMakeLists.txt @@ -0,0 +1,6 @@ +# $Id$ + +lofar_package(Laps-messaging 0.1) + +add_subdirectory(src) +add_subdirectory(commandlineUtils) \ No newline at end of file diff --git a/CEP/LAPS/Messaging/commandlineUtils/CMakeLists.txt b/CEP/LAPS/Messaging/commandlineUtils/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..dbad1b5263094d600759bb44fd3b86f22c3a1df7 --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/CMakeLists.txt @@ -0,0 +1,10 @@ +install(PROGRAMS + qls.sh + listqueues.sh + fed.sh + deltopic.sh + delqueue.sh + cleanupq.sh + addtopic.sh + addqueue.sh + DESTINATION bin/queue) \ No newline at end of file diff --git a/CEP/LAPS/Messaging/commandlineUtils/addqueue.sh b/CEP/LAPS/Messaging/commandlineUtils/addqueue.sh new file mode 100755 index 0000000000000000000000000000000000000000..175fa567ed7b35aeb1dcbe559a962dbc01d338da --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/addqueue.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +qpid-config -a localhost add queue $1 --durable diff --git a/CEP/LAPS/Messaging/commandlineUtils/addtopic.sh b/CEP/LAPS/Messaging/commandlineUtils/addtopic.sh new file mode 100755 index 0000000000000000000000000000000000000000..ee019327ea3aaf43a5b83260a72583ba10ae53ae --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/addtopic.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +qpid-config -a localhost add exchange topic $1 +#--durable diff --git a/CEP/LAPS/Messaging/commandlineUtils/cleanupq.sh b/CEP/LAPS/Messaging/commandlineUtils/cleanupq.sh new file mode 100755 index 0000000000000000000000000000000000000000..8da187f12ebdcc4fd0e5dced3478576bd0007a66 --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/cleanupq.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +QUEUES=`qls |grep Y |awk '{ print $1 }'` +for i in $QUEUES +do + delqueue $i +done + diff --git a/CEP/LAPS/Messaging/commandlineUtils/delqueue.sh b/CEP/LAPS/Messaging/commandlineUtils/delqueue.sh new file mode 100755 index 0000000000000000000000000000000000000000..9705708b5d7ddc735ecf34ea678f6128c9a57265 --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/delqueue.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +qpid-config -a localhost del queue $1 diff --git a/CEP/LAPS/Messaging/commandlineUtils/deltopic.sh b/CEP/LAPS/Messaging/commandlineUtils/deltopic.sh new file mode 100755 index 0000000000000000000000000000000000000000..ee019327ea3aaf43a5b83260a72583ba10ae53ae --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/deltopic.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +qpid-config -a localhost add exchange topic $1 +#--durable diff --git a/CEP/LAPS/Messaging/commandlineUtils/fed.sh b/CEP/LAPS/Messaging/commandlineUtils/fed.sh new file mode 100755 index 0000000000000000000000000000000000000000..1f5a1172fbe6c156e74ad66f89b519ae1b398684 --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/fed.sh @@ -0,0 +1,88 @@ +#!/bin/bash +# fed: +# (2014) Jan Rinze Peterzon +# +# This sets up a queue on two nodes +# and subsequently will add a forwarding route +# between the two queues + +# Usage: +# +# fed <source node> <destination node> <queue name> +# +# +# Example: +# fed Groningen Singapore Inbox.Toni +# fed Singapore Groningen Inbox.JR +# +# At Groningen it is now possible to send a +# message to InboxToni and the message will +# automatically be forwarded to Singapore. +# +# JR@Groningen $> sendmsg -a Inbox.Toni -m "Hi Toni!" +# +# Toni@Singapore $> receivemsg -a Inbox.Toni +# Received message: Hi Toni! +# +# Toni@Singapore $> sendmsg -a Inbox.JR -m "Thanks JR!" +# +# JR@Groningen $> receivemsg -a Inbox.JR +# Received message: Thanks JR! +# +argc=$# +echo "num, args: " $argc +myname=$( echo $0 |sed 's|.*/||g' ) +if [ "$argc" -lt 3 ]; then + echo $myname " can be used to setup a forwarded queue." + echo "Usage: " $myname " <QueueName> <SourceNode> <DestinationNode>" + exit -1 +fi + +tmpfile=$( mktemp ) +haserror=0 + +echo "setup queue " $1 " at " $2 +now=$( date ) +echo -n "$now : qpid-config -b $2 add queue $1 :" >>"$tmpfile" +qpid-config -b $2 add queue $1 2>>"$tmpfile" >>"$tmpfile" +if [ "$?" -gt 0 ]; then + echo "failed to create queue " $1 " on " $2 + haserror=1 +else + echo "OK" >> "$tmpfile" +fi + +echo "setup queue " $1 " at " $3 +echo -n "$now qpid-config -b $3 add queue $1 :" >>"$tmpfile" +qpid-config -b $3 add queue $1 2>>"$tmpfile" >>"$tmpfile" +if [ "$?" -gt 0 ]; then + echo "failed to create queue " $1 " on " $2 + haserror=1 +else + echo "OK" >> "$tmpfile" +fi +echo "setup forward route for queue " $1 " from " $2 " to " $3 +echo -n "$now qpid-route queue add $3 $2 '' $1 :" >>"$tmpfile" +qpid-route queue add $3 $2 '' $1 2>>"$tmpfile" >>"$tmpfile" +if [ "$?" -gt 0 ]; then + echo "failed to create forward route for queue " $1 " from " $2 " to " $3 + haserror=1 +else + echo "OK" >> "$tmpfile" +fi + +if [ "$haserror" -gt 0 ]; then + echo "log is in "$tmpfile +else + rm $tmpfile +fi + + +#ping -c 1 $1 2>/dev/null >/dev/null +#result=$? +#if [ "$result" -lt 1 ]; then +# echo Host $1 is alive +#else +# echo Host $1 is not reachable +#fi +# diff --git a/CEP/LAPS/Messaging/commandlineUtils/listqueues.sh b/CEP/LAPS/Messaging/commandlineUtils/listqueues.sh new file mode 100755 index 0000000000000000000000000000000000000000..286a6c9218799d22e11e5163e16cc54679f1080f --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/listqueues.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +qpid-stat -q diff --git a/CEP/LAPS/Messaging/commandlineUtils/qls.sh b/CEP/LAPS/Messaging/commandlineUtils/qls.sh new file mode 100755 index 0000000000000000000000000000000000000000..c6c2fa8af00c1d0eb1ef7610745aa277618381ea --- /dev/null +++ b/CEP/LAPS/Messaging/commandlineUtils/qls.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +HEADER="Queues:" +DESCRIP=" queue dur autoDel excl msg msgIn msgOut bytes bytesIn bytesOut cons bind" +BREAKLINE=" =========================================================================================================================" + +if [ "$1" != "" ] +then + echo "$HEADER" + echo "$DESCRIP" + echo "$BREAKLINE" + qpid-stat -q |grep $1 +else + qpid-stat -q +fi diff --git a/CEP/LAPS/Messaging/examples/client.py b/CEP/LAPS/Messaging/examples/client.py new file mode 100644 index 0000000000000000000000000000000000000000..7598cfd14994522bdc69476f2c909e6d475fb69b --- /dev/null +++ b/CEP/LAPS/Messaging/examples/client.py @@ -0,0 +1,68 @@ +#!/usr/bin/python +# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id$ +from optparse import OptionParser +import sys, time +from qpid.messaging import * +parser = OptionParser() +parser.add_option("-a", "--address", dest="address", default="testqueue", + help="address (name of queue or topic)", metavar="FILE") +parser.add_option("-b", "--broker", dest="broker", default="localhost", + help="broker hostname") +parser.add_option("-c", "--count", dest="count", default=1, + help="number of messages to be sent") + +(options, args) = parser.parse_args() + +print "options :" , +print options +print "args :" , +print args + +broker=options.__dict__['broker'] +address=options.__dict__['address'] +count=int(options.__dict__['count']) + +print " setup connection with ", +print broker +print " on queue or topic :", +print address +print " count of messages :", +print count + +connection = Connection(broker) + +try: + connection.open() + print " opened " + session = connection.session() + print " session " + sender = session.sender(address) + print " sending message " + while count >0: + #time.sleep(2) + print 'send message: Hello world! %d' %(count) + sender.send(Message('Hello world! %d' %(count))) + count -= 1 + + +except MessagingError,m: + print m +finally: + connection.close() diff --git a/CEP/LAPS/Messaging/examples/receivemsg.py b/CEP/LAPS/Messaging/examples/receivemsg.py new file mode 100644 index 0000000000000000000000000000000000000000..21871ce53d9d79788c8eb6244a2d2fbd3da28d7d --- /dev/null +++ b/CEP/LAPS/Messaging/examples/receivemsg.py @@ -0,0 +1,52 @@ +#!/usr/bin/python + +import sys +from qpid.messaging import * +from optparse import OptionParser + + +parser = OptionParser() +parser.add_option("-a", "--address", dest="address", default="testqueue;{create:always}", + help="address (name of queue or topic)", metavar="FILE") +parser.add_option("-b", "--broker", dest="broker", default="localhost", + help="broker hostname") +parser.add_option("-c", "--count", dest="count", default=1, + help="number of messages to be sent") + +(options, args) = parser.parse_args() + +print "options :" , +print options +print "args :" , +print args + +broker=options.__dict__['broker'] +address=options.__dict__['address'] +count=int(options.__dict__['count']) + + +print " setup connection " +#if len(sys.argv)<3 else sys.argv[2] + +connection = Connection(broker) + +try: + connection.open() + print " opened " + session = connection.session() + print " session " + receiver = session.receiver(address) + message = receiver.fetch() + while (message and count): + print "received :", + print message.content + session.acknowledge() + if count>0: + count = count - 1 + if count>0: + message = receiver.fetch() + +except MessagingError,m: + print m +finally: + connection.close() diff --git a/CEP/LAPS/Messaging/examples/sendmsg.py b/CEP/LAPS/Messaging/examples/sendmsg.py new file mode 100644 index 0000000000000000000000000000000000000000..7571a4f067ab59233705d425bd8118325dcf998d --- /dev/null +++ b/CEP/LAPS/Messaging/examples/sendmsg.py @@ -0,0 +1,56 @@ +#!/usr/bin/python +from optparse import OptionParser +import sys, time +from qpid.messaging import * +parser = OptionParser() +parser.add_option("-a", "--address", dest="address", default="testqueue", + help="address (name of queue or topic)", metavar="FILE") +parser.add_option("-b", "--broker", dest="broker", default="localhost", + help="broker hostname") +parser.add_option("-c", "--count", dest="count", default=1, + help="number of messages to be sent") +parser.add_option("-m", "--message", dest="message", default="void", + help="number of messages to be sent") + +(options, args) = parser.parse_args() + +print "options :" , +print options +print "args :" , +print args + +broker=options.__dict__['broker'] +address=options.__dict__['address'] +count=int(options.__dict__['count']) +message=options.__dict__['message'] + +print " setup connection with ", +print broker +print " on queue or topic :", +print address +print " count of messages :", +print count + +connection = Connection(broker) + +try: + connection.open() + print " opened " + session = connection.session() + print " session " + sender = session.sender(address) + print " sending message " + while count >0: + #time.sleep(2) + print 'send message: Hello world! %d' %(count) + if message=="void": + sender.send(Message('Hello world! %d' %(count))) + else: + sender.send(Message(message)) + count -= 1 + + +except MessagingError,m: + print m +finally: + connection.close() diff --git a/CEP/LAPS/Messaging/examples/server.py b/CEP/LAPS/Messaging/examples/server.py new file mode 100644 index 0000000000000000000000000000000000000000..caa8ce7ebdebd2a0c630aa96cfbfd5f4bac29010 --- /dev/null +++ b/CEP/LAPS/Messaging/examples/server.py @@ -0,0 +1,66 @@ +#!/usr/bin/python +# Copyright (C) 2012-2014 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id$ +import sys +from qpid.messaging import * +from optparse import OptionParser + + +parser = OptionParser() +parser.add_option("-a", "--address", dest="address", default="testqueue;{create:always}", + help="address (name of queue or topic)", metavar="FILE") +parser.add_option("-b", "--broker", dest="broker", default="localhost", + help="broker hostname") +parser.add_option("-c", "--count", dest="count", default=1, + help="number of messages to be sent") + +(options, args) = parser.parse_args() + +print "options :" , +print options +print "args :" , +print args + +broker=options.__dict__['broker'] +address=options.__dict__['address'] +count=int(options.__dict__['count']) + + +print " setup connection " +#if len(sys.argv)<3 else sys.argv[2] + +connection = Connection(broker) + +try: + connection.open() + print " opened " + session = connection.session() + print " session " + receiver = session.receiver(address) + message = receiver.fetch() + while message: + print "received :", + print message.content + session.acknowledge() + message = receiver.fetch() + +except MessagingError,m: + print m +finally: + connection.close() diff --git a/CEP/LAPS/Messaging/src/CMakeLists.txt b/CEP/LAPS/Messaging/src/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..9d1f9d539db3e03e6b7fece81772f01cf522b6e8 --- /dev/null +++ b/CEP/LAPS/Messaging/src/CMakeLists.txt @@ -0,0 +1,9 @@ +# $Id$ + +include(PythonInstall) + +python_install( + MsgBus.py + __init__.py + + DESTINATION LAPS/MsgBus) \ No newline at end of file diff --git a/CEP/LAPS/Messaging/src/MsgBus.py b/CEP/LAPS/Messaging/src/MsgBus.py new file mode 100644 index 0000000000000000000000000000000000000000..7019326ec044172c659442659a819379ed95d94a --- /dev/null +++ b/CEP/LAPS/Messaging/src/MsgBus.py @@ -0,0 +1,54 @@ +#!/usr/bin/python +# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id$ +from qpid.messaging import * + +# Candidate for a config file +broker="lof022" +address="laps.cep3.pipeline.start" +options="create:always, node: { type: queue, durable: True}" + +class Bus(): + def __init__(self): + self.connection = Connection(broker) + self.connection.reconnect = True + + try: + self.connection.open() + self.session = self.connection.session() + self.receiver = self.session.receiver("%s;{%s}" %(address,options)) + self.sender = self.session.sender(address) + + except MessagingError,m: + print " OMG!!" + print m + + def send(self,parsetdata,subject="defaultfilename.out"): + msg = Message(parsetdata) + msg.subject=subject + msg.durable=True + self.sender.send(msg) + + def get(self): + msg= self.receiver.fetch() + return msg.content, msg.subject + + def ack(self): + self.session.acknowledge() + diff --git a/CEP/LAPS/Messaging/src/__init__.py b/CEP/LAPS/Messaging/src/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..efe4be4a176f6fe8f1183019e09d0e600062003a --- /dev/null +++ b/CEP/LAPS/Messaging/src/__init__.py @@ -0,0 +1,19 @@ +# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id$ +all