diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index c538ddba93e30a41a50f89746944517fdd7d9883..7efefe627e66e97767a3122ee068f7fc54019660 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -18,6 +18,7 @@ # $Id: qa_service.py 43930 2019-08-30 07:57:17Z klazema $ import os.path +import json from subprocess import call from optparse import OptionParser, OptionGroup from lofar.common.util import waitForInterrupt @@ -69,22 +70,28 @@ class QAFilteringOTDBBusListener(OTDBBusListener): class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener): class QAFilteringTMSSSubTaskEventMessageHandler(UsingToBusMixin, TMSSSubTaskEventMessageHandler): def _send_qa_command_message(self, subtask_id: int, command_subject: str): - try: - content = {"subtask_id": subtask_id } - msg = CommandMessage(subject=command_subject, content=content) - logger.info('sending command message subject:\'%s\' content: %s', msg.subject, content) - self.send(msg) - except Exception as e: - logger.error('Could not send event message: %s', e) - - def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str): with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: - tmsssession.set_subtask_status(subtask_id, 'queing') + tmsssession.set_subtask_status(subtask_id, 'queueing') - self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT) + try: + content = {"subtask_id": subtask_id } + msg = CommandMessage(subject=command_subject, content=content) + logger.info('sending command message subject:\'%s\' content: %s', msg.subject, content) + self.send(msg) + except Exception as e: + logger.error('Could not send event message: %s', e) tmsssession.set_subtask_status(subtask_id, 'queued') + def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str): + with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: + subtask = tmsssession.get_subtask(subtask_id) + spec = json.loads(tmsssession.session.get(subtask['specifications_template']).content) + if '/qa_files/' in spec['type']: + self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_SUBJECT) + elif '/qa_plots/' in spec['type']: + self._send_qa_command_message(subtask_id, DEFAULT_DO_QAPLOTS_SUBJECT) + def __init__(self, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): super().__init__(handler_type=QAFilteringTMSSSubTaskBusListener.QAFilteringTMSSSubTaskEventMessageHandler, exchange=exchange, @@ -109,7 +116,10 @@ class QACommandsBusListener(BusListener): elif msg.subject == DEFAULT_DO_QAPLOTS_SUBJECT: self.qa_service.do_qaplots(otdb_id=msg.content.get('otdb_id'), subtask_id=msg.content.get('subtask_id')) elif msg.subject == DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT: - self.qa_service.do_qa(otdb_id=msg.content.get('otdb_id'), subtask_id=msg.content.get('subtask_id')) + if msg.content.get('subtask_id'): + raise ValueError("%s: cannot do qa file conversion and plotting in one call for TMSS subtask id=%s. These steps are modelled seperately" % (self.__class__.__name__, msg.content.get('subtask_id'))) + + self.qa_service.do_qa(otdb_id=msg.content.get('otdb_id')) elif msg.subject == DEFAULT_DO_QAFILE_FINALIZE_SUBJECT: self.qa_service.finalize_qa(otdb_id=msg.content.get('otdb_id'), subtask_id=msg.content.get('subtask_id')) else: @@ -167,19 +177,12 @@ class QAService: plots_dirname = 'L%s' % observation_id return os.path.join(QAService.QA_LUSTRE_BASE_DIR, 'plots', plots_dirname) - def do_qa(self, otdb_id=None, subtask_id=None): + def do_qa(self, otdb_id=None): ''' convert a MS or BeamFormed observation to a qa h5 file, and create plots. ''' - if subtask_id: - self.tmsssession.set_subtask_status(subtask_id, 'starting') - self.tmsssession.set_subtask_status(subtask_id, 'started') - - if self.do_qafile_conversion(otdb_id=otdb_id, subtask_id=subtask_id): - self.do_qaplots(otdb_id=otdb_id, subtask_id=subtask_id) - - self.tmsssession.set_subtask_status(subtask_id, 'finishing') - self.tmsssession.set_subtask_status(subtask_id, 'finished') + if self.do_qafile_conversion(otdb_id=otdb_id): + self.do_qaplots(otdb_id=otdb_id) def do_qafile_conversion(self, otdb_id=None, subtask_id=None): ''' @@ -203,8 +206,10 @@ class QAService: else: logger.info("No uv or cs dataproducts avaiblable to convert for otdb_id %s", otdb_id) return + elif subtask_id: + self.tmsssession.set_subtask_status(subtask_id, 'starting') + self.tmsssession.set_subtask_status(subtask_id, 'started') - if subtask_id: hdf5_file_path = self._convert_ms2hdf5(otdb_id=otdb_id, subtask_id=subtask_id) if hdf5_file_path: @@ -212,20 +217,43 @@ class QAService: self._cluster_h5_file(hdf5_file_path, otdb_id=otdb_id, subtask_id=subtask_id) self._copy_hdf5_to_nfs_dir(hdf5_file_path) + + if subtask_id: + self.tmsssession.set_subtask_status(subtask_id, 'finishing') + self.tmsssession.set_subtask_status(subtask_id, 'finished') + return hdf5_file_path + + if subtask_id: + self.tmsssession.set_subtask_status(subtask_id, 'error') + return None def do_qaplots(self, otdb_id=None, subtask_id=None): + if subtask_id: + self.tmsssession.set_subtask_status(subtask_id, 'starting') + self.tmsssession.set_subtask_status(subtask_id, 'started') + hdf5_path = self.h5_lustre_filepath(otdb_id or subtask_id) plot_dir_path = self._create_plots_for_h5_file(hdf5_path, otdb_id, subtask_id) + if plot_dir_path: + if subtask_id: + self.tmsssession.set_subtask_status(subtask_id, 'finishing') + plot_dir_path = self._move_plots_to_nfs_dir(plot_dir_path) + if subtask_id: + self.tmsssession.set_subtask_status(subtask_id, 'finished') + # and notify that we're finished self._send_event_message('Finished', {'otdb_id': otdb_id, 'subtask_id': subtask_id, 'hdf5_file_path': hdf5_path, 'plot_dir_path': plot_dir_path or ''}) + else: + if subtask_id: + self.tmsssession.set_subtask_status(subtask_id, 'error') def finalize_qa(self, otdb_id=None, subtask_id=None): '''