diff --git a/lib/slurm_cli/slurm_control.py b/lib/slurm_cli/slurm_control.py index 2f3ffaea914b3b193deab7eb2512de11df533962..2cf42cbed71f1ead744a6cc93c36b1ed67042c30 100644 --- a/lib/slurm_cli/slurm_control.py +++ b/lib/slurm_cli/slurm_control.py @@ -1,6 +1,9 @@ -from typing import List, Tuple, Union +from typing import List, Tuple, Union, Dict from .jobs import SlurmJobStatus -import subprocess +from subprocess import run as run_process + +__SQUEUE_PATH='squeue' +__SRUN_PATH='srun' class EmptyListException(Exception): @@ -21,8 +24,8 @@ def __list_contains_valid_ids(ids_list): raise EmptyListException() -def slurm_get_processes_status(job_ids: Union[List, Tuple] = ()): - cmd = ['squeue', '--states=all', '-h'] +def __compose_get_processes_status_cmd(job_ids: Union[List, Tuple] = ()): + cmd = ['--states=all', '-h'] fmt = '%i;%j;%t;%T;%r' cmd += ['--format=%s' % fmt] if job_ids: @@ -30,13 +33,30 @@ def slurm_get_processes_status(job_ids: Union[List, Tuple] = ()): else: cmd += ['-a'] - process_status = subprocess.run(cmd) + return cmd + + +def __execute_squeue(args): + process_status = run_process([__SQUEUE_PATH] + args) if process_status.returncode > 0: raise SlurmCallError() output = process_status.stdout + return output + + +def __parse_squeue_output(squeue_output) -> List[SlurmJobStatus]: + ''' + Parses the output of squeue + e.g. + 123;test_job;CD;COMPLETED;None + :param squeue_output: + :return: + ''' jobs_found = [] - if process_status.stdout: - for line in output.split('\n'): + if squeue_output: + for line in squeue_output.split('\n'): + if not line: + continue job_id, job_name, status_code, status, reason = line.split(';') jobs_found.append(SlurmJobStatus(job_id=job_id, job_name=job_name, @@ -44,4 +64,15 @@ def slurm_get_processes_status(job_ids: Union[List, Tuple] = ()): status=status, reason=reason)) - return jobs_found \ No newline at end of file + return jobs_found + + +def __map_job_status_per_jobid(job_status_list: List[SlurmJobStatus]) -> Dict[str, SlurmJobStatus]: + return {job_status.job_id: job_status for job_status in job_status_list} + + +def get_jobs_status(job_ids: Union[List, Tuple] = ()): + args = __compose_get_processes_status_cmd(job_ids) + output = __execute_squeue(args) + parsed_output = __parse_squeue_output(output) + return __map_job_status_per_jobid(parsed_output) \ No newline at end of file diff --git a/lib/slurm_executor/slurm.py b/lib/slurm_executor/slurm.py index aa4998480fa3289d75c8671cde89613592f9a68f..8bf1b2998e21a192f4a13cc5151769575b133308 100644 --- a/lib/slurm_executor/slurm.py +++ b/lib/slurm_executor/slurm.py @@ -1,6 +1,7 @@ from airflow.plugins_manager import AirflowPlugin from airflow.executors.base_executor import BaseExecutor from airflow.utils.state import State +from slurm_cli.slurm_control import get_jobs_status import subprocess # Will show up under airflow.executors.slurm.SlurmExecutor @@ -40,5 +41,5 @@ class SlurmExecutorPlugin(AirflowPlugin): import sys if __name__=='__main__': - print('output', _slurm_get_processes_status()) - print('output', _slurm_get_processes_status(sys.argv[1:])) + print('output', get_jobs_status()) + print('output', get_jobs_status(sys.argv[1:]))