diff --git a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py index 984fbc4ecb428bb5e391d59175718c81a3f7464c..4609d07b7193fcfcd69d1fb5b43b870a18d1a2f0 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py +++ b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py @@ -17,7 +17,7 @@ import time import xml.dom.minidom as xml from lofarpipe.support.pipelinelogging import log_process_output -from lofarpipe.support.subprocess import spawn_process +from lofarpipe.support.subprocessgroup import SubProcessGroup from lofarpipe.support.lofarexceptions import PipelineQuit from lofarpipe.support.jobserver import job_server import lofarpipe.support.lofaringredient as ingredient @@ -33,8 +33,7 @@ def run_remote_command(config, logger, host, command, env, arguments = None, res Run command on host, passing it arguments from the arguments list and exporting key/value pairs from env(a dictionary). - Returns an object with poll() and communicate() methods, similar to - subprocess.Popen. + Returns an array of command line arguments to start. This is a generic interface to potentially multiple ways of running commands (SSH, mpirun, etc). The appropriate method is chosen from the @@ -66,16 +65,14 @@ def run_via_slurm_srun_cep3(logger, command, arguments, host): logger.debug("Dispatching command to %s with srun" % host) for arg in arguments: command = command + " " + str(arg) - commandstring = ["srun","-N 1","-n 1","-w",host, "/bin/sh", "-c", "hostname && " + command] + commandarray = ["srun","-N 1","-n 1","-w",host, "/bin/sh", "-c", "hostname && " + command] #commandstring = ["srun","-N 1","--cpu_bind=map_cpu:none","-w",host, "/bin/sh", "-c", "hostname && " + command] # we have a bug that crashes jobs when too many get startet at the same time # temporary NOT 100% reliable workaround #from random import randint #time.sleep(randint(0,10)) ########################## - process = spawn_process(commandstring, logger) - process.kill = lambda : os.kill(process.pid, signal.SIGKILL) - return process + return commandarray def run_via_mpirun(logger, host, command, environment, arguments): """ @@ -86,47 +83,35 @@ def run_via_mpirun(logger, host, command, environment, arguments): """ logger.debug("Dispatching command to %s with mpirun" % host) mpi_cmd = ["/usr/bin/mpirun", "-host", host] - for key in environment.keys(): - mpi_cmd.extend(["-x", key]) + for key,value in environment.iteritems(): + mpi_cmd.extend(["-x", "%s=%s" % (key,value)]) mpi_cmd.append("--") mpi_cmd.extend(command.split()) # command is split into (python, script) mpi_cmd.extend(str(arg) for arg in arguments) - env = os.environ - env.update(environment) - process = spawn_process(mpi_cmd, logger, env = env) - # mpirun should be killed with a SIGTERM to enable it to shut down the - # remote command. - process.kill = lambda : os.kill(process.pid, signal.SIGTERM) - return process + return mpi_cmd # let the mpi demon manage free resources to start jobs def run_via_mpiexec(logger, command, arguments, host): for arg in arguments: command = command + " " + str(arg) - commandstring = ["mpiexec", "-x", "-np=1", "/bin/sh", "-c", "hostname && " + command] - process = spawn_process(commandstring, logger) - process.kill = lambda : os.kill(process.pid, signal.SIGKILL) - return process + commandarray = ["mpiexec", "-x", "-np=1", "/bin/sh", "-c", "hostname && " + command] + return commandarray # start mpi run on cep # TODO: rsync fails on missing ssh key?? def run_via_mpiexec_cep(logger, command, arguments, host): for arg in arguments: command = command + " " + str(arg) - commandstring = ["mpiexec", "-x", "PYTHONPATH", "-x", "LD_LIBRARY_PATH", "-x", "PATH", "-H", host, "/bin/sh", "-c", "hostname ; " + command] - process = spawn_process(commandstring, logger) - process.kill = lambda : os.kill(process.pid, signal.SIGKILL) - return process + commandarray = ["mpiexec", "-x", "PYTHONPATH", "-x", "LD_LIBRARY_PATH", "-x", "PATH", "-H", host, "/bin/sh", "-c", "hostname ; " + command] + return commandarray def run_via_local(logger, command, arguments): - commandstring = ["/bin/sh", "-c"] + commandarray = ["/bin/sh", "-c"] for arg in arguments: command = command + " " + str(arg) - commandstring.append(command) - process = spawn_process(commandstring, logger) - process.kill = lambda : os.kill(process.pid, signal.SIGKILL) - return process + commandarray.append(command) + return commandarray def run_via_ssh(logger, host, command, environment, arguments): """ @@ -142,9 +127,7 @@ def run_via_ssh(logger, host, command, environment, arguments): commandstring.append(command) commandstring.extend(re.escape(str(arg)) for arg in arguments) ssh_cmd.append('"' + " ".join(commandstring) + '"') - process = spawn_process(ssh_cmd, logger) - process.kill = lambda : os.kill(process.pid, signal.SIGKILL) - return process + return ssh_cmd def run_via_custom_cmdline(logger, host, command, environment, arguments, config, resources): """ @@ -205,10 +188,7 @@ def run_via_custom_cmdline(logger, host, command, environment, arguments, config ).split(' ') logger.debug("Dispatching command to %s with custom_cmdline: %s" % (host, full_command_line)) - - process = spawn_process(full_command_line, logger) - process.kill = lambda : os.kill(process.pid, signal.SIGKILL) - return process + return full_command_line class ProcessLimiter(defaultdict): """ @@ -276,7 +256,7 @@ class ComputeJob(object): self.results['returncode'] = 1 error.set() return 1 - process = run_remote_command( + cmdarray = run_remote_command( config, logger, self.host, @@ -291,18 +271,12 @@ class ComputeJob(object): arguments = [id, jobhost, jobport], resources = self.resources ) - # Wait for process to finish. In the meantime, if the killswitch - # is set (by an exception in the main thread), forcibly kill our - # job off. - while process.poll() == None: - if killswitch.isSet(): - process.kill() - else: - time.sleep(1) - sout, serr = process.communicate() - serr = serr.replace("Connection to %s closed.\r\n" % self.host, "") - log_process_output("Remote command", sout, serr, logger) + # Run and wait for process to finish. + pg = SubProcessGroup(logger=logger, killSwitch=killswitch) + pg.run(cmdarray) + pg.wait_for_finish() + except Exception, e: logger.exception("Failed to run remote process %s (%s)" % (self.command, str(e))) self.results['returncode'] = 1 diff --git a/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py b/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py index 7dc11f4b406d49e10a212c23a9cb91fffe0db9eb..d5bb3292586890668bb4d1bea7699d1ee54f9b34 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py +++ b/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py @@ -1,7 +1,8 @@ import subprocess import select -import fcntl import os +import signal +import fcntl import time from lofarpipe.support.lofarexceptions import PipelineException @@ -20,6 +21,7 @@ class SubProcess(object): print line self.cmd = cmd + self.killed = False self.completed = False self.logger = logger.info if logger else print_logger @@ -84,6 +86,13 @@ class SubProcess(object): return True + def kill(self): + if self.killed: + return + + os.signal(self.pid, signal.SIGTERM) + self.killed = True + def fds(self): return self.output_streams.values() @@ -126,7 +135,8 @@ class SubProcessGroup(object): max_concurrent_processes = 8, # poll each 10 seconds: we have a mix of short and long # running processes - polling_interval = 10): + polling_interval = 10, + killSwitch = None): self.process_group = [] self.logger = logger self.usageStats = usageStats @@ -138,10 +148,17 @@ class SubProcessGroup(object): self.processes_waiting_for_execution = [] self.polling_interval = polling_interval + self.killSwitch = killSwitch + def _start_process(self, cmd, cwd): """ Helper function collection all the coded needed to start a process """ + + # Do nothing if we're stopping + if self.killSwitch and self.killSwitch.isSet(): + return + # About to start a process, increase the counter self.running_process_count += 1 @@ -193,6 +210,11 @@ class SubProcessGroup(object): # collect all unfinished processes processes = [p for p in self.process_group if not p.completed] + # check whether we're stopping + if self.killSwitch and self.killSwitch.isSet(): + for process in processes: + process.kill() + # collect fds we need to poll fds = [] for process in processes: diff --git a/CEP/Pipeline/framework/lofarpipe/support/utilities.py b/CEP/Pipeline/framework/lofarpipe/support/utilities.py index de7ec3e27956cfdca45d3183911eec56d57bbe12..37c883f128c058771b36882d19c9999eaa4d258a 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/utilities.py +++ b/CEP/Pipeline/framework/lofarpipe/support/utilities.py @@ -217,6 +217,9 @@ def string_to_list(my_string): def spawn_process(cmd, logger, cwd = None, env = None, max_tries = 2, max_timeout = 30): """ + DEPRECATED -- spawn_process leads to custom, and thus bad, output handling. Use + support.subprocessgroup.SubProcessGroup instead. + Tries to spawn a process. If it hits an OSError due to lack of memory or too many open files, it @@ -225,6 +228,9 @@ def spawn_process(cmd, logger, cwd = None, env = None, max_tries = 2, max_timeou If successful, the process object is returned. Otherwise, we eventually propagate the exception. """ + + logger.error("support.utilities.spawn_process is DEPRECATED. Please use support.subprocessgroup.SubProcessGroup") + trycounter = 0 while True: logger.debug(