diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index c78bbc80c26995e0aea6e4806ccdf27022b97edb..321bcfd56b7751a9f5dc3220e9ba115ca93d3a53 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -30,6 +30,7 @@ from lofar.qa.service.config import DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX from lofar.common.cep4_utils import * from lofar.parameterset import parameterset from lofar.sas.otdb.otdbrpc import OTDBRPC +from lofar.sas.tmss.util import TMSSsession import logging logger = logging.getLogger(__name__) @@ -77,7 +78,12 @@ class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener): logger.error('Could not send event message: %s', e) def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str): - self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT) + with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: + tmsssession.set_tmss_subtask_status(subtask_id, 'queing') + + self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT) + + tmsssession.set_tmss_subtask_status(subtask_id, 'queued') def __init__(self, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): super().__init__(handler_type=QAFilteringTMSSSubTaskBusListener.QAFilteringTMSSSubTaskEventMessageHandler, @@ -133,8 +139,11 @@ class QAService: self.filtering_tmssbuslistener = QAFilteringTMSSSubTaskBusListener(exchange = exchange, broker = broker) self.commands_buslistener = QACommandsBusListener(qa_service=self, exchange = exchange, broker = broker) self._unfinished_otdb_id_map = {} + self.tmsssession = None def __enter__(self): + self.tmsssession = TMSSsession.create_from_dbcreds_for_ldap() + self.tmsssession.open() self.tobus.open() self.filtering_otdbbuslistener.start_listening() self.filtering_tmssbuslistener.start_listening() @@ -146,6 +155,7 @@ class QAService: self.filtering_tmssbuslistener.stop_listening() self.commands_buslistener.stop_listening() self.tobus.close() + self.tmsssession.close() @staticmethod def h5_lustre_filepath(observation_id) -> str: @@ -161,9 +171,16 @@ class QAService: ''' convert a MS or BeamFormed observation to a qa h5 file, and create plots. ''' + if subtask_id: + self.tmsssession.set_tmss_subtask_status(subtask_id, 'starting') + self.tmsssession.set_tmss_subtask_status(subtask_id, 'started') + if self.do_qafile_conversion(otdb_id=otdb_id, subtask_id=subtask_id): self.do_qaplots(otdb_id=otdb_id, subtask_id=subtask_id) + self.tmsssession.set_tmss_subtask_status(subtask_id, 'finishing') + self.tmsssession.set_tmss_subtask_status(subtask_id, 'finished') + def do_qafile_conversion(self, otdb_id=None, subtask_id=None): ''' convert a MS or BeamFormed observation to a qa h5 file diff --git a/QA/QA_Service/test/t_qa_service.py b/QA/QA_Service/test/t_qa_service.py index 5bf33508b46f28b260a2f9248f7031a915177bcc..60c89054d1b28e288f04d70a8cd257a6a9e0ab69 100755 --- a/QA/QA_Service/test/t_qa_service.py +++ b/QA/QA_Service/test/t_qa_service.py @@ -89,20 +89,30 @@ class TestQAService(unittest.TestCase): ''' Tests for the QAService class ''' + @classmethod + def setUpClass(cls) -> None: + cls.TEST_UUID = uuid.uuid1() + cls.TEST_OTDB_ID = 999999 + cls.TEST_TMSS_ID = 2000001#+999999 + + cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID)) + cls.tmp_exchange.open() + + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address) + cls.tmss_test_env.start() + + @classmethod + def tearDownClass(cls) -> None: + cls.tmss_test_env.stop() + cls.tmp_exchange.close() + + def setUp(self): ''' quite complicated setup to setup test message-exchanges/queues and mock away ssh calls to cep4 and mock away dockerized commands ''' - self.TEST_UUID = uuid.uuid1() - self.TEST_OTDB_ID = 999999 - self.TEST_TMSS_ID = 2000001#+999999 - - self.tmp_exchange = TemporaryExchange("%s_%s" % (__class__.__name__, self.TEST_UUID)) - self.tmp_exchange.open() - self.addCleanup(self.tmp_exchange.close) - # where to store the test results self.TEST_DIR = '/tmp/test_qa_service_%s' % self.TEST_UUID QAService.QA_LUSTRE_BASE_DIR = os.path.join(self.TEST_DIR, 'lustre') @@ -534,17 +544,14 @@ class TestQAService(unittest.TestCase): # start the QAService (the object under test) qaservice = QAService(exchange=self.tmp_exchange.address) - tmss_test_env = TMSSTestEnvironment(exchange=self.tmp_exchange.address) - with qaservice, tmss_test_env, BusListenerJanitor(qaservice.filtering_otdbbuslistener), BusListenerJanitor(qaservice.filtering_tmssbuslistener), BusListenerJanitor(qaservice.commands_buslistener): + with qaservice, BusListenerJanitor(qaservice.filtering_otdbbuslistener), BusListenerJanitor(qaservice.filtering_tmssbuslistener), BusListenerJanitor(qaservice.commands_buslistener): # start listening for QA event messages from the QAService with BusListenerJanitor(SynchronizationQABusListener(exchange=self.tmp_exchange.address)) as qa_listener: # trigger a qa process by setting the tmss subtask to scheduled # this will result in the QAService actually doing its magic - auth = requests.auth.HTTPBasicAuth(tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password) - requests.patch(url=tmss_test_env.django_server.url+'/subtask/%s/' % self.TEST_TMSS_ID, - json={'state': tmss_test_env.django_server.url+"/subtask_state/scheduled/" }, - auth=auth) + with self.tmss_test_env.create_tmss_client() as client: + client.set_tmss_subtask_status(self.TEST_TMSS_ID, 'scheduled') # start waiting until ConvertedMS2Hdf5 event message received (or timeout) qa_listener.converted_event.wait(30)