Skip to content
Snippets Groups Projects
Commit e7a9fa2d authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #8887: Fixes for docker/slurm command line, and some minor tweaking

parent b2e1c38c
No related branches found
No related tags found
No related merge requests found
...@@ -44,7 +44,8 @@ while getopts "hv:" opt; do ...@@ -44,7 +44,8 @@ while getopts "hv:" opt; do
done done
# Make sure we obtain info about the project source! # Make sure we obtain info about the project source!
VERSION_INFO=`$VERSION_DOCKER` # Drop stderr to prevent logger output from contaminating our output
VERSION_INFO=`$VERSION_DOCKER 2>/dev/null`
# Extract branch name w.r.t. repository root, e.g. branches/LOFAR-Task1234 # Extract branch name w.r.t. repository root, e.g. branches/LOFAR-Task1234
export LOFAR_BRANCH_NAME=`echo "$VERSION_INFO" | perl -ne 'print "$1" if /branch += +(.+)/;'` export LOFAR_BRANCH_NAME=`echo "$VERSION_INFO" | perl -ne 'print "$1" if /branch += +(.+)/;'`
......
...@@ -77,7 +77,7 @@ import logging ...@@ -77,7 +77,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def runCommand(cmdline, input=None): def runCommand(cmdline, input=None):
logger.info("Running '%s'", cmdline) logger.info("runCommand starting: %s", cmdline)
# Start command # Start command
proc = subprocess.Popen( proc = subprocess.Popen(
...@@ -90,8 +90,9 @@ def runCommand(cmdline, input=None): ...@@ -90,8 +90,9 @@ def runCommand(cmdline, input=None):
) )
# Feed input and wait for termination # Feed input and wait for termination
logger.info("runCommand input: %s", input)
stdout, _ = proc.communicate(input) stdout, _ = proc.communicate(input)
logger.debug(stdout) logger.info("runCommand output: %s", stdout)
# Check exit status, bail on error # Check exit status, bail on error
if proc.returncode != 0: if proc.returncode != 0:
...@@ -124,6 +125,10 @@ class Parset(dict): ...@@ -124,6 +125,10 @@ class Parset(dict):
def processingCluster(self): def processingCluster(self):
return self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterName"] or "CEP2" return self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterName"] or "CEP2"
@staticmethod
def dockerRepository():
return "nexus.cep4.control.lofar:18080"
@staticmethod @staticmethod
def defaultDockerImage(): def defaultDockerImage():
return runCommand("docker-template", "lofar-pipeline:${LOFAR_TAG}") return runCommand("docker-template", "lofar-pipeline:${LOFAR_TAG}")
...@@ -146,15 +151,19 @@ class Slurm(object): ...@@ -146,15 +151,19 @@ class Slurm(object):
# TODO: Derive SLURM partition name # TODO: Derive SLURM partition name
self.partition = "cpu" self.partition = "cpu"
def _runCommand(self, cmdline): def _runCommand(self, cmdline, input=None):
cmdline = "ssh %s %s" % (self.headnode, cmdline) cmdline = "ssh %s %s" % (self.headnode, cmdline)
runCommand(cmdline) return runCommand(cmdline, input)
def submit(self, jobName, cmdline, sbatch_params=None): def submit(self, jobName, cmdline, sbatch_params=None):
if sbatch_params is None: if sbatch_params is None:
sbatch_params = [] sbatch_params = []
stdout = self._runCommand("sbatch --partition=%s --job-name=%s %s bash -c '%s'" % (self.partition, jobName, " ".join(sbatch_params), cmdline)) script = """#!/bin/bash
{cmdline}
""".format(cmdline = cmdline)
stdout = self._runCommand("sbatch --partition=%s --job-name=%s %s" % (self.partition, jobName, " ".join(sbatch_params)), script)
# Returns "Submitted batch job 3" -- extract ID # Returns "Submitted batch job 3" -- extract ID
match = re.search("Submitted batch job (\d+)", stdout) match = re.search("Submitted batch job (\d+)", stdout)
...@@ -327,7 +336,7 @@ class PipelineControl(OTDBBusListener): ...@@ -327,7 +336,7 @@ class PipelineControl(OTDBBusListener):
"--time=31-0", "--time=31-0",
# TODO: Compute nr nodes # TODO: Compute nr nodes
"--nodes=50", "--nodes=24",
# Define better places to write the output # Define better places to write the output
os.path.expandvars("--error=/data/log/runPipeline-%s.stderr" % (otdbId,)), os.path.expandvars("--error=/data/log/runPipeline-%s.stderr" % (otdbId,)),
...@@ -385,11 +394,13 @@ class PipelineControl(OTDBBusListener): ...@@ -385,11 +394,13 @@ class PipelineControl(OTDBBusListener):
), ),
sbatch_params=[ sbatch_params=[
"--cpus-per=task=1", "--cpus-per-task=1",
"--ntasks=1" "--ntasks=1",
"--dependency=afternotok:%s" % slurm_job_id, "--dependency=afternotok:%s" % slurm_job_id,
"--kill-on-invalid-dep=yes", "--kill-on-invalid-dep=yes",
"--requeue", "--requeue",
"--error=/data/log/pipelineAborted-%s.stderr" % (otdbId,),
"--output=/data/log/pipelineAborted-%s.log" % (otdbId,),
] ]
) )
logger.info("Scheduled SLURM job %s" % (slurm_cancel_job_id,)) logger.info("Scheduled SLURM job %s" % (slurm_cancel_job_id,))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment