Skip to content
Snippets Groups Projects
Commit cf458076 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

implemented srun to schedule job

parent c683aa88
Branches
No related tags found
No related merge requests found
......@@ -6,6 +6,14 @@ __SQUEUE_PATH='squeue'
__SRUN_PATH = 'srun'
def execute_on_shell(cmd, args):
process_status = run_process([cmd] + args)
if process_status.returncode > 0:
raise SlurmCallError()
output = process_status.stdout
return output
class EmptyListException(Exception):
pass
......@@ -24,24 +32,24 @@ def __list_contains_valid_ids(ids_list):
raise EmptyListException()
def __compose_get_processes_status_cmd(job_ids: Union[List, Tuple] = ()):
def __compose_get_processes_status_cmd(job_ids: Union[List, Tuple] = (), job_name: Union[List, Tuple] = ()):
cmd = ['--states=all', '-h']
fmt = '%i;%j;%t;%T;%r'
cmd += ['--format=%s' % fmt]
if job_ids:
cmd += ','.join(job_ids)
cmd += [','.join(job_ids)]
else:
cmd += ['-a']
if job_name:
cmd += ['-n', ','.join(job_name)]
return cmd
def __execute_squeue(args):
process_status = run_process([__SQUEUE_PATH] + args)
if process_status.returncode > 0:
raise SlurmCallError()
output = process_status.stdout
return output
return execute_on_shell(__SQUEUE_PATH, args)
def __parse_squeue_output(squeue_output) -> List[SlurmJobStatus]:
......@@ -71,8 +79,34 @@ def __map_job_status_per_jobid(job_status_list: List[SlurmJobStatus]) -> Dict[st
return {job_status.job_id: job_status for job_status in job_status_list}
def get_jobs_status(job_ids: Union[List, Tuple] = ()):
args = __compose_get_processes_status_cmd(job_ids)
def get_jobs_status(job_ids: Union[List, Tuple] = (), job_name: Union[List, Tuple] = ()):
args = __compose_get_processes_status_cmd(job_ids, job_name)
output = __execute_squeue(args)
parsed_output = __parse_squeue_output(output)
return __map_job_status_per_jobid(parsed_output)
def __execute_srun(args):
return execute_on_shell(__SRUN_PATH, args)
def __compose_run_job_arguments(cmd, queue=None, executor_config=None, job_name=None):
if executor_config:
raise NotImplementedError()
args = []
if queue:
args += ['-p', queue]
if job_name:
args += ['-J', job_name]
else:
args += ['-J', cmd]
args += cmd.split(' ')
return args
# TODO collect output
def run_job(cmd, queue=None, executor_config=None, task_name=None):
args = __compose_run_job_arguments(cmd, queue, executor_config, task_name)
_ = __execute_srun(args)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment