Skip to content
Snippets Groups Projects
Commit 1efb7e69 authored by Wouter Klijn's avatar Wouter Klijn
Browse files

Task #7491: Merge intro trunk the longbaseline pipeline with skip support.

parents c6c8d964 2284dbff
No related branches found
No related tags found
No related merge requests found
...@@ -134,7 +134,7 @@ class msss_imager_pipeline(control): ...@@ -134,7 +134,7 @@ class msss_imager_pipeline(control):
# ****************************************************************** # ******************************************************************
# (1) prepare phase: copy and collect the ms # (1) prepare phase: copy and collect the ms
concat_ms_map_path, timeslice_map_path, raw_ms_per_image_map_path, \ concat_ms_map_path, timeslice_map_path, ms_per_image_map_path, \
processed_ms_dir = self._long_baseline(input_mapfile, processed_ms_dir = self._long_baseline(input_mapfile,
target_mapfile, add_beam_tables, output_ms_mapfile) target_mapfile, add_beam_tables, output_ms_mapfile)
...@@ -210,7 +210,7 @@ class msss_imager_pipeline(control): ...@@ -210,7 +210,7 @@ class msss_imager_pipeline(control):
@xml_node @xml_node
def _finalize(self, awimager_output_map, processed_ms_dir, def _finalize(self, awimager_output_map, processed_ms_dir,
raw_ms_per_image_map, sourcelist_map, minbaseline, ms_per_image_map, sourcelist_map, minbaseline,
maxbaseline, target_mapfile, maxbaseline, target_mapfile,
output_ms_mapfile, sourcedb_map, skip = False): output_ms_mapfile, sourcedb_map, skip = False):
""" """
...@@ -229,7 +229,7 @@ class msss_imager_pipeline(control): ...@@ -229,7 +229,7 @@ class msss_imager_pipeline(control):
else: else:
placed_image_mapfile = self.run_task("imager_finalize", placed_image_mapfile = self.run_task("imager_finalize",
target_mapfile, awimager_output_map = awimager_output_map, target_mapfile, awimager_output_map = awimager_output_map,
raw_ms_per_image_map = raw_ms_per_image_map, ms_per_image_map = ms_per_image_map,
sourcelist_map = sourcelist_map, sourcelist_map = sourcelist_map,
sourcedb_map = sourcedb_map, sourcedb_map = sourcedb_map,
minbaseline = minbaseline, minbaseline = minbaseline,
...@@ -251,7 +251,7 @@ class msss_imager_pipeline(control): ...@@ -251,7 +251,7 @@ class msss_imager_pipeline(control):
the time slices into a large virtual measurement set the time slices into a large virtual measurement set
""" """
# Create the dir where found and processed ms are placed # Create the dir where found and processed ms are placed
# raw_ms_per_image_map_path contains all the original ms locations: # ms_per_image_map_path contains all the original ms locations:
# this list contains possible missing files # this list contains possible missing files
processed_ms_dir = os.path.join(self.scratch_directory, "subbands") processed_ms_dir = os.path.join(self.scratch_directory, "subbands")
...@@ -265,8 +265,8 @@ class msss_imager_pipeline(control): ...@@ -265,8 +265,8 @@ class msss_imager_pipeline(control):
output_mapfile = self._write_datamap_to_file(None, "prepare_output") output_mapfile = self._write_datamap_to_file(None, "prepare_output")
time_slices_mapfile = self._write_datamap_to_file(None, time_slices_mapfile = self._write_datamap_to_file(None,
"prepare_time_slices") "prepare_time_slices")
raw_ms_per_image_mapfile = self._write_datamap_to_file(None, ms_per_image_mapfile = self._write_datamap_to_file(None,
"raw_ms_per_image") "ms_per_image")
# get some parameters from the imaging pipeline parset: # get some parameters from the imaging pipeline parset:
subbandgroups_per_ms = self.parset.getInt("LongBaseline.subbandgroups_per_ms") subbandgroups_per_ms = self.parset.getInt("LongBaseline.subbandgroups_per_ms")
...@@ -280,7 +280,7 @@ class msss_imager_pipeline(control): ...@@ -280,7 +280,7 @@ class msss_imager_pipeline(control):
subbands_per_subbandgroup = subbands_per_subbandgroup, subbands_per_subbandgroup = subbands_per_subbandgroup,
mapfile = output_mapfile, mapfile = output_mapfile,
slices_mapfile = time_slices_mapfile, slices_mapfile = time_slices_mapfile,
raw_ms_per_image_mapfile = raw_ms_per_image_mapfile, ms_per_image_mapfile = ms_per_image_mapfile,
working_directory = self.scratch_directory, working_directory = self.scratch_directory,
processed_ms_dir = processed_ms_dir, processed_ms_dir = processed_ms_dir,
add_beam_tables = add_beam_tables, add_beam_tables = add_beam_tables,
...@@ -299,15 +299,15 @@ class msss_imager_pipeline(control): ...@@ -299,15 +299,15 @@ class msss_imager_pipeline(control):
'slices_mapfile') 'slices_mapfile')
self.logger.error(error_msg) self.logger.error(error_msg)
raise PipelineException(error_msg) raise PipelineException(error_msg)
if not ('raw_ms_per_image_mapfile' in output_keys): if not ('ms_per_image_mapfile' in output_keys):
error_msg = "The imager_prepare master script did not"\ error_msg = "The imager_prepare master script did not"\
"return correct data. missing: {0}".format( "return correct data. missing: {0}".format(
'raw_ms_per_image_mapfile') 'ms_per_image_mapfile')
self.logger.error(error_msg) self.logger.error(error_msg)
raise PipelineException(error_msg) raise PipelineException(error_msg)
# Return the mapfiles paths with processed data # Return the mapfiles paths with processed data
return output_mapfile, outputs["slices_mapfile"], raw_ms_per_image_mapfile, \ return output_mapfile, outputs["slices_mapfile"], ms_per_image_mapfile, \
processed_ms_dir processed_ms_dir
......
...@@ -101,9 +101,9 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -101,9 +101,9 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
'--slices-mapfile', '--slices-mapfile',
help="Path to mapfile containing the produced subband groups" help="Path to mapfile containing the produced subband groups"
), ),
'raw_ms_per_image_mapfile': ingredient.StringField( 'ms_per_image_mapfile': ingredient.StringField(
'--raw-ms-per-image-mapfile', '--ms-per-image-mapfile',
help="Path to mapfile containing the raw ms for each produced" help="Path to mapfile containing the ms for each produced"
"image" "image"
), ),
'processed_ms_dir': ingredient.StringField( 'processed_ms_dir': ingredient.StringField(
...@@ -129,8 +129,8 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -129,8 +129,8 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
'slices_mapfile': ingredient.FileField( 'slices_mapfile': ingredient.FileField(
help="Path to mapfile containing the produced subband groups"), help="Path to mapfile containing the produced subband groups"),
'raw_ms_per_image_mapfile': ingredient.FileField( 'ms_per_image_mapfile': ingredient.FileField(
help="Path to mapfile containing the raw ms for each produced" help="Path to mapfile containing the ms for each produced"
"image") "image")
} }
...@@ -167,7 +167,8 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -167,7 +167,8 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
n_subband_groups = len(output_map) n_subband_groups = len(output_map)
output_map.iterator = final_output_map.iterator = DataMap.SkipIterator output_map.iterator = final_output_map.iterator = DataMap.SkipIterator
for idx_sb_group, (output_item, final_item) in enumerate(zip(output_map, final_output_map)): for idx_sb_group, (output_item, final_item) in enumerate(zip(output_map,
final_output_map)):
#create the input files for this node #create the input files for this node
self.logger.debug("Creating input data subset for processing" self.logger.debug("Creating input data subset for processing"
"on: {0}".format(output_item.host)) "on: {0}".format(output_item.host))
...@@ -220,8 +221,19 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -220,8 +221,19 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
concat_ms = copy.deepcopy(output_map) concat_ms = copy.deepcopy(output_map)
slices = [] slices = []
finished_runs = 0 finished_runs = 0
#scan the return dict for completed key # If we have a skipped item, add the item to the slices with skip set
for (item, job) in zip(concat_ms, jobs): jobs_idx = 0
for item in concat_ms:
# If this is an item that is skipped via the skip parameter in
# the parset, append a skipped
if item.skip:
slices.append(tuple([item.host, [], True]))
continue
# we cannot use the skip iterator so we need to manually get the
# current job from the list
job = jobs[jobs_idx]
# only save the slices if the node has completed succesfull # only save the slices if the node has completed succesfull
if job.results["returncode"] == 0: if job.results["returncode"] == 0:
finished_runs += 1 finished_runs += 1
...@@ -230,11 +242,14 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -230,11 +242,14 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
else: else:
# Set the dataproduct to skipped!! # Set the dataproduct to skipped!!
item.skip = True item.skip = True
slices.append(tuple([item.host, ["/Failed"], True])) slices.append(tuple([item.host, [], True]))
msg = "Failed run on {0}. NOT Created: {1} ".format( msg = "Failed run on {0}. NOT Created: {1} ".format(
item.host, item.file) item.host, item.file)
self.logger.warn(msg) self.logger.warn(msg)
# we have a non skipped workitem, increase the job idx
jobs_idx += 1
if finished_runs == 0: if finished_runs == 0:
self.logger.error("None of the started compute node finished:" self.logger.error("None of the started compute node finished:"
"The current recipe produced no output, aborting") "The current recipe produced no output, aborting")
...@@ -252,15 +267,15 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -252,15 +267,15 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
self.inputs['slices_mapfile'])) self.inputs['slices_mapfile']))
#map with actual input mss. #map with actual input mss.
self._store_data_map(self.inputs["raw_ms_per_image_mapfile"], self._store_data_map(self.inputs["ms_per_image_mapfile"],
DataMap(paths_to_image_mapfiles), DataMap(paths_to_image_mapfiles),
"mapfile containing (raw) input ms per image:") "mapfile containing input ms per image:")
# Set the return values # Set the return values
self.outputs['mapfile'] = output_ms_mapfile_path self.outputs['mapfile'] = output_ms_mapfile_path
self.outputs['slices_mapfile'] = self.inputs['slices_mapfile'] self.outputs['slices_mapfile'] = self.inputs['slices_mapfile']
self.outputs['raw_ms_per_image_mapfile'] = \ self.outputs['ms_per_image_mapfile'] = \
self.inputs["raw_ms_per_image_mapfile"] self.inputs["ms_per_image_mapfile"]
return 0 return 0
...@@ -301,7 +316,7 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -301,7 +316,7 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn):
""" """
# The output_map contains a number of path/node pairs. The final data # The output_map contains a number of path/node pairs. The final data
# dataproduct of the prepare phase: The 'input' for each of these pairs # dataproduct of the prepare phase: The 'input' for each of these pairs
# is a number of raw measurement sets: The number of time slices times # is a number of measurement sets: The number of time slices times
# the number of subbands collected into each of these time slices. # the number of subbands collected into each of these time slices.
# The total length of the input map should match this. # The total length of the input map should match this.
if len(input_map) != len(output_map) * \ if len(input_map) != len(output_map) * \
......
...@@ -43,7 +43,7 @@ class long_baseline(LOFARnodeTCP): ...@@ -43,7 +43,7 @@ class long_baseline(LOFARnodeTCP):
""" """
def run(self, environment, parset, working_dir, processed_ms_dir, def run(self, environment, parset, working_dir, processed_ms_dir,
ndppp_executable, output_measurement_set, ndppp_executable, output_measurement_set,
subbandgroups_per_ms, subbands_per_subbandgroup, raw_ms_mapfile, subbandgroups_per_ms, subbands_per_subbandgroup, ms_mapfile,
asciistat_executable, statplot_executable, msselect_executable, asciistat_executable, statplot_executable, msselect_executable,
rficonsole_executable, add_beam_tables, final_output_path): rficonsole_executable, add_beam_tables, final_output_path):
""" """
...@@ -52,7 +52,7 @@ class long_baseline(LOFARnodeTCP): ...@@ -52,7 +52,7 @@ class long_baseline(LOFARnodeTCP):
self.environment.update(environment) self.environment.update(environment)
with log_time(self.logger): with log_time(self.logger):
input_map = DataMap.load(raw_ms_mapfile) input_map = DataMap.load(ms_mapfile)
#****************************************************************** #******************************************************************
# I. Create the directories used in this recipe # I. Create the directories used in this recipe
create_directory(processed_ms_dir) create_directory(processed_ms_dir)
...@@ -71,14 +71,14 @@ class long_baseline(LOFARnodeTCP): ...@@ -71,14 +71,14 @@ class long_baseline(LOFARnodeTCP):
#****************************************************************** #******************************************************************
# 1. Copy the input files # 1. Copy the input files
copied_ms_map = self._copy_input_files( processed_ms_map = self._copy_input_files(
processed_ms_dir, input_map) processed_ms_dir, input_map)
#****************************************************************** #******************************************************************
# 2. run dppp: collect frequencies into larger group # 2. run dppp: collect frequencies into larger group
time_slices_path_list = \ time_slices_path_list = \
self._run_dppp(working_dir, time_slice_dir, self._run_dppp(working_dir, time_slice_dir,
subbandgroups_per_ms, copied_ms_map, subbands_per_subbandgroup, subbandgroups_per_ms, processed_ms_map, subbands_per_subbandgroup,
processed_ms_dir, parset, ndppp_executable) processed_ms_dir, parset, ndppp_executable)
# If no timeslices were created, bail out with exit status 1 # If no timeslices were created, bail out with exit status 1
...@@ -138,6 +138,10 @@ class long_baseline(LOFARnodeTCP): ...@@ -138,6 +138,10 @@ class long_baseline(LOFARnodeTCP):
self._deep_copy_to_output_location(output_measurement_set, self._deep_copy_to_output_location(output_measurement_set,
final_output_path) final_output_path)
# Write the actually used ms for the created dataset to the input
# mapfile
processed_ms_map.save(ms_mapfile)
#****************************************************************** #******************************************************************
...@@ -167,46 +171,49 @@ class long_baseline(LOFARnodeTCP): ...@@ -167,46 +171,49 @@ class long_baseline(LOFARnodeTCP):
This function collects all the file in the input map in the This function collects all the file in the input map in the
processed_ms_dir Return value is a set of missing files processed_ms_dir Return value is a set of missing files
""" """
copied_ms_map = copy.deepcopy(input_map) processed_ms_map = copy.deepcopy(input_map)
# loop all measurement sets # loop all measurement sets
for input_item, copied_item in zip(input_map, copied_ms_map): for input_item, copied_item in zip(input_map, processed_ms_map):
# fill the copied item with the correct data # fill the copied item with the correct data
copied_item.host = self.host copied_item.host = self.host
copied_item.file = os.path.join( copied_item.file = os.path.join(
processed_ms_dir, os.path.basename(input_item.file)) processed_ms_dir, os.path.basename(input_item.file))
stderrdata = None
# If we have to skip this ms # If we have to skip this ms
if input_item.skip == True: if input_item.skip == True:
exit_status = 1 # exit_status = 1
stderrdata = "SKIPPED_FILE"
# skip the copy if machine is the same (execution on localhost)
# make sure data is in the correct directory. for now: working_dir/[jobname]/subbands else:
#if input_item.host == "localhost": # skip the copy if machine is the same (execution on localhost)
# continue # make sure data is in the correct directory.
# construct copy command # for now: working_dir/[jobname]/subbands
command = ["rsync", "-r", "{0}:{1}".format( # construct copy command
input_item.host, input_item.file), command = ["rsync", "-r", "{0}:{1}".format(
"{0}".format(processed_ms_dir)] input_item.host, input_item.file),
if input_item.host == "localhost": "{0}".format(processed_ms_dir)]
command = ["cp", "-r", "{0}".format(input_item.file), if input_item.host == "localhost":
"{0}".format(processed_ms_dir)] command = ["cp", "-r", "{0}".format(input_item.file),
"{0}".format(processed_ms_dir)]
self.logger.debug("executing: " + " ".join(command))
self.logger.debug("executing: " + " ".join(command))
# Spawn a subprocess and connect the pipes
# The copy step is performed 720 at once in that case which might # Spawn a subprocess and connect the pipes
# saturate the cluster. # The copy step is performed 720 at once in that case which might
copy_process = subprocess.Popen( # saturate the cluster.
command, copy_process = subprocess.Popen(
stdin = subprocess.PIPE, command,
stdout = subprocess.PIPE, stdin = subprocess.PIPE,
stderr = subprocess.PIPE) stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
# Wait for finish of copy inside the loop: enforce single tread
# copy # Wait for finish of copy inside the loop: enforce single tread
(stdoutdata, stderrdata) = copy_process.communicate() # copy
(stdoutdata, stderrdata) = copy_process.communicate()
exit_status = copy_process.returncode
exit_status = copy_process.returncode
# if copy failed log the missing file and update the skip fields # if copy failed log the missing file and update the skip fields
if exit_status != 0: if exit_status != 0:
...@@ -218,7 +225,7 @@ class long_baseline(LOFARnodeTCP): ...@@ -218,7 +225,7 @@ class long_baseline(LOFARnodeTCP):
self.logger.debug(stdoutdata) self.logger.debug(stdoutdata)
return copied_ms_map return processed_ms_map
def _dppp_call(self, working_dir, ndppp, cmd, environment): def _dppp_call(self, working_dir, ndppp, cmd, environment):
...@@ -233,7 +240,7 @@ class long_baseline(LOFARnodeTCP): ...@@ -233,7 +240,7 @@ class long_baseline(LOFARnodeTCP):
logger, cleanup = None) logger, cleanup = None)
def _run_dppp(self, working_dir, time_slice_dir_path, slices_per_image, def _run_dppp(self, working_dir, time_slice_dir_path, slices_per_image,
copied_ms_map, subbands_per_image, collected_ms_dir_name, parset, processed_ms_map, subbands_per_image, collected_ms_dir_name, parset,
ndppp): ndppp):
""" """
Run NDPPP: Run NDPPP:
...@@ -247,9 +254,6 @@ class long_baseline(LOFARnodeTCP): ...@@ -247,9 +254,6 @@ class long_baseline(LOFARnodeTCP):
end_slice_range = (idx_time_slice + 1) * subbands_per_image end_slice_range = (idx_time_slice + 1) * subbands_per_image
# Get the subset of ms that are part of the current timeslice, # Get the subset of ms that are part of the current timeslice,
# cast to datamap # cast to datamap
input_map_subgroup = DataMap(
copied_ms_map[start_slice_range:end_slice_range])
output_ms_name = "time_slice_{0}.dppp.ms".format(idx_time_slice) output_ms_name = "time_slice_{0}.dppp.ms".format(idx_time_slice)
# construct time slice name # construct time slice name
...@@ -258,7 +262,21 @@ class long_baseline(LOFARnodeTCP): ...@@ -258,7 +262,21 @@ class long_baseline(LOFARnodeTCP):
# convert the datamap to a file list: Do not remove skipped files: # convert the datamap to a file list: Do not remove skipped files:
# ndppp needs the incorrect files there to allow filling with zeros # ndppp needs the incorrect files there to allow filling with zeros
ndppp_input_ms = [item.file for item in input_map_subgroup]
ndppp_input_ms = []
for item in processed_ms_map[start_slice_range:end_slice_range]:
if item.skip:
ndppp_input_ms.append("SKIPPEDSUBBAND")
# We need an entry in the list: ndppp will add zeros to
# pad missing subbands
else:
ndppp_input_ms.append(item.file)
# if none of the input files was valid, skip the creation of the
# timeslice all together, it will not show up in the timeslice
# mapfile
if len(ndppp_input_ms) == 0:
continue
# Join into a single list of paths. # Join into a single list of paths.
msin = "['{0}']".format("', '".join(ndppp_input_ms)) msin = "['{0}']".format("', '".join(ndppp_input_ms))
...@@ -300,11 +318,9 @@ class long_baseline(LOFARnodeTCP): ...@@ -300,11 +318,9 @@ class long_baseline(LOFARnodeTCP):
time_slice_path_list.append(time_slice_path) time_slice_path_list.append(time_slice_path)
# On error the current timeslice should be skipped # On error the current timeslice should be skipped
except subprocess.CalledProcessError, exception:
self.logger.warning(str(exception))
continue
except Exception, exception: except Exception, exception:
for item in processed_ms_map[start_slice_range:end_slice_range]:
item.skip = True
self.logger.warning(str(exception)) self.logger.warning(str(exception))
continue continue
......
...@@ -14,6 +14,8 @@ def load_and_compare_data_sets(ms1, ms2): ...@@ -14,6 +14,8 @@ def load_and_compare_data_sets(ms1, ms2):
# create a target array with the same length as the datacolumn # create a target array with the same length as the datacolumn
div_array = numpy.zeros((n_row, 1, n_complex_vis), dtype=numpy.complex64) div_array = numpy.zeros((n_row, 1, n_complex_vis), dtype=numpy.complex64)
ms1_array = ms1.getcol('DATA') ms1_array = ms1.getcol('DATA')
# TODO: WHy are different collomns compared?
# is this an issue in the test dataset??
ms2_array = ms2.getcol('CORRECTED_DATA') ms2_array = ms2.getcol('CORRECTED_DATA')
div_max = 0 div_max = 0
......
...@@ -32,6 +32,9 @@ ...@@ -32,6 +32,9 @@
<Content Include="mac\CMakeLists.txt" /> <Content Include="mac\CMakeLists.txt" />
<Content Include="recipes\CMakeLists.txt" /> <Content Include="recipes\CMakeLists.txt" />
<Content Include="test\CMakeLists.txt" /> <Content Include="test\CMakeLists.txt" />
<Compile Include="recipes\sip\bin\long_baseline_pipeline.py" />
<Compile Include="recipes\sip\master\long_baseline.py" />
<Compile Include="recipes\sip\nodes\long_baseline.py" />
<Compile Include="test\__init__.py" /> <Compile Include="test\__init__.py" />
<Compile Include="deploy\deprecated\fabfile.py" /> <Compile Include="deploy\deprecated\fabfile.py" />
<Compile Include="deploy\deprecated\start_cluster.py" /> <Compile Include="deploy\deprecated\start_cluster.py" />
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment