Skip to content
Snippets Groups Projects
Commit 0187294b authored by Wouter Klijn's avatar Wouter Klijn
Browse files

Task #5994: Add the messaging framework

parent 52814885
Branches
Tags
No related merge requests found
Showing
with 482 additions and 0 deletions
......@@ -1156,6 +1156,20 @@ CEP/LAPS/GRIDInterface/docs/Presentation[!!-~]LOFAR[!!-~]DPU[!!-~]XML[!!-~]inter
CEP/LAPS/GRIDInterface/src/dpu_xml_interface.py eol=lf
CEP/LAPS/GRIDInterface/src/pcombine.py eol=lf
CEP/LAPS/GRIDInterface/src/pipeline_job.py eol=lf
CEP/LAPS/Messaging/commandlineUtils/addqueue.sh eol=lf
CEP/LAPS/Messaging/commandlineUtils/addtopic.sh eol=lf
CEP/LAPS/Messaging/commandlineUtils/cleanupq.sh eol=lf
CEP/LAPS/Messaging/commandlineUtils/delqueue.sh eol=lf
CEP/LAPS/Messaging/commandlineUtils/deltopic.sh eol=lf
CEP/LAPS/Messaging/commandlineUtils/fed.sh eol=lf
CEP/LAPS/Messaging/commandlineUtils/listqueues.sh eol=lf
CEP/LAPS/Messaging/commandlineUtils/qls.sh eol=lf
CEP/LAPS/Messaging/examples/client.py eol=lf
CEP/LAPS/Messaging/examples/receivemsg.py eol=lf
CEP/LAPS/Messaging/examples/sendmsg.py eol=lf
CEP/LAPS/Messaging/examples/server.py eol=lf
CEP/LAPS/Messaging/src/MsgBus.py eol=lf
CEP/LAPS/Messaging/src/__init__.py eol=lf
CEP/LAPS/ParsetCombiner/src/pcombine.py eol=lf
CEP/LAPS/QToPipeline/src/LapsServer.py.orig -text
CEP/LAPS/QToPipeline/src/QToPipeline.py eol=lf
......
# $Id$
lofar_package(Laps-messaging 0.1)
add_subdirectory(src)
add_subdirectory(commandlineUtils)
\ No newline at end of file
install(PROGRAMS
qls.sh
listqueues.sh
fed.sh
deltopic.sh
delqueue.sh
cleanupq.sh
addtopic.sh
addqueue.sh
DESTINATION bin/queue)
\ No newline at end of file
#!/bin/bash
qpid-config -a localhost add queue $1 --durable
#!/bin/bash
qpid-config -a localhost add exchange topic $1
#--durable
#!/bin/bash
QUEUES=`qls |grep Y |awk '{ print $1 }'`
for i in $QUEUES
do
delqueue $i
done
#!/bin/bash
qpid-config -a localhost del queue $1
#!/bin/bash
qpid-config -a localhost add exchange topic $1
#--durable
#!/bin/bash
# fed:
# (2014) Jan Rinze Peterzon
#
# This sets up a queue on two nodes
# and subsequently will add a forwarding route
# between the two queues
# Usage:
#
# fed <source node> <destination node> <queue name>
#
#
# Example:
# fed Groningen Singapore Inbox.Toni
# fed Singapore Groningen Inbox.JR
#
# At Groningen it is now possible to send a
# message to InboxToni and the message will
# automatically be forwarded to Singapore.
#
# JR@Groningen $> sendmsg -a Inbox.Toni -m "Hi Toni!"
#
# Toni@Singapore $> receivemsg -a Inbox.Toni
# Received message: Hi Toni!
#
# Toni@Singapore $> sendmsg -a Inbox.JR -m "Thanks JR!"
#
# JR@Groningen $> receivemsg -a Inbox.JR
# Received message: Thanks JR!
#
argc=$#
echo "num, args: " $argc
myname=$( echo $0 |sed 's|.*/||g' )
if [ "$argc" -lt 3 ]; then
echo $myname " can be used to setup a forwarded queue."
echo "Usage: " $myname " <QueueName> <SourceNode> <DestinationNode>"
exit -1
fi
tmpfile=$( mktemp )
haserror=0
echo "setup queue " $1 " at " $2
now=$( date )
echo -n "$now : qpid-config -b $2 add queue $1 :" >>"$tmpfile"
qpid-config -b $2 add queue $1 2>>"$tmpfile" >>"$tmpfile"
if [ "$?" -gt 0 ]; then
echo "failed to create queue " $1 " on " $2
haserror=1
else
echo "OK" >> "$tmpfile"
fi
echo "setup queue " $1 " at " $3
echo -n "$now qpid-config -b $3 add queue $1 :" >>"$tmpfile"
qpid-config -b $3 add queue $1 2>>"$tmpfile" >>"$tmpfile"
if [ "$?" -gt 0 ]; then
echo "failed to create queue " $1 " on " $2
haserror=1
else
echo "OK" >> "$tmpfile"
fi
echo "setup forward route for queue " $1 " from " $2 " to " $3
echo -n "$now qpid-route queue add $3 $2 '' $1 :" >>"$tmpfile"
qpid-route queue add $3 $2 '' $1 2>>"$tmpfile" >>"$tmpfile"
if [ "$?" -gt 0 ]; then
echo "failed to create forward route for queue " $1 " from " $2 " to " $3
haserror=1
else
echo "OK" >> "$tmpfile"
fi
if [ "$haserror" -gt 0 ]; then
echo "log is in "$tmpfile
else
rm $tmpfile
fi
#ping -c 1 $1 2>/dev/null >/dev/null
#result=$?
#if [ "$result" -lt 1 ]; then
# echo Host $1 is alive
#else
# echo Host $1 is not reachable
#fi
#
#!/bin/bash
qpid-stat -q
#!/bin/bash
HEADER="Queues:"
DESCRIP=" queue dur autoDel excl msg msgIn msgOut bytes bytesIn bytesOut cons bind"
BREAKLINE=" ========================================================================================================================="
if [ "$1" != "" ]
then
echo "$HEADER"
echo "$DESCRIP"
echo "$BREAKLINE"
qpid-stat -q |grep $1
else
qpid-stat -q
fi
#!/usr/bin/python
# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
from optparse import OptionParser
import sys, time
from qpid.messaging import *
parser = OptionParser()
parser.add_option("-a", "--address", dest="address", default="testqueue",
help="address (name of queue or topic)", metavar="FILE")
parser.add_option("-b", "--broker", dest="broker", default="localhost",
help="broker hostname")
parser.add_option("-c", "--count", dest="count", default=1,
help="number of messages to be sent")
(options, args) = parser.parse_args()
print "options :" ,
print options
print "args :" ,
print args
broker=options.__dict__['broker']
address=options.__dict__['address']
count=int(options.__dict__['count'])
print " setup connection with ",
print broker
print " on queue or topic :",
print address
print " count of messages :",
print count
connection = Connection(broker)
try:
connection.open()
print " opened "
session = connection.session()
print " session "
sender = session.sender(address)
print " sending message "
while count >0:
#time.sleep(2)
print 'send message: Hello world! %d' %(count)
sender.send(Message('Hello world! %d' %(count)))
count -= 1
except MessagingError,m:
print m
finally:
connection.close()
#!/usr/bin/python
import sys
from qpid.messaging import *
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-a", "--address", dest="address", default="testqueue;{create:always}",
help="address (name of queue or topic)", metavar="FILE")
parser.add_option("-b", "--broker", dest="broker", default="localhost",
help="broker hostname")
parser.add_option("-c", "--count", dest="count", default=1,
help="number of messages to be sent")
(options, args) = parser.parse_args()
print "options :" ,
print options
print "args :" ,
print args
broker=options.__dict__['broker']
address=options.__dict__['address']
count=int(options.__dict__['count'])
print " setup connection "
#if len(sys.argv)<3 else sys.argv[2]
connection = Connection(broker)
try:
connection.open()
print " opened "
session = connection.session()
print " session "
receiver = session.receiver(address)
message = receiver.fetch()
while (message and count):
print "received :",
print message.content
session.acknowledge()
if count>0:
count = count - 1
if count>0:
message = receiver.fetch()
except MessagingError,m:
print m
finally:
connection.close()
#!/usr/bin/python
from optparse import OptionParser
import sys, time
from qpid.messaging import *
parser = OptionParser()
parser.add_option("-a", "--address", dest="address", default="testqueue",
help="address (name of queue or topic)", metavar="FILE")
parser.add_option("-b", "--broker", dest="broker", default="localhost",
help="broker hostname")
parser.add_option("-c", "--count", dest="count", default=1,
help="number of messages to be sent")
parser.add_option("-m", "--message", dest="message", default="void",
help="number of messages to be sent")
(options, args) = parser.parse_args()
print "options :" ,
print options
print "args :" ,
print args
broker=options.__dict__['broker']
address=options.__dict__['address']
count=int(options.__dict__['count'])
message=options.__dict__['message']
print " setup connection with ",
print broker
print " on queue or topic :",
print address
print " count of messages :",
print count
connection = Connection(broker)
try:
connection.open()
print " opened "
session = connection.session()
print " session "
sender = session.sender(address)
print " sending message "
while count >0:
#time.sleep(2)
print 'send message: Hello world! %d' %(count)
if message=="void":
sender.send(Message('Hello world! %d' %(count)))
else:
sender.send(Message(message))
count -= 1
except MessagingError,m:
print m
finally:
connection.close()
#!/usr/bin/python
# Copyright (C) 2012-2014 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
import sys
from qpid.messaging import *
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-a", "--address", dest="address", default="testqueue;{create:always}",
help="address (name of queue or topic)", metavar="FILE")
parser.add_option("-b", "--broker", dest="broker", default="localhost",
help="broker hostname")
parser.add_option("-c", "--count", dest="count", default=1,
help="number of messages to be sent")
(options, args) = parser.parse_args()
print "options :" ,
print options
print "args :" ,
print args
broker=options.__dict__['broker']
address=options.__dict__['address']
count=int(options.__dict__['count'])
print " setup connection "
#if len(sys.argv)<3 else sys.argv[2]
connection = Connection(broker)
try:
connection.open()
print " opened "
session = connection.session()
print " session "
receiver = session.receiver(address)
message = receiver.fetch()
while message:
print "received :",
print message.content
session.acknowledge()
message = receiver.fetch()
except MessagingError,m:
print m
finally:
connection.close()
# $Id$
include(PythonInstall)
python_install(
MsgBus.py
__init__.py
DESTINATION LAPS/MsgBus)
\ No newline at end of file
#!/usr/bin/python
# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
from qpid.messaging import *
# Candidate for a config file
broker="lof022"
address="laps.cep3.pipeline.start"
options="create:always, node: { type: queue, durable: True}"
class Bus():
def __init__(self):
self.connection = Connection(broker)
self.connection.reconnect = True
try:
self.connection.open()
self.session = self.connection.session()
self.receiver = self.session.receiver("%s;{%s}" %(address,options))
self.sender = self.session.sender(address)
except MessagingError,m:
print " OMG!!"
print m
def send(self,parsetdata,subject="defaultfilename.out"):
msg = Message(parsetdata)
msg.subject=subject
msg.durable=True
self.sender.send(msg)
def get(self):
msg= self.receiver.fetch()
return msg.content, msg.subject
def ack(self):
self.session.acknowledge()
# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
all
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment