Skip to content
Snippets Groups Projects
Commit 6d45d135 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-60: Use TMSSsession with temporary TMSSTestEnvironment to integrate the...

TMSS-60: Use TMSSsession with temporary TMSSTestEnvironment to integrate the qa_service and tmss tests
parent 304adaa1
No related branches found
No related tags found
1 merge request!154Resolve TMSS-60 and TMSS-171 and TMSS-198
......@@ -30,6 +30,7 @@ from lofar.qa.service.config import DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX
from lofar.common.cep4_utils import *
from lofar.parameterset import parameterset
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.tmss.util import TMSSsession
import logging
logger = logging.getLogger(__name__)
......@@ -77,7 +78,12 @@ class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener):
logger.error('Could not send event message: %s', e)
def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str):
self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT)
with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession:
tmsssession.set_tmss_subtask_status(subtask_id, 'queing')
self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT)
tmsssession.set_tmss_subtask_status(subtask_id, 'queued')
def __init__(self, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER):
super().__init__(handler_type=QAFilteringTMSSSubTaskBusListener.QAFilteringTMSSSubTaskEventMessageHandler,
......@@ -133,8 +139,11 @@ class QAService:
self.filtering_tmssbuslistener = QAFilteringTMSSSubTaskBusListener(exchange = exchange, broker = broker)
self.commands_buslistener = QACommandsBusListener(qa_service=self, exchange = exchange, broker = broker)
self._unfinished_otdb_id_map = {}
self.tmsssession = None
def __enter__(self):
self.tmsssession = TMSSsession.create_from_dbcreds_for_ldap()
self.tmsssession.open()
self.tobus.open()
self.filtering_otdbbuslistener.start_listening()
self.filtering_tmssbuslistener.start_listening()
......@@ -146,6 +155,7 @@ class QAService:
self.filtering_tmssbuslistener.stop_listening()
self.commands_buslistener.stop_listening()
self.tobus.close()
self.tmsssession.close()
@staticmethod
def h5_lustre_filepath(observation_id) -> str:
......@@ -161,9 +171,16 @@ class QAService:
'''
convert a MS or BeamFormed observation to a qa h5 file, and create plots.
'''
if subtask_id:
self.tmsssession.set_tmss_subtask_status(subtask_id, 'starting')
self.tmsssession.set_tmss_subtask_status(subtask_id, 'started')
if self.do_qafile_conversion(otdb_id=otdb_id, subtask_id=subtask_id):
self.do_qaplots(otdb_id=otdb_id, subtask_id=subtask_id)
self.tmsssession.set_tmss_subtask_status(subtask_id, 'finishing')
self.tmsssession.set_tmss_subtask_status(subtask_id, 'finished')
def do_qafile_conversion(self, otdb_id=None, subtask_id=None):
'''
convert a MS or BeamFormed observation to a qa h5 file
......
......@@ -89,20 +89,30 @@ class TestQAService(unittest.TestCase):
'''
Tests for the QAService class
'''
@classmethod
def setUpClass(cls) -> None:
cls.TEST_UUID = uuid.uuid1()
cls.TEST_OTDB_ID = 999999
cls.TEST_TMSS_ID = 2000001#+999999
cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
cls.tmp_exchange.open()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address)
cls.tmss_test_env.start()
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
cls.tmp_exchange.close()
def setUp(self):
'''
quite complicated setup to setup test message-exchanges/queues
and mock away ssh calls to cep4
and mock away dockerized commands
'''
self.TEST_UUID = uuid.uuid1()
self.TEST_OTDB_ID = 999999
self.TEST_TMSS_ID = 2000001#+999999
self.tmp_exchange = TemporaryExchange("%s_%s" % (__class__.__name__, self.TEST_UUID))
self.tmp_exchange.open()
self.addCleanup(self.tmp_exchange.close)
# where to store the test results
self.TEST_DIR = '/tmp/test_qa_service_%s' % self.TEST_UUID
QAService.QA_LUSTRE_BASE_DIR = os.path.join(self.TEST_DIR, 'lustre')
......@@ -534,17 +544,14 @@ class TestQAService(unittest.TestCase):
# start the QAService (the object under test)
qaservice = QAService(exchange=self.tmp_exchange.address)
tmss_test_env = TMSSTestEnvironment(exchange=self.tmp_exchange.address)
with qaservice, tmss_test_env, BusListenerJanitor(qaservice.filtering_otdbbuslistener), BusListenerJanitor(qaservice.filtering_tmssbuslistener), BusListenerJanitor(qaservice.commands_buslistener):
with qaservice, BusListenerJanitor(qaservice.filtering_otdbbuslistener), BusListenerJanitor(qaservice.filtering_tmssbuslistener), BusListenerJanitor(qaservice.commands_buslistener):
# start listening for QA event messages from the QAService
with BusListenerJanitor(SynchronizationQABusListener(exchange=self.tmp_exchange.address)) as qa_listener:
# trigger a qa process by setting the tmss subtask to scheduled
# this will result in the QAService actually doing its magic
auth = requests.auth.HTTPBasicAuth(tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)
requests.patch(url=tmss_test_env.django_server.url+'/subtask/%s/' % self.TEST_TMSS_ID,
json={'state': tmss_test_env.django_server.url+"/subtask_state/scheduled/" },
auth=auth)
with self.tmss_test_env.create_tmss_client() as client:
client.set_tmss_subtask_status(self.TEST_TMSS_ID, 'scheduled')
# start waiting until ConvertedMS2Hdf5 event message received (or timeout)
qa_listener.converted_event.wait(30)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment