From 78180d5fe8d0dd429c227e02caf8c4cebc2175a4 Mon Sep 17 00:00:00 2001 From: mancini <mancini@astron.nl> Date: Tue, 17 Sep 2019 10:43:16 +0200 Subject: [PATCH] First commit and project setup --- README.md | 7 +++++ lib/slurm_cli/__init__.py | 0 lib/slurm_cli/jobs.py | 10 ++++++++ lib/slurm_cli/slurm_control.py | 47 ++++++++++++++++++++++++++++++++++ lib/slurm_executor/__init__.py | 0 lib/slurm_executor/slurm.py | 44 +++++++++++++++++++++++++++++++ setup.py | 16 ++++++++++++ 7 files changed, 124 insertions(+) create mode 100644 lib/slurm_cli/__init__.py create mode 100644 lib/slurm_cli/jobs.py create mode 100644 lib/slurm_cli/slurm_control.py create mode 100644 lib/slurm_executor/__init__.py create mode 100644 lib/slurm_executor/slurm.py create mode 100644 setup.py diff --git a/README.md b/README.md index 721200f..a90a3cf 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,10 @@ slurm as an executor provided that the scheduler is executed on a machine where the slurm client is configured properly. +To use the plugin after installing the package change +the AIRFLOW configuration file as such: +``` +# The executor class that airflow should use. Choices include +# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor +executor = slurm.SlurmExecutor +``` diff --git a/lib/slurm_cli/__init__.py b/lib/slurm_cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/slurm_cli/jobs.py b/lib/slurm_cli/jobs.py new file mode 100644 index 0000000..ff6ec2e --- /dev/null +++ b/lib/slurm_cli/jobs.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + + +@dataclass +class SlurmJobStatus: + job_id: str + job_name: str + status_code: str + status: str + reason: str \ No newline at end of file diff --git a/lib/slurm_cli/slurm_control.py b/lib/slurm_cli/slurm_control.py new file mode 100644 index 0000000..2f3ffae --- /dev/null +++ b/lib/slurm_cli/slurm_control.py @@ -0,0 +1,47 @@ +from typing import List, Tuple, Union +from .jobs import SlurmJobStatus +import subprocess + + +class EmptyListException(Exception): + pass + + +class SlurmCallError(Exception): + pass + + +def __list_contains_valid_ids(ids_list): + if ids_list: + for item in ids_list: + if not isinstance(item, int) and not item.is_digit(): + return False + return True + else: + raise EmptyListException() + + +def slurm_get_processes_status(job_ids: Union[List, Tuple] = ()): + cmd = ['squeue', '--states=all', '-h'] + fmt = '%i;%j;%t;%T;%r' + cmd += ['--format=%s' % fmt] + if job_ids: + cmd += ','.join(job_ids) + else: + cmd += ['-a'] + + process_status = subprocess.run(cmd) + if process_status.returncode > 0: + raise SlurmCallError() + output = process_status.stdout + jobs_found = [] + if process_status.stdout: + for line in output.split('\n'): + job_id, job_name, status_code, status, reason = line.split(';') + jobs_found.append(SlurmJobStatus(job_id=job_id, + job_name=job_name, + status_code=status_code, + status=status, + reason=reason)) + + return jobs_found \ No newline at end of file diff --git a/lib/slurm_executor/__init__.py b/lib/slurm_executor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/slurm_executor/slurm.py b/lib/slurm_executor/slurm.py new file mode 100644 index 0000000..aa49984 --- /dev/null +++ b/lib/slurm_executor/slurm.py @@ -0,0 +1,44 @@ +from airflow.plugins_manager import AirflowPlugin +from airflow.executors.base_executor import BaseExecutor +from airflow.utils.state import State +import subprocess + +# Will show up under airflow.executors.slurm.SlurmExecutor +class SlurmExecutor(BaseExecutor): + def __init__(self): + super().__init__() + self.commands_to_run = [] + + def execute_async(self, key, command, queue=None, executor_config=None): + print("execute async called") + self.commands_to_run.append((key, command,)) + + def trigger_tasks(self, open_slots): + print('trigger tasks called', open_slots) + super().trigger_tasks(open_slots) + + def sync(self): + for key, command in self.commands_to_run: + self.log.info("Executing command with key %s: %s", key, command) + + try: + subprocess.check_call(command, close_fds=True) + self.change_state(key, State.SUCCESS) + except subprocess.CalledProcessError as e: + self.change_state(key, State.FAILED) + self.log.error("Failed to execute task %s.", str(e)) + + self.commands_to_run = [] + + def end(self): + self.heartbeat() + +# Defining the plugin class +class SlurmExecutorPlugin(AirflowPlugin): + name = "slurm" + executors = [SlurmExecutor] + +import sys +if __name__=='__main__': + print('output', _slurm_get_processes_status()) + print('output', _slurm_get_processes_status(sys.argv[1:])) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..3511458 --- /dev/null +++ b/setup.py @@ -0,0 +1,16 @@ +from setuptools import setup + +setup( + name="slurm-executor-plugin", + version='0.1', + description='Slurm executor plugin for AIRFLOW', + url='https://git.astron.nl/eosc/slurmexecutorplugin/', + package_dir = {'': 'lib'}, + packages = ['slurm_cli', 'slurm_executor'], + entry_points = { + 'airflow.plugins': [ + 'slurm = slurm_executor.slurm:SlurmExecutorPlugin' + ] + } +) + -- GitLab