diff --git a/QA/QA_Service/lib/QABusListener.py b/QA/QA_Service/lib/QABusListener.py index 6961d69d84da257ea401a8ba8122676e341b0a60..a1c77127c18a9399d03565f2433c6b7aa901fdca 100644 --- a/QA/QA_Service/lib/QABusListener.py +++ b/QA/QA_Service/lib/QABusListener.py @@ -26,6 +26,7 @@ Typical usage is to derive your own subclass from QABusListener and implement th from lofar.messaging.messagebus import BusListener, AbstractMessageHandler, LofarMessage, EventMessage from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.qa.service.config import DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX +from lofar.common.util import single_line_with_single_spaces import logging @@ -62,19 +63,19 @@ class QAEventMessageHandler(AbstractMessageHandler): raise ValueError("QAEventMessageHandler.handleMessage: unknown subject: %s" % msg.subject) def onConvertedMS2Hdf5(self, msg_content): - pass + logger.info("%s.onConvertedMS2Hdf5(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) def onClustered(self, msg_content): - pass + logger.info("%s.onClustered(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) def onCreatedInspectionPlots(self, msg_content): - pass + logger.info("%s.onCreatedInspectionPlots(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) def onFinished(self, msg_content): - pass + logger.info("%s.onFinished(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) def onError(self, msg_content): - pass + logger.info("%s.onError(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) class QABusListener(BusListener): def __init__(self, @@ -85,9 +86,12 @@ class QABusListener(BusListener): num_threads: int = 1, broker: str = DEFAULT_BROKER): """ - RABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. - Typical usage is to derive your own subclass from RABusListener and implement the specific on<SomeMessage> methods that you are interested in. + QABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. + Typical usage is to derive your own subclass from QABusListener and implement the specific on<SomeMessage> methods that you are interested in. """ + if not issubclass(handler_type, QAEventMessageHandler): + raise TypeError("handler_type should be a QAEventMessageHandler subclass") + super().__init__(handler_type, handler_kwargs, exchange, routing_key, None, num_threads, broker) diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index 7895225d95236f6e7cb42a51f25b7bed32e3ceb5..3226cf27d82aaeacef074630fa6dd4670b029ab7 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -32,6 +32,8 @@ from lofar.common.cep4_utils import * logger = logging.getLogger(__name__) +QA_BASE_DIR = '/data/qa' + #TODO: idea: convert periodically while observing? class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler): @@ -40,7 +42,7 @@ class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler): upon observation/pipeline completion. The qa processes convert MS (measurement sets) to hdf5 qa files, and then starts generating plots from the hdf5 file. ''' - def __init__(self, qa_base_dir = '/data/qa'): + def __init__(self, qa_base_dir = QA_BASE_DIR): """ Instantiate a QAService which listens on the given messagebus for Completion messages. See also the superclass, OTDBEventMessageHandler. @@ -245,6 +247,16 @@ class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler): logging.exception('error in _cluster_h5_file: %s', e) self._send_event_message('Error', {'otdb_id': otdb_id, 'message': str(e)}) +class QAService(OTDBBusListener): + def __init__(self, qa_base_dir=QA_BASE_DIR, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + """ + The QAService is a QAService handling otdb events via the QAOTDBEventMessageHandler. + :param exchange: valid message exchange address + :param broker: valid broker host (default: None, which means localhost) + """ + super().__init__(handler_type=QAOTDBEventMessageHandler, + handler_kwargs={'qa_base_dir': qa_base_dir}, + exchange=exchange, broker=broker) def main(): @@ -269,8 +281,7 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) #start the qa service - with OTDBBusListener(handler_type=QAOTDBEventMessageHandler, - exchange=options.exchange, broker=options.broker): + with QAService(exchange=options.exchange, broker=options.broker): #loop and wait for messages or interrupt. waitForInterrupt() diff --git a/QA/QA_Service/test/t_qa_service.py b/QA/QA_Service/test/t_qa_service.py index 9061968ce9ecdfbbd8b8d691452584bc801fb2c2..03a1986fdbb7af5c7cd7f3aef8a21c47de0d0262 100755 --- a/QA/QA_Service/test/t_qa_service.py +++ b/QA/QA_Service/test/t_qa_service.py @@ -31,47 +31,54 @@ logger = logging.getLogger(__name__) from lofar.qa.service.qa_service import QAService from lofar.qa.service.QABusListener import * from lofar.qa.hdf5_io import * -from lofar.messaging.messagebus import TemporaryQueue +from lofar.messaging.messagebus import TemporaryExchange from lofar.messaging.messages import EventMessage from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT # the tests below test is multi threaded (even multi process) -# define a QABusListener-derivative to handle synchronization (set the *_events) -class SynchronizingQABusListener(QABusListener): +# define a SynchronizationQABusListener-derivative to handle synchronization (set the *_events) +class SynchronizationQABusListener(QABusListener): + class SynchronizationQAEventMessageHandler(QAEventMessageHandler): + def __init__(self, listener): + super().__init__() + self.listener = listener + + def onConvertedMS2Hdf5(self, msg_content): + self.listener.converted_msg_content = msg_content + self.listener.converted_event.set() + + def onCreatedInspectionPlots(self, msg_content): + self.listener.plotted_msg_content = msg_content + self.listener.plotted_event.set() + + def onFinished(self, msg_content): + self.listener.finished_msg_content = msg_content + self.listener.finished_event.set() + + def onClustered(self, msg_content): + self.listener.clustered_msg_content = msg_content + self.listener.clustered_event.set() + + def onError(self, msg_content): + self.listener.error_msg_content = msg_content + self.listener.error_event.set() + ''' - the tests below test is multi threaded (even multi process) + the tests below test are multi threaded (even multi process) this QABusListener-derivative handles synchronization (set the *_events) and stores the msg_content results for expected result checking ''' - def __init__(self, busname): - super(SynchronizingQABusListener, self).__init__(busname=busname) + def __init__(self, exchange): + super().__init__(handler_type=SynchronizationQABusListener.SynchronizationQAEventMessageHandler, + handler_kwargs={'listener':self}, + exchange=exchange) self.converted_event = Event() self.clustered_event = Event() self.plotted_event = Event() self.finished_event = Event() self.error_event = Event() - def onConvertedMS2Hdf5(self, msg_content): - self.converted_msg_content = msg_content - self.converted_event.set() - - def onCreatedInspectionPlots(self, msg_content): - self.plotted_msg_content = msg_content - self.plotted_event.set() - - def onFinished(self, msg_content): - self.finished_msg_content = msg_content - self.finished_event.set() - - def onClustered(self, msg_content): - self.clustered_msg_content = msg_content - self.clustered_event.set() - - def onError(self, msg_content): - self.error_msg_content = msg_content - self.error_event.set() - class TestQAService(unittest.TestCase): ''' @@ -79,21 +86,17 @@ class TestQAService(unittest.TestCase): ''' def setUp(self): ''' - quite complicated setup to setup test qpid-queues + quite complicated setup to setup test message-exchanges/queues and mock away ssh calls to cep4 and mock away dockerized commands ''' - self.tmp_qa_queue = TemporaryQueue(__class__.__name__ + "_qa_notification") - self.tmp_qa_queue.open() - self.addCleanup(self.tmp_qa_queue.close) - - self.tmp_otdb_queue = TemporaryQueue(__class__.__name__ + "_qa_notification") - self.tmp_otdb_queue.open() - self.addCleanup(self.tmp_otdb_queue.close) - self.TEST_UUID = uuid.uuid1() self.TEST_OTDB_ID = 999999 + self.tmp_exchange = TemporaryExchange("%s_%s" % (__class__.__name__, self.TEST_UUID)) + self.tmp_exchange.open() + self.addCleanup(self.tmp_exchange.close) + # where to store the test results self.TEST_DIR = '/tmp/qa_service_%s' % self.TEST_UUID self.TEST_H5_FILE = 'L%s.MS_extract.h5' % (self.TEST_OTDB_ID,) @@ -159,7 +162,7 @@ class TestQAService(unittest.TestCase): def send_otdb_task_completing_event(self): '''helper method: create a ToBus and send a completing EventMessage''' - with self.tmp_otdb_queue.create_tobus() as sender: + with self.tmp_exchange.create_tobus() as sender: msg = EventMessage(subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, content={"treeID": self.TEST_OTDB_ID, "state": 'completing', @@ -213,12 +216,11 @@ class TestQAService(unittest.TestCase): self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker # start the QAService (the object under test) - with QAService(qa_notification_busname=self.tmp_qa_queue.address, - otdb_notification_busname=self.tmp_otdb_queue.address, + with QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR): # start listening for QA event messages from the QAService - with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener: + with SynchronizationQABusListener(exchange=self.tmp_exchange.address) as qa_listener: # trigger a qa process by sending otdb task completing event # this will result in the QAService actually doing its magic self.send_otdb_task_completing_event() @@ -315,11 +317,11 @@ class TestQAService(unittest.TestCase): self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker # start the QAService (the object under test) - with QAService(qa_notification_busname=self.tmp_qa_queue.address, - otdb_notification_busname=self.tmp_otdb_queue.address, + with QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR): + # start listening for QA event messages from the QAService - with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener: + with SynchronizationQABusListener(exchange=self.tmp_exchange.address) as qa_listener: # trigger a qa process by sending otdb task completing event # this will result in the QAService actually doing its magic self.send_otdb_task_completing_event() @@ -384,11 +386,11 @@ class TestQAService(unittest.TestCase): self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker # start the QAService (the object under test) - with QAService(qa_notification_busname=self.tmp_qa_queue.address, - otdb_notification_busname=self.tmp_otdb_queue.address, + with QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR): + # start listening for QA event messages from the QAService - with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener: + with SynchronizationQABusListener(exchange=self.tmp_exchange.address) as qa_listener: # trigger a qa process by sending otdb task completing event # this will result in the QAService actually doing its magic self.send_otdb_task_completing_event() @@ -436,11 +438,11 @@ class TestQAService(unittest.TestCase): self.wrap_command_in_cep4_cpu_node_ssh_call_mock.side_effect = mocked_wrap_command_in_cep4_cpu_node_ssh_call # start the QAService (the object under test) - with QAService(qa_notification_busname=self.tmp_qa_queue.address, - otdb_notification_busname=self.tmp_otdb_queue.address, + with QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR): + # start listening for QA event messages from the QAService - with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener: + with SynchronizationQABusListener(exchange=self.tmp_exchange.address) as qa_listener: # trigger a qa process by sending otdb task completing event # this will result in the QAService actually doing its magic self.send_otdb_task_completing_event() @@ -461,8 +463,8 @@ class TestQAService(unittest.TestCase): self.ssh_cmd_list_mock.assert_not_called() -if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) +if __name__ == '__main__': #run the unit tests unittest.main()