diff --git a/LCS/MessageDaemons/src/MessageRouter b/LCS/MessageDaemons/src/MessageRouter index 0bf2b1a457c0b5d6b9f3672895a30e9ebfde81b6..2ead89f38cf41b3020c04d095e1296f9605d316c 100644 --- a/LCS/MessageDaemons/src/MessageRouter +++ b/LCS/MessageDaemons/src/MessageRouter @@ -116,6 +116,32 @@ class RouterConfig(SafeConfigParser): def destinations(self, source): return [field.strip() for field in self.get('multicast', source).split(',')] + +def process_command_queue(command_queue): + """ + Listen on the given queue, and process the received commands. + + Supported commands: + + stop - Do a graceful shutdown of this service + """ + + log("INFO","[main] Listening for commands on %s" % (command_queue,)) + + cmdq = msgbus.FromBus(command_queue, "create: always, delete: always") + while True: + msg = cmdq.get(1.0) + if msg is None: + continue + + command = msg.raw_content() + log("INFO","[main] Received command: %s" % (command,)) + + cmdq.ack(msg) + + if command == "stop": + break + if __name__ == "__main__": """ Apply the routing specified in router.conf and router.conf.`hostname`; @@ -156,23 +182,7 @@ if __name__ == "__main__": log("INFO","[main] Running %s threads" % (len(threadlist),)) # Listen for commands on a special queue - command_queue = "messagerouter.command" - - log("INFO","[main] Listening for commands on %s" % (command_queue,)) - - cmdq = msgbus.FromBus(command_queue, "create: always, delete: always") - while True: - msg = cmdq.get(1.0) - if msg is None: - continue - - command = msg.raw_content() - log("INFO","[main] Received command: %s" % (command,)) - - cmdq.ack(msg) - - if command == "stop": - break + process_command_queue("messagerouter.command") log("INFO","[main] Shutting down")