From 7f929c9d06a739876443a1b8650f60250b601792 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Wed, 14 Nov 2018 11:39:08 +0000 Subject: [PATCH] SW-507: initial skeleton implementation of the tbbservice + the rpc client + the needed start_datawriters_and_wait_until_finished method + a helper/test bin script tbb_start_datawriters --- .gitattributes | 19 +++ CMake/LofarPackageList.cmake | 7 +- MAC/Services/CMakeLists.txt | 5 +- MAC/Services/TBB/CMakeLists.txt | 9 ++ MAC/Services/TBB/TBBClient/CMakeLists.txt | 6 + MAC/Services/TBB/TBBClient/bin/CMakeLists.txt | 3 + .../TBB/TBBClient/bin/tbb_start_datawriters | 31 ++++ MAC/Services/TBB/TBBClient/lib/CMakeLists.txt | 5 + MAC/Services/TBB/TBBClient/lib/__init__.py | 40 +++++ MAC/Services/TBB/TBBClient/lib/rpc.py | 26 +++ .../TBB/TBBClient/lib/tbbbuslistener.py | 110 +++++++++++++ MAC/Services/TBB/TBBServer/CMakeLists.txt | 5 + MAC/Services/TBB/TBBServer/bin/CMakeLists.txt | 8 + MAC/Services/TBB/TBBServer/bin/tbbservice | 5 + MAC/Services/TBB/TBBServer/bin/tbbservice.ini | 8 + MAC/Services/TBB/TBBServer/lib/CMakeLists.txt | 5 + MAC/Services/TBB/TBBServer/lib/tbbservice.py | 149 ++++++++++++++++++ .../TBB/TBBServer/test/CMakeLists.txt | 5 + .../TBB/TBBServer/test/t_tbbserver.py | 57 +++++++ .../TBB/TBBServer/test/t_tbbserver.run | 6 + .../TBB/TBBServer/test/t_tbbserver.sh | 3 + MAC/Services/TBB/config.py | 11 ++ 22 files changed, 519 insertions(+), 4 deletions(-) create mode 100644 MAC/Services/TBB/CMakeLists.txt create mode 100644 MAC/Services/TBB/TBBClient/CMakeLists.txt create mode 100644 MAC/Services/TBB/TBBClient/bin/CMakeLists.txt create mode 100755 MAC/Services/TBB/TBBClient/bin/tbb_start_datawriters create mode 100644 MAC/Services/TBB/TBBClient/lib/CMakeLists.txt create mode 100644 MAC/Services/TBB/TBBClient/lib/__init__.py create mode 100644 MAC/Services/TBB/TBBClient/lib/rpc.py create mode 100644 MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py create mode 100644 MAC/Services/TBB/TBBServer/CMakeLists.txt create mode 100644 MAC/Services/TBB/TBBServer/bin/CMakeLists.txt create mode 100755 MAC/Services/TBB/TBBServer/bin/tbbservice create mode 100644 MAC/Services/TBB/TBBServer/bin/tbbservice.ini create mode 100644 MAC/Services/TBB/TBBServer/lib/CMakeLists.txt create mode 100644 MAC/Services/TBB/TBBServer/lib/tbbservice.py create mode 100644 MAC/Services/TBB/TBBServer/test/CMakeLists.txt create mode 100755 MAC/Services/TBB/TBBServer/test/t_tbbserver.py create mode 100755 MAC/Services/TBB/TBBServer/test/t_tbbserver.run create mode 100755 MAC/Services/TBB/TBBServer/test/t_tbbserver.sh create mode 100644 MAC/Services/TBB/config.py diff --git a/.gitattributes b/.gitattributes index 4e01e398cc3..e8fbc310778 100644 --- a/.gitattributes +++ b/.gitattributes @@ -3655,6 +3655,25 @@ MAC/Navigator2/scripts/readStationConfigs.ctl -text MAC/Navigator2/scripts/readStationConnections.ctl -text MAC/Navigator2/scripts/setSumAlerts.ctl -text MAC/Navigator2/scripts/transferMPs.ctl -text +MAC/Services/TBB/CMakeLists.txt -text +MAC/Services/TBB/TBBClient/CMakeLists.txt -text +MAC/Services/TBB/TBBClient/bin/CMakeLists.txt -text +MAC/Services/TBB/TBBClient/bin/tbb_start_datawriters -text +MAC/Services/TBB/TBBClient/lib/CMakeLists.txt -text +MAC/Services/TBB/TBBClient/lib/__init__.py -text +MAC/Services/TBB/TBBClient/lib/rpc.py -text +MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py -text +MAC/Services/TBB/TBBServer/CMakeLists.txt -text +MAC/Services/TBB/TBBServer/bin/CMakeLists.txt -text +MAC/Services/TBB/TBBServer/bin/tbbservice -text +MAC/Services/TBB/TBBServer/bin/tbbservice.ini -text +MAC/Services/TBB/TBBServer/lib/CMakeLists.txt -text +MAC/Services/TBB/TBBServer/lib/tbbservice.py -text +MAC/Services/TBB/TBBServer/test/CMakeLists.txt -text +MAC/Services/TBB/TBBServer/test/t_tbbserver.py -text +MAC/Services/TBB/TBBServer/test/t_tbbserver.run -text +MAC/Services/TBB/TBBServer/test/t_tbbserver.sh -text +MAC/Services/TBB/config.py -text MAC/Services/TaskManagement/CMakeLists.txt -text MAC/Services/TaskManagement/Client/CMakeLists.txt -text MAC/Services/TaskManagement/Client/lib/CMakeLists.txt -text diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index ef02966164e..a992e22a9f3 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -1,7 +1,7 @@ # - Create for each LOFAR package a variable containing the absolute path to # its source directory. # -# Generated by gen_LofarPackageList_cmake.sh at wo 18 jul 2018 15:04:44 CEST +# Generated by gen_LofarPackageList_cmake.sh at di 13 nov 2018 15:44:22 CET # # ---- DO NOT EDIT ---- # @@ -36,9 +36,9 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(TestDynDPPP_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/DP3/TestDynDPPP) set(PythonDPPP_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/DP3/PythonDPPP) set(DPPP_AOFlag_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/DP3/DPPP_AOFlag) - set(DPPP_Interpolate_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/DP3/DPPP_Interpolate) set(SPW_Combine_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/DP3/SPWCombine) set(DPPP_DDECal_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/DP3/DPPP_DDECal) + set(DPPP_Interpolate_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/DP3/DPPP_Interpolate) set(LofarFT_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/Imager/LofarFT) set(AWImager2_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/Imager/AWImager2) set(Laps-GRIDInterface_SOURCE_DIR ${CMAKE_SOURCE_DIR}/CEP/LAPS/GRIDInterface) @@ -147,6 +147,9 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(GCFTM_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/GCF/TM) set(GCFPVSS_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/GCF/PVSS) set(GCFRTDB_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/GCF/RTDB) + set(TBB_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Services/TBB) + set(TBBClient_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Services/TBB/TBBClient) + set(TBBServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Services/TBB/TBBServer) set(TaskManagementClient_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Services/TaskManagement/Client) set(TaskManagementCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Services/TaskManagement/Common) set(TaskManagementServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Services/TaskManagement/Server) diff --git a/MAC/Services/CMakeLists.txt b/MAC/Services/CMakeLists.txt index 67a2918d974..4791d0668b0 100644 --- a/MAC/Services/CMakeLists.txt +++ b/MAC/Services/CMakeLists.txt @@ -1,6 +1,7 @@ # $Id$ -lofar_package(MAC_Services 1.0 DEPENDS PyMessaging OTDB_Services pyparameterset Docker ResourceAssignmentService) +lofar_package(MAC_Services 1.0 DEPENDS PyMessaging OTDB_Services pyparameterset Docker ResourceAssignmentService TBB) add_subdirectory(src) -add_subdirectory(test) \ No newline at end of file +add_subdirectory(test) +lofar_add_package(TBB) diff --git a/MAC/Services/TBB/CMakeLists.txt b/MAC/Services/TBB/CMakeLists.txt new file mode 100644 index 00000000000..2214fe290a2 --- /dev/null +++ b/MAC/Services/TBB/CMakeLists.txt @@ -0,0 +1,9 @@ + +include(PythonInstall) +python_install(config.py + DESTINATION lofar/mac/tbb) + +lofar_add_package(TBBClient) +lofar_add_package(TBBServer) + +lofar_package(TBB 2.0 DEPENDS TBBClient TBBServer) diff --git a/MAC/Services/TBB/TBBClient/CMakeLists.txt b/MAC/Services/TBB/TBBClient/CMakeLists.txt new file mode 100644 index 00000000000..b541e8c2701 --- /dev/null +++ b/MAC/Services/TBB/TBBClient/CMakeLists.txt @@ -0,0 +1,6 @@ +lofar_package(TBBClient 2.0 DEPENDS PyMessaging PyCommon) + +include(PythonInstall) + +add_subdirectory(lib) +add_subdirectory(bin) diff --git a/MAC/Services/TBB/TBBClient/bin/CMakeLists.txt b/MAC/Services/TBB/TBBClient/bin/CMakeLists.txt new file mode 100644 index 00000000000..272729e00aa --- /dev/null +++ b/MAC/Services/TBB/TBBClient/bin/CMakeLists.txt @@ -0,0 +1,3 @@ + +lofar_add_bin_scripts(tbb_start_datawriters) + diff --git a/MAC/Services/TBB/TBBClient/bin/tbb_start_datawriters b/MAC/Services/TBB/TBBClient/bin/tbb_start_datawriters new file mode 100755 index 00000000000..4d9e8812296 --- /dev/null +++ b/MAC/Services/TBB/TBBClient/bin/tbb_start_datawriters @@ -0,0 +1,31 @@ +#!/usr/bin/python + +import logging +from optparse import OptionParser +from lofar.mac.tbb.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER +from lofar.mac.tbb.config import DEFAULT_TBB_NOTIFICATION_BUSNAME +from lofar.mac.tbb.client import start_datawriters_and_wait_until_finished + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = OptionParser('%prog [options]', + description='issue a command to the tbb service to start the datawriters, and wait until they are done.') + parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') + + parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string", + default=DEFAULT_TBB_BUSNAME, + help="Name of the bus on which the tbb service listens. [default: %default]") + parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string", + default=DEFAULT_TBB_SERVICENAME, + help="Name of the tbb service. [default: %default]") + parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string", + default=DEFAULT_TBB_NOTIFICATION_BUSNAME, + help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default') + (options, args) = parser.parse_args() + + start_datawriters_and_wait_until_finished(service_busname=options.tbb_service_busname, + service_name=options.tbb_service_name, + notification_busname=options.tbb_notification_busname, + broker=options.broker) diff --git a/MAC/Services/TBB/TBBClient/lib/CMakeLists.txt b/MAC/Services/TBB/TBBClient/lib/CMakeLists.txt new file mode 100644 index 00000000000..c08c73edc4b --- /dev/null +++ b/MAC/Services/TBB/TBBClient/lib/CMakeLists.txt @@ -0,0 +1,5 @@ + +python_install(__init__.py + tbbbuslistener.py + rpc.py + DESTINATION lofar/mac/tbb/client) diff --git a/MAC/Services/TBB/TBBClient/lib/__init__.py b/MAC/Services/TBB/TBBClient/lib/__init__.py new file mode 100644 index 00000000000..1af728131ad --- /dev/null +++ b/MAC/Services/TBB/TBBClient/lib/__init__.py @@ -0,0 +1,40 @@ +#!/usr/bin/python + +from threading import Event + +import logging +logger = logging.getLogger(__name__) + +from lofar.mac.tbb.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER +from lofar.mac.tbb.config import DEFAULT_TBB_NOTIFICATION_BUSNAME +from lofar.mac.tbb.client.rpc import TBBRPC +from lofar.mac.tbb.client.tbbbuslistener import TBBBusListener + +def start_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAME, + service_name=DEFAULT_TBB_SERVICENAME, + notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME, + broker=DEFAULT_BROKER): + ''' + convenience method which issues a start_datawriters command to the tbbservice, and waits until all writers are done + ''' + + class Waiter(TBBBusListener): + '''Helper class which overrides TBBBusListener.onDataWritersFinished + and waits for synchronization using threading Events''' + def __init__(self, busname=notification_busname, broker=broker, **kwargs): + self.wait_event = Event() + super(Waiter, self).__init__(busname=busname, broker=broker, **kwargs) + + def onDataWritersFinished(self, msg_content): + # signal that we're done waiting + self.wait_event.set() + + def wait(self, timeout=24*3600): + # wait until onDataWritersFinished + self.wait_event.wait(timeout) + + with Waiter() as waiter: + with TBBRPC(busname=service_busname, servicename=service_name, broker=broker) as rpc: + rpc.start_datawriters() + waiter.wait() + diff --git a/MAC/Services/TBB/TBBClient/lib/rpc.py b/MAC/Services/TBB/TBBClient/lib/rpc.py new file mode 100644 index 00000000000..c538ff88e3e --- /dev/null +++ b/MAC/Services/TBB/TBBClient/lib/rpc.py @@ -0,0 +1,26 @@ +#!/usr/bin/python + +import logging +from lofar.messaging.RPC import RPC, RPCException, RPCWrapper +from lofar.mac.tbb.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER + +logger = logging.getLogger(__name__) + +class TBBRPC(RPCWrapper): + def __init__(self, busname=DEFAULT_TBB_BUSNAME, + servicename=DEFAULT_TBB_SERVICENAME, + broker=DEFAULT_BROKER): + super(TBBRPC, self).__init__(busname=busname, + servicename=servicename, + broker=broker) + + def start_datawriters(self): + return self.rpc('start_datawriters') + +if __name__ == '__main__': + '''little example usage''' + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', + level=logging.INFO) + import pprint + with TBBRPC() as rpc: + pprint.pprint(rpc.start_datawriters()) diff --git a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py new file mode 100644 index 00000000000..6e00667e0d0 --- /dev/null +++ b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python + +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# + +from lofar.common.util import humanreadablesize +from lofar.messaging.messagebus import AbstractBusListener +from lofar.mac.tbb.config import DEFAULT_TBB_NOTIFICATION_BUSNAME, DEFAULT_TBB_NOTIFICATION_SUBJECTS, DEFAULT_BROKER + +import logging +logger = logging.getLogger() + +class TBBBusListener(AbstractBusListener): + def __init__(self, busname=DEFAULT_TBB_NOTIFICATION_BUSNAME, subjects=DEFAULT_TBB_NOTIFICATION_SUBJECTS, broker=DEFAULT_BROKER, **kwargs): + """ + TBBBusListener listens on the given notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. + Typical usage is to derive your own subclass from TBBBusListener and implement the specific on<SomeMessage> methods that you are interested in. + :param busname: valid Qpid address + :param subjects: the subjects filter string to listen for. + :param broker: valid Qpid broker host + """ + self.subject_prefix = (subjects.split('.')[0]+'.') if '.' in subjects else '' + + address = "%s/%s" % (busname, subjects) + super(TBBBusListener, self).__init__(address, broker, **kwargs) + + def _handleMessage(self, msg): + # try to handle an incoming message, and call the associated on<SomeMessage> method + try: + logger.info("on%s: %s", msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')) + + if msg.subject == '%sTBBServiceStarted' % self.subject_prefix: + self.onTBBServiceStarted(msg.content) + elif msg.subject == '%sTBBServiceStopped' % self.subject_prefix: + self.onTBBServiceStopped(msg.content) + elif msg.subject == '%sDataWritersStarted' % self.subject_prefix: + self.onDataWritersStarted(msg.content) + elif msg.subject == '%sDataWritersFinished' % self.subject_prefix: + self.onDataWritersFinished(msg.content) + else: + logger.error("TBBBusListener.handleMessage: unknown subject: %s", msg.subject) + except Exception as e: + logger.exception("TBBBusListener.handleMessage: %s", e) + raise + + def onTBBServiceStarted(self, msg_content): + '''onTBBServiceStarted is called upon receiving a TBBServiceStarted message. + :param msg_content: content of the TBBServiceStarted message.''' + pass + + def onTBBServiceStopped(self, msg_content): + '''onTBBServiceStopped is called upon receiving a TBBServiceStopped message. + :param msg_content: content of the TBBServiceStopped message.''' + pass + + def onDataWritersStarted(self, msg_content): + '''onDataWritersStarted is called upon receiving a DataWritersStarted message. + :param msg_content: dictionary with the info on which datawrited started on which host/port''' + pass + + def onDataWritersFinished(self, msg_content): + '''onDataWritersFinished is called upon receiving a DataWritersFinished message. + :param msg_content: dictionary with the info on which datawrited started on which host/port''' + pass + + +def main(): + from lofar.common.util import waitForInterrupt + from optparse import OptionParser + import os, sys + + # make sure we run in UTC timezone + os.environ['TZ'] = 'UTC' + + # Check the invocation arguments + parser = OptionParser('%prog [options]', + description='run the ingest job monitor') + parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') + parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string", + default=DEFAULT_TBB_NOTIFICATION_BUSNAME, + help='Name of the notification exchange where to listen for the published tbb notifications, default: %default') + (options, args) = parser.parse_args() + + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', + level=logging.INFO, + stream=sys.stdout) + + with TBBBusListener(busname=options.tbb_notification_busname, + subjects=DEFAULT_TBB_NOTIFICATION_SUBJECTS, + broker=options.broker): + waitForInterrupt() + +if __name__ == '__main__': + main() diff --git a/MAC/Services/TBB/TBBServer/CMakeLists.txt b/MAC/Services/TBB/TBBServer/CMakeLists.txt new file mode 100644 index 00000000000..9fcff3e3d1d --- /dev/null +++ b/MAC/Services/TBB/TBBServer/CMakeLists.txt @@ -0,0 +1,5 @@ +lofar_package(TBBServer 2.0 DEPENDS TBBClient PyMessaging) + +add_subdirectory(lib) +add_subdirectory(bin) +add_subdirectory(test) diff --git a/MAC/Services/TBB/TBBServer/bin/CMakeLists.txt b/MAC/Services/TBB/TBBServer/bin/CMakeLists.txt new file mode 100644 index 00000000000..ae00445ca87 --- /dev/null +++ b/MAC/Services/TBB/TBBServer/bin/CMakeLists.txt @@ -0,0 +1,8 @@ + +lofar_add_bin_scripts(tbbservice) + +# supervisord config files +install(FILES + tbbservice.ini + DESTINATION etc/supervisord.d) + diff --git a/MAC/Services/TBB/TBBServer/bin/tbbservice b/MAC/Services/TBB/TBBServer/bin/tbbservice new file mode 100755 index 00000000000..9ab556b77d3 --- /dev/null +++ b/MAC/Services/TBB/TBBServer/bin/tbbservice @@ -0,0 +1,5 @@ +#!/usr/bin/python + +if __name__ == '__main__': + from lofar.mac.tbb.server.tbbservice import main + main() diff --git a/MAC/Services/TBB/TBBServer/bin/tbbservice.ini b/MAC/Services/TBB/TBBServer/bin/tbbservice.ini new file mode 100644 index 00000000000..39d7d8ec325 --- /dev/null +++ b/MAC/Services/TBB/TBBServer/bin/tbbservice.ini @@ -0,0 +1,8 @@ +[program:tbbservice] +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec tbbservice' +user=ingest +stopsignal=INT ; KeyboardInterrupt +stopasgroup=true ; bash does not propagate signals +stdout_logfile=%(program_name)s.log +redirect_stderr=true +stderr_logfile=NONE diff --git a/MAC/Services/TBB/TBBServer/lib/CMakeLists.txt b/MAC/Services/TBB/TBBServer/lib/CMakeLists.txt new file mode 100644 index 00000000000..544817c69f3 --- /dev/null +++ b/MAC/Services/TBB/TBBServer/lib/CMakeLists.txt @@ -0,0 +1,5 @@ + +include(PythonInstall) + +python_install(tbbservice.py + DESTINATION lofar/mac/tbb/server) diff --git a/MAC/Services/TBB/TBBServer/lib/tbbservice.py b/MAC/Services/TBB/TBBServer/lib/tbbservice.py new file mode 100644 index 00000000000..db1901d813c --- /dev/null +++ b/MAC/Services/TBB/TBBServer/lib/tbbservice.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python + +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import os +from datetime import datetime +from socket import gethostname +from optparse import OptionParser + +import logging +logger = logging.getLogger() + +from lofar.messaging import Service +from lofar.messaging.Service import MessageHandlerInterface +from lofar.messaging.messages import EventMessage +from lofar.messaging.messagebus import ToBus +from lofar.common.util import waitForInterrupt +from lofar.mac.tbb.config import * + + +class TBBServiceMessageHandler(MessageHandlerInterface): + ''' + TBBServiceMessageHandler implements the MessageHandlerInterface + and glues the message handling in Service to the methods in TBBControlService + ''' + def __init__(self, tbb_control_service, **kwargs): + self._tbb_control_service = tbb_control_service + + # set/overwrite defaults + kwargs['use_service_methods'] = True + kwargs['numthreads'] = 1 + + # init MessageHandlerInterface with supplied kwargs + super(TBBServiceMessageHandler, self).__init__(**kwargs) + + # set the mapping for the MessageHandlerInterface from message command + # to actual method in the _tbb_control_service + self.service2MethodMap = { 'start_datawriters': self._tbb_control_service.start_datawriters } + +class TBBControlService: + def __init__(self, + busname=DEFAULT_TBB_BUSNAME, + servicename=DEFAULT_TBB_SERVICENAME, + notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME, + broker=DEFAULT_BROKER): + + # the event_bus is used to send out notifications about what this TBBControlService is doing + self.event_bus = ToBus(notification_busname, broker=broker) + + # create a service to receive and handle the messages + # supply 'self' as argument to the TBBServiceMessageHandler, + # so the methods in this TBBControlService can be called when a service method is called. + self.service = Service(servicename, + TBBServiceMessageHandler, + busname=busname, + broker=broker, + handler_args={'tbb_control_service': self}, + use_service_methods=True) + + + def __enter__(self): + '''enter context, open connections''' + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + '''leave context, close connections''' + self.close() + + def open(self): + '''open the qpid connections for the event bus and the service''' + self.event_bus.open() + self.service.start_listening() + + self._send_event_message('TBBServiceStarted', 'tbb service started at %s on host %s. rpc-service is listening on %s' % ( + datetime.utcnow(), gethostname(), self.service.address)) + + def close(self): + '''close the qpid connections for the event bus and the service''' + self.service.stop_listening() + self._send_event_message('TBBServiceStopped', 'tbb service stopped at %s on host %s. stopped rpc-service on %s' % ( + datetime.utcnow(), gethostname(), self.service.address)) + self.event_bus.close() + + def _send_event_message(self, subject, content): + self.event_bus.send(EventMessage(context='%s%s' % (DEFAULT_TBB_NOTIFICATION_PREFIX, subject), + content=content)) + + def start_datawriters(self): + '''start the tbb datawriters and notify when done''' + self._send_event_message('DataWritersStarted', {}) + + # TODO: implement start_datawriters + import time + time.sleep(1) + + self._send_event_message('DataWritersFinished', {}) + +def main(): + '''main method which starts the TBBControlService with the cmdline supplied options and then waits until stopped by an interrupt.''' + # make sure we run in UTC timezone + os.environ['TZ'] = 'UTC' + + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = OptionParser('%prog [options]', + description='run the tbb service which currently has the sole responsibility of starting the datawriters') + parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') + + parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string", + default=DEFAULT_TBB_BUSNAME, + help="Name of the bus on which the tbb service listens. [default: %default]") + parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string", + default=DEFAULT_TBB_SERVICENAME, + help="Name of the tbb service. [default: %default]") + parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string", + default=DEFAULT_TBB_NOTIFICATION_BUSNAME, + help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default') + (options, args) = parser.parse_args() + + with TBBControlService(busname=options.tbb_service_busname, + servicename=options.tbb_service_name, + notification_busname=options.tbb_notification_busname, + broker=options.broker) as service: + logger.info('*****************************************') + logger.info('Started TBBService') + logger.info('*****************************************') + waitForInterrupt() + +if __name__ == '__main__': + main() + diff --git a/MAC/Services/TBB/TBBServer/test/CMakeLists.txt b/MAC/Services/TBB/TBBServer/test/CMakeLists.txt new file mode 100644 index 00000000000..c45ff5a6f18 --- /dev/null +++ b/MAC/Services/TBB/TBBServer/test/CMakeLists.txt @@ -0,0 +1,5 @@ +include(LofarCTest) + +lofar_add_test(t_tbbserver) + + diff --git a/MAC/Services/TBB/TBBServer/test/t_tbbserver.py b/MAC/Services/TBB/TBBServer/test/t_tbbserver.py new file mode 100755 index 00000000000..96e4d1d909c --- /dev/null +++ b/MAC/Services/TBB/TBBServer/test/t_tbbserver.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python + +import unittest +import uuid +from qpid.messaging.exceptions import * +from lofar.messaging.messagebus import FromBus, ToBus +from lofar.messaging.messages import CommandMessage, EventMessage + +import logging +logger = logging.getLogger(__name__) + +try: + from qpid.messaging import Connection + from qpidtoollibs import BrokerAgent +except ImportError: + print 'Cannot run test without qpid tools' + print 'Please source qpid profile' + exit(3) + +#TODO: add tests for tbbservice + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + exit_code = 0 + test_exchange_name = 't_tbbserver_test_exchange_%s' % uuid.uuid1() + + connection = None + broker = None + + try: + # setup broker connection + connection = Connection.establish('127.0.0.1') + broker = BrokerAgent(connection) + + # add test service exchanges/queues + logger.info('adding test exchange to broker: %s', test_exchange_name) + broker.addExchange('topic', test_exchange_name) + + # and run all tests + unittest.main() + + except ConnectError as ce: + logger.error(ce) + exit_code = 3 + except Exception as e: + logger.error(e) + exit_code = 1 + finally: + # cleanup test exchanges/queues and exit + if broker: + logger.info('removing test exchange from broker: %s', test_exchange_name) + broker.delExchange(test_exchange_name) + if connection: + connection.close() + + exit(exit_code) diff --git a/MAC/Services/TBB/TBBServer/test/t_tbbserver.run b/MAC/Services/TBB/TBBServer/test/t_tbbserver.run new file mode 100755 index 00000000000..77cb1493a51 --- /dev/null +++ b/MAC/Services/TBB/TBBServer/test/t_tbbserver.run @@ -0,0 +1,6 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "*TBB*" t_tbbserver.py + diff --git a/MAC/Services/TBB/TBBServer/test/t_tbbserver.sh b/MAC/Services/TBB/TBBServer/test/t_tbbserver.sh new file mode 100755 index 00000000000..7fe968b6abd --- /dev/null +++ b/MAC/Services/TBB/TBBServer/test/t_tbbserver.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_tbbserver diff --git a/MAC/Services/TBB/config.py b/MAC/Services/TBB/config.py new file mode 100644 index 00000000000..9ec6b58bab7 --- /dev/null +++ b/MAC/Services/TBB/config.py @@ -0,0 +1,11 @@ +from lofar.messaging import adaptNameToEnvironment +from lofar.common import isProductionEnvironment + +DEFAULT_TBB_BUSNAME = adaptNameToEnvironment('lofar.tbb.command') +DEFAULT_TBB_SERVICENAME = 'TBBService' + +DEFAULT_TBB_NOTIFICATION_BUSNAME = adaptNameToEnvironment('lofar.tbb.notification') +DEFAULT_TBB_NOTIFICATION_PREFIX = 'TBB.' +DEFAULT_TBB_NOTIFICATION_SUBJECTS=DEFAULT_TBB_NOTIFICATION_PREFIX+'*' + +DEFAULT_BROKER = 'scu001.control.lofar' if isProductionEnvironment() else 'localhost' -- GitLab