diff --git a/.gitattributes b/.gitattributes index 13bce7cc5d0b344ad0dcab25285e6326b8149dad..00f45ddf3b538cbac5280c1cb5f5ea033a54e3a5 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1581,6 +1581,7 @@ LCS/MessageDaemons/ObservationStartListener/test/tObservationStartListener.sh eo LCS/MessageDaemons/src/MessageRouter -text LCS/MessageDaemons/src/MessageRouter.conf.ccu001 -text LCS/MessageDaemons/src/MessageRouter.conf.ccu199 -text +LCS/MessageDaemons/test/tMessageRouter.py eol=lf LCS/MessageDaemons/test/tMessageRouter.run eol=lf LCS/MessageDaemons/test/tMessageRouter.sh eol=lf LCS/MessageDaemons/webmonitor/QPIDWebserverJSON -text diff --git a/LCS/MessageBus/src/messagebus.py b/LCS/MessageBus/src/messagebus.py index 3ab922a1c3d25dcd60b394cb846935addaff3adb..a36fdee46a5860f3d72847857bb5cd18f13c5872 100644 --- a/LCS/MessageBus/src/messagebus.py +++ b/LCS/MessageBus/src/messagebus.py @@ -37,6 +37,9 @@ options="create:never" logger=logging.getLogger("MessageBus") +#TODO: replace this version of the messagebus by the version in LCS/Messaging/python/messaging +logger.warning("This version of the messagebus (lofar.messagebus.messagebus) is deprecated and will be replaced by lofar.messaging.messagebus") + # Set to True if the caller parses LOFARENV -- required for surface between MessageBus and Messaging libs IGNORE_LOFARENV=False diff --git a/LCS/MessageDaemons/src/CMakeLists.txt b/LCS/MessageDaemons/src/CMakeLists.txt index 89821db1ee3b2a69c5c724524c9ea48865fec2db..a192ccdc382af4624674cc5b8835f545db1dee9a 100644 --- a/LCS/MessageDaemons/src/CMakeLists.txt +++ b/LCS/MessageDaemons/src/CMakeLists.txt @@ -7,14 +7,10 @@ set(messagedaemons_LIB_SRCS lofar_add_library(messagedaemons ${messagedaemons_LIB_SRCS}) -install(FILES - MessageRouter - DESTINATION bin) +lofar_add_bin_scripts(MessageRouter) file(GLOB MessageRouterConfs MessageRouter.conf* ) -install(FILES - ${MessageRouterConfs} - DESTINATION etc) +lofar_add_sysconf_files(${MessageRouterConfs}) diff --git a/LCS/MessageDaemons/src/MessageRouter b/LCS/MessageDaemons/src/MessageRouter old mode 100644 new mode 100755 index d3701b47b014ffd38d9ebad824975922ab6280f8..7e7ff2b06da6660ba91e70239b7757e77780fdfe --- a/LCS/MessageDaemons/src/MessageRouter +++ b/LCS/MessageDaemons/src/MessageRouter @@ -21,10 +21,14 @@ """ import logging -logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) +logger = logging.getLogger(__name__) + +#TODO: replace MessageRouter, and just configure the qpid broker to provide the same functionality. +logger.warning("MessageRouter is deprecated and should/will be replaced by using the qpid-broker's functionality to copy messages to multiple queue's.") import lofar.messagebus.messagebus as messagebus import lofar.messagebus.message as message +from lofar.common.util import waitForInterrupt import threading from configparser import ConfigParser @@ -32,8 +36,6 @@ import os.path import sys from datetime import datetime -def log(level, msg): - print("%s %-4s %s" % (str(datetime.now())[:-3], level, msg)) class BusMulticast(threading.Thread): """ @@ -44,17 +46,19 @@ class BusMulticast(threading.Thread): self.source = source self.destlist = destlist self.done = False + logger.info("Setting up multicast from %s to %s" % (self.source, self.destlist)) def stop(self): self.done = True def run(self): + logger.info("Starting multicast from %s to %s" % (self.source, self.destlist)) + try: inbus = messagebus.FromBus(self.source) outbusses = [messagebus.ToBus(dest) for dest in self.destlist] - outdump = messagebus.ToBus("dump.%s" % (self.source,), "create:always,delete:always,node:{type:topic}") - log("INFO","[%s] Forwarding to %s" % (self.source,self.destlist)) + logger.info("[%s] Forwarding to %s" % (self.source,self.destlist)) while not self.done: # TODO: Use a transaction (not supported yet in qpid 0.30) @@ -65,27 +69,24 @@ class BusMulticast(threading.Thread): try: content = msg.content() - log("INFO","[%s] [%s] Message received" % (self.source, content)) + logger.info("[%s] [%s] Message received" % (self.source, content)) except Exception as e: content = "<unknown>" - log("WARN","[%s] Non-compliant message received" % (self.source,)) + logger.warning("[%s] Non-compliant message received" % (self.source,)) - - outdump.send(msg) for outbus in outbusses: outbus.send(msg) inbus.ack(msg) - log("INFO", "[%s] [%s] Forwarded to %s" % (self.source, content, self.destlist)) + logger.info("[%s] [%s] Forwarded to %s" % (self.source, content, self.destlist)) except Exception as e: - log("FATAL", "[%s] Caught exception: %s" % (self.source, e)) + logger.fatal("[%s] Caught exception: %s" % (self.source, e)) # Abort everything, to avoid half-broken situations - cmdbus = messagebus.ToBus("messagerouter.command") - cmdbus.send("stop") + exit(1) - log("INFO", "[%s] Done" % (self.source,)) + logger.info("[%s] Done" % (self.source,)) class RouterConfig(ConfigParser): """ @@ -109,12 +110,14 @@ class RouterConfig(ConfigParser): self.read(filename) def read(self, filename): - log("INFO","[RouterConfig] Considering reading %s" % (filename,)) + logger.info("[RouterConfig] Considering reading %s" % (filename,)) if not os.path.isfile(filename): + logger.info("[RouterConfig] not found...: %s" % (filename,)) return False - log("INFO","[RouterConfig] Reading %s" % (filename,)) + logger.info("[RouterConfig] Reading %s" % (filename,)) ConfigParser.read(self, filename) + logger.info("[RouterConfig] Read %s" % (filename,)) return True def sources(self): @@ -123,37 +126,11 @@ class RouterConfig(ConfigParser): def destinations(self, source): return [field.strip() for field in self.get('multicast', source).split(',')] - -def process_command_queue(command_queue): - """ - Listen on the given queue, and process the received commands. - - Supported commands: - - stop - Do a graceful shutdown of this service - """ - - log("INFO","[main] Listening for commands on %s" % (command_queue,)) - - cmdq = messagebus.FromBus(command_queue, "create: always, delete: always") - while True: - msg = cmdq.get(1.0) - if msg is None: - continue - - command = msg.raw_content() - log("INFO","[main] Received command: %s" % (command,)) - - cmdq.ack(msg) - - if command == "stop": - break - if __name__ == "__main__": """ - Apply the routing specified in router.conf and router.conf.`hostname`; - both configration files are found in $LOFARROOT/etc, or . if $LOFARROOT is - not set. + Apply the routing specified in MessageRouter.conf and MessageRouter.conf.`hostname`; + both configration files are found in $LOFARROOT/etc. + The file in the current working dir takes precedence over $LOFARROOT/etc Application runs forever, regardless of the number of routes. Also runs forever if no routing is required, to keep behaviour consistent across @@ -163,6 +140,10 @@ if __name__ == "__main__": import platform import time + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + logger.info("Starting MessageRouter") + def hostname(): possibly_fqdn = platform.node().lower() if '.' in possibly_fqdn: @@ -170,7 +151,9 @@ if __name__ == "__main__": else: return possibly_fqdn - if "LOFARROOT" in os.environ: + # always read MessageRouter.conf from current working directory if present + # otherwise try it from $LOFARROOT/etc + if "LOFARROOT" in os.environ and not os.path.exists("MessageRouter.conf"): path = os.path.expandvars("$LOFARROOT/etc") else: path = "." @@ -180,7 +163,7 @@ if __name__ == "__main__": # read host-specific config file my_configfile = '%s/MessageRouter.conf.%s' % (path, hostname()) - log("INFO", "Reading configuration file %s" % my_configfile) + logger.info("Reading configuration file %s" % my_configfile) config.read(my_configfile) threadlist = [] @@ -194,12 +177,11 @@ if __name__ == "__main__": threadlist.append(t) - log("INFO","[main] Running %s threads" % (len(threadlist),)) + logger.info("[main] Running %s threads" % (len(threadlist),)) - # Listen for commands on a special queue - process_command_queue("messagerouter.command") + waitForInterrupt() - log("INFO","[main] Shutting down") + logger.info("[main] Shutting down") # signal end of processing for t in threadlist: @@ -209,4 +191,4 @@ if __name__ == "__main__": for t in threadlist: t.join() - log("INFO","[main] Done") + logger.info("[main] Done") diff --git a/LCS/MessageDaemons/test/CMakeLists.txt b/LCS/MessageDaemons/test/CMakeLists.txt index b9fdacc60db31c2c36597eefc1dc86c0d0630fc5..37d02646e99b64581256e618711a18daab925480 100644 --- a/LCS/MessageDaemons/test/CMakeLists.txt +++ b/LCS/MessageDaemons/test/CMakeLists.txt @@ -2,6 +2,9 @@ include(LofarCTest) +message(STATUS "QPID_FOUND = ${QPID_FOUND}") +message(STATUS "HAVE_QPID = ${HAVE_QPID}") + if(HAVE_QPID) lofar_add_test(tMessageRouter) endif(HAVE_QPID) diff --git a/LCS/MessageDaemons/test/tMessageRouter.py b/LCS/MessageDaemons/test/tMessageRouter.py new file mode 100755 index 0000000000000000000000000000000000000000..64d5822b9e687855b6a434bf34c6d78a12959670 --- /dev/null +++ b/LCS/MessageDaemons/test/tMessageRouter.py @@ -0,0 +1,84 @@ +# t_messagebus.py: Test program for the module lofar.messaging.messagebus +# +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id$ + +""" +Test program for the module lofar.messaging.messagebus +""" + +from copy import deepcopy +import os +import unittest +import logging +logger = logging.getLogger(__name__) + +from lofar.messaging.messages import * +from lofar.messaging.messagebus import * +from subprocess import Popen +from signal import SIGINT + +class TestMessageRouter(unittest.TestCase): + """Test the MessageRouter class""" + + def test_1(self): + """ + test 1 + """ + with TemporaryQueue("in") as in_queue, \ + TemporaryQueue("out1") as out1_queue, \ + TemporaryQueue("out2") as out2_queue: + + # write config file in cur working dir, so MessageRouter uses our tmp queues + config = "[multicast]\n%s: %s, %s\n" % (in_queue.address,out1_queue.address, out2_queue.address) + with open("MessageRouter.conf", 'w') as config_file: + config_file.write(config) + + + # create 2 receivers + with out1_queue.create_frombus() as recveiver1, \ + out2_queue.create_frombus() as recveiver2: + + # start a MessageRouter (which reads the config in cur-work-dir with our tmp queues) + # "fake" the PRODUCTION LOFARENV environment, so the queue names are not messed up. + env = deepcopy(os.environ) + env['LOFARENV'] = 'PRODUCTION' + proc = Popen(["MessageRouter"], env=env) + + msg_in = EventMessage(content="foo") + with in_queue.create_tobus() as sender: + sender.send(msg_in) + + msg_out1 = recveiver1.receive() + msg_out2 = recveiver2.receive() + + self.assertEqual(msg_in.id, msg_out1.id) + self.assertEqual(msg_in.id, msg_out2.id) + + self.assertEqual(msg_in.body, msg_out1.body) + self.assertEqual(msg_in.body, msg_out2.body) + + proc.send_signal(SIGINT) + self.assertEqual(0, proc.wait(5)) + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s TestMessageRouter %(levelname)s %(message)s', level=logging.DEBUG) + unittest.main() diff --git a/LCS/MessageDaemons/test/tMessageRouter.run b/LCS/MessageDaemons/test/tMessageRouter.run index 1b7503aedbde3cbe3ca23a42fe632d7edc164419..16185cc3efae3d0a02e6da273c45eb650be17fbf 100755 --- a/LCS/MessageDaemons/test/tMessageRouter.run +++ b/LCS/MessageDaemons/test/tMessageRouter.run @@ -1,111 +1,7 @@ -#!/bin/bash -ev +#!/bin/bash -e -source MessageFuncs.sh +# Run the unit test +source python-coverage.sh +python_coverage_test "MessageRouter" tMessageRouter.py -unset LOFARROOT # Make sure MessageRouter reads ./MessageRouter.conf - -# Allow timeouts on commands. Use: alarm <timeout> <cmd> <args> -alarm() { perl -e 'alarm shift; exec @ARGV' "$@"; } - -# -# ---- TEST: normal configuration ----- -# - -# Create a routing configuration -create_queue in1 -create_queue in2 -create_queue out1 -create_queue out2 -create_queue out3 - -echo ' -[multicast] -in1: out1, out2 -in2: out3 -' > MessageRouter.conf - -# Start MessageRouter -alarm 60 python3 $srcdir/../src/MessageRouter >&2 & -PID=$! - -# Inject messages into all input queues -send_msg in1 "test1" -send_msg in2 "test2" - -# Check output queues, we only want ONE message in each! -[ "`wait_msg 1 out1`" == "test1" ] -wait_msg 0 out1 && exit 1 -[ "`wait_msg 1 out2`" == "test1" ] -wait_msg 0 out2 && exit 1 -[ "`wait_msg 1 out3`" == "test2" ] -wait_msg 0 out3 && exit 1 - -# Check whether input queues have been emptied -wait_msg 0 in1 && exit 1 -wait_msg 0 in2 && exit 1 - -# Stop MessageRouter -send_msg messagerouter.command "stop" -wait $PID - -# -# ---- TEST: non-existing input queue ---- -# - -echo ' -[multicast] -in.notexist: out1 -' > MessageRouter.conf - -# Start MessageRouter -- should crash -python3 $srcdir/../src/MessageRouter >&2 || true - -# -# ---- TEST: non-existing output queue ---- -# - -echo ' -[multicast] -in1: out.notexist -' > MessageRouter.conf - -# Start MessageRouter -- should crash -python3 $srcdir/../src/MessageRouter >&2 || true - -# -# ---- TEST: fowarding to dump.<inputqueue> topic ----- -# - -echo ' -[multicast] -in1: out1 -' > MessageRouter.conf - -# Start MessageRouter -python3 $srcdir/../src/MessageRouter >&2 & -PID=$! - -# Wait for topic to become available -sleep 1 - -# Start listening already, since topics are not persistent! -wait_msg 3 dump.in1 > tMessageRouter.out & -MSGPID=$! - -# Wait for listener to attach to topic -sleep 1 - -# Inject a message -send_msg in1 "test1" - -# Receive the message and verify the content -wait $MSGPID -diff <(echo "test1") tMessageRouter.out - -# Stop MessageRouter -send_msg messagerouter.command "stop" -wait $PID - -# Technically unnecessary with "bash -e" -exit 0