Skip to content
Snippets Groups Projects
Commit c0e0167a authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-61: execute the TMSS subtasks for qa_fileconversion and for qa_plots, and report state back

parent 2026c20d
No related branches found
No related tags found
1 merge request!154Resolve TMSS-60 and TMSS-171 and TMSS-198
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
# $Id: qa_service.py 43930 2019-08-30 07:57:17Z klazema $ # $Id: qa_service.py 43930 2019-08-30 07:57:17Z klazema $
import os.path import os.path
import json
from subprocess import call from subprocess import call
from optparse import OptionParser, OptionGroup from optparse import OptionParser, OptionGroup
from lofar.common.util import waitForInterrupt from lofar.common.util import waitForInterrupt
...@@ -69,22 +70,28 @@ class QAFilteringOTDBBusListener(OTDBBusListener): ...@@ -69,22 +70,28 @@ class QAFilteringOTDBBusListener(OTDBBusListener):
class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener): class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener):
class QAFilteringTMSSSubTaskEventMessageHandler(UsingToBusMixin, TMSSSubTaskEventMessageHandler): class QAFilteringTMSSSubTaskEventMessageHandler(UsingToBusMixin, TMSSSubTaskEventMessageHandler):
def _send_qa_command_message(self, subtask_id: int, command_subject: str): def _send_qa_command_message(self, subtask_id: int, command_subject: str):
try:
content = {"subtask_id": subtask_id }
msg = CommandMessage(subject=command_subject, content=content)
logger.info('sending command message subject:\'%s\' content: %s', msg.subject, content)
self.send(msg)
except Exception as e:
logger.error('Could not send event message: %s', e)
def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str):
with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession:
tmsssession.set_subtask_status(subtask_id, 'queing') tmsssession.set_subtask_status(subtask_id, 'queueing')
self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT) try:
content = {"subtask_id": subtask_id }
msg = CommandMessage(subject=command_subject, content=content)
logger.info('sending command message subject:\'%s\' content: %s', msg.subject, content)
self.send(msg)
except Exception as e:
logger.error('Could not send event message: %s', e)
tmsssession.set_subtask_status(subtask_id, 'queued') tmsssession.set_subtask_status(subtask_id, 'queued')
def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str):
with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession:
subtask = tmsssession.get_subtask(subtask_id)
spec = json.loads(tmsssession.session.get(subtask['specifications_template']).content)
if '/qa_files/' in spec['type']:
self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_SUBJECT)
elif '/qa_plots/' in spec['type']:
self._send_qa_command_message(subtask_id, DEFAULT_DO_QAPLOTS_SUBJECT)
def __init__(self, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): def __init__(self, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER):
super().__init__(handler_type=QAFilteringTMSSSubTaskBusListener.QAFilteringTMSSSubTaskEventMessageHandler, super().__init__(handler_type=QAFilteringTMSSSubTaskBusListener.QAFilteringTMSSSubTaskEventMessageHandler,
exchange=exchange, exchange=exchange,
...@@ -109,7 +116,10 @@ class QACommandsBusListener(BusListener): ...@@ -109,7 +116,10 @@ class QACommandsBusListener(BusListener):
elif msg.subject == DEFAULT_DO_QAPLOTS_SUBJECT: elif msg.subject == DEFAULT_DO_QAPLOTS_SUBJECT:
self.qa_service.do_qaplots(otdb_id=msg.content.get('otdb_id'), subtask_id=msg.content.get('subtask_id')) self.qa_service.do_qaplots(otdb_id=msg.content.get('otdb_id'), subtask_id=msg.content.get('subtask_id'))
elif msg.subject == DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT: elif msg.subject == DEFAULT_DO_QAFILE_CONVERSION_AND_PLOTS_SUBJECT:
self.qa_service.do_qa(otdb_id=msg.content.get('otdb_id'), subtask_id=msg.content.get('subtask_id')) if msg.content.get('subtask_id'):
raise ValueError("%s: cannot do qa file conversion and plotting in one call for TMSS subtask id=%s. These steps are modelled seperately" % (self.__class__.__name__, msg.content.get('subtask_id')))
self.qa_service.do_qa(otdb_id=msg.content.get('otdb_id'))
elif msg.subject == DEFAULT_DO_QAFILE_FINALIZE_SUBJECT: elif msg.subject == DEFAULT_DO_QAFILE_FINALIZE_SUBJECT:
self.qa_service.finalize_qa(otdb_id=msg.content.get('otdb_id'), subtask_id=msg.content.get('subtask_id')) self.qa_service.finalize_qa(otdb_id=msg.content.get('otdb_id'), subtask_id=msg.content.get('subtask_id'))
else: else:
...@@ -167,19 +177,12 @@ class QAService: ...@@ -167,19 +177,12 @@ class QAService:
plots_dirname = 'L%s' % observation_id plots_dirname = 'L%s' % observation_id
return os.path.join(QAService.QA_LUSTRE_BASE_DIR, 'plots', plots_dirname) return os.path.join(QAService.QA_LUSTRE_BASE_DIR, 'plots', plots_dirname)
def do_qa(self, otdb_id=None, subtask_id=None): def do_qa(self, otdb_id=None):
''' '''
convert a MS or BeamFormed observation to a qa h5 file, and create plots. convert a MS or BeamFormed observation to a qa h5 file, and create plots.
''' '''
if subtask_id: if self.do_qafile_conversion(otdb_id=otdb_id):
self.tmsssession.set_subtask_status(subtask_id, 'starting') self.do_qaplots(otdb_id=otdb_id)
self.tmsssession.set_subtask_status(subtask_id, 'started')
if self.do_qafile_conversion(otdb_id=otdb_id, subtask_id=subtask_id):
self.do_qaplots(otdb_id=otdb_id, subtask_id=subtask_id)
self.tmsssession.set_subtask_status(subtask_id, 'finishing')
self.tmsssession.set_subtask_status(subtask_id, 'finished')
def do_qafile_conversion(self, otdb_id=None, subtask_id=None): def do_qafile_conversion(self, otdb_id=None, subtask_id=None):
''' '''
...@@ -203,8 +206,10 @@ class QAService: ...@@ -203,8 +206,10 @@ class QAService:
else: else:
logger.info("No uv or cs dataproducts avaiblable to convert for otdb_id %s", otdb_id) logger.info("No uv or cs dataproducts avaiblable to convert for otdb_id %s", otdb_id)
return return
elif subtask_id:
self.tmsssession.set_subtask_status(subtask_id, 'starting')
self.tmsssession.set_subtask_status(subtask_id, 'started')
if subtask_id:
hdf5_file_path = self._convert_ms2hdf5(otdb_id=otdb_id, subtask_id=subtask_id) hdf5_file_path = self._convert_ms2hdf5(otdb_id=otdb_id, subtask_id=subtask_id)
if hdf5_file_path: if hdf5_file_path:
...@@ -212,20 +217,43 @@ class QAService: ...@@ -212,20 +217,43 @@ class QAService:
self._cluster_h5_file(hdf5_file_path, otdb_id=otdb_id, subtask_id=subtask_id) self._cluster_h5_file(hdf5_file_path, otdb_id=otdb_id, subtask_id=subtask_id)
self._copy_hdf5_to_nfs_dir(hdf5_file_path) self._copy_hdf5_to_nfs_dir(hdf5_file_path)
if subtask_id:
self.tmsssession.set_subtask_status(subtask_id, 'finishing')
self.tmsssession.set_subtask_status(subtask_id, 'finished')
return hdf5_file_path return hdf5_file_path
if subtask_id:
self.tmsssession.set_subtask_status(subtask_id, 'error')
return None return None
def do_qaplots(self, otdb_id=None, subtask_id=None): def do_qaplots(self, otdb_id=None, subtask_id=None):
if subtask_id:
self.tmsssession.set_subtask_status(subtask_id, 'starting')
self.tmsssession.set_subtask_status(subtask_id, 'started')
hdf5_path = self.h5_lustre_filepath(otdb_id or subtask_id) hdf5_path = self.h5_lustre_filepath(otdb_id or subtask_id)
plot_dir_path = self._create_plots_for_h5_file(hdf5_path, otdb_id, subtask_id) plot_dir_path = self._create_plots_for_h5_file(hdf5_path, otdb_id, subtask_id)
if plot_dir_path: if plot_dir_path:
if subtask_id:
self.tmsssession.set_subtask_status(subtask_id, 'finishing')
plot_dir_path = self._move_plots_to_nfs_dir(plot_dir_path) plot_dir_path = self._move_plots_to_nfs_dir(plot_dir_path)
if subtask_id:
self.tmsssession.set_subtask_status(subtask_id, 'finished')
# and notify that we're finished # and notify that we're finished
self._send_event_message('Finished', {'otdb_id': otdb_id, self._send_event_message('Finished', {'otdb_id': otdb_id,
'subtask_id': subtask_id, 'subtask_id': subtask_id,
'hdf5_file_path': hdf5_path, 'hdf5_file_path': hdf5_path,
'plot_dir_path': plot_dir_path or ''}) 'plot_dir_path': plot_dir_path or ''})
else:
if subtask_id:
self.tmsssession.set_subtask_status(subtask_id, 'error')
def finalize_qa(self, otdb_id=None, subtask_id=None): def finalize_qa(self, otdb_id=None, subtask_id=None):
''' '''
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment