From a3f0b26f728dee232a42330db6bcc4c77b5a90fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20J=C3=BCrges?= <jurges@astron.nl> Date: Mon, 26 Nov 2018 14:42:37 +0000 Subject: [PATCH] Task SW-533: Add client function to stop data writers --- .gitattributes | 1 + .../TBB/TBBClient/bin/tbb_stop_datawriters | 31 ++++++++++++++ MAC/Services/TBB/TBBClient/lib/__init__.py | 42 ++++++++++++------- 3 files changed, 58 insertions(+), 16 deletions(-) create mode 100755 MAC/Services/TBB/TBBClient/bin/tbb_stop_datawriters diff --git a/.gitattributes b/.gitattributes index b1ab61338c4..53a7641bfd0 100644 --- a/.gitattributes +++ b/.gitattributes @@ -3663,6 +3663,7 @@ MAC/Services/TBB/CMakeLists.txt -text MAC/Services/TBB/TBBClient/CMakeLists.txt -text MAC/Services/TBB/TBBClient/bin/CMakeLists.txt -text MAC/Services/TBB/TBBClient/bin/tbb_start_datawriters -text +MAC/Services/TBB/TBBClient/bin/tbb_stop_datawriters -text MAC/Services/TBB/TBBClient/lib/CMakeLists.txt -text MAC/Services/TBB/TBBClient/lib/__init__.py -text MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py -text diff --git a/MAC/Services/TBB/TBBClient/bin/tbb_stop_datawriters b/MAC/Services/TBB/TBBClient/bin/tbb_stop_datawriters new file mode 100755 index 00000000000..14b0f7fffa2 --- /dev/null +++ b/MAC/Services/TBB/TBBClient/bin/tbb_stop_datawriters @@ -0,0 +1,31 @@ +#!/usr/bin/python + +import logging +from optparse import OptionParser +from lofar.mac.tbbservice.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER +from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_BUSNAME +from lofar.mac.tbbservice.client import stop_datawriters_and_wait_until_finished + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = OptionParser('%prog [options]', + description='issue a command to the tbb service to stop the datawriters, and wait until they are done.') + parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') + + parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string", + default=DEFAULT_TBB_BUSNAME, + help="Name of the bus on which the tbb service listens. [default: %default]") + parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string", + default=DEFAULT_TBB_SERVICENAME, + help="Name of the tbb service. [default: %default]") + parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string", + default=DEFAULT_TBB_NOTIFICATION_BUSNAME, + help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default') + (options, args) = parser.parse_args() + + stop_datawriters_and_wait_until_finished(service_busname=options.tbb_service_busname, + service_name=options.tbb_service_name, + notification_busname=options.tbb_notification_busname, + broker=options.broker) diff --git a/MAC/Services/TBB/TBBClient/lib/__init__.py b/MAC/Services/TBB/TBBClient/lib/__init__.py index 1af728131ad..63b263683ce 100644 --- a/MAC/Services/TBB/TBBClient/lib/__init__.py +++ b/MAC/Services/TBB/TBBClient/lib/__init__.py @@ -10,6 +10,21 @@ from lofar.mac.tbb.config import DEFAULT_TBB_NOTIFICATION_BUSNAME from lofar.mac.tbb.client.rpc import TBBRPC from lofar.mac.tbb.client.tbbbuslistener import TBBBusListener +class Waiter(TBBBusListener): + '''Helper class which overrides TBBBusListener.onDataWritersFinished + and waits for synchronization using threading Events''' + def __init__(self, busname=notification_busname, broker=broker, **kwargs): + self.wait_event = Event() + super(Waiter, self).__init__(busname=busname, broker=broker, **kwargs) + + def onDataWritersFinished(self, msg_content): + # signal that we're done waiting + self.wait_event.set() + + def wait(self, timeout=24*3600): + # wait until onDataWritersFinished + self.wait_event.wait(timeout) + def start_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAME, service_name=DEFAULT_TBB_SERVICENAME, notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME, @@ -17,24 +32,19 @@ def start_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAM ''' convenience method which issues a start_datawriters command to the tbbservice, and waits until all writers are done ''' - - class Waiter(TBBBusListener): - '''Helper class which overrides TBBBusListener.onDataWritersFinished - and waits for synchronization using threading Events''' - def __init__(self, busname=notification_busname, broker=broker, **kwargs): - self.wait_event = Event() - super(Waiter, self).__init__(busname=busname, broker=broker, **kwargs) - - def onDataWritersFinished(self, msg_content): - # signal that we're done waiting - self.wait_event.set() - - def wait(self, timeout=24*3600): - # wait until onDataWritersFinished - self.wait_event.wait(timeout) - with Waiter() as waiter: with TBBRPC(busname=service_busname, servicename=service_name, broker=broker) as rpc: rpc.start_datawriters() waiter.wait() +def stop_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAME, + service_name=DEFAULT_TBB_SERVICENAME, + notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME, + broker=DEFAULT_BROKER): + ''' + convenience method which issues a stop_datawriters command to the tbbservice, and waits until all writers are done + ''' + with Waiter() as waiter: + with TBBRPC(busname=service_busname, servicename=service_name, broker=broker) as rpc: + rpc.stop_datawriters() + waiter.wait() -- GitLab