From d81359531ec5218f9e625a6759b0c16e44327948 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 26 Apr 2021 19:51:53 +0200 Subject: [PATCH] TMSS-717: wait for queued/error/cancelled state before starting. Submit all jobs at once --- .../lib/ingesttmssadapter.py | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py index fc6e8e93050..0e6210ef221 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py @@ -34,7 +34,7 @@ from lofar.common.util import waitForInterrupt from threading import Thread import time -from datetime import datetime +from datetime import datetime, timedelta from typing import Union import logging @@ -62,7 +62,25 @@ class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEvent def onJobStarted(self, job_dict): if self.is_tmss_job(job_dict): - self.tmss_client.set_subtask_status(job_dict['export_id'], 'started') + subtask_id = job_dict['export_id'] + subtask = self.tmss_client.get_subtask(subtask_id) + + if subtask['state_value'] == 'started': + pass # the ingest subtask was already started + else: + # wait until subtask was fully queued (or in error/cancelled) + start_wait_timestamp = datetime.utcnow() + while subtask['state_value'] not in ('queued', 'cancelled', 'error'): + if datetime.utcnow() - start_wait_timestamp > timedelta(seconds=60): + raise TimeoutError("Timeout while waiting for ingest subtask id=%s to get status queued/cancelled/error. Current status is %s" % (subtask_id, subtask['state_value'])) + time.sleep(1) + subtask = self.tmss_client.get_subtask(subtask_id) + + if subtask['state_value'] == 'queued': + # the ingest subtask was fully queued, and this is the first ingest transfer job that started + # so, set the ingest subtask to starting->started + self.tmss_client.set_subtask_status(subtask_id, 'starting') + self.tmss_client.set_subtask_status(subtask_id, 'started') def onJobFailed(self, job_dict): if self.is_tmss_job(job_dict): @@ -150,13 +168,15 @@ class TMSSEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, TMSSEventMess self.tmss_client.set_subtask_status(subtask['id'], 'queueing') # gather all relevant and needed info... - task_blueprint = self.tmss_client.get_url_as_json_object(next(subtask['task_blueprints'])) + task_blueprint = self.tmss_client.get_url_as_json_object(subtask['task_blueprints'][0]) task_draft = self.tmss_client.get_url_as_json_object(task_blueprint['draft']) scheduling_unit_draft = self.tmss_client.get_url_as_json_object(task_draft['scheduling_unit_draft']) scheduling_set = self.tmss_client.get_url_as_json_object(scheduling_unit_draft['scheduling_set']) project = self.tmss_client.get_url_as_json_object(scheduling_set['project']) # create an ingest xml job for each input dataproduct + # store the jobs in a list, and submit them in one go to the queue + jobs = [] for input_dp in input_dataproducts: dp_global_identifier = self.tmss_client.get_url_as_json_object(input_dp['global_identifier']) producer = self.tmss_client.get_url_as_json_object(input_dp['producer']) @@ -169,7 +189,10 @@ class TMSSEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, TMSSEventMess location=subtask['cluster_name']+':'+os.path.join(input_dp['directory'], input_dp['filename']), tmss_ingest_subtask_id=subtask['id'], tmss_input_dataproduct_id=input_dp['id']) + jobs.append(job) + # submit all jobs to the in one go to ingest-incoming-job-queue + for job in jobs: msg = CommandMessage(content=job, subject=DEFAULT_INGEST_INCOMING_JOB_SUBJECT) logger.info('submitting job %s to exchange %s with subject %s at broker %s', parseJobXml(job)['JobId'], self._tobus.exchange, msg.subject, self._tobus.broker) -- GitLab