diff --git a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py index c0665a181f613341704474fc64707c7f49a35a48..96ebacfad03eba34cd6450be9611924c78baffd9 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py +++ b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py @@ -61,7 +61,7 @@ class ParamikoWrapper(object): def kill(self): self.chan.close() -def run_remote_command(config, logger, host, command, env, arguments=None): +def run_remote_command(config, logger, host, command, env, arguments = None): """ Run command on host, passing it arguments from the arguments list and exporting key/value pairs from env(a dictionary). @@ -86,6 +86,12 @@ def run_remote_command(config, logger, host, command, env, arguments=None): return run_via_paramiko(logger, host, command, env, arguments, key_filename) elif method == "mpirun": return run_via_mpirun(logger, host, command, env, arguments) + elif method == "local": + return run_via_local(logger, command, arguments) + elif method == "juropa_mpi": + return run_via_mpiexec(logger, command, arguments, host) + elif method == "cep_mpi": + return run_via_mpiexec_cep(logger, command, arguments, host) else: return run_via_ssh(logger, host, command, env, arguments) @@ -101,16 +107,45 @@ def run_via_mpirun(logger, host, command, environment, arguments): for key in environment.keys(): mpi_cmd.extend(["-x", key]) mpi_cmd.append("--") - mpi_cmd.extend(command.split()) # command is split into (python, script) + mpi_cmd.extend(command.split()) # command is split into (python, script) mpi_cmd.extend(str(arg) for arg in arguments) env = os.environ env.update(environment) - process = spawn_process(mpi_cmd, logger, env=env) + process = spawn_process(mpi_cmd, logger, env = env) # mpirun should be killed with a SIGTERM to enable it to shut down the # remote command. process.kill = lambda : os.kill(process.pid, signal.SIGTERM) return process +# let the mpi demon manage free resources to start jobs +def run_via_mpiexec(logger, command, arguments, host): + for arg in arguments: + command = command + " " + str(arg) + commandstring = ["mpiexec", "-x", "-np=1", "/bin/sh", "-c", "hostname && " + command] + process = spawn_process(commandstring, logger) + process.kill = lambda : os.kill(process.pid, signal.SIGKILL) + return process + +# start mpi run on cep +# TODO: rsync fails on missing ssh key?? +def run_via_mpiexec_cep(logger, command, arguments, host): + for arg in arguments: + command = command + " " + str(arg) + commandstring = ["mpiexec", "-x", "PYTHONPATH", "-x", "LD_LIBRARY_PATH", "-x", "PATH", "-H", host, "/bin/sh", "-c", "hostname ; " + command] + process = spawn_process(commandstring, logger) + process.kill = lambda : os.kill(process.pid, signal.SIGKILL) + return process + + +def run_via_local(logger, command, arguments): + commandstring = ["/bin/sh", "-c"] + for arg in arguments: + command = command + " " + str(arg) + commandstring.append(command) + process = spawn_process(commandstring, logger) + process.kill = lambda : os.kill(process.pid, signal.SIGKILL) + return process + def run_via_ssh(logger, host, command, environment, arguments): """ Dispatch a remote command via SSH. @@ -139,7 +174,7 @@ def run_via_paramiko(logger, host, command, environment, arguments, key_filename import paramiko client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - client.connect(host, key_filename=key_filename) + client.connect(host, key_filename = key_filename) commandstring = ["%s=%s" % (key, value) for key, value in environment.items()] commandstring.append(command) commandstring.extend(re.escape(str(arg)) for arg in arguments) @@ -156,7 +191,7 @@ class ProcessLimiter(defaultdict): :param nproc: Bound value for semaphore (ie, maximum number of jobs) :type nproc: integer or none """ - def __init__(self, nproc=None): + def __init__(self, nproc = None): if nproc: super(ProcessLimiter, self).__init__( lambda: BoundedSemaphore(int(nproc)) @@ -181,12 +216,12 @@ class ComputeJob(object): :param command: Full path to command to be run on target host :param arguments: List of arguments which will be passed to command """ - def __init__(self, host, command, arguments=[]): + def __init__(self, host, command, arguments = []): self.host = host self.command = command self.arguments = arguments self.results = {} - self.results['returncode'] = 123456 # Default to obscure code to allow + self.results['returncode'] = 123456 # Default to obscure code to allow # test of failing ssh connections def dispatch(self, logger, config, limiter, id, jobhost, jobport, @@ -214,10 +249,11 @@ class ComputeJob(object): self.host, self.command, { + "PATH": os.environ.get('PATH'), "PYTHONPATH": os.environ.get('PYTHONPATH'), "LD_LIBRARY_PATH": os.environ.get('LD_LIBRARY_PATH') }, - arguments=[id, jobhost, jobport] + arguments = [id, jobhost, jobport] ) # Wait for process to finish. In the meantime, if the killswitch # is set (by an exception in the main thread), forcibly kill our @@ -296,7 +332,7 @@ class RemoteCommandRecipeMixIn(object): """ Mix-in for recipes to dispatch jobs using the remote command mechanism. """ - def _schedule_jobs(self, jobs, max_per_node=None): + def _schedule_jobs(self, jobs, max_per_node = None): """ Schedule a series of compute jobs. Blocks until completion. @@ -307,6 +343,8 @@ class RemoteCommandRecipeMixIn(object): """ threadpool = [] jobpool = {} + if not max_per_node and self.config.has_option('remote', 'max_per_node'): + max_per_node = self.config.getint('remote', 'max_per_node') limiter = ProcessLimiter(max_per_node) killswitch = threading.Event() @@ -319,8 +357,8 @@ class RemoteCommandRecipeMixIn(object): jobpool[job_id] = job threadpool.append( threading.Thread( - target=job.dispatch, - args=( + target = job.dispatch, + args = ( self.logger, self.config, limiter, job_id, jobhost, jobport, self.error, killswitch ) @@ -355,7 +393,7 @@ class RemoteCommandRecipeMixIn(object): # this allows backward compatible logging: If not read an additional # output does not matter self.outputs._fields["return_xml"] = ingredient.StringField( - help="XML return data.") - self.outputs["return_xml"] = node_durations.toxml(encoding="ascii") + help = "XML return data.") + self.outputs["return_xml"] = node_durations.toxml(encoding = "ascii") return jobpool diff --git a/CEP/Pipeline/recipes/sip/nodes/copier.py b/CEP/Pipeline/recipes/sip/nodes/copier.py index 65082bd88c83d57cf5608bef84590d925270fb12..a35f66e5d1c2bb038a7a933be57bc2f7a40f8683 100644 --- a/CEP/Pipeline/recipes/sip/nodes/copier.py +++ b/CEP/Pipeline/recipes/sip/nodes/copier.py @@ -50,7 +50,11 @@ class copier(LOFARnodeTCP): # construct copy command: Copy to the dir - command = ["rsync", "-r", + # if process runs on local host use a simple copy command. + if source_node=="localhost": + command = ["cp", "-r","{0}".format(source_path),"{0}".format(target_path)] + else: + command = ["rsync", "-r", "{0}:{1}/".format(source_node, source_path), "{0}".format(target_path)] diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py index 69b6ff2afd3c124062f11e9276fc224c19b0728b..c59b71b1be8a8b91552455045f1b0fb3f80c7219 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py @@ -10,6 +10,7 @@ import shutil import os import subprocess import copy +import pyrap.tables as pt # order of pyrap import influences the type conversion binding from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_time from lofarpipe.support.utilities import patch_parset @@ -19,7 +20,6 @@ from lofarpipe.support.utilities import create_directory from lofarpipe.support.data_map import DataMap from lofarpipe.support.subprocessgroup import SubProcessGroup -import pyrap.tables as pt # Some constant settings for the recipe _time_slice_dir_name = "time_slices" @@ -157,6 +157,10 @@ class imager_prepare(LOFARnodeTCP): if input_item.skip == True: exit_status = 1 # + # skip the copy if machine is the same (execution on localhost) + # make sure data is in the correct directory. for now: working_dir/[jobname]/subbands + if input_item.host == "localhost": + continue # construct copy command command = ["rsync", "-r", "{0}:{1}".format( input_item.host, input_item.file), diff --git a/CEP/Pipeline/recipes/sip/pipeline.cfg.in b/CEP/Pipeline/recipes/sip/pipeline.cfg.in index beaac19b51f26d7a558443b95a1ecb6aeaac725c..8732c7a30820b5cd5c51e3140dc170f37a663c5d 100644 --- a/CEP/Pipeline/recipes/sip/pipeline.cfg.in +++ b/CEP/Pipeline/recipes/sip/pipeline.cfg.in @@ -22,4 +22,4 @@ engine_lpath = %(lofarroot)s/lib:%(casaroot)s/lib:%(pyraproot)s/lib:%(hdf5root)s [logging] log_file = %(runtime_directory)s/%(job_name)s/logs/%(start_time)s/pipeline.log -xml_stat_file = %(runtime_directory)s/%(job_name)s/logs/%(start_time)s/statistics.xml \ No newline at end of file +xml_stat_file = %(runtime_directory)s/%(job_name)s/logs/%(start_time)s/statistics.xml diff --git a/CEP/Pipeline/test/regression_tests/target_pipeline.py b/CEP/Pipeline/test/regression_tests/target_pipeline.py index edc2529ad0456c81b1d68fb35f75333ea5aeede0..4979b9dfb7c69440d51a798fc71b25d0edeaee0e 100644 --- a/CEP/Pipeline/test/regression_tests/target_pipeline.py +++ b/CEP/Pipeline/test/regression_tests/target_pipeline.py @@ -26,8 +26,8 @@ def load_and_compare_data_sets(ms1, ms2): div_array[idx][0][idy] = div_value print "maximum different value between measurement sets: {0}".format(div_max) - - if div_max != 0: + # Use a delta of about float precision + if div_max > 1e-6: print "The measurement sets are contained a different value" print "failed delta test!" return False