diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 9dddc258c17f751963942f834ec350ad27a374d5..96cda4813ff7f8e31141d0485608a528d30c173a 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -1,7 +1,6 @@ #!/usr/bin/env python -#coding: iso-8859-15 # -# Copyright (C) 2015 +# Copyright (C) 2016 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # @@ -169,9 +168,9 @@ class Parset(dict): runCommand("docker-template", "${LOFAR_TAG}")) def slurmJobName(self): - return str(self.treeId()) + return str(self.otdbId()) - def treeId(self): + def otdbId(self): return int(self[PARSET_PREFIX + "Observation.otdbID"]) class Slurm(object): @@ -277,21 +276,21 @@ class PipelineControl(OTDBBusListener): def _slurmJobIds(self, parsets): return [self.slurm.jobid(p.slurmJobName()) for p in parsets] - def _getParset(self, treeId): - return Parset(self.parset_rpc( OtdbID=treeId, timeout=10 )[0]) + def _getParset(self, otdbId): + return Parset(self.parset_rpc( OtdbID=otdbId, timeout=10 )[0]) def _getPredecessorParsets(self, parset): - treeIds = parset.predecessors() + otdbIds = parset.predecessors() - logger.info("Obtaining predecessor parsets %s", treeIds) + logger.info("Obtaining predecessor parsets %s", otdbIds) - return [self._getParset(treeId) for treeId in treeIds] + return [self._getParset(otdbId) for otdbId in otdbIds] - def onObservationAborted(self, treeId, modificationTime): - logger.info("***** STOP Tree ID %s *****", treeId) + def onObservationAborted(self, otdbId, modificationTime): + logger.info("***** STOP Otdb ID %s *****", otdbId) # Request the parset - parset = self._getParset(treeId) + parset = self._getParset(otdbId) if not self._shouldHandle(parset): return @@ -323,11 +322,11 @@ class PipelineControl(OTDBBusListener): return result - def onObservationScheduled(self, treeId, modificationTime): - logger.info("***** QUEUE Tree ID %s *****", treeId) + def onObservationScheduled(self, otdbId, modificationTime): + logger.info("***** QUEUE Otdb ID %s *****", otdbId) # Request the parset - parset = self._getParset(treeId) + parset = self._getParset(otdbId) if not self._shouldHandle(parset): return @@ -363,8 +362,8 @@ class PipelineControl(OTDBBusListener): "--nodes=50", # Define better places to write the output - os.path.expandvars("--error=$LOFARROOT/var/log/docker-startPython-%s.stderr" % (treeId,)), - os.path.expandvars("--output=$LOFARROOT/var/log/docker-startPython-%s.log" % (treeId,)), + os.path.expandvars("--error=$LOFARROOT/var/log/docker-startPython-%s.stderr" % (otdbId,)), + os.path.expandvars("--output=$LOFARROOT/var/log/docker-startPython-%s.log" % (otdbId,)), ] min_starttime = self._minStartTime([x for x in preparsets if x.isObservation()]) @@ -390,7 +389,7 @@ class PipelineControl(OTDBBusListener): " runPipeline.sh -o {obsid} -c /opt/lofar/share/pipeline/pipeline.cfg.{cluster} -B {status_bus}" .format( lofarenv = os.environ.get("LOFARENV", ""), - obsid = treeId, + obsid = otdbId, tag = parset.dockerTag(), cluster = parset.processingCluster(), status_bus = self.setStatus_busname, @@ -412,7 +411,7 @@ class PipelineControl(OTDBBusListener): " pipelineAborted.sh -o {obsid} -B {status_bus}" .format( lofarenv = os.environ.get("LOFARENV", ""), - obsid = treeId, + obsid = otdbId, tag = parset.dockerTag(), status_bus = self.setStatus_busname, ), @@ -436,7 +435,7 @@ class PipelineControl(OTDBBusListener): # TODO: How to avoid race condition with runPipeline.sh setting the status to STARTED # when the SLURM job starts running? logger.info("Setting status to QUEUED") - self._setStatus(treeId, "queued") + self._setStatus(otdbId, "queued") logger.info("Pipeline processed.") diff --git a/MAC/Services/src/pipelinecontrol b/MAC/Services/src/pipelinecontrol index 8962d1b4f9ef46d2102f1d7b999a5c31a77fef1b..2ad9e0fde0b503d1759e1ff0b743dba0b80207f0 100644 --- a/MAC/Services/src/pipelinecontrol +++ b/MAC/Services/src/pipelinecontrol @@ -1,7 +1,6 @@ #!/usr/bin/env python -#coding: iso-8859-15 # -# Copyright (C) 2015 +# Copyright (C) 2016 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # diff --git a/MAC/Services/test/tPipelineControl.py b/MAC/Services/test/tPipelineControl.py index c3899f508ecc108a375ef5fcc48b168c44c4e2da..aecd19fc8500fd94519982cddadf29c5187a22d6 100644 --- a/MAC/Services/test/tPipelineControl.py +++ b/MAC/Services/test/tPipelineControl.py @@ -1,6 +1,5 @@ #!/usr/bin/env python -# Be able to find service python file import sys from lofar.mac.PipelineControl import * @@ -211,7 +210,7 @@ class TestPipelineControl(unittest.TestCase): self.assertTrue(self.trigger.wait()) # Verify message - self.assertEqual(self.trigger.args[0], 3) # treeId + self.assertEqual(self.trigger.args[0], 3) # otdbId # Check if job was scheduled self.assertIn("3", ps.slurm.scheduled_jobs) @@ -233,7 +232,7 @@ class TestPipelineControl(unittest.TestCase): self.assertTrue(self.trigger.wait()) # Verify message - self.assertEqual(self.trigger.args[0], 1) # treeId + self.assertEqual(self.trigger.args[0], 1) # otdbId # Check if job was scheduled self.assertIn("1", ps.slurm.scheduled_jobs)