diff --git a/CEP/Pipeline/recipes/sip/bin/long_baseline_pipeline.py b/CEP/Pipeline/recipes/sip/bin/long_baseline_pipeline.py index 7dcc5600358151311284a430f495c7dc13ceb465..c44160a860035580515a72ed9dde48f281864a00 100644 --- a/CEP/Pipeline/recipes/sip/bin/long_baseline_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/long_baseline_pipeline.py @@ -134,7 +134,7 @@ class msss_imager_pipeline(control): # ****************************************************************** # (1) prepare phase: copy and collect the ms - concat_ms_map_path, timeslice_map_path, raw_ms_per_image_map_path, \ + concat_ms_map_path, timeslice_map_path, ms_per_image_map_path, \ processed_ms_dir = self._long_baseline(input_mapfile, target_mapfile, add_beam_tables, output_ms_mapfile) @@ -210,7 +210,7 @@ class msss_imager_pipeline(control): @xml_node def _finalize(self, awimager_output_map, processed_ms_dir, - raw_ms_per_image_map, sourcelist_map, minbaseline, + ms_per_image_map, sourcelist_map, minbaseline, maxbaseline, target_mapfile, output_ms_mapfile, sourcedb_map, skip = False): """ @@ -229,7 +229,7 @@ class msss_imager_pipeline(control): else: placed_image_mapfile = self.run_task("imager_finalize", target_mapfile, awimager_output_map = awimager_output_map, - raw_ms_per_image_map = raw_ms_per_image_map, + ms_per_image_map = ms_per_image_map, sourcelist_map = sourcelist_map, sourcedb_map = sourcedb_map, minbaseline = minbaseline, @@ -251,7 +251,7 @@ class msss_imager_pipeline(control): the time slices into a large virtual measurement set """ # Create the dir where found and processed ms are placed - # raw_ms_per_image_map_path contains all the original ms locations: + # ms_per_image_map_path contains all the original ms locations: # this list contains possible missing files processed_ms_dir = os.path.join(self.scratch_directory, "subbands") @@ -265,8 +265,8 @@ class msss_imager_pipeline(control): output_mapfile = self._write_datamap_to_file(None, "prepare_output") time_slices_mapfile = self._write_datamap_to_file(None, "prepare_time_slices") - raw_ms_per_image_mapfile = self._write_datamap_to_file(None, - "raw_ms_per_image") + ms_per_image_mapfile = self._write_datamap_to_file(None, + "ms_per_image") # get some parameters from the imaging pipeline parset: subbandgroups_per_ms = self.parset.getInt("LongBaseline.subbandgroups_per_ms") @@ -280,7 +280,7 @@ class msss_imager_pipeline(control): subbands_per_subbandgroup = subbands_per_subbandgroup, mapfile = output_mapfile, slices_mapfile = time_slices_mapfile, - raw_ms_per_image_mapfile = raw_ms_per_image_mapfile, + ms_per_image_mapfile = ms_per_image_mapfile, working_directory = self.scratch_directory, processed_ms_dir = processed_ms_dir, add_beam_tables = add_beam_tables, @@ -299,15 +299,15 @@ class msss_imager_pipeline(control): 'slices_mapfile') self.logger.error(error_msg) raise PipelineException(error_msg) - if not ('raw_ms_per_image_mapfile' in output_keys): + if not ('ms_per_image_mapfile' in output_keys): error_msg = "The imager_prepare master script did not"\ "return correct data. missing: {0}".format( - 'raw_ms_per_image_mapfile') + 'ms_per_image_mapfile') self.logger.error(error_msg) raise PipelineException(error_msg) # Return the mapfiles paths with processed data - return output_mapfile, outputs["slices_mapfile"], raw_ms_per_image_mapfile, \ + return output_mapfile, outputs["slices_mapfile"], ms_per_image_mapfile, \ processed_ms_dir diff --git a/CEP/Pipeline/recipes/sip/master/long_baseline.py b/CEP/Pipeline/recipes/sip/master/long_baseline.py index 4d9f5b7ee75889b978dc596a9b65646dfdb9c53d..f61764bc78ba0740108235c66aa2919aab20a808 100644 --- a/CEP/Pipeline/recipes/sip/master/long_baseline.py +++ b/CEP/Pipeline/recipes/sip/master/long_baseline.py @@ -101,9 +101,9 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): '--slices-mapfile', help="Path to mapfile containing the produced subband groups" ), - 'raw_ms_per_image_mapfile': ingredient.StringField( - '--raw-ms-per-image-mapfile', - help="Path to mapfile containing the raw ms for each produced" + 'ms_per_image_mapfile': ingredient.StringField( + '--ms-per-image-mapfile', + help="Path to mapfile containing the ms for each produced" "image" ), 'processed_ms_dir': ingredient.StringField( @@ -129,8 +129,8 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): 'slices_mapfile': ingredient.FileField( help="Path to mapfile containing the produced subband groups"), - 'raw_ms_per_image_mapfile': ingredient.FileField( - help="Path to mapfile containing the raw ms for each produced" + 'ms_per_image_mapfile': ingredient.FileField( + help="Path to mapfile containing the ms for each produced" "image") } @@ -167,7 +167,8 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): n_subband_groups = len(output_map) output_map.iterator = final_output_map.iterator = DataMap.SkipIterator - for idx_sb_group, (output_item, final_item) in enumerate(zip(output_map, final_output_map)): + for idx_sb_group, (output_item, final_item) in enumerate(zip(output_map, + final_output_map)): #create the input files for this node self.logger.debug("Creating input data subset for processing" "on: {0}".format(output_item.host)) @@ -220,8 +221,19 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): concat_ms = copy.deepcopy(output_map) slices = [] finished_runs = 0 - #scan the return dict for completed key - for (item, job) in zip(concat_ms, jobs): + # If we have a skipped item, add the item to the slices with skip set + jobs_idx = 0 + for item in concat_ms: + # If this is an item that is skipped via the skip parameter in + # the parset, append a skipped + if item.skip: + slices.append(tuple([item.host, [], True])) + continue + + # we cannot use the skip iterator so we need to manually get the + # current job from the list + job = jobs[jobs_idx] + # only save the slices if the node has completed succesfull if job.results["returncode"] == 0: finished_runs += 1 @@ -230,11 +242,14 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): else: # Set the dataproduct to skipped!! item.skip = True - slices.append(tuple([item.host, ["/Failed"], True])) + slices.append(tuple([item.host, [], True])) msg = "Failed run on {0}. NOT Created: {1} ".format( item.host, item.file) self.logger.warn(msg) + # we have a non skipped workitem, increase the job idx + jobs_idx += 1 + if finished_runs == 0: self.logger.error("None of the started compute node finished:" "The current recipe produced no output, aborting") @@ -252,15 +267,15 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): self.inputs['slices_mapfile'])) #map with actual input mss. - self._store_data_map(self.inputs["raw_ms_per_image_mapfile"], + self._store_data_map(self.inputs["ms_per_image_mapfile"], DataMap(paths_to_image_mapfiles), - "mapfile containing (raw) input ms per image:") + "mapfile containing input ms per image:") # Set the return values self.outputs['mapfile'] = output_ms_mapfile_path self.outputs['slices_mapfile'] = self.inputs['slices_mapfile'] - self.outputs['raw_ms_per_image_mapfile'] = \ - self.inputs["raw_ms_per_image_mapfile"] + self.outputs['ms_per_image_mapfile'] = \ + self.inputs["ms_per_image_mapfile"] return 0 @@ -301,7 +316,7 @@ class long_baseline(BaseRecipe, RemoteCommandRecipeMixIn): """ # The output_map contains a number of path/node pairs. The final data # dataproduct of the prepare phase: The 'input' for each of these pairs - # is a number of raw measurement sets: The number of time slices times + # is a number of measurement sets: The number of time slices times # the number of subbands collected into each of these time slices. # The total length of the input map should match this. if len(input_map) != len(output_map) * \ diff --git a/CEP/Pipeline/recipes/sip/nodes/long_baseline.py b/CEP/Pipeline/recipes/sip/nodes/long_baseline.py index 66caee64fbeb2cb6864abc1f3cd023aaf065443c..cea1e3c7752e639ae6f12cd68feb3268ecb4b4a1 100644 --- a/CEP/Pipeline/recipes/sip/nodes/long_baseline.py +++ b/CEP/Pipeline/recipes/sip/nodes/long_baseline.py @@ -43,7 +43,7 @@ class long_baseline(LOFARnodeTCP): """ def run(self, environment, parset, working_dir, processed_ms_dir, ndppp_executable, output_measurement_set, - subbandgroups_per_ms, subbands_per_subbandgroup, raw_ms_mapfile, + subbandgroups_per_ms, subbands_per_subbandgroup, ms_mapfile, asciistat_executable, statplot_executable, msselect_executable, rficonsole_executable, add_beam_tables, final_output_path): """ @@ -52,7 +52,7 @@ class long_baseline(LOFARnodeTCP): self.environment.update(environment) with log_time(self.logger): - input_map = DataMap.load(raw_ms_mapfile) + input_map = DataMap.load(ms_mapfile) #****************************************************************** # I. Create the directories used in this recipe create_directory(processed_ms_dir) @@ -71,14 +71,14 @@ class long_baseline(LOFARnodeTCP): #****************************************************************** # 1. Copy the input files - copied_ms_map = self._copy_input_files( + processed_ms_map = self._copy_input_files( processed_ms_dir, input_map) #****************************************************************** # 2. run dppp: collect frequencies into larger group time_slices_path_list = \ self._run_dppp(working_dir, time_slice_dir, - subbandgroups_per_ms, copied_ms_map, subbands_per_subbandgroup, + subbandgroups_per_ms, processed_ms_map, subbands_per_subbandgroup, processed_ms_dir, parset, ndppp_executable) # If no timeslices were created, bail out with exit status 1 @@ -138,6 +138,10 @@ class long_baseline(LOFARnodeTCP): self._deep_copy_to_output_location(output_measurement_set, final_output_path) + # Write the actually used ms for the created dataset to the input + # mapfile + processed_ms_map.save(ms_mapfile) + #****************************************************************** @@ -167,46 +171,49 @@ class long_baseline(LOFARnodeTCP): This function collects all the file in the input map in the processed_ms_dir Return value is a set of missing files """ - copied_ms_map = copy.deepcopy(input_map) + processed_ms_map = copy.deepcopy(input_map) + # loop all measurement sets - for input_item, copied_item in zip(input_map, copied_ms_map): + for input_item, copied_item in zip(input_map, processed_ms_map): # fill the copied item with the correct data copied_item.host = self.host copied_item.file = os.path.join( processed_ms_dir, os.path.basename(input_item.file)) + stderrdata = None # If we have to skip this ms if input_item.skip == True: - exit_status = 1 # - - # skip the copy if machine is the same (execution on localhost) - # make sure data is in the correct directory. for now: working_dir/[jobname]/subbands - #if input_item.host == "localhost": - # continue - # construct copy command - command = ["rsync", "-r", "{0}:{1}".format( - input_item.host, input_item.file), - "{0}".format(processed_ms_dir)] - if input_item.host == "localhost": - command = ["cp", "-r", "{0}".format(input_item.file), - "{0}".format(processed_ms_dir)] - - self.logger.debug("executing: " + " ".join(command)) - - # Spawn a subprocess and connect the pipes - # The copy step is performed 720 at once in that case which might - # saturate the cluster. - copy_process = subprocess.Popen( - command, - stdin = subprocess.PIPE, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE) - - # Wait for finish of copy inside the loop: enforce single tread - # copy - (stdoutdata, stderrdata) = copy_process.communicate() - - exit_status = copy_process.returncode + exit_status = 1 + stderrdata = "SKIPPED_FILE" + + else: + # skip the copy if machine is the same (execution on localhost) + # make sure data is in the correct directory. + # for now: working_dir/[jobname]/subbands + # construct copy command + command = ["rsync", "-r", "{0}:{1}".format( + input_item.host, input_item.file), + "{0}".format(processed_ms_dir)] + if input_item.host == "localhost": + command = ["cp", "-r", "{0}".format(input_item.file), + "{0}".format(processed_ms_dir)] + + self.logger.debug("executing: " + " ".join(command)) + + # Spawn a subprocess and connect the pipes + # The copy step is performed 720 at once in that case which might + # saturate the cluster. + copy_process = subprocess.Popen( + command, + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + + # Wait for finish of copy inside the loop: enforce single tread + # copy + (stdoutdata, stderrdata) = copy_process.communicate() + + exit_status = copy_process.returncode # if copy failed log the missing file and update the skip fields if exit_status != 0: @@ -218,7 +225,7 @@ class long_baseline(LOFARnodeTCP): self.logger.debug(stdoutdata) - return copied_ms_map + return processed_ms_map def _dppp_call(self, working_dir, ndppp, cmd, environment): @@ -233,7 +240,7 @@ class long_baseline(LOFARnodeTCP): logger, cleanup = None) def _run_dppp(self, working_dir, time_slice_dir_path, slices_per_image, - copied_ms_map, subbands_per_image, collected_ms_dir_name, parset, + processed_ms_map, subbands_per_image, collected_ms_dir_name, parset, ndppp): """ Run NDPPP: @@ -247,9 +254,6 @@ class long_baseline(LOFARnodeTCP): end_slice_range = (idx_time_slice + 1) * subbands_per_image # Get the subset of ms that are part of the current timeslice, # cast to datamap - input_map_subgroup = DataMap( - copied_ms_map[start_slice_range:end_slice_range]) - output_ms_name = "time_slice_{0}.dppp.ms".format(idx_time_slice) # construct time slice name @@ -258,7 +262,21 @@ class long_baseline(LOFARnodeTCP): # convert the datamap to a file list: Do not remove skipped files: # ndppp needs the incorrect files there to allow filling with zeros - ndppp_input_ms = [item.file for item in input_map_subgroup] + + ndppp_input_ms = [] + for item in processed_ms_map[start_slice_range:end_slice_range]: + if item.skip: + ndppp_input_ms.append("SKIPPEDSUBBAND") + # We need an entry in the list: ndppp will add zeros to + # pad missing subbands + else: + ndppp_input_ms.append(item.file) + + # if none of the input files was valid, skip the creation of the + # timeslice all together, it will not show up in the timeslice + # mapfile + if len(ndppp_input_ms) == 0: + continue # Join into a single list of paths. msin = "['{0}']".format("', '".join(ndppp_input_ms)) @@ -300,11 +318,9 @@ class long_baseline(LOFARnodeTCP): time_slice_path_list.append(time_slice_path) # On error the current timeslice should be skipped - except subprocess.CalledProcessError, exception: - self.logger.warning(str(exception)) - continue - except Exception, exception: + for item in processed_ms_map[start_slice_range:end_slice_range]: + item.skip = True self.logger.warning(str(exception)) continue diff --git a/CEP/Pipeline/test/regression_tests/long_baseline_pipeline_test.py b/CEP/Pipeline/test/regression_tests/long_baseline_pipeline_test.py index 5e12e482a8e567b12edd168a1aea9839f453d37c..df873f32d297044ab5cb87e673b0cf48bee761d7 100644 --- a/CEP/Pipeline/test/regression_tests/long_baseline_pipeline_test.py +++ b/CEP/Pipeline/test/regression_tests/long_baseline_pipeline_test.py @@ -1,376 +1,61 @@ -import math +import pyrap.tables as pt +import numpy import sys -def validate_image_equality(image_1_path, image_2_path, max_delta): - import pyrap.images as pim +def load_and_compare_data_sets(ms1, ms2): + # open the two datasets + ms1 = pt.table(ms1) + ms2 = pt.table(ms2) - # get the difference between the two images - print "comparing images from paths:" - print image_1_path - print image_2_path - im = pim.image('"{0}" - "{1}"'.format(image_1_path, image_2_path)) - im.saveas("difference.IM2") - # get the stats of the image - stats_dict = im.statistics() - return_value = compare_image_statistics(stats_dict, max_delta) + #get the amount of rows in the dataset + n_row = len(ms1.getcol('CORRECTED_DATA')) + n_complex_vis = 4 - if not return_value: - print "\n\n\n" - print "*"*30 - print "Statistics of the produced image:" - im = pim.image("{0}".format(image_1_path)) - stats_dict_single_image = im.statistics() - print stats_dict_single_image - print "\n\n\n" - print "Statistics of the compare image:" - im = pim.image("{0}".format(image_2_path)) - stats_dict_single_image = im.statistics() - print stats_dict_single_image - print "\n\n\n" - print "difference between produced image and the baseline image:" - print "maximum delta: {0}".format(max_delta) - print stats_dict - print "*"*30 + # create a target array with the same length as the datacolumn + div_array = numpy.zeros((n_row, 1, n_complex_vis), dtype=numpy.complex64) + ms1_array = ms1.getcol('CORRECTED_DATA') + ms2_array = ms2.getcol('CORRECTED_DATA') - return return_value + div_max = 0 + for idx in xrange(n_row): + for idy in xrange(n_complex_vis): + div_value = ms1_array[idx][0][idy] - ms2_array[idx][0][idy] + if numpy.abs(div_value) > numpy.abs(div_max): + div_max = div_value -def _test_against_maxdelta(value, max_delta, name): - if math.fabs(value) > max_delta: - print "Dif found: '{0}' difference >{2}<is larger then " \ - "the maximum accepted delta: {1}".format(name, max_delta, value) - return True - return False - -def compare_image_statistics(stats_dict, max_delta=0.0001): - - return_value = False - found_incorrect_datapoint = False - for name, value in stats_dict.items(): - - if name == "rms": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta * 300, name) - elif name == "medabsdevmed": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta * 200, name) - elif name == "minpos": - pass - # this min location might move 100 points while still being the same image - elif name == "min": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta * 2000, name) - elif name == "maxpos": - pass - # this max location might move 100 points while still being the same image - elif name == "max": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta * 1500, name) - elif name == "sum": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta * 200000, name) - elif name == "quartile": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta * 4000, name) - elif name == "sumsq": - # tested with sum already - pass - - elif name == "median": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta, name) - elif name == "npts": - pass # cannot be tested.. - elif name == "sigma": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta * 300, name) - elif name == "mean": - found_incorrect_datapoint = _test_against_maxdelta( - float(value[0]), max_delta * 3, name) - - # if we found an incorrect datapoint in this run or with previous - # results: results in true value if any comparison failed - return_value = return_value or found_incorrect_datapoint - - return not return_value - - - -# from here sourcelist compare functions -def validate_source_list_files(source_list_1_path, source_list_2_path, max_delta): - # read the sourcelist files - fp = open(source_list_1_path) - sourcelist1 = fp.read() - fp.close() - - fp = open(source_list_2_path) - sourcelist2 = fp.read() - fp.close() - - # convert to dataarrays - sourcelist_data_1 = convert_sourcelist_as_string_to_data_array(sourcelist1) - sourcelist_data_2 = convert_sourcelist_as_string_to_data_array(sourcelist2) - - return compare_sourcelist_data_arrays(sourcelist_data_1, sourcelist_data_2, max_delta) - - -def convert_sourcelist_as_string_to_data_array(source_list_as_string): - #split in lines - source_list_lines = source_list_as_string.split("\n") - entries_array = [] - - #get the format line - format_line_entrie = source_list_lines[0] - - # get the format entries - entries_array.append([format_line_entrie.split(",")[0].split("=")[1].strip()]) - for entry in format_line_entrie.split(',')[1:]: - entries_array.append([entry.strip()]) - - # scan all the lines for the actual data - - for line in sorted(source_list_lines[2:]): # try sorting based on name (should work :P) - # if empty - if line == "": - continue - # add the data entries - for idx, entrie in enumerate(line.split(",")): - entries_array[idx].append(entrie.strip()) - - return entries_array - -def easyprint_data_arrays(data_array1, data_array2): - print "All data as red from the sourcelists:" - for (first_array, second_array) in zip(data_array1, data_array2): - print first_array - print second_array - -def compare_sourcelist_data_arrays(data_array1, data_array2, max_delta=0.0001): - """ - Ugly function to compare two sourcelists. - It needs major refactoring, but for a proof of concept it works - """ - print "######################################################" - found_incorrect_datapoint = False - for (first_array, second_array) in zip(data_array1, data_array2): - - # first check if the format string is the same, we have a major fail if this happens - if first_array[0] != second_array[0]: - print "******************* problem:" - print "format strings not equal: {0} != {1}".format(first_array[0], second_array[0]) - found_incorrect_datapoint = True - - # Hard check on equality of the name of the found sources - if first_array[0] == "Name": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - if entrie1 != entrie2: - print "The sourcelist entrie names are not the same: \n{0} !=\n {1}".format(entrie1, entrie2) - found_incorrect_datapoint = True - - # Hard check on equality of the type of the found sources - elif first_array[0] == "Type": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - if entrie1 != entrie2: - print "The sourcelist entrie types are not the same: {0} != {1}".format(entrie1, entrie2) - found_incorrect_datapoint = True - - # soft check on the Ra: convert to float and compare the values - elif first_array[0] == "Ra": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_array = entrie1.split(":") - entrie1_as_float = float(entrie1_as_array[0]) * 3600 + float(entrie1_as_array[1]) * 60 + float(entrie1_as_array[2])# float("".join(entrie1.split(":"))) - entrie2_as_array = entrie2.split(":") - entrie2_as_float = float(entrie2_as_array[0]) * 3600 + float(entrie2_as_array[1]) * 60 + float(entrie2_as_array[2]) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 10000) : - print "we have a problem Ra's are not the same within max_delta: {0} != {1} max_delta_ra = {2}".format( - entrie1, entrie2, max_delta * 10000) - found_incorrect_datapoint = True - elif first_array[0] == "Dec": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_array = entrie1.strip("+").split(".") - entrie1_as_float = float(entrie1_as_array[0]) * 3600 + float(entrie1_as_array[1]) * 60 + \ - float("{0}.{1}".format(entrie1_as_array[2], entrie1_as_array[3])) - entrie2_as_array = entrie2.strip("+").split(".") - entrie2_as_float = float(entrie2_as_array[0]) * 3600 + float(entrie2_as_array[1]) * 60 + \ - float("{0}.{1}".format(entrie2_as_array[2], entrie2_as_array[3])) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 10000) : - print "Dec's are not the same within max_delta: {0} != {1} max_delta_ra = {2}".format( - entrie1, entrie2, max_delta * 10000) - found_incorrect_datapoint = True - - elif first_array[0] == "I": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_float = float(entrie1) - entrie2_as_float = float(entrie2) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 2000): - print "I's are not the same within max_delta {0} != {1} max_delta_I = {2} ".format( - entrie1_as_float, entrie2_as_float, max_delta * 1000) - found_incorrect_datapoint = True - - - elif first_array[0] == "Q": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_float = float(entrie1) - entrie2_as_float = float(entrie2) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 1000): - print "Q's are not the same within max_delta {0} != {1} max_delta_I = {2} ".format( - entrie1_as_float, entrie2_as_float, max_delta * 1000) - found_incorrect_datapoint = True - elif first_array[0] == "U": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_float = float(entrie1) - entrie2_as_float = float(entrie2) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 1000): - print "Q's are not the same within max_delta {0} != {1} max_delta_I = {2} ".format( - entrie1_as_float, entrie2_as_float, max_delta * 1000) - found_incorrect_datapoint = True - - elif first_array[0] == "V": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_float = float(entrie1) - entrie2_as_float = float(entrie2) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 1000): - print "V's are not the same within max_delta {0} != {1} max_delta_I = {2} ".format( - entrie1_as_float, entrie2_as_float, max_delta * 1000) - found_incorrect_datapoint = True - - elif first_array[0] == "MajorAxis": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_float = float(entrie1) - entrie2_as_float = float(entrie2) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 60000): - print "MajorAxis's are not the same within max_delta {0} != {1} max_delta_I = {2} ".format( - entrie1_as_float, entrie2_as_float, max_delta * 50000) - found_incorrect_datapoint = True - - elif first_array[0] == "MinorAxis": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_float = float(entrie1) - entrie2_as_float = float(entrie2) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 30000): - print "MinorAxis's are not the same within max_delta {0} != {1} max_delta_I = {2} ".format( - entrie1_as_float, entrie2_as_float, max_delta * 30000) - found_incorrect_datapoint = True - - elif first_array[0] == "Orientation": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_float = float(entrie1) - entrie2_as_float = float(entrie2) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 70000): - print "Orientation's are not the same within max_delta {0} != {1} max_delta_I = {2} ".format( - entrie1_as_float, entrie2_as_float, max_delta * 10000) - found_incorrect_datapoint = True - - elif first_array[0].split("=")[0].strip() == "ReferenceFrequency": - for (entrie1, entrie2) in zip(first_array[1:], second_array[1:]): - entrie1_as_float = float(entrie1) - entrie2_as_float = float(entrie2) - if not math.fabs(entrie1_as_float - entrie2_as_float) < (max_delta * 10000000): - print "Orientation's are not the same within max_delta {0} != {1} max_delta_I = {2} ".format( - entrie1_as_float, entrie2_as_float, max_delta * 10000000) - found_incorrect_datapoint = True - elif first_array[0].split("=")[0].strip() == "SpectralIndex": - # Not known yet what will be in the spectral index: therefore do not test it - pass - else: - print "unknown format line entrie found: delta fails" - print first_array[0] - found_incorrect_datapoint = True - - if found_incorrect_datapoint: - print "######################################################" - print "compared the following data arrays:" - easyprint_data_arrays(data_array1, data_array2) - print "######################################################" - - - # return inverse of found_incorrect_datapoint to signal delta test success - return not found_incorrect_datapoint - - -# Test data: -source_list_as_string = """ -format = Name, Type, Ra, Dec, I, Q, U, V, MajorAxis, MinorAxis, Orientation, ReferenceFrequency='6.82495e+07', SpectralIndex='[]' - -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i3_s3_g3, GAUSSIAN, 14:58:34.711, +71.42.19.636, 3.145e+01, 0.0, 0.0, 0.0, 1.79857e+02, 1.49783e+02, 1.24446e+02, 6.82495e+07, [0.000e+00] -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i2_s2_g2, GAUSSIAN, 15:09:52.818, +70.48.01.625, 2.321e+01, 0.0, 0.0, 0.0, 2.23966e+02, 1.09786e+02, 1.32842e+02, 6.82495e+07, [0.000e+00] -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i4_s4_g4, GAUSSIAN, 14:53:10.634, +69.29.31.920, 1.566e+01, 0.0, 0.0, 0.0, 1.25136e+02, 4.72783e+01, 6.49083e+01, 6.82495e+07, [0.000e+00] -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i0_s0_g0, POINT, 15:20:15.370, +72.27.35.077, 1.151e+01, 0.0, 0.0, 0.0, 0.00000e+00, 0.00000e+00, 0.00000e+00, 6.82495e+07, [0.000e+00] -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i1_s1_g1, POINT, 15:15:15.623, +66.54.31.670, 4.138e+00, 0.0, 0.0, 0.0, 0.00000e+00, 0.00000e+00, 0.00000e+00, 6.82495e+07, [0.000e+00] - -""" - -source_list_as_string2 = """ -format = Name, Type, Ra, Dec, I, Q, U, V, MajorAxis, MinorAxis, Orientation, ReferenceFrequency='6.82495e+07', SpectralIndex='[]' - -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i3_s3_g3, GAUSSIAN, 14:58:34.711, +71.42.19.636, 3.146e+01, 0.0, 0.0, 0.0, 1.79857e+02, 1.49783e+02, 1.24446e+02, 6.82496e+07, [0.000e+00] -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i2_s2_g2, GAUSSIAN, 15:09:52.818, +70.48.01.625, 2.321e+01, 0.0, 0.0, 0.0, 2.23966e+02, 1.09786e+02, 1.32842e+02, 6.82495e+07, [0.000e+00] -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i4_s4_g4, GAUSSIAN, 14:53:10.634, +69.29.31.920, 1.566e+01, 0.0, 0.0, 0.0, 1.25136e+02, 4.72783e+01, 6.49083e+01, 6.82495e+07, [0.000e+00] -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i0_s0_g0, POINT, 15:20:15.370, +72.27.35.077, 1.151e+01, 0.0, 0.0, 0.0, 0.00000e+00, 0.00000e+00, 0.00000e+00, 6.82495e+07, [0.000e+00] -/data/scratch/klijn/out/awimage_cycle_0/image.restored_w0_i1_s1_g1, POINT, 15:15:15.623, +66.54.31.670, 4.138e+00, 0.0, 0.0, 0.0, 0.00000e+00, 0.00000e+00, 0.00000e+00, 6.82495e+07, [0.000e+00] - -""" -#entries_array = convert_sourcelist_as_string_to_data_array(source_list_as_string) -#entries_array2 = convert_sourcelist_as_string_to_data_array(source_list_as_string2) - -#print compare_sourcelist_data_arrays(entries_array, entries_array2, 0.0001) - -image_data = {'rms': [ 0.], 'medabsdevmed':[ 0.], 'minpos': [0, 0, 0, 0] - , 'min':[ 0.], 'max': [ 0.], - 'quartile': [ 0.], 'sumsq': [ 0.], 'median': [ 0.], 'npts':[ 65536.], - 'maxpos': [0, 0, 0, 0], 'sigma': [ 0.], 'mean': [ 0.]} - - - #{'rms': array([ 0.52093363]), 'medabsdevmed': array([ 0.27387491]), 'minpos': array([156, 221, 0, 0], - #dtype=int32), 'min': array([-2.26162958]), 'max': array([ 24.01361465]), 'sum': array([ 1355.46549538]), - #'quartile': array([ 0.54873329]), 'sumsq': array([ 17784.62525496]), 'median': array([ 0.00240479]), - # 'npts': array([ 65536.]), 'maxpos': array([148, 199, 0, 0], dtype=int32), - # 'sigma': array([ 0.52052685]), 'mean': array([ 0.02068276])} - -image_data = {'rms': [ 0.52093363], 'medabsdevmed': [ 0.27387491], 'minpos': [[156, 221, 0, 0], "int32"], - 'min': [-2.26162958], 'max': [ 24.01361465], 'sum': [ 1355.46549538], - 'quartile' : [ 0.54873329], 'sumsq': [ 17784.62525496], 'median': [ 0.00240479], - 'npts': [ 65536.], 'maxpos':[ [148, 199, 0, 0], "int32"], - 'sigma': [ 0.52052685], 'mean': [ 0.02068276]} - -# print compare_image_statistics(image_data) + div_array[idx][0][idy] = div_value + print "maximum different value between measurement sets: {0}".format(div_max) + # Use a delta of about float precision + if div_max > 1e-6: + print "The measurement sets are contained a different value" + print "failed delta test!" + return False + return True if __name__ == "__main__": - source_list_1, image_1, source_list_2, image_2, max_delta = None, None, None, None, None + ms_1, mw_2 = None, None # Parse parameters from command line error = False - print sys.argv[1:5] + print sys.argv try: - image_1, source_list_1, fist_1, image_2, source_list_2, fits_2 = sys.argv[1:7] - except: - print "Sourcelist comparison has been disabled! Arguments must still be provided" - print "usage: python {0} source_list_1_path "\ - " image_1_path source_list_2_path image_2_path (max_delta type=float)".format(sys.argv[0]) + ms_1, mw_2 = sys.argv[1:3] + except Exception, e: + print e + print "usage: python {0} ms1 "\ + " ms2 ".format(sys.argv[0]) + print "The longbaseline is deterministic and should result in the same ms" sys.exit(1) - max_delta = None - try: - max_delta = float(sys.argv[5]) - except: - max_delta = 0.0001 - - sys.exit(0) - # todo: Add delta test when we have validate test data - #print "using max delta: {0}".format(max_delta) - - #if not error: - # image_equality = validate_image_equality(image_1, image_2, max_delta) - # # sourcelist comparison is still unstable default to true - # sourcelist_equality = True #validate_source_list_files(source_list_1, source_list_2, max_delta) - # if not (image_equality and sourcelist_equality): - # print "Regression test failed: exiting with exitstatus 1" - # print " image_equality: {0}".format(image_equality) - # print " sourcelist_equality: {0}".format(sourcelist_equality) - # sys.exit(1) - - # print "Regression test Succeed!!" - # sys.exit(0) + if not error: + print "regression test:" + data_equality = load_and_compare_data_sets(ms_1, mw_2) + if not data_equality: + print "Regression test failed: exiting with exitstatus 1" + sys.exit(1) + print "Regression test Succeed!!" + sys.exit(0) diff --git a/CEP/Pipeline/test/regression_tests/target_pipeline.py b/CEP/Pipeline/test/regression_tests/target_pipeline.py index 4979b9dfb7c69440d51a798fc71b25d0edeaee0e..35bedbbc18f3f23b11e88a64c6dda0d436da240d 100644 --- a/CEP/Pipeline/test/regression_tests/target_pipeline.py +++ b/CEP/Pipeline/test/regression_tests/target_pipeline.py @@ -14,6 +14,8 @@ def load_and_compare_data_sets(ms1, ms2): # create a target array with the same length as the datacolumn div_array = numpy.zeros((n_row, 1, n_complex_vis), dtype=numpy.complex64) ms1_array = ms1.getcol('DATA') + # TODO: WHy are different collomns compared? + # is this an issue in the test dataset?? ms2_array = ms2.getcol('CORRECTED_DATA') div_max = 0 diff --git a/CEP/Pipeline/visual_studio/Pipeline.pyproj b/CEP/Pipeline/visual_studio/Pipeline.pyproj index a6154cd9fe665465132d0c80f1547c1c4b8d1fc3..6e7a812531ac203ce8a5a46e64f66ffe80ed5e4d 100644 --- a/CEP/Pipeline/visual_studio/Pipeline.pyproj +++ b/CEP/Pipeline/visual_studio/Pipeline.pyproj @@ -32,6 +32,9 @@ <Content Include="mac\CMakeLists.txt" /> <Content Include="recipes\CMakeLists.txt" /> <Content Include="test\CMakeLists.txt" /> + <Compile Include="recipes\sip\bin\long_baseline_pipeline.py" /> + <Compile Include="recipes\sip\master\long_baseline.py" /> + <Compile Include="recipes\sip\nodes\long_baseline.py" /> <Compile Include="test\__init__.py" /> <Compile Include="deploy\deprecated\fabfile.py" /> <Compile Include="deploy\deprecated\start_cluster.py" />