Commit 78180d5f authored by mancini's avatar mancini

First commit and project setup

parent 5432970f
......@@ -6,3 +6,10 @@ slurm as an executor provided that the scheduler is executed
on a machine where the slurm client is configured properly.
To use the plugin after installing the package change
the AIRFLOW configuration file as such:
```
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = slurm.SlurmExecutor
```
from dataclasses import dataclass
@dataclass
class SlurmJobStatus:
job_id: str
job_name: str
status_code: str
status: str
reason: str
\ No newline at end of file
from typing import List, Tuple, Union
from .jobs import SlurmJobStatus
import subprocess
class EmptyListException(Exception):
pass
class SlurmCallError(Exception):
pass
def __list_contains_valid_ids(ids_list):
if ids_list:
for item in ids_list:
if not isinstance(item, int) and not item.is_digit():
return False
return True
else:
raise EmptyListException()
def slurm_get_processes_status(job_ids: Union[List, Tuple] = ()):
cmd = ['squeue', '--states=all', '-h']
fmt = '%i;%j;%t;%T;%r'
cmd += ['--format=%s' % fmt]
if job_ids:
cmd += ','.join(job_ids)
else:
cmd += ['-a']
process_status = subprocess.run(cmd)
if process_status.returncode > 0:
raise SlurmCallError()
output = process_status.stdout
jobs_found = []
if process_status.stdout:
for line in output.split('\n'):
job_id, job_name, status_code, status, reason = line.split(';')
jobs_found.append(SlurmJobStatus(job_id=job_id,
job_name=job_name,
status_code=status_code,
status=status,
reason=reason))
return jobs_found
\ No newline at end of file
from airflow.plugins_manager import AirflowPlugin
from airflow.executors.base_executor import BaseExecutor
from airflow.utils.state import State
import subprocess
# Will show up under airflow.executors.slurm.SlurmExecutor
class SlurmExecutor(BaseExecutor):
def __init__(self):
super().__init__()
self.commands_to_run = []
def execute_async(self, key, command, queue=None, executor_config=None):
print("execute async called")
self.commands_to_run.append((key, command,))
def trigger_tasks(self, open_slots):
print('trigger tasks called', open_slots)
super().trigger_tasks(open_slots)
def sync(self):
for key, command in self.commands_to_run:
self.log.info("Executing command with key %s: %s", key, command)
try:
subprocess.check_call(command, close_fds=True)
self.change_state(key, State.SUCCESS)
except subprocess.CalledProcessError as e:
self.change_state(key, State.FAILED)
self.log.error("Failed to execute task %s.", str(e))
self.commands_to_run = []
def end(self):
self.heartbeat()
# Defining the plugin class
class SlurmExecutorPlugin(AirflowPlugin):
name = "slurm"
executors = [SlurmExecutor]
import sys
if __name__=='__main__':
print('output', _slurm_get_processes_status())
print('output', _slurm_get_processes_status(sys.argv[1:]))
from setuptools import setup
setup(
name="slurm-executor-plugin",
version='0.1',
description='Slurm executor plugin for AIRFLOW',
url='https://git.astron.nl/eosc/slurmexecutorplugin/',
package_dir = {'': 'lib'},
packages = ['slurm_cli', 'slurm_executor'],
entry_points = {
'airflow.plugins': [
'slurm = slurm_executor.slurm:SlurmExecutorPlugin'
]
}
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment