diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py index 4dcf57e8dae0e610460d2c71b365d00470132266..3c59c8f6d7189d690788d164a81895ef833774aa 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py @@ -79,7 +79,7 @@ class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEvent def onJobFailed(self, job_dict): if self.is_tmss_job(job_dict): - self.tmss_client.set_subtask_status(job_dict['export_id'], 'error') + self.tmss_client.set_subtask_status(job_dict['export_id'], 'error', 'Ingest job failed') def onJobFinished(self, job_dict): if self.is_tmss_job(job_dict): diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 04e02d3c5c50dc2ae104910873a2deaf8b1b50d5..343958615496340ce81d3a7b74016962b0d0c196 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -483,17 +483,22 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): os.path.expandvars("--output=/data/log/pipeline-%s-%%j.log" % (subtask_id,)), ] - def setStatus_cmdline(status): + def setStatus_cmdline(status, error_reason=None): + if error_reason is not None: + reason = f"-e '{error_reason}'" + else: + reason = "" return ( "ssh {myhostname} '" "source {lofarroot}/lofarinit.sh && " - "tmss_set_subtask_state {subtaskid} {status}" + "tmss_set_subtask_state {reason} {subtaskid} {status}" "'" .format( myhostname=getfqdn(), lofarroot=os.environ.get("LOFARROOT", ""), subtaskid=subtask_id, - status=status + status=status, + reason=reason )) def getParset_cmdline(): @@ -592,11 +597,15 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): logger.info("Scheduling SLURM job for pipelineAborted.sh") slurm_cancel_job_id = self.slurm.submit("%s-abort-trigger" % self._jobName(subtask_id), """ + # notify TMSS + {setStatus_error} + # notify ganglia wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} ABORTED&host_regex=" """ .format( obsid=subtask_id, + setStatus_error=setStatus_cmdline('error', 'Pipeline aborted via SLURM job') ), sbatch_params=[ @@ -615,7 +624,7 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): self.tmss_client.set_subtask_status(subtask_id, "queued") except Exception as e: logger.error(str(e)) - self.tmss_client.set_subtask_status(subtask_id, "error") + self.tmss_client.set_subtask_status(subtask_id, "error", f'Error when starting pipeline: {e}') class PipelineControlHandler( OTDBEventMessageHandler): diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index b5a6c57dbb96d6ffe93db3172a166b9f655a5ce0..51233871f526f40e2399977cbdf29019060a8f03 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -234,7 +234,7 @@ class QAService: return hdf5_file_path if subtask_id: - self.tmsssession.set_subtask_status(subtask_id, 'error') + self.tmsssession.set_subtask_status(subtask_id, 'error', 'could not convert observation to a qa h5 file') return None @@ -273,7 +273,7 @@ class QAService: 'plot_dir_path': plot_dir_path or ''}) else: if subtask_id: - self.tmsssession.set_subtask_status(subtask_id, 'error') + self.tmsssession.set_subtask_status(subtask_id, 'error', 'could not create QA plots') def finalize_qa(self, otdb_id=None, subtask_id=None): ''' diff --git a/SAS/DataManagement/Cleanup/CleanupService/service.py b/SAS/DataManagement/Cleanup/CleanupService/service.py index ac99a0035a6af0ea0e540f26fed7e1873f320295..ef65c57aa3ccb4108c6be8f76a34e13e41a00e52 100644 --- a/SAS/DataManagement/Cleanup/CleanupService/service.py +++ b/SAS/DataManagement/Cleanup/CleanupService/service.py @@ -520,7 +520,7 @@ class TMSSEventMessageHandlerForCleanup(TMSSEventMessageHandler): logger.info("cleanup subtask id=%s: %s", subtask['id'], result.get('message',"")) if any([not r['deleted'] for r in results]): - self._tmss_client.set_subtask_status(subtask['id'], 'error') + self._tmss_client.set_subtask_status(subtask['id'], 'error', 'Error during cleanup: not all predecessor task data could be deleted.') else: self._tmss_client.set_subtask_status(subtask['id'], 'finishing') self._tmss_client.set_subtask_status(subtask['id'], 'finished') @@ -550,7 +550,7 @@ class TMSSEventMessageHandlerForCleanup(TMSSEventMessageHandler): logger.info("cleanup subtask id=%s: %s", subtask['id'], result.get('message',"")) if any([not r['deleted'] for r in results]): - self._tmss_client.set_subtask_status(subtask['id'], 'error') + self._tmss_client.set_subtask_status(subtask['id'], 'error', 'Error during cleanup: not all predecessor task data could be deleted.') else: self._tmss_client.set_subtask_status(subtask['id'], 'finishing') self._tmss_client.set_subtask_status(subtask['id'], 'finished') diff --git a/SAS/TMSS/client/lib/mains.py b/SAS/TMSS/client/lib/mains.py index f817a3fc21cd4a6dcd91793ea9b9e256f9c9bc39..4b6ea7e3816ee12ba79957f1f2a2f3fd370c0380 100644 --- a/SAS/TMSS/client/lib/mains.py +++ b/SAS/TMSS/client/lib/mains.py @@ -99,12 +99,13 @@ def main_set_subtask_state(): parser = argparse.ArgumentParser() parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to set the status on") parser.add_argument("state", help="The state to set") + parser.add_argument('-e', '--error_reason', default=None, help="Optional error message string when setting a subtask to error.") parser.add_argument('-R', '--rest_api_credentials', type=str, default='TMSSClient', help='TMSS django REST API credentials name, default: TMSSClient') args = parser.parse_args() try: with TMSSsession.create_from_dbcreds_for_ldap(dbcreds_name=args.rest_api_credentials) as session: - changed_subtask = session.set_subtask_status(args.subtask_id, args.state) + changed_subtask = session.set_subtask_status(args.subtask_id, args.state, args.error_reason) print("%s now has state %s, see: %s" % (changed_subtask['id'], changed_subtask['state_value'], changed_subtask['url'])) except Exception as e: print(e) diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index 6e6fb09ba50f1ee162844925e4dde8fe332c4835..20c9c3bc222aa8ef2cd181e083bcb14d8f4fbaf4 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -18,6 +18,7 @@ from lofar.common.dbcredentials import DBCredentials import html from urllib.parse import quote from typing import Union +import socket # usage example: # @@ -151,11 +152,14 @@ class TMSSsession(object): except: pass - def set_subtask_status(self, subtask_id: int, status: str) -> {}: + def set_subtask_status(self, subtask_id: int, status: str, error_reason=None) -> {}: '''set the status for the given subtask, and return the subtask with its new state, or raise on error''' json_doc = {'state': "%s/subtask_state/%s/" % (self.api_url, status)} if status == 'finishing' or status == 'cancelling': json_doc['scheduled_on_sky_stop_time'] = datetime.utcnow().isoformat() + if status == 'error': + hostname = socket.gethostname() + json_doc['error_reason'] = error_reason if error_reason is not None else f"set to error via REST client on host '{hostname}'" logger.info("updating subtask id=%s status to '%s'", subtask_id, status) response = self.session.patch(url='%s/subtask/%s/' % (self.api_url, subtask_id), json=json_doc)