Skip to content
Snippets Groups Projects

[RAP-318] Additional cleanup of leftover files by e.g. Toil

Merged [RAP-318] Additional cleanup of leftover files by e.g. Toil
All threads resolved!
Merged Frits Sweijen requested to merge RAP-318 into master
All threads resolved!
+ 19
2
@@ -4,6 +4,7 @@ Classes that wrap the CWL runners that Rapthor supports.
from __future__ import annotations
import logging
import os
import shutil
import subprocess
from typing import TYPE_CHECKING
@@ -103,6 +104,7 @@ class CWLRunner:
if self.operation.field.use_mpi:
self._delete_mpi_config_file()
def run(self) -> bool:
"""
Start the runner in a subprocess.
@@ -155,8 +157,8 @@ class ToilRunner(CWLRunner):
"""
super().setup()
self.args.extend(['--batchSystem', self.operation.batch_system])
self.args.extend(['--bypass-file-store'])
if self.operation.batch_system == 'slurm':
self.args.extend(['--bypass-file-store'])
self.args.extend(['--disableCaching'])
self.args.extend(['--defaultCores', str(self.operation.cpus_per_task)])
self.args.extend(['--defaultMemory', self.operation.mem_per_node_gb])
@@ -187,7 +189,6 @@ class ToilRunner(CWLRunner):
self.args.extend(['--coordinationDir', self.operation.coordination_dir])
self.args.extend(['--clean', 'never']) # preserves the job store for future runs
self.args.extend(['--servicePollingInterval', '10'])
self.args.extend(['--stats'])
if self.operation.debug_workflow:
self.args.extend(['--cleanWorkDir', 'never'])
self.args.extend(['--debugWorker']) # NOTE: stdout/stderr are not redirected to the log
@@ -201,6 +202,17 @@ class ToilRunner(CWLRunner):
for key in self.toil_env_variables:
del os.environ[key]
super().teardown()
if not self.operation.debug_workflow:
# Use the logs to find the temporary directory we ran in.
workerlogs = os.popen("grep 'Redirecting logging to' " + os.path.join(self.operation.log_dir, 'pipeline.log') + " | awk '{print $NF}'").read()
leftover_tempdirs = []
for f in workerlogs.splitlines():
tempdir = '/'.join(f.split('/')[:-2])
if tempdir not in leftover_tempdirs:
leftover_tempdirs.append(tempdir)
for t in leftover_tempdirs:
logger.debug('Cleaning up temporary directory {:s} of {:s}'.format(t, self.operation.name))
shutil.rmtree(t, ignore_errors=True)
class CWLToolRunner(CWLRunner):
@@ -224,6 +236,11 @@ class CWLToolRunner(CWLRunner):
self.args.extend(['--tmp-outdir-prefix', prefix])
if self.operation.debug_workflow:
self.args.extend(["--debug"])
self.args.extend(["--leave-tmpdir"])
def teardown(self) -> None:
return super().teardown()
def create_cwl_runner(runner: str, operation: Operation) -> CWLRunner:
Loading