Skip to content
Snippets Groups Projects
Commit 2e97ad8f authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-658: fixed test for tMessageRouter. Added annotations that MessageRouter is...

SW-658: fixed test for tMessageRouter. Added annotations that MessageRouter is deprecated and should be replaced by qpid-broker-configuration. Added deprecation warnings for lofar.messagebus.messagebus.
parent ab09ecd6
No related branches found
No related tags found
No related merge requests found
......@@ -1581,6 +1581,7 @@ LCS/MessageDaemons/ObservationStartListener/test/tObservationStartListener.sh eo
LCS/MessageDaemons/src/MessageRouter -text
LCS/MessageDaemons/src/MessageRouter.conf.ccu001 -text
LCS/MessageDaemons/src/MessageRouter.conf.ccu199 -text
LCS/MessageDaemons/test/tMessageRouter.py eol=lf
LCS/MessageDaemons/test/tMessageRouter.run eol=lf
LCS/MessageDaemons/test/tMessageRouter.sh eol=lf
LCS/MessageDaemons/webmonitor/QPIDWebserverJSON -text
......
......@@ -37,6 +37,9 @@ options="create:never"
logger=logging.getLogger("MessageBus")
#TODO: replace this version of the messagebus by the version in LCS/Messaging/python/messaging
logger.warning("This version of the messagebus (lofar.messagebus.messagebus) is deprecated and will be replaced by lofar.messaging.messagebus")
# Set to True if the caller parses LOFARENV -- required for surface between MessageBus and Messaging libs
IGNORE_LOFARENV=False
......
......@@ -7,14 +7,10 @@ set(messagedaemons_LIB_SRCS
lofar_add_library(messagedaemons ${messagedaemons_LIB_SRCS})
install(FILES
MessageRouter
DESTINATION bin)
lofar_add_bin_scripts(MessageRouter)
file(GLOB MessageRouterConfs
MessageRouter.conf*
)
install(FILES
${MessageRouterConfs}
DESTINATION etc)
lofar_add_sysconf_files(${MessageRouterConfs})
......@@ -21,10 +21,14 @@
"""
import logging
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
#TODO: replace MessageRouter, and just configure the qpid broker to provide the same functionality.
logger.warning("MessageRouter is deprecated and should/will be replaced by using the qpid-broker's functionality to copy messages to multiple queue's.")
import lofar.messagebus.messagebus as messagebus
import lofar.messagebus.message as message
from lofar.common.util import waitForInterrupt
import threading
from configparser import ConfigParser
......@@ -32,8 +36,6 @@ import os.path
import sys
from datetime import datetime
def log(level, msg):
print("%s %-4s %s" % (str(datetime.now())[:-3], level, msg))
class BusMulticast(threading.Thread):
"""
......@@ -44,17 +46,19 @@ class BusMulticast(threading.Thread):
self.source = source
self.destlist = destlist
self.done = False
logger.info("Setting up multicast from %s to %s" % (self.source, self.destlist))
def stop(self):
self.done = True
def run(self):
logger.info("Starting multicast from %s to %s" % (self.source, self.destlist))
try:
inbus = messagebus.FromBus(self.source)
outbusses = [messagebus.ToBus(dest) for dest in self.destlist]
outdump = messagebus.ToBus("dump.%s" % (self.source,), "create:always,delete:always,node:{type:topic}")
log("INFO","[%s] Forwarding to %s" % (self.source,self.destlist))
logger.info("[%s] Forwarding to %s" % (self.source,self.destlist))
while not self.done:
# TODO: Use a transaction (not supported yet in qpid 0.30)
......@@ -65,27 +69,24 @@ class BusMulticast(threading.Thread):
try:
content = msg.content()
log("INFO","[%s] [%s] Message received" % (self.source, content))
logger.info("[%s] [%s] Message received" % (self.source, content))
except Exception as e:
content = "<unknown>"
log("WARN","[%s] Non-compliant message received" % (self.source,))
logger.warning("[%s] Non-compliant message received" % (self.source,))
outdump.send(msg)
for outbus in outbusses:
outbus.send(msg)
inbus.ack(msg)
log("INFO", "[%s] [%s] Forwarded to %s" % (self.source, content, self.destlist))
logger.info("[%s] [%s] Forwarded to %s" % (self.source, content, self.destlist))
except Exception as e:
log("FATAL", "[%s] Caught exception: %s" % (self.source, e))
logger.fatal("[%s] Caught exception: %s" % (self.source, e))
# Abort everything, to avoid half-broken situations
cmdbus = messagebus.ToBus("messagerouter.command")
cmdbus.send("stop")
exit(1)
log("INFO", "[%s] Done" % (self.source,))
logger.info("[%s] Done" % (self.source,))
class RouterConfig(ConfigParser):
"""
......@@ -109,12 +110,14 @@ class RouterConfig(ConfigParser):
self.read(filename)
def read(self, filename):
log("INFO","[RouterConfig] Considering reading %s" % (filename,))
logger.info("[RouterConfig] Considering reading %s" % (filename,))
if not os.path.isfile(filename):
logger.info("[RouterConfig] not found...: %s" % (filename,))
return False
log("INFO","[RouterConfig] Reading %s" % (filename,))
logger.info("[RouterConfig] Reading %s" % (filename,))
ConfigParser.read(self, filename)
logger.info("[RouterConfig] Read %s" % (filename,))
return True
def sources(self):
......@@ -123,37 +126,11 @@ class RouterConfig(ConfigParser):
def destinations(self, source):
return [field.strip() for field in self.get('multicast', source).split(',')]
def process_command_queue(command_queue):
"""
Listen on the given queue, and process the received commands.
Supported commands:
stop - Do a graceful shutdown of this service
"""
log("INFO","[main] Listening for commands on %s" % (command_queue,))
cmdq = messagebus.FromBus(command_queue, "create: always, delete: always")
while True:
msg = cmdq.get(1.0)
if msg is None:
continue
command = msg.raw_content()
log("INFO","[main] Received command: %s" % (command,))
cmdq.ack(msg)
if command == "stop":
break
if __name__ == "__main__":
"""
Apply the routing specified in router.conf and router.conf.`hostname`;
both configration files are found in $LOFARROOT/etc, or . if $LOFARROOT is
not set.
Apply the routing specified in MessageRouter.conf and MessageRouter.conf.`hostname`;
both configration files are found in $LOFARROOT/etc.
The file in the current working dir takes precedence over $LOFARROOT/etc
Application runs forever, regardless of the number of routes. Also runs
forever if no routing is required, to keep behaviour consistent across
......@@ -163,6 +140,10 @@ if __name__ == "__main__":
import platform
import time
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger.info("Starting MessageRouter")
def hostname():
possibly_fqdn = platform.node().lower()
if '.' in possibly_fqdn:
......@@ -170,7 +151,9 @@ if __name__ == "__main__":
else:
return possibly_fqdn
if "LOFARROOT" in os.environ:
# always read MessageRouter.conf from current working directory if present
# otherwise try it from $LOFARROOT/etc
if "LOFARROOT" in os.environ and not os.path.exists("MessageRouter.conf"):
path = os.path.expandvars("$LOFARROOT/etc")
else:
path = "."
......@@ -180,7 +163,7 @@ if __name__ == "__main__":
# read host-specific config file
my_configfile = '%s/MessageRouter.conf.%s' % (path, hostname())
log("INFO", "Reading configuration file %s" % my_configfile)
logger.info("Reading configuration file %s" % my_configfile)
config.read(my_configfile)
threadlist = []
......@@ -194,12 +177,11 @@ if __name__ == "__main__":
threadlist.append(t)
log("INFO","[main] Running %s threads" % (len(threadlist),))
logger.info("[main] Running %s threads" % (len(threadlist),))
# Listen for commands on a special queue
process_command_queue("messagerouter.command")
waitForInterrupt()
log("INFO","[main] Shutting down")
logger.info("[main] Shutting down")
# signal end of processing
for t in threadlist:
......@@ -209,4 +191,4 @@ if __name__ == "__main__":
for t in threadlist:
t.join()
log("INFO","[main] Done")
logger.info("[main] Done")
......@@ -2,6 +2,9 @@
include(LofarCTest)
message(STATUS "QPID_FOUND = ${QPID_FOUND}")
message(STATUS "HAVE_QPID = ${HAVE_QPID}")
if(HAVE_QPID)
lofar_add_test(tMessageRouter)
endif(HAVE_QPID)
# t_messagebus.py: Test program for the module lofar.messaging.messagebus
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
"""
Test program for the module lofar.messaging.messagebus
"""
from copy import deepcopy
import os
import unittest
import logging
logger = logging.getLogger(__name__)
from lofar.messaging.messages import *
from lofar.messaging.messagebus import *
from subprocess import Popen
from signal import SIGINT
class TestMessageRouter(unittest.TestCase):
"""Test the MessageRouter class"""
def test_1(self):
"""
test 1
"""
with TemporaryQueue("in") as in_queue, \
TemporaryQueue("out1") as out1_queue, \
TemporaryQueue("out2") as out2_queue:
# write config file in cur working dir, so MessageRouter uses our tmp queues
config = "[multicast]\n%s: %s, %s\n" % (in_queue.address,out1_queue.address, out2_queue.address)
with open("MessageRouter.conf", 'w') as config_file:
config_file.write(config)
# create 2 receivers
with out1_queue.create_frombus() as recveiver1, \
out2_queue.create_frombus() as recveiver2:
# start a MessageRouter (which reads the config in cur-work-dir with our tmp queues)
# "fake" the PRODUCTION LOFARENV environment, so the queue names are not messed up.
env = deepcopy(os.environ)
env['LOFARENV'] = 'PRODUCTION'
proc = Popen(["MessageRouter"], env=env)
msg_in = EventMessage(content="foo")
with in_queue.create_tobus() as sender:
sender.send(msg_in)
msg_out1 = recveiver1.receive()
msg_out2 = recveiver2.receive()
self.assertEqual(msg_in.id, msg_out1.id)
self.assertEqual(msg_in.id, msg_out2.id)
self.assertEqual(msg_in.body, msg_out1.body)
self.assertEqual(msg_in.body, msg_out2.body)
proc.send_signal(SIGINT)
self.assertEqual(0, proc.wait(5))
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s TestMessageRouter %(levelname)s %(message)s', level=logging.DEBUG)
unittest.main()
#!/bin/bash -ev
#!/bin/bash -e
source MessageFuncs.sh
# Run the unit test
source python-coverage.sh
python_coverage_test "MessageRouter" tMessageRouter.py
unset LOFARROOT # Make sure MessageRouter reads ./MessageRouter.conf
# Allow timeouts on commands. Use: alarm <timeout> <cmd> <args>
alarm() { perl -e 'alarm shift; exec @ARGV' "$@"; }
#
# ---- TEST: normal configuration -----
#
# Create a routing configuration
create_queue in1
create_queue in2
create_queue out1
create_queue out2
create_queue out3
echo '
[multicast]
in1: out1, out2
in2: out3
' > MessageRouter.conf
# Start MessageRouter
alarm 60 python3 $srcdir/../src/MessageRouter >&2 &
PID=$!
# Inject messages into all input queues
send_msg in1 "test1"
send_msg in2 "test2"
# Check output queues, we only want ONE message in each!
[ "`wait_msg 1 out1`" == "test1" ]
wait_msg 0 out1 && exit 1
[ "`wait_msg 1 out2`" == "test1" ]
wait_msg 0 out2 && exit 1
[ "`wait_msg 1 out3`" == "test2" ]
wait_msg 0 out3 && exit 1
# Check whether input queues have been emptied
wait_msg 0 in1 && exit 1
wait_msg 0 in2 && exit 1
# Stop MessageRouter
send_msg messagerouter.command "stop"
wait $PID
#
# ---- TEST: non-existing input queue ----
#
echo '
[multicast]
in.notexist: out1
' > MessageRouter.conf
# Start MessageRouter -- should crash
python3 $srcdir/../src/MessageRouter >&2 || true
#
# ---- TEST: non-existing output queue ----
#
echo '
[multicast]
in1: out.notexist
' > MessageRouter.conf
# Start MessageRouter -- should crash
python3 $srcdir/../src/MessageRouter >&2 || true
#
# ---- TEST: fowarding to dump.<inputqueue> topic -----
#
echo '
[multicast]
in1: out1
' > MessageRouter.conf
# Start MessageRouter
python3 $srcdir/../src/MessageRouter >&2 &
PID=$!
# Wait for topic to become available
sleep 1
# Start listening already, since topics are not persistent!
wait_msg 3 dump.in1 > tMessageRouter.out &
MSGPID=$!
# Wait for listener to attach to topic
sleep 1
# Inject a message
send_msg in1 "test1"
# Receive the message and verify the content
wait $MSGPID
diff <(echo "test1") tMessageRouter.out
# Stop MessageRouter
send_msg messagerouter.command "stop"
wait $PID
# Technically unnecessary with "bash -e"
exit 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment