diff --git a/Docker/docker-template b/Docker/docker-template index 7b3987abe45f25ea7a9beadc8595220d3c43eebf..53be24bc26b2e7d7cd7bd500e20a49c978a19df8 100755 --- a/Docker/docker-template +++ b/Docker/docker-template @@ -44,7 +44,8 @@ while getopts "hv:" opt; do done # Make sure we obtain info about the project source! -VERSION_INFO=`$VERSION_DOCKER` +# Drop stderr to prevent logger output from contaminating our output +VERSION_INFO=`$VERSION_DOCKER 2>/dev/null` # Extract branch name w.r.t. repository root, e.g. branches/LOFAR-Task1234 export LOFAR_BRANCH_NAME=`echo "$VERSION_INFO" | perl -ne 'print "$1" if /branch += +(.+)/;'` diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 6452fe0f5bc649fa0043afb2301a4ba68094b030..40517b11a80c7d974cea4cf13e8a655c38116614 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -77,7 +77,7 @@ import logging logger = logging.getLogger(__name__) def runCommand(cmdline, input=None): - logger.info("Running '%s'", cmdline) + logger.info("runCommand starting: %s", cmdline) # Start command proc = subprocess.Popen( @@ -90,8 +90,9 @@ def runCommand(cmdline, input=None): ) # Feed input and wait for termination + logger.info("runCommand input: %s", input) stdout, _ = proc.communicate(input) - logger.debug(stdout) + logger.info("runCommand output: %s", stdout) # Check exit status, bail on error if proc.returncode != 0: @@ -124,6 +125,10 @@ class Parset(dict): def processingCluster(self): return self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterName"] or "CEP2" + @staticmethod + def dockerRepository(): + return "nexus.cep4.control.lofar:18080" + @staticmethod def defaultDockerImage(): return runCommand("docker-template", "lofar-pipeline:${LOFAR_TAG}") @@ -146,15 +151,19 @@ class Slurm(object): # TODO: Derive SLURM partition name self.partition = "cpu" - def _runCommand(self, cmdline): + def _runCommand(self, cmdline, input=None): cmdline = "ssh %s %s" % (self.headnode, cmdline) - runCommand(cmdline) + return runCommand(cmdline, input) def submit(self, jobName, cmdline, sbatch_params=None): if sbatch_params is None: sbatch_params = [] - stdout = self._runCommand("sbatch --partition=%s --job-name=%s %s bash -c '%s'" % (self.partition, jobName, " ".join(sbatch_params), cmdline)) + script = """#!/bin/bash +{cmdline} +""".format(cmdline = cmdline) + + stdout = self._runCommand("sbatch --partition=%s --job-name=%s %s" % (self.partition, jobName, " ".join(sbatch_params)), script) # Returns "Submitted batch job 3" -- extract ID match = re.search("Submitted batch job (\d+)", stdout) @@ -327,7 +336,7 @@ class PipelineControl(OTDBBusListener): "--time=31-0", # TODO: Compute nr nodes - "--nodes=50", + "--nodes=24", # Define better places to write the output os.path.expandvars("--error=/data/log/runPipeline-%s.stderr" % (otdbId,)), @@ -385,11 +394,13 @@ class PipelineControl(OTDBBusListener): ), sbatch_params=[ - "--cpus-per=task=1", - "--ntasks=1" + "--cpus-per-task=1", + "--ntasks=1", "--dependency=afternotok:%s" % slurm_job_id, "--kill-on-invalid-dep=yes", "--requeue", + "--error=/data/log/pipelineAborted-%s.stderr" % (otdbId,), + "--output=/data/log/pipelineAborted-%s.log" % (otdbId,), ] ) logger.info("Scheduled SLURM job %s" % (slurm_cancel_job_id,))