Skip to content
Snippets Groups Projects
Commit e8a4b35f authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: fixed some typo's. made sure we choose sane defaults in case...

Task #9607: fixed some typo's. made sure we choose sane defaults in case incorrect processing keys are specified. Added check on startup for already scheduled pipelines if they can be handed to slurm
parent da106019
No related branches found
No related tags found
No related merge requests found
......@@ -130,13 +130,23 @@ class Parset(dict):
return self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterName"] or "CEP2"
def processingPartition(self):
return self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterPartition"] or "cpu"
result = self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterPartition"] or "cpu"
if '/' in result:
logger.error('clusterPartition contains invalid value: %s. Defaulting clusterPartition to \'cpu\'', result)
return 'cpu'
return result
def processingNumberOfCoresPerTask(self):
return int(self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"]) or "20"
result = int(self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"]) or "20"
if result < 1 or result > 20:
logger.warn('Invalid Observation.Cluster.ProcessingCluster.numberOfCoresPerTask: %s, defaulting to %s', result, max(1, min(20, result)))
return max(1, min(20, result))
def processingNumberOfTasks(self):
return int(self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.numberOfTasks"]) or "24"
result = int(self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.numberOfTasks"]) or "24"
if result < 1 or result > 48:
logger.warn('Invalid Observation.Cluster.ProcessingCluster.numberOfTasks: %s, defaulting to %s', result, max(1, min(48, result)))
return max(1, min(48, result))
@staticmethod
def dockerRepository():
......@@ -246,7 +256,7 @@ class PipelineDependencies(object):
raise TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,))
successor_radb_ids = radb_task['successor_ids']
successor_tasks = self.rarpc.getTasks(task_ids=successor_ids) if successor_radb_ids else []
successor_tasks = self.rarpc.getTasks(task_ids=successor_radb_ids) if successor_radb_ids else []
successor_otdb_ids = [t["otdb_id"] for t in successor_tasks]
logger.debug("getSuccessorIds(%s) = %s", otdb_id, successor_otdb_ids)
......@@ -294,12 +304,37 @@ class PipelineControl(OTDBBusListener):
super(PipelineControl, self).start_listening(**kwargs)
self._checkScheduledPipelines()
def stop_listening(self, **kwargs):
super(PipelineControl, self).stop_listening(**kwargs)
self.dependencies.close()
self.otdbrpc.close()
def _checkScheduledPipelines(self):
try:
scheduled_pipelines = self.dependencies.rarpc.getTasks(task_status='scheduled', task_type='pipeline')
logger.info("Checking %s scheduled pipelines if they can start.", len(scheduled_pipelines))
for pipeline in scheduled_pipelines:
logger.info("Checking if scheduled pipeline otdbId=%s can start.", pipeline['otdb_id'])
try:
otdbId = pipeline['otdb_id']
parset = self._getParset(otdbId)
if not self._shouldHandle(parset):
return
# Maybe the pipeline can start already
if self.dependencies.canStart(otdbId):
self._startPipeline(otdbId, parset)
else:
logger.info("Job %s was set to scheduled, but cannot start yet.", otdbId)
except Exception as e:
logger.error(e)
except Exception as e:
logger.error(e)
@staticmethod
def _shouldHandle(parset):
if not parset.isPipeline():
......@@ -328,6 +363,8 @@ class PipelineControl(OTDBBusListener):
logger.info("Pipeline %s is already queued or running in SLURM.", otdbId)
return
logger.info("***** START Otdb ID %s *****", otdbId)
# Determine SLURM parameters
sbatch_params = [
# Only run job if all nodes are ready
......@@ -344,11 +381,11 @@ class PipelineControl(OTDBBusListener):
# Lower priority to drop below inspection plots
"--nice=1000",
"--partition=%s" % parset.processingPartition(),
"--nodes=%s" % parset.processingNumberOfTasks(),
"--cpus-per-task=%s" % parset.processingNumberOfCoresPerTask(),
# Define better places to write the output
os.path.expandvars("--output=/data/log/pipeline-%s-%%j.log" % (otdbId,)),
]
......@@ -391,7 +428,7 @@ class PipelineControl(OTDBBusListener):
" -e SLURM_JOB_ID=$SLURM_JOB_ID"
" -v /data:/data"
" {image}"
" runPipeline.sh -o {obsid} -c /opt/lofar/share/pipeline/pipeline.cfg.{cluster} -P /data/parsets || exit $?\n"
" runPipeline.sh -o {obsid} -c /opt/lofar/share/pipeline/pipeline.cfg.{cluster} || exit $?\n"
# notify that we're tearing down
"{setStatus_completing}\n"
......@@ -458,7 +495,7 @@ class PipelineControl(OTDBBusListener):
def _startSuccessors(self, otdbId):
try:
successor_ids = self.dependencies.getSuccessorIds(otdbId)
except TaskNotFoundException, e:
except PipelineDependencies.TaskNotFoundException, e:
logger.error("_startSuccessors(%s): Error obtaining task successors, not starting them: %s", otdbId, e)
return
......@@ -468,7 +505,6 @@ class PipelineControl(OTDBBusListener):
continue
if self.dependencies.canStart(s):
logger.info("***** START Otdb ID %s *****", otdbId)
self._startPipeline(s, parset)
else:
logger.info("Job %s still cannot start yet.", otdbId)
......@@ -480,7 +516,6 @@ class PipelineControl(OTDBBusListener):
# Maybe the pipeline can start already
if self.dependencies.canStart(otdbId):
logger.info("***** START Otdb ID %s *****", otdbId)
self._startPipeline(otdbId, parset)
else:
logger.info("Job %s was set to scheduled, but cannot start yet.", otdbId)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment