Skip to content
Snippets Groups Projects
Commit 68d6c962 authored by Stefan Froehlich's avatar Stefan Froehlich
Browse files

Task #9800: support for remote clusters in Hertfordshire and Juelich. changes...

Task #9800: support for remote clusters in Hertfordshire and Juelich. changes in command construction and special handling of node setup.
parent 78448689
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,7 @@ from lofarpipe.support.lofarexceptions import PipelineQuit
from lofarpipe.support.jobserver import job_server
import lofarpipe.support.lofaringredient as ingredient
from lofarpipe.support.xmllogging import add_child
from subprocess import Popen, PIPE
# By default, Linux allocates lots more memory than we need(?) for a new stack
# frame. When multiplexing lots of threads, that will cause memory issues.
......@@ -58,9 +59,28 @@ def run_remote_command(config, logger, host, command, env, arguments = None, res
return run_via_slurm_srun_cep3(logger, command, arguments, host)
elif method == "custom_cmdline":
return run_via_custom_cmdline(logger, host, command, env, arguments, config, resources)
# Hertfordshire cluster
elif method == "pbs_ssh":
return run_via_ssh(logger, host, command, env, arguments)
# Jureca HPC
elif method == "slurm_srun":
return run_via_slurm_srun(logger, command, arguments, host)
else:
return run_via_ssh(logger, host, command, env, arguments)
def run_via_slurm_srun(logger, command, arguments, host):
for arg in arguments:
command = command + " " + str(arg)
commandarray = ["srun", "-N 1", "--cpu_bind=map_cpu:none", "-w", host, "/bin/sh", "-c", "hostname && " + command]
# we have a bug that crashes jobs when too many get startet at the same time
# temporary NOT 100% reliable workaround
from random import randint
time.sleep(randint(1, 20)/2.)
##########################
return commandarray
def run_via_slurm_srun_cep3(logger, command, arguments, host):
logger.debug("Dispatching command to %s with srun" % host)
for arg in arguments:
......@@ -365,6 +385,62 @@ class RemoteCommandRecipeMixIn(object):
if max_per_node:
self.logger.info("Limiting to %d simultaneous jobs/node" % max_per_node)
# External cluster stuff
try:
method = self.config.get('remote', 'method')
except:
method = None
# JURECA SLURM
if method == 'slurm_srun':
nodeliststr = []
hargs = ['srun','hostname']
proc = Popen(hargs, False, stdout=PIPE, stderr=None)
tup = proc.communicate()
nodeliststr = tup[0].rstrip('\n').split('\n')
# remove duplicates. order not important
nodeliststr = list(set(nodeliststr))
# equal distribution
total = len(jobs)
# when nodes crash? length of slurm_nodelist and env slurm_nnodes dont match anymore
nnodes = len(nodeliststr)
# round robin
nodelist = []
for i in range(total):
nodelist.append(nodeliststr[i%nnodes])
for i, job in enumerate(jobs):
job.host = nodelist[i]
# Hertfordshire cluster
if method == 'pbs_ssh':
# special case - get the list of nodes from the pbs job
nodeliststr = []
try:
filename = os.environ['PBS_NODEFILE']
except KeyError:
self.logger.error('PBS_NODEFILE not found.')
raise PipelineQuit()
with open(filename, 'r') as file:
for line in file:
node_name = line.split()[0]
if node_name not in nodeliststr:
nodeliststr.append(node_name)
# equal distribution
total = len(jobs)
# when nodes crash? length of slurm_nodelist and env slurm_nnodes dont match anymore
nnodes = len(nodeliststr)
# round robin
nodelist = []
for i in range(total):
nodelist.append(nodeliststr[i%nnodes])
for i, job in enumerate(jobs):
job.host = nodelist[i]
with job_server(self.logger, jobpool, self.error) as (jobhost, jobport):
self.logger.debug("Job dispatcher at %s:%d" % (jobhost, jobport))
for job_id, job in enumerate(jobs):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment