Skip to content
Snippets Groups Projects
Commit a3f0b26f authored by Thomas Jürges's avatar Thomas Jürges
Browse files

Task SW-533: Add client function to stop data writers

parent 63a21729
No related branches found
No related tags found
No related merge requests found
...@@ -3663,6 +3663,7 @@ MAC/Services/TBB/CMakeLists.txt -text ...@@ -3663,6 +3663,7 @@ MAC/Services/TBB/CMakeLists.txt -text
MAC/Services/TBB/TBBClient/CMakeLists.txt -text MAC/Services/TBB/TBBClient/CMakeLists.txt -text
MAC/Services/TBB/TBBClient/bin/CMakeLists.txt -text MAC/Services/TBB/TBBClient/bin/CMakeLists.txt -text
MAC/Services/TBB/TBBClient/bin/tbb_start_datawriters -text MAC/Services/TBB/TBBClient/bin/tbb_start_datawriters -text
MAC/Services/TBB/TBBClient/bin/tbb_stop_datawriters -text
MAC/Services/TBB/TBBClient/lib/CMakeLists.txt -text MAC/Services/TBB/TBBClient/lib/CMakeLists.txt -text
MAC/Services/TBB/TBBClient/lib/__init__.py -text MAC/Services/TBB/TBBClient/lib/__init__.py -text
MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py -text MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py -text
......
#!/usr/bin/python
import logging
from optparse import OptionParser
from lofar.mac.tbbservice.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER
from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_BUSNAME
from lofar.mac.tbbservice.client import stop_datawriters_and_wait_until_finished
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='issue a command to the tbb service to stop the datawriters, and wait until they are done.')
parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default')
parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string",
default=DEFAULT_TBB_BUSNAME,
help="Name of the bus on which the tbb service listens. [default: %default]")
parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string",
default=DEFAULT_TBB_SERVICENAME,
help="Name of the tbb service. [default: %default]")
parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string",
default=DEFAULT_TBB_NOTIFICATION_BUSNAME,
help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default')
(options, args) = parser.parse_args()
stop_datawriters_and_wait_until_finished(service_busname=options.tbb_service_busname,
service_name=options.tbb_service_name,
notification_busname=options.tbb_notification_busname,
broker=options.broker)
...@@ -10,6 +10,21 @@ from lofar.mac.tbb.config import DEFAULT_TBB_NOTIFICATION_BUSNAME ...@@ -10,6 +10,21 @@ from lofar.mac.tbb.config import DEFAULT_TBB_NOTIFICATION_BUSNAME
from lofar.mac.tbb.client.rpc import TBBRPC from lofar.mac.tbb.client.rpc import TBBRPC
from lofar.mac.tbb.client.tbbbuslistener import TBBBusListener from lofar.mac.tbb.client.tbbbuslistener import TBBBusListener
class Waiter(TBBBusListener):
'''Helper class which overrides TBBBusListener.onDataWritersFinished
and waits for synchronization using threading Events'''
def __init__(self, busname=notification_busname, broker=broker, **kwargs):
self.wait_event = Event()
super(Waiter, self).__init__(busname=busname, broker=broker, **kwargs)
def onDataWritersFinished(self, msg_content):
# signal that we're done waiting
self.wait_event.set()
def wait(self, timeout=24*3600):
# wait until onDataWritersFinished
self.wait_event.wait(timeout)
def start_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAME, def start_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAME,
service_name=DEFAULT_TBB_SERVICENAME, service_name=DEFAULT_TBB_SERVICENAME,
notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME, notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME,
...@@ -17,24 +32,19 @@ def start_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAM ...@@ -17,24 +32,19 @@ def start_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAM
''' '''
convenience method which issues a start_datawriters command to the tbbservice, and waits until all writers are done convenience method which issues a start_datawriters command to the tbbservice, and waits until all writers are done
''' '''
class Waiter(TBBBusListener):
'''Helper class which overrides TBBBusListener.onDataWritersFinished
and waits for synchronization using threading Events'''
def __init__(self, busname=notification_busname, broker=broker, **kwargs):
self.wait_event = Event()
super(Waiter, self).__init__(busname=busname, broker=broker, **kwargs)
def onDataWritersFinished(self, msg_content):
# signal that we're done waiting
self.wait_event.set()
def wait(self, timeout=24*3600):
# wait until onDataWritersFinished
self.wait_event.wait(timeout)
with Waiter() as waiter: with Waiter() as waiter:
with TBBRPC(busname=service_busname, servicename=service_name, broker=broker) as rpc: with TBBRPC(busname=service_busname, servicename=service_name, broker=broker) as rpc:
rpc.start_datawriters() rpc.start_datawriters()
waiter.wait() waiter.wait()
def stop_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAME,
service_name=DEFAULT_TBB_SERVICENAME,
notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME,
broker=DEFAULT_BROKER):
'''
convenience method which issues a stop_datawriters command to the tbbservice, and waits until all writers are done
'''
with Waiter() as waiter:
with TBBRPC(busname=service_busname, servicename=service_name, broker=broker) as rpc:
rpc.stop_datawriters()
waiter.wait()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment