diff --git a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py index 9bde0ca9846040d236d8a90484a1473e74883a37..cc20068080698af3cf355fec6060d4c8236c91ed 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py +++ b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py @@ -22,6 +22,7 @@ from lofarpipe.support.lofarexceptions import PipelineQuit from lofarpipe.support.jobserver import job_server import lofarpipe.support.lofaringredient as ingredient from lofarpipe.support.xmllogging import add_child +from subprocess import Popen, PIPE # By default, Linux allocates lots more memory than we need(?) for a new stack # frame. When multiplexing lots of threads, that will cause memory issues. @@ -58,9 +59,28 @@ def run_remote_command(config, logger, host, command, env, arguments = None, res return run_via_slurm_srun_cep3(logger, command, arguments, host) elif method == "custom_cmdline": return run_via_custom_cmdline(logger, host, command, env, arguments, config, resources) + # Hertfordshire cluster + elif method == "pbs_ssh": + return run_via_ssh(logger, host, command, env, arguments) + # Jureca HPC + elif method == "slurm_srun": + return run_via_slurm_srun(logger, command, arguments, host) else: return run_via_ssh(logger, host, command, env, arguments) + +def run_via_slurm_srun(logger, command, arguments, host): + for arg in arguments: + command = command + " " + str(arg) + commandarray = ["srun", "-N 1", "--cpu_bind=map_cpu:none", "-w", host, "/bin/sh", "-c", "hostname && " + command] + # we have a bug that crashes jobs when too many get startet at the same time + # temporary NOT 100% reliable workaround + from random import randint + time.sleep(randint(1, 20)/2.) + ########################## + return commandarray + + def run_via_slurm_srun_cep3(logger, command, arguments, host): logger.debug("Dispatching command to %s with srun" % host) for arg in arguments: @@ -365,6 +385,62 @@ class RemoteCommandRecipeMixIn(object): if max_per_node: self.logger.info("Limiting to %d simultaneous jobs/node" % max_per_node) + # External cluster stuff + try: + method = self.config.get('remote', 'method') + except: + method = None + # JURECA SLURM + if method == 'slurm_srun': + nodeliststr = [] + hargs = ['srun','hostname'] + proc = Popen(hargs, False, stdout=PIPE, stderr=None) + tup = proc.communicate() + nodeliststr = tup[0].rstrip('\n').split('\n') + # remove duplicates. order not important + nodeliststr = list(set(nodeliststr)) + + # equal distribution + total = len(jobs) + # when nodes crash? length of slurm_nodelist and env slurm_nnodes dont match anymore + nnodes = len(nodeliststr) + # round robin + nodelist = [] + for i in range(total): + nodelist.append(nodeliststr[i%nnodes]) + + for i, job in enumerate(jobs): + job.host = nodelist[i] + + # Hertfordshire cluster + if method == 'pbs_ssh': + # special case - get the list of nodes from the pbs job + nodeliststr = [] + + try: + filename = os.environ['PBS_NODEFILE'] + except KeyError: + self.logger.error('PBS_NODEFILE not found.') + raise PipelineQuit() + + with open(filename, 'r') as file: + for line in file: + node_name = line.split()[0] + if node_name not in nodeliststr: + nodeliststr.append(node_name) + + # equal distribution + total = len(jobs) + # when nodes crash? length of slurm_nodelist and env slurm_nnodes dont match anymore + nnodes = len(nodeliststr) + # round robin + nodelist = [] + for i in range(total): + nodelist.append(nodeliststr[i%nnodes]) + + for i, job in enumerate(jobs): + job.host = nodelist[i] + with job_server(self.logger, jobpool, self.error) as (jobhost, jobport): self.logger.debug("Job dispatcher at %s:%d" % (jobhost, jobport)) for job_id, job in enumerate(jobs):