Skip to content
Snippets Groups Projects
Commit a3e1f0ad authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

TMSS-1633: fix race condition by waiting for status queued in pipeline slurm job

parent ef6dc9e6
No related branches found
No related tags found
1 merge request!947TMSS-1633: fix race condition by waiting for status queued in pipeline slurm job
...@@ -501,6 +501,19 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): ...@@ -501,6 +501,19 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler):
reason=reason reason=reason
)) ))
def waitForStatus_cmdline(status):
return (
"ssh {myhostname} '"
"source {lofarroot}/lofarinit.sh && "
"tmss_wait_for_subtask_state -t 60 {subtaskid} {status}"
"'"
.format(
myhostname=getfqdn(),
lofarroot=os.environ.get("LOFARROOT", ""),
subtaskid=subtask_id,
status=status
))
def getParset_cmdline(): def getParset_cmdline():
return ( return (
"ssh {myhostname} '" "ssh {myhostname} '"
...@@ -537,6 +550,9 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): ...@@ -537,6 +550,9 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler):
# print some info # print some info
echo Running on $SLURM_NODELIST echo Running on $SLURM_NODELIST
# wait for status queued (set by pipeline control directly after job submision) before advancing
runcmd {waitForStatus_queued}
# notify TMSS that we're starting # notify TMSS that we're starting
runcmd {setStatus_starting} runcmd {setStatus_starting}
...@@ -586,7 +602,8 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): ...@@ -586,7 +602,8 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler):
setStatus_starting=setStatus_cmdline("starting"), setStatus_starting=setStatus_cmdline("starting"),
setStatus_started=setStatus_cmdline("started"), setStatus_started=setStatus_cmdline("started"),
setStatus_finished=setStatus_cmdline("finished"), setStatus_finished=setStatus_cmdline("finished"),
setStatus_error=setStatus_cmdline("error") setStatus_error=setStatus_cmdline("error"),
waitForStatus_queued=waitForStatus_cmdline("queued")
), ),
sbatch_params=sbatch_params sbatch_params=sbatch_params
......
...@@ -12,3 +12,4 @@ lofar_add_bin_scripts(tmss_get_setting) ...@@ -12,3 +12,4 @@ lofar_add_bin_scripts(tmss_get_setting)
lofar_add_bin_scripts(tmss_set_setting) lofar_add_bin_scripts(tmss_set_setting)
lofar_add_bin_scripts(tmss_populate) lofar_add_bin_scripts(tmss_populate)
lofar_add_bin_scripts(tmss_submit_trigger) lofar_add_bin_scripts(tmss_submit_trigger)
lofar_add_bin_scripts(tmss_wait_for_subtask_state)
\ No newline at end of file
...@@ -11,6 +11,8 @@ import urllib3 ...@@ -11,6 +11,8 @@ import urllib3
urllib3.disable_warnings() urllib3.disable_warnings()
DEFAULT_CLIENT_RETRY_COUNT=5 DEFAULT_CLIENT_RETRY_COUNT=5
DEFAULT_CLIENT_POLL_TIMEOUT = 10
DEFAULT_CLIENT_POLL_INTERVAL = 1.0
def main_get_subtask_parset(): def main_get_subtask_parset():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
...@@ -116,6 +118,24 @@ def main_set_subtask_state(): ...@@ -116,6 +118,24 @@ def main_set_subtask_state():
exit(1) exit(1)
def main_wait_for_subtask_state():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to wait for a particular state on")
parser.add_argument("state", help="The expected state to wait for")
parser.add_argument('-R', '--rest_api_credentials', type=str, default='TMSSClient', help='TMSS django REST API credentials name, default: TMSSClient')
parser.add_argument('-t', '--timeout', type=int, default=DEFAULT_CLIENT_POLL_TIMEOUT, help='Poll for <timeout> seconds until this times out, default: [%default]')
parser.add_argument('-i', '--interval', type=int, default=DEFAULT_CLIENT_POLL_INTERVAL, help='Check every <interval> seconds for current subtask state, default: [%default]')
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap(dbcreds_name=args.rest_api_credentials) as session:
subtask = session.wait_for_subtask_status(args.subtask_id, args.state, timeout=args.timeout, poll_interval=args.interval)
print("%s now has state %s, see: %s" % (subtask['id'], subtask['state_value'], subtask['url']))
except Exception as e:
print(e)
exit(1)
def main_schedule_subtask(): def main_schedule_subtask():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be scheduled") parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be scheduled")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment