diff --git a/CEP/Pipeline/recipes/sip/master/copier.py b/CEP/Pipeline/recipes/sip/master/copier.py index 4846c7a5beb6e36de3713196c3f137a472701212..0f9f8405a822c1a2a651adfaf9e5e6a615e75a0c 100644 --- a/CEP/Pipeline/recipes/sip/master/copier.py +++ b/CEP/Pipeline/recipes/sip/master/copier.py @@ -236,6 +236,8 @@ class copier(MasterNodeInterface): self.logger.info("Starting copier run") super(copier, self).go() + globalfs = self.config.has_option("remote", "globalfs") and self.config.getboolean("remote", "globalfs") + # Load data from mapfiles self.source_map = DataMap.load(self.inputs['mapfile_source']) self.target_map = DataMap.load(self.inputs['mapfile_target']) @@ -246,8 +248,8 @@ class copier(MasterNodeInterface): # Run the compute nodes with the node specific mapfiles for source, target in zip(self.source_map, self.target_map): - args = [source.host, source.file, target.file] - self.append_job(target.host, args) + args = [source.host, source.file, target.file, globalfs] + self.append_job(target.host args) # start the jobs, return the exit status. return self.run_jobs() diff --git a/CEP/Pipeline/recipes/sip/master/imager_prepare.py b/CEP/Pipeline/recipes/sip/master/imager_prepare.py index 3d06ab9dcd50b41a67c98e774b60389364fb2104..dc44bad684dfd602f21d45f48cbad3c6e21309e5 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/master/imager_prepare.py @@ -160,6 +160,8 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): paths_to_image_mapfiles = [] n_subband_groups = len(output_map) # needed for subsets in sb list + globalfs = self.config.has_option("remote", "globalfs") and self.config.getboolean("remote", "globalfs") + for idx_sb_group, item in enumerate(output_map): #create the input files for this node self.logger.debug("Creating input data subset for processing" @@ -203,7 +205,8 @@ class imager_prepare(BaseRecipe, RemoteCommandRecipeMixIn): self.inputs['msselect_executable'], self.inputs['rficonsole_executable'], self.inputs['do_rficonsole'], - self.inputs['add_beam_tables']] + self.inputs['add_beam_tables'], + globalfs] jobs.append(ComputeJob(item.host, node_command, arguments)) diff --git a/CEP/Pipeline/recipes/sip/master/long_baseline.py b/CEP/Pipeline/recipes/sip/master/long_baseline.py index f61764bc78ba0740108235c66aa2919aab20a808..9e2c6f9a16e33712974e17c0f6928a97ec937202 100644 --- a/CEP/Pipeline/recipes/sip/master/long_baseline.py +++ b/CEP/Pipeline/recipes/sip/master/long_baseline.py @@ -166,6 +166,8 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): paths_to_image_mapfiles = [] n_subband_groups = len(output_map) + globalfs = self.config.has_option("remote", "globalfs") and self.config.getboolean("remote", "globalfs") + output_map.iterator = final_output_map.iterator = DataMap.SkipIterator for idx_sb_group, (output_item, final_item) in enumerate(zip(output_map, final_output_map)): @@ -204,6 +206,7 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): self.inputs['msselect_executable'], self.inputs['rficonsole_executable'], self.inputs['add_beam_tables'], + globalfs, final_item.file] jobs.append(ComputeJob(output_item.host, node_command, arguments)) diff --git a/CEP/Pipeline/recipes/sip/nodes/copier.py b/CEP/Pipeline/recipes/sip/nodes/copier.py index a35f66e5d1c2bb038a7a933be57bc2f7a40f8683..b972b06c0042f50985d7162214ca5146816f5413 100644 --- a/CEP/Pipeline/recipes/sip/nodes/copier.py +++ b/CEP/Pipeline/recipes/sip/nodes/copier.py @@ -21,7 +21,8 @@ class copier(LOFARnodeTCP): """ Node script for copying files between nodes. See master script for full public interface """ - def run(self, source_node, source_path, target_path): + def run(self, source_node, source_path, target_path, globalfs): + self.globalfs = globalfs # Time execution of this job with log_time(self.logger): return self._copy_single_file_using_rsync( @@ -51,7 +52,7 @@ class copier(LOFARnodeTCP): # construct copy command: Copy to the dir # if process runs on local host use a simple copy command. - if source_node=="localhost": + if self.globalfs or source_node=="localhost": command = ["cp", "-r","{0}".format(source_path),"{0}".format(target_path)] else: command = ["rsync", "-r", diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py index 63328ef3f2a3e43e84483635109ce1d645d6f66d..e1ed5c185d129360df63f6ea49c18e7861562514 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py @@ -44,11 +44,12 @@ class imager_prepare(LOFARnodeTCP): ndppp_executable, output_measurement_set, time_slices_per_image, subbands_per_group, input_ms_mapfile, asciistat_executable, statplot_executable, msselect_executable, - rficonsole_executable, do_rficonsole, add_beam_tables): + rficonsole_executable, do_rficonsole, add_beam_tables, globalfs): """ Entry point for the node recipe """ self.environment.update(environment) + self.globalfs = globalfs with log_time(self.logger): input_map = DataMap.load(input_ms_mapfile) @@ -178,7 +179,7 @@ class imager_prepare(LOFARnodeTCP): command = ["rsync", "-r", "{0}:{1}".format( input_item.host, input_item.file), "{0}".format(processed_ms_dir)] - if input_item.host == "localhost": + if self.globalfs or input_item.host == "localhost": command = ["cp", "-r", "{0}".format(input_item.file), "{0}".format(processed_ms_dir)] diff --git a/CEP/Pipeline/recipes/sip/nodes/long_baseline.py b/CEP/Pipeline/recipes/sip/nodes/long_baseline.py index cea1e3c7752e639ae6f12cd68feb3268ecb4b4a1..60826d9fcabff25c6c0b31814b5fb283c5447963 100644 --- a/CEP/Pipeline/recipes/sip/nodes/long_baseline.py +++ b/CEP/Pipeline/recipes/sip/nodes/long_baseline.py @@ -45,11 +45,12 @@ class long_baseline(LOFARnodeTCP): ndppp_executable, output_measurement_set, subbandgroups_per_ms, subbands_per_subbandgroup, ms_mapfile, asciistat_executable, statplot_executable, msselect_executable, - rficonsole_executable, add_beam_tables, final_output_path): + rficonsole_executable, add_beam_tables, globalfs, final_output_path): """ Entry point for the node recipe """ self.environment.update(environment) + self.globalfs = globalfs with log_time(self.logger): input_map = DataMap.load(ms_mapfile) @@ -194,7 +195,7 @@ class long_baseline(LOFARnodeTCP): command = ["rsync", "-r", "{0}:{1}".format( input_item.host, input_item.file), "{0}".format(processed_ms_dir)] - if input_item.host == "localhost": + if self.globalfs or input_item.host == "localhost": command = ["cp", "-r", "{0}".format(input_item.file), "{0}".format(processed_ms_dir)] diff --git a/CEP/Pipeline/recipes/sip/pipeline.cfg.CEP4.tmpl b/CEP/Pipeline/recipes/sip/pipeline.cfg.CEP4.tmpl index 6ff3730d5481d20202048feae1e2044aaa417d3b..105f03b84e2b1a99f48703cd28f8015bdc79627d 100644 --- a/CEP/Pipeline/recipes/sip/pipeline.cfg.CEP4.tmpl +++ b/CEP/Pipeline/recipes/sip/pipeline.cfg.CEP4.tmpl @@ -51,6 +51,7 @@ method = none [remote] method = custom_cmdline +globalfs = yes # We take the following path to start a remote container: #