Skip to content
Snippets Groups Projects
Commit 63d8f36d authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #8437: Add support for global fs in node scripts to prevent rsyncing...

Task #8437: Add support for global fs in node scripts to prevent rsyncing files within the same filesystem
parent 706dd95c
No related branches found
No related tags found
No related merge requests found
......@@ -236,6 +236,8 @@ class copier(MasterNodeInterface):
self.logger.info("Starting copier run")
super(copier, self).go()
globalfs = self.config.has_option("remote", "globalfs") and self.config.getboolean("remote", "globalfs")
# Load data from mapfiles
self.source_map = DataMap.load(self.inputs['mapfile_source'])
self.target_map = DataMap.load(self.inputs['mapfile_target'])
......@@ -246,8 +248,8 @@ class copier(MasterNodeInterface):
# Run the compute nodes with the node specific mapfiles
for source, target in zip(self.source_map, self.target_map):
args = [source.host, source.file, target.file]
self.append_job(target.host, args)
args = [source.host, source.file, target.file, globalfs]
self.append_job(target.host args)
# start the jobs, return the exit status.
return self.run_jobs()
......
......@@ -160,6 +160,8 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn):
paths_to_image_mapfiles = []
n_subband_groups = len(output_map) # needed for subsets in sb list
globalfs = self.config.has_option("remote", "globalfs") and self.config.getboolean("remote", "globalfs")
for idx_sb_group, item in enumerate(output_map):
#create the input files for this node
self.logger.debug("Creating input data subset for processing"
......@@ -203,7 +205,8 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn):
self.inputs['msselect_executable'],
self.inputs['rficonsole_executable'],
self.inputs['do_rficonsole'],
self.inputs['add_beam_tables']]
self.inputs['add_beam_tables'],
globalfs]
jobs.append(ComputeJob(item.host, node_command, arguments))
......
......@@ -166,6 +166,8 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
paths_to_image_mapfiles = []
n_subband_groups = len(output_map)
globalfs = self.config.has_option("remote", "globalfs") and self.config.getboolean("remote", "globalfs")
output_map.iterator = final_output_map.iterator = DataMap.SkipIterator
for idx_sb_group, (output_item, final_item) in enumerate(zip(output_map,
final_output_map)):
......@@ -204,6 +206,7 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
self.inputs['msselect_executable'],
self.inputs['rficonsole_executable'],
self.inputs['add_beam_tables'],
globalfs,
final_item.file]
jobs.append(ComputeJob(output_item.host, node_command, arguments))
......
......@@ -21,7 +21,8 @@ class copier(LOFARnodeTCP):
"""
Node script for copying files between nodes. See master script for full public interface
"""
def run(self, source_node, source_path, target_path):
def run(self, source_node, source_path, target_path, globalfs):
self.globalfs = globalfs
# Time execution of this job
with log_time(self.logger):
return self._copy_single_file_using_rsync(
......@@ -51,7 +52,7 @@ class copier(LOFARnodeTCP):
# construct copy command: Copy to the dir
# if process runs on local host use a simple copy command.
if source_node=="localhost":
if self.globalfs or source_node=="localhost":
command = ["cp", "-r","{0}".format(source_path),"{0}".format(target_path)]
else:
command = ["rsync", "-r",
......
......@@ -44,11 +44,12 @@ class imager_prepare(LOFARnodeTCP):
ndppp_executable, output_measurement_set,
time_slices_per_image, subbands_per_group, input_ms_mapfile,
asciistat_executable, statplot_executable, msselect_executable,
rficonsole_executable, do_rficonsole, add_beam_tables):
rficonsole_executable, do_rficonsole, add_beam_tables, globalfs):
"""
Entry point for the node recipe
"""
self.environment.update(environment)
self.globalfs = globalfs
with log_time(self.logger):
input_map = DataMap.load(input_ms_mapfile)
......@@ -178,7 +179,7 @@ class imager_prepare(LOFARnodeTCP):
command = ["rsync", "-r", "{0}:{1}".format(
input_item.host, input_item.file),
"{0}".format(processed_ms_dir)]
if input_item.host == "localhost":
if self.globalfs or input_item.host == "localhost":
command = ["cp", "-r", "{0}".format(input_item.file),
"{0}".format(processed_ms_dir)]
......
......@@ -45,11 +45,12 @@ class long_baseline(LOFARnodeTCP):
ndppp_executable, output_measurement_set,
subbandgroups_per_ms, subbands_per_subbandgroup, ms_mapfile,
asciistat_executable, statplot_executable, msselect_executable,
rficonsole_executable, add_beam_tables, final_output_path):
rficonsole_executable, add_beam_tables, globalfs, final_output_path):
"""
Entry point for the node recipe
"""
self.environment.update(environment)
self.globalfs = globalfs
with log_time(self.logger):
input_map = DataMap.load(ms_mapfile)
......@@ -194,7 +195,7 @@ class long_baseline(LOFARnodeTCP):
command = ["rsync", "-r", "{0}:{1}".format(
input_item.host, input_item.file),
"{0}".format(processed_ms_dir)]
if input_item.host == "localhost":
if self.globalfs or input_item.host == "localhost":
command = ["cp", "-r", "{0}".format(input_item.file),
"{0}".format(processed_ms_dir)]
......
......@@ -51,6 +51,7 @@ method = none
[remote]
method = custom_cmdline
globalfs = yes
# We take the following path to start a remote container:
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment