Skip to content
Snippets Groups Projects
Commit be63b190 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-796: fixed t_qa_service

parent 2eb326cc
No related branches found
No related tags found
2 merge requests!40Merge Lofar release 4 0 into master,!26Resolve SW-796
......@@ -26,6 +26,7 @@ Typical usage is to derive your own subclass from QABusListener and implement th
from lofar.messaging.messagebus import BusListener, AbstractMessageHandler, LofarMessage, EventMessage
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.qa.service.config import DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX
from lofar.common.util import single_line_with_single_spaces
import logging
......@@ -62,19 +63,19 @@ class QAEventMessageHandler(AbstractMessageHandler):
raise ValueError("QAEventMessageHandler.handleMessage: unknown subject: %s" % msg.subject)
def onConvertedMS2Hdf5(self, msg_content):
pass
logger.info("%s.onConvertedMS2Hdf5(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content))
def onClustered(self, msg_content):
pass
logger.info("%s.onClustered(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content))
def onCreatedInspectionPlots(self, msg_content):
pass
logger.info("%s.onCreatedInspectionPlots(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content))
def onFinished(self, msg_content):
pass
logger.info("%s.onFinished(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content))
def onError(self, msg_content):
pass
logger.info("%s.onError(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content))
class QABusListener(BusListener):
def __init__(self,
......@@ -85,9 +86,12 @@ class QABusListener(BusListener):
num_threads: int = 1,
broker: str = DEFAULT_BROKER):
"""
RABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from RABusListener and implement the specific on<SomeMessage> methods that you are interested in.
QABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from QABusListener and implement the specific on<SomeMessage> methods that you are interested in.
"""
if not issubclass(handler_type, QAEventMessageHandler):
raise TypeError("handler_type should be a QAEventMessageHandler subclass")
super().__init__(handler_type, handler_kwargs, exchange, routing_key, None, num_threads, broker)
......
......@@ -32,6 +32,8 @@ from lofar.common.cep4_utils import *
logger = logging.getLogger(__name__)
QA_BASE_DIR = '/data/qa'
#TODO: idea: convert periodically while observing?
class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler):
......@@ -40,7 +42,7 @@ class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler):
upon observation/pipeline completion. The qa processes convert MS (measurement sets) to hdf5 qa files,
and then starts generating plots from the hdf5 file.
'''
def __init__(self, qa_base_dir = '/data/qa'):
def __init__(self, qa_base_dir = QA_BASE_DIR):
"""
Instantiate a QAService which listens on the given messagebus for Completion messages.
See also the superclass, OTDBEventMessageHandler.
......@@ -245,6 +247,16 @@ class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler):
logging.exception('error in _cluster_h5_file: %s', e)
self._send_event_message('Error', {'otdb_id': otdb_id, 'message': str(e)})
class QAService(OTDBBusListener):
def __init__(self, qa_base_dir=QA_BASE_DIR, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
"""
The QAService is a QAService handling otdb events via the QAOTDBEventMessageHandler.
:param exchange: valid message exchange address
:param broker: valid broker host (default: None, which means localhost)
"""
super().__init__(handler_type=QAOTDBEventMessageHandler,
handler_kwargs={'qa_base_dir': qa_base_dir},
exchange=exchange, broker=broker)
def main():
......@@ -269,8 +281,7 @@ def main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
#start the qa service
with OTDBBusListener(handler_type=QAOTDBEventMessageHandler,
exchange=options.exchange, broker=options.broker):
with QAService(exchange=options.exchange, broker=options.broker):
#loop and wait for messages or interrupt.
waitForInterrupt()
......
......@@ -31,47 +31,54 @@ logger = logging.getLogger(__name__)
from lofar.qa.service.qa_service import QAService
from lofar.qa.service.QABusListener import *
from lofar.qa.hdf5_io import *
from lofar.messaging.messagebus import TemporaryQueue
from lofar.messaging.messagebus import TemporaryExchange
from lofar.messaging.messages import EventMessage
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT
# the tests below test is multi threaded (even multi process)
# define a QABusListener-derivative to handle synchronization (set the *_events)
class SynchronizingQABusListener(QABusListener):
# define a SynchronizationQABusListener-derivative to handle synchronization (set the *_events)
class SynchronizationQABusListener(QABusListener):
class SynchronizationQAEventMessageHandler(QAEventMessageHandler):
def __init__(self, listener):
super().__init__()
self.listener = listener
def onConvertedMS2Hdf5(self, msg_content):
self.listener.converted_msg_content = msg_content
self.listener.converted_event.set()
def onCreatedInspectionPlots(self, msg_content):
self.listener.plotted_msg_content = msg_content
self.listener.plotted_event.set()
def onFinished(self, msg_content):
self.listener.finished_msg_content = msg_content
self.listener.finished_event.set()
def onClustered(self, msg_content):
self.listener.clustered_msg_content = msg_content
self.listener.clustered_event.set()
def onError(self, msg_content):
self.listener.error_msg_content = msg_content
self.listener.error_event.set()
'''
the tests below test is multi threaded (even multi process)
the tests below test are multi threaded (even multi process)
this QABusListener-derivative handles synchronization (set the *_events)
and stores the msg_content results for expected result checking
'''
def __init__(self, busname):
super(SynchronizingQABusListener, self).__init__(busname=busname)
def __init__(self, exchange):
super().__init__(handler_type=SynchronizationQABusListener.SynchronizationQAEventMessageHandler,
handler_kwargs={'listener':self},
exchange=exchange)
self.converted_event = Event()
self.clustered_event = Event()
self.plotted_event = Event()
self.finished_event = Event()
self.error_event = Event()
def onConvertedMS2Hdf5(self, msg_content):
self.converted_msg_content = msg_content
self.converted_event.set()
def onCreatedInspectionPlots(self, msg_content):
self.plotted_msg_content = msg_content
self.plotted_event.set()
def onFinished(self, msg_content):
self.finished_msg_content = msg_content
self.finished_event.set()
def onClustered(self, msg_content):
self.clustered_msg_content = msg_content
self.clustered_event.set()
def onError(self, msg_content):
self.error_msg_content = msg_content
self.error_event.set()
class TestQAService(unittest.TestCase):
'''
......@@ -79,21 +86,17 @@ class TestQAService(unittest.TestCase):
'''
def setUp(self):
'''
quite complicated setup to setup test qpid-queues
quite complicated setup to setup test message-exchanges/queues
and mock away ssh calls to cep4
and mock away dockerized commands
'''
self.tmp_qa_queue = TemporaryQueue(__class__.__name__ + "_qa_notification")
self.tmp_qa_queue.open()
self.addCleanup(self.tmp_qa_queue.close)
self.tmp_otdb_queue = TemporaryQueue(__class__.__name__ + "_qa_notification")
self.tmp_otdb_queue.open()
self.addCleanup(self.tmp_otdb_queue.close)
self.TEST_UUID = uuid.uuid1()
self.TEST_OTDB_ID = 999999
self.tmp_exchange = TemporaryExchange("%s_%s" % (__class__.__name__, self.TEST_UUID))
self.tmp_exchange.open()
self.addCleanup(self.tmp_exchange.close)
# where to store the test results
self.TEST_DIR = '/tmp/qa_service_%s' % self.TEST_UUID
self.TEST_H5_FILE = 'L%s.MS_extract.h5' % (self.TEST_OTDB_ID,)
......@@ -159,7 +162,7 @@ class TestQAService(unittest.TestCase):
def send_otdb_task_completing_event(self):
'''helper method: create a ToBus and send a completing EventMessage'''
with self.tmp_otdb_queue.create_tobus() as sender:
with self.tmp_exchange.create_tobus() as sender:
msg = EventMessage(subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
content={"treeID": self.TEST_OTDB_ID,
"state": 'completing',
......@@ -213,12 +216,11 @@ class TestQAService(unittest.TestCase):
self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker
# start the QAService (the object under test)
with QAService(qa_notification_busname=self.tmp_qa_queue.address,
otdb_notification_busname=self.tmp_otdb_queue.address,
with QAService(exchange=self.tmp_exchange.address,
qa_base_dir=self.TEST_DIR):
# start listening for QA event messages from the QAService
with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener:
with SynchronizationQABusListener(exchange=self.tmp_exchange.address) as qa_listener:
# trigger a qa process by sending otdb task completing event
# this will result in the QAService actually doing its magic
self.send_otdb_task_completing_event()
......@@ -315,11 +317,11 @@ class TestQAService(unittest.TestCase):
self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker
# start the QAService (the object under test)
with QAService(qa_notification_busname=self.tmp_qa_queue.address,
otdb_notification_busname=self.tmp_otdb_queue.address,
with QAService(exchange=self.tmp_exchange.address,
qa_base_dir=self.TEST_DIR):
# start listening for QA event messages from the QAService
with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener:
with SynchronizationQABusListener(exchange=self.tmp_exchange.address) as qa_listener:
# trigger a qa process by sending otdb task completing event
# this will result in the QAService actually doing its magic
self.send_otdb_task_completing_event()
......@@ -384,11 +386,11 @@ class TestQAService(unittest.TestCase):
self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker
# start the QAService (the object under test)
with QAService(qa_notification_busname=self.tmp_qa_queue.address,
otdb_notification_busname=self.tmp_otdb_queue.address,
with QAService(exchange=self.tmp_exchange.address,
qa_base_dir=self.TEST_DIR):
# start listening for QA event messages from the QAService
with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener:
with SynchronizationQABusListener(exchange=self.tmp_exchange.address) as qa_listener:
# trigger a qa process by sending otdb task completing event
# this will result in the QAService actually doing its magic
self.send_otdb_task_completing_event()
......@@ -436,11 +438,11 @@ class TestQAService(unittest.TestCase):
self.wrap_command_in_cep4_cpu_node_ssh_call_mock.side_effect = mocked_wrap_command_in_cep4_cpu_node_ssh_call
# start the QAService (the object under test)
with QAService(qa_notification_busname=self.tmp_qa_queue.address,
otdb_notification_busname=self.tmp_otdb_queue.address,
with QAService(exchange=self.tmp_exchange.address,
qa_base_dir=self.TEST_DIR):
# start listening for QA event messages from the QAService
with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener:
with SynchronizationQABusListener(exchange=self.tmp_exchange.address) as qa_listener:
# trigger a qa process by sending otdb task completing event
# this will result in the QAService actually doing its magic
self.send_otdb_task_completing_event()
......@@ -461,8 +463,8 @@ class TestQAService(unittest.TestCase):
self.ssh_cmd_list_mock.assert_not_called()
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
if __name__ == '__main__':
#run the unit tests
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment