diff --git a/.gitattributes b/.gitattributes index c95f587ca88c695648058014dc2bffbadd048a4f..fbc52086093a55b507f5aa4c97ce160989254d1a 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1519,6 +1519,9 @@ CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py eol=lf CEP/Pipeline/recipes/sip/nodes/long_baseline.py eol=lf CEP/Pipeline/recipes/sip/nodes/new_bbs.py eol=lf CEP/Pipeline/recipes/sip/nodes/rficonsole.py eol=lf +CEP/Pipeline/recipes/sip/nodes/selfcal_awimager.py eol=lf +CEP/Pipeline/recipes/sip/nodes/selfcal_bbs.py eol=lf +CEP/Pipeline/recipes/sip/nodes/selfcal_finalize.py eol=lf CEP/Pipeline/recipes/sip/nodes/setupparmdb.py eol=lf CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py eol=lf CEP/Pipeline/recipes/sip/nodes/vdsmaker.py eol=lf diff --git a/CEP/Pipeline/recipes/sip/nodes/get_metadata.py b/CEP/Pipeline/recipes/sip/nodes/get_metadata.py index 3e258235aa9342c4cf3c4d5804f7dc311da053b1..bfd9f04ff53bd285b3d57854c44cc9732ead20d6 100644 --- a/CEP/Pipeline/recipes/sip/nodes/get_metadata.py +++ b/CEP/Pipeline/recipes/sip/nodes/get_metadata.py @@ -25,24 +25,14 @@ class get_metadata(LOFARnodeTCP): self.logger.error("Dataset %s does not exist" % (infile)) return 1 -# # Get the product metadata. If data product type was not specified, -# # derive it from the input filename's extension. -# if not product_type: -# ext = os.path.splitext(infile)[1] -# if ext == ".MS": product_type = "Correlated" -# elif ext == ".INST": product_type = "InstrumentModel" -# elif ext == ".IM": product_type = "SkyImage" -# if not product_type: -# self.logger.error("File %s has unknown product type" % infile) -# return 1 - self.logger.debug("Product type: %s" % product_type) if product_type == "Correlated": - self.outputs = metadata.Correlated(infile).data() + self.outputs = metadata.Correlated(self.logger, infile).data() elif product_type == "InstrumentModel": - self.outputs = metadata.InstrumentModel(infile).data() + self.outputs = metadata.InstrumentModel(self.logger, + infile).data() elif product_type == "SkyImage": - self.outputs = metadata.SkyImage(infile).data() + self.outputs = metadata.SkyImage(self.logger, infile).data() else: self.logger.error("Unknown product type: %s" % product_type) return 1 diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_bbs.py b/CEP/Pipeline/recipes/sip/nodes/imager_bbs.py index 416874c12e176b0c7765d7624b4933b43b321603..839cc05ae88f5d7f82ead457b5d40653fbdfbf6c 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_bbs.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_bbs.py @@ -62,6 +62,9 @@ class imager_bbs(LOFARnodeTCP): if bbs_process_group.wait_for_finish() != None: self.logger.error( "Failed bbs run detected Aborting") + return 1 # If bbs failed we need to abort: the concat + # is now corrupt + except OSError, exception: self.logger.error("Failed to execute bbs: {0}".format(str( exception))) diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py b/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py index b9b1c92b28b3b2de5e54382aecd0678f372f3e6f..e258f8c279fc47beeee0e06edf418d645c6a58d2 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_create_dbs.py @@ -51,24 +51,27 @@ class imager_create_dbs(LOFARnodeTCP): There is a single sourcedb for a concatenated measurement set/ image 3. Each individual timeslice needs a place to collect parameters: This is done in the paramdb. - 4. Assign the outputs of the script + 4. Add the created databases as meta information to the measurment set + + 5. Assign the outputs of the script """ def run(self, concatenated_measurement_set, sourcedb_target_path, monet_db_hostname, monet_db_port, monet_db_name, monet_db_user, monet_db_password, assoc_theta, parmdb_executable, slice_paths, parmdb_suffix, environment, working_directory, makesourcedb_path, - source_list_path_extern): + source_list_path_extern, major_cycle): self.logger.info("Starting imager_create_dbs Node") self.environment.update(environment) #******************************************************************* # 1. get a sourcelist: from gsm or from file - source_list, append = self._create_source_list(source_list_path_extern, - sourcedb_target_path, concatenated_measurement_set, - monet_db_hostname, monet_db_port, monet_db_name, monet_db_user, - monet_db_password, assoc_theta) + source_list, append = self._create_source_list( + source_list_path_extern,sourcedb_target_path, + concatenated_measurement_set,monet_db_hostname, + monet_db_port, monet_db_name, monet_db_user, + monet_db_password, assoc_theta) #******************************************************************* # 2convert it to a sourcedb (casa table) @@ -86,8 +89,14 @@ class imager_create_dbs(LOFARnodeTCP): self.logger.error("failed creating paramdb for slices") return 1 + # ******************************************************************* + # Add the create databases to the measurments set, + self._add_dbs_to_ms(concatenated_measurement_set, sourcedb_target_path, + parmdbs, major_cycle) + + #******************************************************************* - # 4. Assign the outputs + # 5. Assign the outputs self.outputs["sourcedb"] = sourcedb_target_path self.outputs["parmdbs"] = parmdbs return 0 @@ -118,7 +127,8 @@ class imager_create_dbs(LOFARnodeTCP): append = False else: source_list = source_list_path_extern - append = True + append = False # Nicolas Should this be true or false? + # later steps should not contain the original bootstrapping input return source_list, append @@ -460,7 +470,44 @@ class imager_create_dbs(LOFARnodeTCP): return None - + def _add_dbs_to_ms(self, concatenated_measurement_set, sourcedb_target_path, + parmdbs_path, major_cycle): + """ + Add the in this recipe created sourcedb and instrument table(parmdb) + to the local measurementset. + """ + self.logger.info("Adding sourcemodel and instrument model to output ms.") + # Create the base meta information directory + meta_directory = concatenated_measurement_set + "_selfcal_information" + if not os.path.exists(meta_directory): + os.makedirs(meta_directory) + + # Cycle dir + cycle_directory = os.path.join(meta_directory, + "cycle_" + str(major_cycle)) + if not os.path.exists(cycle_directory): + os.makedirs(cycle_directory) + + #COpy the actual data. parmdbs_path is a list! + sourcedb_directory = os.path.join(cycle_directory, + os.path.basename(sourcedb_target_path)) + if os.path.exists(sourcedb_directory): + shutil.rmtree(sourcedb_directory) # delete dir to assure copy succeeds + shutil.copytree(sourcedb_target_path, sourcedb_directory) + + #parmdbs_path is a list! + for parmdb_entry in parmdbs_path: + try: + parmdb_directory = os.path.join(cycle_directory, + os.path.basename(parmdb_entry)) + # delete dir to assure copy succeeds + if os.path.exists(parmdb_directory): + shutil.rmtree(parmdb_directory) + shutil.copytree(parmdb_entry, parmdb_directory) + except: + self.logger.warn("Failed copying parmdb:") + self.logger.warn(parmdb_entry) + continue # slices might be missing, not an exit error if __name__ == "__main__": diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_finalize.py b/CEP/Pipeline/recipes/sip/nodes/imager_finalize.py index a06849c3dbdadb36aba4ca6d5c901b9c40eeb9cd..5d4be9c094ad92cf407c9a335a66c28e5bcd0e12 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_finalize.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_finalize.py @@ -34,13 +34,13 @@ class imager_finalize(LOFARnodeTCP): 5. Export sourcelist to msss server, copy the sourcelist to hdf5 location 6. Return the outputs """ - def run(self, awimager_output, raw_ms_per_image, sourcelist, target, + def run(self, awimager_output, ms_per_image, sourcelist, target, output_image, minbaseline, maxbaseline, processed_ms_dir, fillrootimagegroup_exec, environment, sourcedb): self.environment.update(environment) """ :param awimager_output: Path to the casa image produced by awimager - :param raw_ms_per_image: The X (90) measurements set scheduled to + :param ms_per_image: The X (90) measurements set scheduled to create the image :param sourcelist: list of sources found in the image :param target: <unused> @@ -55,7 +55,7 @@ class imager_finalize(LOFARnodeTCP): :rtype: self.outputs['image'] path to the produced hdf5 image """ with log_time(self.logger): - raw_ms_per_image_map = DataMap.load(raw_ms_per_image) + ms_per_image_map = DataMap.load(ms_per_image) # ***************************************************************** # 1. add image info @@ -64,22 +64,22 @@ class imager_finalize(LOFARnodeTCP): # TODO: BUG!! the meta data might contain files that were copied # but failed in imager_bbs processed_ms_paths = [] - for item in raw_ms_per_image_map: + for item in ms_per_image_map: path = item.file - raw_ms_file_name = os.path.split(path)[1] - #if the raw ms is in the processed dir (additional check) - if (raw_ms_file_name in file_list): + ms_file_name = os.path.split(path)[1] + #if the ms is in the processed dir (additional check) + if (ms_file_name in file_list): # save the path processed_ms_paths.append(os.path.join(processed_ms_dir, - raw_ms_file_name)) + ms_file_name)) #add the information the image try: addimg.addImagingInfo(awimager_output, processed_ms_paths, sourcedb, minbaseline, maxbaseline) except Exception, error: - self.logger.error("addImagingInfo Threw Exception:") - self.logger.error(error) + self.logger.warn("addImagingInfo Threw Exception:") + self.logger.warn(error) # Catch raising of already done error: allows for rerunning # of the recipe if "addImagingInfo already done" in str(error): @@ -143,8 +143,8 @@ class imager_finalize(LOFARnodeTCP): (stdoutdata, stderrdata) = proc.communicate() exit_status = proc.returncode - self.logger.error(stdoutdata) - self.logger.error(stderrdata) + self.logger.info(stdoutdata) + self.logger.info(stderrdata) #if copy failed log the missing file if exit_status != 0: diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py index 80acc58def0958122d1e7c16f603903ecc0d4032..6d75d26b47cdc028be6513f2cab2c05d773db552 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py @@ -10,7 +10,8 @@ import shutil import os import subprocess import copy -import pyrap.tables as pt # order of pyrap import influences the type conversion binding +import pyrap.tables as pt # order of pyrap import influences the type + # conversion binding from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_time from lofarpipe.support.utilities import patch_parset @@ -19,7 +20,7 @@ from lofarpipe.support.lofarnode import LOFARnodeTCP from lofarpipe.support.utilities import create_directory from lofarpipe.support.data_map import DataMap from lofarpipe.support.subprocessgroup import SubProcessGroup - +from lofarpipe.recipes.helpers.data_quality import run_rficonsole, filter_bad_stations # Some constant settings for the recipe _time_slice_dir_name = "time_slices" @@ -29,31 +30,30 @@ class imager_prepare(LOFARnodeTCP): """ Steps perform on the node: - 0. Create directories and assure that they are empty. - 1. Collect the Measurement Sets (MSs): copy to the current node. - 2. Start dppp: Combines the data from subgroups into single timeslice. - 3. Flag rfi. - 4. Add addImagingColumns to the casa ms. - 5. Concatenate the time slice measurment sets, to a single virtual ms. + 1. Create directories and assure that they are empty. + 2. Collect the Measurement Sets (MSs): copy to the current node. + 3. Start dppp: Combines the data from subgroups into single timeslice. + 4. Flag rfi (toggle by parameter) + 5. Add addImagingColumns to the casa ms. 6. Filter bad stations. Find station with repeated bad measurement and remove these completely from the dataset. - - **Members:** + 7. Add measurmentset tables + 8. Perform the (virtual) concatenation of the timeslices """ def run(self, environment, parset, working_dir, processed_ms_dir, - ndppp_executable, output_measurement_set, - time_slices_per_image, subbands_per_group, raw_ms_mapfile, + ndppp_executable, output_measurement_set, + time_slices_per_image, subbands_per_group, input_ms_mapfile, asciistat_executable, statplot_executable, msselect_executable, - rficonsole_executable, add_beam_tables): + rficonsole_executable, do_rficonsole, add_beam_tables): """ Entry point for the node recipe """ self.environment.update(environment) with log_time(self.logger): - input_map = DataMap.load(raw_ms_mapfile) + input_map = DataMap.load(input_ms_mapfile) #****************************************************************** - # I. Create the directories used in this recipe + # 1. Create the directories used in this recipe create_directory(processed_ms_dir) # time slice dir_to_remove: assure empty directory: Stale data @@ -69,17 +69,20 @@ class imager_prepare(LOFARnodeTCP): self.logger.debug("and assured it is empty") #****************************************************************** - # 1. Copy the input files - copied_ms_map = self._copy_input_files( + # 2. Copy the input files + # processed_ms_map will be the map containing all the 'valid' + # input ms + processed_ms_map = self._copy_input_files( processed_ms_dir, input_map) #****************************************************************** - # 2. run dppp: collect frequencies into larger group + # 3. run dppp: collect frequencies into larger group time_slices_path_list = \ self._run_dppp(working_dir, time_slice_dir, - time_slices_per_image, copied_ms_map, subbands_per_group, + time_slices_per_image, processed_ms_map, subbands_per_group, processed_ms_dir, parset, ndppp_executable) + # If no timeslices were created, bail out with exit status 1 if len(time_slices_path_list) == 0: self.logger.error("No timeslices were created.") @@ -88,13 +91,16 @@ class imager_prepare(LOFARnodeTCP): self.logger.debug( "Produced time slices: {0}".format(time_slices_path_list)) + #*********************************************************** - # 3. run rfi_concole: flag datapoints which are corrupted - self._run_rficonsole(rficonsole_executable, time_slice_dir, - time_slices_path_list) + # 4. run rfi_concole: flag datapoints which are corrupted + if (do_rficonsole): + run_rficonsole(rficonsole_executable, time_slice_dir, + time_slices_path_list, self.logger, + self.resourceMonitor ) #****************************************************************** - # 4. Add imaging columns to each timeslice + # 5. Add imaging columns to each timeslice # ndppp_executable fails if not present for time_slice_path in time_slices_path_list: pt.addImagingColumns(time_slice_path) @@ -103,29 +109,34 @@ class imager_prepare(LOFARnodeTCP): time_slice_path)) #***************************************************************** - # 5. Filter bad stations - time_slice_filtered_path_list = self._filter_bad_stations( + # 6. Filter bad stations + time_slice_filtered_path_list = filter_bad_stations( time_slices_path_list, asciistat_executable, - statplot_executable, msselect_executable) + statplot_executable, msselect_executable, + self.logger, self.resourceMonitor) #***************************************************************** - # Add measurmenttables + # 7. Add measurementtables if add_beam_tables: - self.add_beam_tables(time_slice_filtered_path_list) + self._add_beam_tables(time_slice_filtered_path_list) #****************************************************************** - # 6. Perform the (virtual) concatenation of the timeslices + # 8. Perform the (virtual) concatenation of the timeslices self._concat_timeslices(time_slice_filtered_path_list, output_measurement_set) - #****************************************************************** + # ***************************************************************** + # Write the actually used ms for the created dataset to the input + # mapfile + processed_ms_map.save(input_ms_mapfile) + # return self.outputs["time_slices"] = \ time_slices_path_list return 0 - def add_beam_tables(self, time_slices_path_list): + def _add_beam_tables(self, time_slices_path_list): beamtable_proc_group = SubProcessGroup(self.logger) for ms_path in time_slices_path_list: self.logger.debug("makebeamtables start") @@ -134,6 +145,7 @@ class imager_prepare(LOFARnodeTCP): beamtable_proc_group.run(cmd_string) if beamtable_proc_group.wait_for_finish() != None: + # TODO: Exception on error: make time_slices_path_list a mapfile raise Exception("an makebeamtables run failed!") self.logger.debug("makebeamtables finished") @@ -145,58 +157,60 @@ class imager_prepare(LOFARnodeTCP): This function collects all the file in the input map in the processed_ms_dir Return value is a set of missing files """ - copied_ms_map = copy.deepcopy(input_map) + processed_ms_map = copy.deepcopy(input_map) # loop all measurement sets - for input_item, copied_item in zip(input_map, copied_ms_map): + for input_item, processed_item in zip(input_map, processed_ms_map): # fill the copied item with the correct data - copied_item.host = self.host - copied_item.file = os.path.join( + processed_item.host = self.host + processed_item.file = os.path.join( processed_ms_dir, os.path.basename(input_item.file)) + stderrdata = None # If we have to skip this ms if input_item.skip == True: - exit_status = 1 # - - # skip the copy if machine is the same (execution on localhost) - # make sure data is in the correct directory. for now: working_dir/[jobname]/subbands - #if input_item.host == "localhost": - # continue - # construct copy command - command = ["rsync", "-r", "{0}:{1}".format( - input_item.host, input_item.file), - "{0}".format(processed_ms_dir)] - if input_item.host == "localhost": - command = ["cp", "-r", "{0}".format(input_item.file), - "{0}".format(processed_ms_dir)] - - self.logger.debug("executing: " + " ".join(command)) - - # Spawn a subprocess and connect the pipes - # The copy step is performed 720 at once in that case which might - # saturate the cluster. - copy_process = subprocess.Popen( - command, - stdin = subprocess.PIPE, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE) - - # Wait for finish of copy inside the loop: enforce single tread - # copy - (stdoutdata, stderrdata) = copy_process.communicate() - - exit_status = copy_process.returncode + exit_status = 1 + stderrdata = "SKIPPED_FILE" + + else: + # use cp the copy if machine is the same ( localhost) + # make sure data is in the correct directory. for now: + # working_dir/[jobname]/subbands + # construct copy command + command = ["rsync", "-r", "{0}:{1}".format( + input_item.host, input_item.file), + "{0}".format(processed_ms_dir)] + if input_item.host == "localhost": + command = ["cp", "-r", "{0}".format(input_item.file), + "{0}".format(processed_ms_dir)] + + self.logger.debug("executing: " + " ".join(command)) + + # Spawn a subprocess and connect the pipes + # The copy step is performed 720 at once in that case which + # might saturate the cluster. + copy_process = subprocess.Popen( + command, + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + + # Wait for finish of copy inside the loop: enforce single tread + # copy + (stdoutdata, stderrdata) = copy_process.communicate() + + exit_status = copy_process.returncode # if copy failed log the missing file and update the skip fields if exit_status != 0: input_item.skip = True - copied_item.skip = True + processed_item.skip = True self.logger.warning( "Failed loading file: {0}".format(input_item.file)) self.logger.warning(stderrdata) self.logger.debug(stdoutdata) - return copied_ms_map + return processed_ms_map def _dppp_call(self, working_dir, ndppp, cmd, environment): @@ -204,6 +218,9 @@ class imager_prepare(LOFARnodeTCP): Muckable function running the dppp executable. Wraps dppp with catchLog4CPLus and catch_segfaults """ + # TODO: cpu limited is static at this location + environment['OMP_NUM_THREADS'] = str(8) + self.logger.debug("Using %s threads for ndppp" % 8) with CatchLog4CPlus(working_dir, self.logger.name + "." + os.path.basename("imager_prepare_ndppp"), os.path.basename(ndppp)) as logger: @@ -211,8 +228,8 @@ class imager_prepare(LOFARnodeTCP): logger, cleanup = None, usageStats=self.resourceMonitor) def _run_dppp(self, working_dir, time_slice_dir_path, slices_per_image, - copied_ms_map, subbands_per_image, collected_ms_dir_name, parset, - ndppp): + processed_ms_map, subbands_per_image, collected_ms_dir_name, + parset, ndppp): """ Run NDPPP: Create dir for grouped measurements, assure clean workspace @@ -223,27 +240,63 @@ class imager_prepare(LOFARnodeTCP): for idx_time_slice in range(slices_per_image): start_slice_range = idx_time_slice * subbands_per_image end_slice_range = (idx_time_slice + 1) * subbands_per_image - # Get the subset of ms that are part of the current timeslice, - # cast to datamap - input_map_subgroup = DataMap( - copied_ms_map[start_slice_range:end_slice_range]) - output_ms_name = "time_slice_{0}.dppp.ms".format(idx_time_slice) # construct time slice name time_slice_path = os.path.join(time_slice_dir_path, output_ms_name) - # convert the datamap to a file list: Do not remove skipped files: - # ndppp needs the incorrect files there to allow filling with zeros - ndppp_input_ms = [item.file for item in input_map_subgroup] - - # Join into a single list of paths. + # convert the datamap to a file list: Add nonfalid entry for + # skipped files: ndppp needs the incorrect files there to allow + # filling with zeros + ndppp_input_ms = [] + nchan_known = False + for item in processed_ms_map[start_slice_range:end_slice_range]: + if item.skip: + ndppp_input_ms.append("SKIPPEDSUBBAND") + else: + # From the first non skipped filed get the nchan + if not nchan_known: + try: + # Automatically average the number of channels in + # the output to 1 + # open the datasetassume same nchan for all sb + table = pt.table(item.file) # + + # get the data column, get description, get the + # shape, first index returns the number of channels + nchan_input = str( + pt.tablecolumn(table, + 'DATA').getdesc()["shape"][0]) + nchan_known = True + + # corrupt input measurement set + except: + item.skip = True + ndppp_input_ms.append("SKIPPEDSUBBAND") + continue + + ndppp_input_ms.append(item.file) + + # if none of the input files was valid, skip the creation of the + # timeslice all together, it will not show up in the timeslice + # mapfile + if not nchan_known: + continue + + # TODO/FIXME: dependency on the step name!!!! + ndppp_nchan_key = "avg1.freqstep" + + # Join into a single string list of paths. msin = "['{0}']".format("', '".join(ndppp_input_ms)) + # Update the parset with computed parameters patch_dictionary = {'uselogger': 'True', # enables log4cplus 'msin': msin, - 'msout': time_slice_path} + 'msout': time_slice_path, + ndppp_nchan_key:nchan_input} + + nddd_parset_path = time_slice_path + ".ndppp.par" try: temp_parset_filename = patch_parset(parset, patch_dictionary) @@ -278,11 +331,10 @@ class imager_prepare(LOFARnodeTCP): time_slice_path_list.append(time_slice_path) # On error the current timeslice should be skipped - except subprocess.CalledProcessError, exception: - self.logger.warning(str(exception)) - continue - + # and the input ms should have the skip set except Exception, exception: + for item in processed_ms_map[start_slice_range:end_slice_range]: + item.skip = True self.logger.warning(str(exception)) continue @@ -300,138 +352,6 @@ class imager_prepare(LOFARnodeTCP): "mentset: {1}".format( ", ".join(group_measurements_collected), output_file_path)) - def _run_rficonsole(self, rficonsole_executable, time_slice_dir, - time_slices): - """ - _run_rficonsole runs the rficonsole application on the supplied - timeslices in time_slices. - - """ - - # loop all measurement sets - rfi_temp_dir = os.path.join(time_slice_dir, "rfi_temp_dir") - create_directory(rfi_temp_dir) - - try: - rfi_console_proc_group = SubProcessGroup(self.logger, - self.resourceMonitor) - for time_slice in time_slices: - # Each rfi console needs own working space for temp files - temp_slice_path = os.path.join(rfi_temp_dir, - os.path.basename(time_slice)) - create_directory(temp_slice_path) - - # construct copy command - self.logger.info(time_slice) - command = [rficonsole_executable, "-indirect-read", - time_slice] - self.logger.info("executing rficonsole command: {0}".format( - " ".join(command))) - - # Add the command to the process group - rfi_console_proc_group.run(command, cwd = temp_slice_path) - - - # wait for all to finish - if rfi_console_proc_group.wait_for_finish() != None: - raise Exception("an rfi_console_proc_group run failed!") - - finally: - shutil.rmtree(rfi_temp_dir) - - def _filter_bad_stations(self, time_slice_path_list, - asciistat_executable, statplot_executable, msselect_executable): - """ - A Collection of scripts for finding and filtering of bad stations: - - 1. First a number of statistics with regards to the spread of the data - is collected using the asciistat_executable. - 2. Secondly these statistics are consumed by the statplot_executable - which produces a set of bad stations. - 3. In the final step the bad stations are removed from the dataset - using ms select - - REF: http://www.lofar.org/wiki/lib/exe/fetch.php?media=msss:pandeymartinez-week9-v1p2.pdf - """ - # run asciistat to collect statistics about the ms - self.logger.info("Filtering bad stations") - self.logger.debug("Collecting statistical properties of input data") - asciistat_output = [] - asciistat_proc_group = SubProcessGroup(self.logger) - for ms in time_slice_path_list: - output_dir = ms + ".filter_temp" - create_directory(output_dir) - asciistat_output.append((ms, output_dir)) - - cmd_string = "{0} -i {1} -r {2}".format(asciistat_executable, - ms, output_dir) - asciistat_proc_group.run(cmd_string) - - if asciistat_proc_group.wait_for_finish() != None: - raise Exception("an ASCIIStats run failed!") - - # Determine the station to remove - self.logger.debug("Select bad stations depending on collected stats") - asciiplot_output = [] - asciiplot_proc_group = SubProcessGroup(self.logger) - for (ms, output_dir) in asciistat_output: - ms_stats = os.path.join( - output_dir, os.path.split(ms)[1] + ".stats") - - cmd_string = "{0} -i {1} -o {2}".format(statplot_executable, - ms_stats, ms_stats) - asciiplot_output.append((ms, ms_stats)) - asciiplot_proc_group.run(cmd_string) - - if asciiplot_proc_group.wait_for_finish() != None: - raise Exception("an ASCIIplot run failed!") - - # remove the bad stations - self.logger.debug("Use ms select to remove bad stations") - msselect_output = {} - msselect_proc_group = SubProcessGroup(self.logger) - for ms, ms_stats in asciiplot_output: - # parse the .tab file containing the bad stations - station_to_filter = [] - file_pointer = open(ms_stats + ".tab") - - for line in file_pointer.readlines(): - # skip headed line - if line[0] == "#": - continue - - entries = line.split() - # if the current station is bad (the last entry on the line) - if entries[-1] == "True": - # add the name of station - station_to_filter.append(entries[1]) - - # if this measurement does not contain baselines to skip do not - # filter and provide the original ms as output - if len(station_to_filter) == 0: - msselect_output[ms] = ms - continue - - ms_output_path = ms + ".filtered" - msselect_output[ms] = ms_output_path - - # use msselect to remove the stations from the ms - msselect_baseline = "!{0}".format(",".join(station_to_filter)) - cmd_string = "{0} in={1} out={2} baseline={3} deep={4}".format( - msselect_executable, ms, ms_output_path, - msselect_baseline, "False") - msselect_proc_group.run(cmd_string) - - if msselect_proc_group.wait_for_finish() != None: - raise Exception("an MSselect run failed!") - - filtered_list_of_ms = [] - # The order of the inputs needs to be preserved when producing the - # filtered output! - for input_ms in time_slice_path_list: - filtered_list_of_ms.append(msselect_output[input_ms]) - - return filtered_list_of_ms if __name__ == "__main__": diff --git a/CEP/Pipeline/recipes/sip/nodes/selfcal_awimager.py b/CEP/Pipeline/recipes/sip/nodes/selfcal_awimager.py new file mode 100644 index 0000000000000000000000000000000000000000..db7267f7c3f03daac89d6ca2fe0ea83969834741 --- /dev/null +++ b/CEP/Pipeline/recipes/sip/nodes/selfcal_awimager.py @@ -0,0 +1,825 @@ +# LOFAR AUTOMATIC IMAGING PIPELINE +# awimager +# The awimager recipe creates based an image of the field of view. Based on +# nine concatenated and measurementsets each spanning 10 subbands +# The recipe contains two parts: The call to awimager +# and secondairy some functionality that calculates settings (for awimager) +# based on the information present in the measurement set +# The calculated parameters are: +# 1: The cellsize +# 2: The npixels in a each of the two dimension of the image +# 3. What columns use to determine the maximum baseline +# 4. The number of projection planes +# Wouter Klijn 2012 +# klijn@astron.nl +# Nicolas Vilchez, 2014 +# vilchez@astron.nl +# ----------------------------------------------------------------------------- +from __future__ import with_statement +import sys +import shutil +import os.path +import math +import pyfits + +from lofarpipe.support.lofarnode import LOFARnodeTCP +from lofarpipe.support.pipelinelogging import CatchLog4CPlus +from lofarpipe.support.pipelinelogging import log_time +from lofarpipe.support.utilities import patch_parset +from lofarpipe.support.utilities import get_parset +from lofarpipe.support.utilities import catch_segfaults +from lofarpipe.support.lofarexceptions import PipelineException +import pyrap.tables as pt # @UnresolvedImport +from subprocess import CalledProcessError +from lofarpipe.support.utilities import create_directory +import pyrap.images as pim # @UnresolvedImport +from lofarpipe.support.parset import Parset +import lofar.parmdb # @UnresolvedImport +import numpy as np + + +class selfcal_awimager(LOFARnodeTCP): + def run(self, executable, environment, parset, working_directory, + output_image, concatenated_measurement_set, sourcedb_path, + mask_patch_size, autogenerate_parameters, specify_fov, fov, + major_cycle, nr_cycles, perform_self_cal): + """ + :param executable: Path to awimager executable + :param environment: environment for catch_segfaults (executable runner) + :param parset: parameters for the awimager, + :param working_directory: directory the place temporary files + :param output_image: location and filesname to story the output images + the multiple images are appended with type extentions + :param concatenated_measurement_set: Input measurement set + :param sourcedb_path: Path the the sourcedb used to create the image + mask + :param mask_patch_size: Scaling of the patch around the source in the + mask + :param autogenerate_parameters: Turns on the autogeneration of: + cellsize, npix, wprojplanes, wmax, fov + :param fov: if autogenerate_parameters is false calculate + imageparameter (cellsize, npix, wprojplanes, wmax) relative to this + fov + :param major_cycle: number of the self calibration cycle to determine + the imaging parameters: cellsize, npix, wprojplanes, wmax, fov + :param nr_cycles: The requested number of self cal cycles + :param perform_self_cal: Bool used to control the selfcal functionality + or the old semi-automatic functionality + :rtype: self.outputs["image"] The path to the output image + """ + self.logger.info("Start selfcal_awimager node run:") + log4_cplus_name = "selfcal_awimager" + self.environment.update(environment) + + with log_time(self.logger): + # Read the parameters as specified in the parset + parset_object = get_parset(parset) + + # ************************************************************* + # 1. Calculate awimager parameters that depend on measurement set + # and the parset + if perform_self_cal: + # Calculate awimager parameters that depend on measurement set + # and the parset + self.logger.info( + "Calculating selfcalibration parameters ") + cell_size, npix, w_max, w_proj_planes, \ + UVmin, UVmax, robust, threshold =\ + self._get_selfcal_parameters( + concatenated_measurement_set, + parset, major_cycle, nr_cycles) + + self._save_selfcal_info(concatenated_measurement_set, + major_cycle, npix, UVmin, UVmax) + + else: + self.logger.info( + "Calculating parameters.. ( NOT selfcalibration)") + cell_size, npix, w_max, w_proj_planes = \ + self._get_imaging_parameters( + concatenated_measurement_set, + parset, + autogenerate_parameters, + specify_fov, + fov) + + self.logger.info("Using autogenerated parameters; ") + self.logger.info( + "Calculated parameters: cell_size: {0}, npix: {1}".format( + cell_size, npix)) + + self.logger.info("w_max: {0}, w_proj_planes: {1} ".format( + w_max, w_proj_planes)) + + # **************************************************************** + # 2. Get the target image location from the mapfile for the parset. + # Create target dir if it not exists + image_path_head = os.path.dirname(output_image) + create_directory(image_path_head) + self.logger.debug("Created directory to place awimager output" + " files: {0}".format(image_path_head)) + + # **************************************************************** + # 3. Create the mask + #mask_file_path = self._create_mask(npix, cell_size, output_image, + # concatenated_measurement_set, executable, + # working_directory, log4_cplus_name, sourcedb_path, + # mask_patch_size, image_path_head) + # ***************************************************************** + # 4. Update the parset with calculated parameters, and output image + patch_dictionary = {'uselogger': 'True', # enables log4cpluscd log + 'ms': str(concatenated_measurement_set), + 'cellsize': str(cell_size), + 'npix': str(npix), + 'wmax': str(w_max), + 'wprojplanes': str(w_proj_planes), + 'image': str(output_image), + 'maxsupport': str(npix) + # 'mask':str(mask_file_path), #TODO REINTRODUCE + # MASK, excluded to speed up in this debug stage + } + + # Add some aditional keys from the self calibration method + if perform_self_cal: + self_cal_patch_dict = { + 'weight': 'briggs', + 'padding': str(1.18), + 'niter' : str(1000000), + 'operation' : 'mfclark', + 'timewindow' : '300', + 'fits' : '', + 'threshold' : str(threshold), + 'robust' : str(robust), + 'UVmin' : str(UVmin), + 'UVmax' : str(UVmax), + 'maxbaseline' : str(10000000), + 'select' : str("sumsqr(UVW[:2])<1e12"), + } + patch_dictionary.update(self_cal_patch_dict) + + # save the parset at the target dir for the image + calculated_parset_path = os.path.join(image_path_head, + "parset.par") + + try: + temp_parset_filename = patch_parset(parset, patch_dictionary) + # Copy tmp file to the final location + shutil.copyfile(temp_parset_filename, calculated_parset_path) + self.logger.debug("Wrote parset for awimager run: {0}".format( + calculated_parset_path)) + finally: + # remove temp file + os.remove(temp_parset_filename) + + # ***************************************************************** + # 5. Run the awimager with the parameterset + + # TODO: FIXME: manually Limit number of threads used. + omp_num_threads = 8 + self.environment['OMP_NUM_THREADS'] = str(omp_num_threads) + self.logger.debug("Using %s threads for swimager" % omp_num_threads) + + cmd = [executable, calculated_parset_path] + self.logger.debug("Parset used for awimager run:") + self.logger.debug(cmd) + try: + with CatchLog4CPlus(working_directory, + self.logger.name + "." + + os.path.basename(log4_cplus_name), + os.path.basename(executable) + ) as logger: + catch_segfaults(cmd, working_directory, self.environment, + logger, usageStats=self.resourceMonitor) + + # Thrown by catch_segfault + except CalledProcessError, exception: + self.logger.error(str(exception)) + return 1 + + except Exception, exception: + self.logger.error(str(exception)) + return 1 + + # ********************************************************************* + # 6. Return output + # Append static .restored: This might change but prob. not + # The actual output image has this extention always, default of + # awimager + self.outputs["image"] = output_image + ".restored" + return 0 + + def _get_imaging_parameters(self, measurement_set, parset, + autogenerate_parameters, specify_fov, fov): + """ + (1) calculate and format some parameters that are determined runtime. + Based on values in the measurementset and input parameter (set): + + a. <string> The cellsize + b. <int> The npixels in a each of the two dimension of the image + c. <string> The largest baseline in the ms smaller then the maxbaseline + d. <string> The number of projection planes + + The calculation of these parameters is done in three steps: + + 1. Calculate intermediate results based on the ms. + 2. The calculation of the actual target values using intermediate + result + """ + # ********************************************************************* + # 1. Get partial solutions from the parameter set + # Get the parset and a number of raw parameters from this parset + parset_object = get_parset(parset) + baseline_limit = parset_object.getInt('maxbaseline') + + # Get the longest baseline + max_baseline = pt.taql( + 'CALC sqrt(max([select sumsqr(UVW[:2]) from ' + \ + '{0} where sumsqr(UVW[:2]) <{1} giving as memory]))'.format(\ + measurement_set, baseline_limit * + baseline_limit))[0] # ask ger van diepen for details if ness. + # Calculate the wave_length + table_ms = pt.table(measurement_set) + table_spectral_window = pt.table( + table_ms.getkeyword("SPECTRAL_WINDOW")) + freq = table_spectral_window.getcell("REF_FREQUENCY", 0) + + table_spectral_window.close() + wave_length = pt.taql('CALC C()') / freq + wave_length = wave_length[0] + + # Calculate the cell_size from the ms + arc_sec_in_degree = 3600 + arc_sec_in_rad = (180.0 / math.pi) * arc_sec_in_degree + cell_size = (1.0 / 3) * (wave_length / float(max_baseline))\ + * arc_sec_in_rad + + # Calculate the number of pixels in x and y dim + # fov and diameter depending on the antenna name + fov_from_ms, station_diameter = self._get_fov_and_station_diameter( + measurement_set) + + # use fov for to calculate a semi 'user' specified npix and cellsize + # The npix thus depends on the ms cellsize and fov + # Do not use use supplied Fov if autogenerating + if not autogenerate_parameters and specify_fov: + if fov == 0.0: + raise PipelineException("fov set to 0.0: invalid value.") + + # else use full resolution (calculate the fov) + else: + self.logger.info("Using fov calculated on measurement data: " + + str(fov_from_ms)) + fov = fov_from_ms + + # ******************************************************************** + # 2. Calculate the ms based output variables + # 'optimal' npix based on measurement set calculations or user specified + npix = (arc_sec_in_degree * fov) / cell_size + + # Get the closest power of two larger then the calculated pixel size + npix = self._nearest_ceiled_power2(npix) + + # Get the max w with baseline < 10000 + w_max = pt.taql('CALC max([select UVW[2] from ' + \ + '{0} where sumsqr(UVW[:2]) <{1} giving as memory])'.format( + measurement_set, baseline_limit * baseline_limit))[0] + + # Calculate number of projection planes + w_proj_planes = min(257, math.floor((max_baseline * wave_length) / + (station_diameter ** 2))) + w_proj_planes = int(round(w_proj_planes)) + + # MAximum number of proj planes set to 1024: George Heald, Ger van + # Diepen if this exception occurs + maxsupport = max(1024, npix) + if w_proj_planes > maxsupport: + raise Exception("The number of projections planes for the current" + + "measurement set is to large.") + + # ********************************************************************* + # 3. if the npix from the parset is different to the ms calculations, + # calculate a sizeconverter value (to be applied to the cellsize) + if npix < 256: + self.logger.warn("Using a image size smaller then 256x256:" + " This leads to problematic imaging in some instances!!") + + # If we are not autocalculating based on ms or fov, use the npix + # and cell_size specified in the parset + # keep the wmax and w_proj_planes + if (not autogenerate_parameters and not specify_fov): + npix = parset_object.getString('npix') + cell_size_formatted = parset_object.getString('cellsize') + else: + cell_size_formatted = str( + int(round(cell_size))) + 'arcsec' + + self.logger.info("Using the following awimager parameters:" + " cell_size: {0}, npix: {1},".format( + cell_size_formatted, npix) + + " w_max: {0}, w_proj_planes: {1}".format(w_max, w_proj_planes)) + + return cell_size_formatted, str(npix), str(w_max), str(w_proj_planes) + + + # Awimager parameters for selfcal process (depends with major cycle) + # nicolas: THis function needs a lot more documentation: + # THis is the function that does the magic. + # For each step everything must be cristal clear what is happening. + # I will need to support this function + # this function is full with magic numbers + # Instead of: + # variable_a = 3.14 * 3600 * 5 + # use: + # pi = math.pi + # second_in_hour = 3600 + # factorx = 5 # Factor controlling x, value based on manual optimalization + def _get_selfcal_parameters(self, measurement_set, parset, major_cycle, + nr_cycles): + """ + 0. modify the nof cycle to have a final step at the same resolution + as the previous last cycle + 1. Determine target coordinates especially declinaison, because + for low dec (<35 deg) UVmin = 0.1 to excluse very short baseline + 2. Determine the frequency and the wavelenght + 3. Determine the longuest baseline and the best resolution avaible + 4. Estimate all imaging parameters + 5. Calculate number of projection planes + 6. Pixelsize must be a string number : number +arcsec + + # Nicolas Vilchez, 2014 + # vilchez@astron.nl + """ + + + # ******************************************************************** + #0. modify the nof cycle to have a final step at the same resolution + #as the previous last cycle + + if major_cycle < nr_cycles-1: + nr_cycles = nr_cycles-1 + + scaling_factor = float(major_cycle) / float(nr_cycles - 1) + + # ******************************************************************** + #1. Determine Target coordinates for UVmin + tabtarget = pt.table(measurement_set) + tabfield = pt.table(tabtarget.getkeyword('FIELD')) + coords = tabfield.getcell('REFERENCE_DIR',0) + target = coords[0] * 180.0 / math.pi # Why + + UVmin=0 + if target[1] <= 35: # WHy? + UVmin = 0.1 + + ra_target = target[0] + 360.0 # Why + dec_target = target[1] + + # ******************************************************************** + # 2. Determine the frequency and the wavelenght + tabfreq = pt.table(measurement_set) + table_spectral_window = pt.table(tabfreq.getkeyword("SPECTRAL_WINDOW")) + frequency = table_spectral_window.getcell('REF_FREQUENCY', 0) + + wavelenght = 3.0E8 / frequency # Why + + # ******************************************************************** + # 3. Determine the longuest baseline and the best resolution avaible + + tabbaseline = pt.table(measurement_set, readonly=False, ack=True) + posbaseline = tabbaseline.getcol('UVW') + maxBaseline = max(posbaseline[:, 0] ** 2 + + posbaseline[:, 1] ** 2) ** 0.5 + + bestBeamresol = round((wavelenght / maxBaseline) * + (180.0 / math.pi) * 3600.0, 0) + + # Beam resolution limitation to 10arcsec to avoid too large images + if bestBeamresol < 10.0: + bestBeamresol = 10.0 + + # ******************************************************************** + # 4. Estimate all imaging parameters + + # estimate fov + # fov = 5 degree, except for High HBA Observation => 1.5 degree + if frequency > 1.9E8: + fov = 1.5 + else: + fov = 5.0 + + # we need 4 pixel/beam to have enough sampling + pixPerBeam = 4.0 + + # best resolution pixel size (i.e final pixel size for selfcal) + bestPixelResol = round(bestBeamresol / pixPerBeam, 2) + + # factor to estimate the starting resolution (9 times in this case) + badResolFactor = 9 + + pixsize = round((badResolFactor * bestPixelResol) - + (badResolFactor * bestPixelResol - bestPixelResol) * + scaling_factor , 3) + + + # number of pixel must be a multiple of 2 !! + nbpixel = int(fov * 3600.0 / pixsize) + if nbpixel % 2 ==1: + nbpixel = nbpixel + 1 + + robust = 0 #round(1.0 - (3.0 * scaling_factor), 2) + + UVmax = round((wavelenght) / + (pixPerBeam * pixsize / 3600.0 * math.pi / 180.0 ) / + (1E3 * wavelenght), 3) + + wmax = round(UVmax * (wavelenght) * 1E3, 3) + + # ******************************************************************** + # 5. Calculate number of projection planes + # Need to compute station diameter (the fov is fixed to 5 degree) + # using wouter's function, to compute the w_proj_planes + # fov and diameter depending on the antenna name + fov_from_ms, station_diameter = self._get_fov_and_station_diameter( + measurement_set) + + w_proj_planes = min(257, math.floor((maxBaseline * wavelenght) / + (station_diameter ** 2))) + w_proj_planes = int(round(w_proj_planes)) + + # MAximum number of proj planes set to 1024: George Heald, Ger van + # Diepen if this exception occurs + maxsupport = max(1024, nbpixel) + if w_proj_planes > maxsupport: + raise Exception("The number of projections planes for the current" + + "measurement set is to large.") + + # Warnings on pixel size + if nbpixel < 256: + self.logger.warn("Using a image size smaller then 256x256: This " + + "leads to problematic imaging in some instances!!") + + + # ******************************************************************** + # 6. Pixelsize must be a string number : number +arcsec + # conversion at this step + pixsize = str(pixsize)+'arcsec' + + # ******************************************************************** + # 7. Threshold determination from the previous cycle + if major_cycle == 0: + threshold = '0.075Jy' + else: + fits_image_path_list = measurement_set.split('concat.ms') + fits_image_path = fits_image_path_list[0] +\ + 'awimage_cycle_%s/image.fits'%(major_cycle-1) + + + # open a FITS file + fitsImage = pyfits.open(fits_image_path) + scidata = fitsImage[0].data + + dataRange = range(fitsImage[0].shape[2]) + sortedData = range(fitsImage[0].shape[2] ** 2) + + # FIXME We have the sneaking suspicion that this takes very long + # due to bad coding style... (double for loop with compute in inner loop) + for i in dataRange: + for j in dataRange: + sortedData[i * fitsImage[0].shape[2] + j] = scidata[0,0,i,j] + + sortedData = sorted(sortedData) + + # Percent of faintest data to use to determine 5sigma value : use 5% + dataPercent = int(fitsImage[0].shape[2] * 0.05) + + fiveSigmaData = sum(sortedData[0:dataPercent]) / dataPercent + threshold = (abs(fiveSigmaData) / 5.0) * (2.335 / 2.0) * 15 + + return pixsize, str(nbpixel), str(wmax), str(w_proj_planes), \ + str(UVmin), str(UVmax), str(robust), str(threshold) + + + + def _get_fov_and_station_diameter(self, measurement_set): + """ + _field_of_view calculates the fov, which is dependend on the + station type, location and mode: + For details see: + (1) http://www.astron.nl/radio-observatory/astronomers/lofar-imaging-capabilities-sensitivity/lofar-imaging-capabilities/lofar + + """ + # Open the ms + table_ms = pt.table(measurement_set) + + # Get antenna name and observation mode + antenna = pt.table(table_ms.getkeyword("ANTENNA")) + antenna_name = antenna.getcell('NAME', 0) + antenna.close() + + observation = pt.table(table_ms.getkeyword("OBSERVATION")) + antenna_set = observation.getcell('LOFAR_ANTENNA_SET', 0) + observation.close() + + # static parameters for the station diameters ref (1) + hba_core_diameter = 30.8 + hba_remote_diameter = 41.1 + lba_inner = 32.3 + lba_outer = 81.3 + + # use measurement set information to assertain antenna diameter + station_diameter = None + if antenna_name.count('HBA'): + if antenna_name.count('CS'): + station_diameter = hba_core_diameter + elif antenna_name.count('RS'): + station_diameter = hba_remote_diameter + elif antenna_name.count('LBA'): + if antenna_set.count('INNER'): + station_diameter = lba_inner + elif antenna_set.count('OUTER'): + station_diameter = lba_outer + + # raise exception if the antenna is not of a supported type + if station_diameter == None: + self.logger.error( + 'Unknown antenna type for antenna: {0} , {1}'.format(\ + antenna_name, antenna_set)) + raise PipelineException( + "Unknown antenna type encountered in Measurement set") + + # Get the wavelength + spectral_window_table = pt.table(table_ms.getkeyword( + "SPECTRAL_WINDOW")) + freq = float(spectral_window_table.getcell("REF_FREQUENCY", 0)) + wave_length = pt.taql('CALC C()') / freq + spectral_window_table.close() + + # Now calculate the FOV see ref (1) + # alpha_one is a magic parameter: The value 1.3 is representative for a + # WSRT dish, where it depends on the dish illumination + alpha_one = 1.3 + + # alpha_one is in radians so transform to degrees for output + fwhm = alpha_one * (wave_length / station_diameter) * (180 / math.pi) + fov = fwhm / 2.0 + table_ms.close() + + return fov, station_diameter + + def _create_mask(self, npix, cell_size, output_image, + concatenated_measurement_set, executable, + working_directory, log4_cplus_name, sourcedb_path, + mask_patch_size, image_path_directory): + """ + (3) create a casa image containing an mask blocking out the + sources in the provided sourcedb. + + It expects: + + a. the ms for which the mask will be created, it is used to de + termine some image details: (eg. pointing) + b. parameters for running within the catchsegfault framework + c. and the size of the mask_pach. + To create a mask, first a empty measurement set is created using + awimager: ready to be filled with mask data + + This function is a wrapper around some functionality written by: + fdg@mpa-garching.mpg.de + + steps: + 1. Create a parset with image paramters used by: + 2. awimager run. Creating an empty casa image. + 3. Fill the casa image with mask data + + """ + # ******************************************************************** + # 1. Create the parset used to make a mask + mask_file_path = output_image + ".mask" + + mask_patch_dictionary = {"npix": str(npix), + "cellsize": str(cell_size), + "image": str(mask_file_path), + "ms": str(concatenated_measurement_set), + "operation": "empty", + "stokes": "'I'" + } + mask_parset = Parset.fromDict(mask_patch_dictionary) + mask_parset_path = os.path.join(image_path_directory, "mask.par") + mask_parset.writeFile(mask_parset_path) + self.logger.debug( + "Write parset for awimager mask creation: {0}".format( + mask_parset_path)) + + # ********************************************************************* + # 2. Create an empty mask using awimager + cmd = [executable, mask_parset_path] + self.logger.info(" ".join(cmd)) + try: + with CatchLog4CPlus(working_directory, + self.logger.name + "." + os.path.basename(log4_cplus_name), + os.path.basename(executable) + ) as logger: + catch_segfaults(cmd, working_directory, self.environment, + logger) + # Thrown by catch_segfault + except CalledProcessError, exception: + self.logger.error(str(exception)) + return 1 + except Exception, exception: + self.logger.error(str(exception)) + return 1 + + # ******************************************************************** + # 3. create the actual mask + self.logger.debug("Started mask creation using mask_patch_size:" + " {0}".format(mask_patch_size)) + + self._msss_mask(mask_file_path, sourcedb_path, mask_patch_size) + self.logger.debug("Fished mask creation") + return mask_file_path + + def _msss_mask(self, mask_file_path, sourcedb_path, mask_patch_size = 1.0): + """ + Fill casa image with a mask based on skymodel(sourcedb) + Bugs: fdg@mpa-garching.mpg.de + + pipeline implementation klijn@astron.nl + version 0.32 + + Edited by JDS, 2012-03-16: + - Properly convert maj/minor axes to half length + - Handle empty fields in sky model by setting them to 0 + - Fix off-by-one error at mask boundary + + FIXED BUG + - if a source is outside the mask, the script ignores it + - if a source is on the border, the script draws only the inner part + - can handle skymodels with different headers + + KNOWN BUG + - not works with single line skymodels, workaround: add a fake + source outside the field + - mask patched display large amounts of aliasing. A possible + sollution would + be normalizing to pixel centre. ( int(normalize_x * npix) / + npix + (0.5 /npix)) + ideally the patch would increment in pixel radiuses + + Version 0.3 (Wouter Klijn, klijn@astron.nl) + - Usage of sourcedb instead of txt document as 'source' of sources + This allows input from different source sources + Version 0.31 (Wouter Klijn, klijn@astron.nl) + - Adaptable patch size (patch size needs specification) + - Patch size and geometry is broken: needs some astronomer magic to + fix it, problem with afine transformation prol. + Version 0.32 (Wouter Klijn, klijn@astron.nl) + - Renaming of variable names to python convention + """ + # increment in maj/minor axes [arcsec] + pad = 500. + + # open mask + mask = pim.image(mask_file_path, overwrite = True) + mask_data = mask.getdata() + xlen, ylen = mask.shape()[2:] + freq, stokes, null, null = mask.toworld([0, 0, 0, 0]) + + # Open the sourcedb: + table = pt.table(sourcedb_path + "::SOURCES") + pdb = lofar.parmdb.parmdb(sourcedb_path) + + # Get the data of interest + source_list = table.getcol("SOURCENAME") + source_type_list = table.getcol("SOURCETYPE") + # All date in the format valuetype:sourcename + all_values_dict = pdb.getDefValues() + + # Loop the sources + for source, source_type in zip(source_list, source_type_list): + if source_type == 1: + type_string = "Gaussian" + else: + type_string = "Point" + self.logger.info("processing: {0} ({1})".format(source, + type_string)) + + # Get de right_ascension and declination (already in radians) + right_ascension = all_values_dict["Ra:" + source][0, 0] + declination = all_values_dict["Dec:" + source][0, 0] + if source_type == 1: + # Get the raw values from the db + maj_raw = all_values_dict["MajorAxis:" + source][0, 0] + min_raw = all_values_dict["MinorAxis:" + source][0, 0] + pa_raw = all_values_dict["Orientation:" + source][0, 0] + # convert to radians (conversion is copy paste JDS) + # major radius (+pad) in rad + maj = (((maj_raw + pad)) / 3600.) * np.pi / 180. + # minor radius (+pad) in rad + minor = (((min_raw + pad)) / 3600.) * np.pi / 180. + pix_asc = pa_raw * np.pi / 180. + # wenss writes always 'GAUSSIAN' even for point sources + # -> set to wenss beam+pad + if maj == 0 or minor == 0: + maj = ((54. + pad) / 3600.) * np.pi / 180. + minor = ((54. + pad) / 3600.) * np.pi / 180. + # set to wenss beam+pad + elif source_type == 0: + maj = (((54. + pad) / 2.) / 3600.) * np.pi / 180. + minor = (((54. + pad) / 2.) / 3600.) * np.pi / 180. + pix_asc = 0. + else: + self.logger.info( + "WARNING: unknown source source_type ({0})," + "ignoring: ".format(source_type)) + continue + + # define a small square around the source to look for it + null, null, border_y1, border_x1 = mask.topixel( + [freq, stokes, declination - maj, + right_ascension - maj / np.cos(declination - maj)]) + null, null, border_y2, border_x2 = mask.topixel( + [freq, stokes, declination + maj, + right_ascension + maj / np.cos(declination + maj)]) + xmin = np.int(np.floor(np.min([border_x1, border_x2]))) + xmax = np.int(np.ceil(np.max([border_x1, border_x2]))) + ymin = np.int(np.floor(np.min([border_y1, border_y2]))) + ymax = np.int(np.ceil(np.max([border_y1, border_y2]))) + + if xmin > xlen or ymin > ylen or xmax < 0 or ymax < 0: + self.logger.info( + "WARNING: source {0} falls outside the mask," + " ignoring: ".format(source)) + continue + + if xmax > xlen or ymax > ylen or xmin < 0 or ymin < 0: + self.logger.info( + "WARNING: source {0} falls across map edge".format(source)) + + for pixel_x in xrange(xmin, xmax): + for pixel_y in xrange(ymin, ymax): + # skip pixels outside the mask field + if pixel_x >= xlen or pixel_y >= ylen or\ + pixel_x < 0 or pixel_y < 0: + continue + # get pixel right_ascension and declination in rad + null, null, pix_dec, pix_ra = mask.toworld( + [0, 0, pixel_y, pixel_x]) + # Translate and rotate coords. + translated_pixel_x = (pix_ra - right_ascension) * np.sin( + pix_asc) + (pix_dec - declination) * np.cos(pix_asc) + # to align with ellipse + translate_pixel_y = -(pix_ra - right_ascension) * np.cos( + pix_asc) + (pix_dec - declination) * np.sin(pix_asc) + if (((translated_pixel_x ** 2) / (maj ** 2)) + + ((translate_pixel_y ** 2) / (minor ** 2))) < \ + mask_patch_size: + mask_data[0, 0, pixel_y, pixel_x] = 1 + null = null + mask.putdata(mask_data) + table.close() + + # some helper functions + def _nearest_ceiled_power2(self, value): + """ + Return int value of the nearest Ceiled power of 2 for the + suplied argument + + """ + return int(pow(2, math.ceil(math.log(value, 2)))) + + + def _save_selfcal_info(self, concatenated_measurement_set, + major_cycle, npix, UVmin, UVmax): + """ + The selfcal team requested meta information to be added to + measurement set that allows the reproduction of intermediate + steps. + """ + self.logger.info("Save-ing selfcal parameters to file:") + meta_file = os.path.join( + concatenated_measurement_set + "_selfcal_information", + "uvcut_and_npix.txt") + self.logger.info(meta_file) + + # check if we have the output file? Add the header + if not os.path.exists(meta_file): + meta_file_pt = open(meta_file, 'w') + meta_file_pt.write("#cycle_nr npix uvmin(klambda) uvmax(klambda)\n") + meta_file_pt.close() + + meta_file_pt = open(meta_file, 'a') + + # Create the actual string with info + meta_info_str = " ".join([str(major_cycle), + str(npix), + str(UVmin), + str(UVmax)]) + meta_file_pt.write(meta_info_str + "\n") + meta_file_pt.close() + + +if __name__ == "__main__": + _JOBID, _JOBHOST, _JOBPORT = sys.argv[1:4] + sys.exit(selfcal_awimager( + _JOBID, _JOBHOST, _JOBPORT).run_with_stored_arguments()) + diff --git a/CEP/Pipeline/recipes/sip/nodes/selfcal_bbs.py b/CEP/Pipeline/recipes/sip/nodes/selfcal_bbs.py new file mode 100644 index 0000000000000000000000000000000000000000..418879892245900ada1146930549893cc98b2c4d --- /dev/null +++ b/CEP/Pipeline/recipes/sip/nodes/selfcal_bbs.py @@ -0,0 +1,118 @@ +# LOFAR AUTOMATIC IMAGING PIPELINE +# selfcal_bbs +# Wouter Klijn 2012 +# klijn@astron.nl +# Nicolas Vilchez, 2014 +# vilchez@astron.nl +# ----------------------------------------------------------------------------- +from __future__ import with_statement +import sys +import os + +import pyrap.tables as pt + +from lofarpipe.support.lofarnode import LOFARnodeTCP +from lofarpipe.support.group_data import load_data_map +from lofarpipe.support.subprocessgroup import SubProcessGroup +from lofarpipe.support.data_map import MultiDataMap + +class selfcal_bbs(LOFARnodeTCP): + """ + imager_bbs node performs a bbs run for each of measuremt sets supplied in + the mapfile at ms_list_path. Calibration is done on the sources in + the sourcedb in the mapfile sky_list_path. Solutions are stored in the + parmdb_list_path + + 1. Load the mapfiles + 2. For each measurement set to calibrate start a subprocess + 3. Check if the processes finished correctly + 4. (added by Nicolas vilchez) concat in time the final MS + 5. (added by N.Vilchez) copy time slives directory to a new one + """ + + def run(self, bbs_executable, parset, ms_list_path, parmdb_list_path, + sky_list_path, concat_ms_path, major_cycle): + """ + selfcal_bbs functionality. Called by framework performing all the work + """ + self.logger.debug("Starting selfcal_bbs Node") + # ********************************************************************* + # 1. Load mapfiles + # read in the mapfiles to data maps: The master recipe added the single + # path to a mapfilem which allows usage of default data methods + # (load_data_map) + # TODO: Datamap + ms_map = MultiDataMap.load(ms_list_path) + parmdb_map = MultiDataMap.load(parmdb_list_path) + sky_list = MultiDataMap.load(sky_list_path) + source_db = sky_list[0].file[0] # the sourcedb is the first file entry + + try: + bbs_process_group = SubProcessGroup(self.logger, + self.resourceMonitor) + # ***************************************************************** + # 2. start the bbs executable with data + # The data is located in multimaps. We need the first entry + # TODO: THis is not 'nice' usage of the multimap + for (measurement_set, parmdm) in zip(ms_map[0].file, + parmdb_map[0].file): + command = [ + bbs_executable, + "--sourcedb={0}".format(source_db), + "--parmdb={0}".format(parmdm) , + measurement_set, + parset] + self.logger.info("Executing bbs command: {0}".format(" ".join( + command))) + bbs_process_group.run(command) + + # ***************************************************************** + # 3. check status of the processes + if bbs_process_group.wait_for_finish() != None: + self.logger.error( + "Failed bbs run detected Aborting") + return 1 + + except OSError, exception: + self.logger.error("Failed to execute bbs: {0}".format(str( + exception))) + return 1 + + # ********************************************************************* + # 4. Concat in time after bbs calibration your MSs using + # msconcat (pyrap.tables module) (added by N.Vilchez) + # this step has te be performed on this location. because the bbs run + # might add additional columns not present in the original ms + # and therefore not produced in the concat done in the prepare phase + # redmine issue #6021 + pt.msconcat(ms_map[0].file,concat_ms_path, concatTime=True) + + # ********************************************************************* + # 5. copy time slives directory to a new one + # This is done for debugging purpose: The copy is not used for anything + # The actual selfcal steps are done in place + # (added by N.Vilchez) + # THe save location is created relative to the concat.ms + # we could also use the self.scratch_directory from the toplevel recipe + # this would need an aditional ingredient + # This is a 'debugging' step and should never ever cause a failure of \ + # the pipeline + try: + working_dir = os.path.dirname(concat_ms_path) + time_slice_dir = os.path.join(working_dir, 'time_slices') + time_slice_copy_dir = os.path.join(working_dir, + 'time_slices_cycle_{0}'.format(major_cycle)) + + cmd = "cp -r {0} {1}".format(time_slice_dir, time_slice_copy_dir) + os.system(cmd) + except: + self.logger.warn( + "Debug copy of temporary files failed: continue operations") + pass # Do nothing + + return 0 + + +if __name__ == "__main__": + _JOBID, _JOBHOST, _JOBPORT = sys.argv[1:4] + sys.exit(selfcal_bbs(_JOBID, _JOBHOST, _JOBPORT).run_with_stored_arguments()) diff --git a/CEP/Pipeline/recipes/sip/nodes/selfcal_finalize.py b/CEP/Pipeline/recipes/sip/nodes/selfcal_finalize.py new file mode 100644 index 0000000000000000000000000000000000000000..a76465e9fabf12d79e7a6491d9a4edb2b0d171ed --- /dev/null +++ b/CEP/Pipeline/recipes/sip/nodes/selfcal_finalize.py @@ -0,0 +1,196 @@ +# LOFAR IMAGING PIPELINE +# +# selfcal_finalize +# Wouter Klijn 2012 +# klijn@astron.nl +# ------------------------------------------------------------------------------ +from __future__ import with_statement +import sys +import subprocess +import os +import tempfile +import shutil + +from lofarpipe.support.lofarnode import LOFARnodeTCP +from lofarpipe.support.utilities import log_time, create_directory +import lofar.addImagingInfo as addimg +import pyrap.images as pim +from lofarpipe.support.utilities import catch_segfaults +from lofarpipe.support.data_map import DataMap +from lofarpipe.support.pipelinelogging import CatchLog4CPlus +from lofarpipe.support.subprocessgroup import SubProcessGroup + +import urllib2 +import lofarpipe.recipes.helpers.MultipartPostHandler as mph + +class selfcal_finalize(LOFARnodeTCP): + """ + This script performs the folowing functions: + + 1. Add the image info to the casa image: + addimg.addImagingInfo (imageName, msNames, sourcedbName, minbl, maxbl) + 2. Convert the image to hdf5 and fits image + 3. Filling of the HDF5 root group + 4. Move meta data of selfcal to correct dir in ms + 5. Deepcopy ms to output location + """ + def run(self, awimager_output, ms_per_image, sourcelist, target, + output_image, minbaseline, maxbaseline, processed_ms_dir, + fillrootimagegroup_exec, environment, sourcedb, concat_ms, + correlated_output_location, msselect_executable): + self.environment.update(environment) + """ + :param awimager_output: Path to the casa image produced by awimager + :param ms_per_image: The X (90) measurements set scheduled to + create the image + :param sourcelist: list of sources found in the image + :param target: <unused> + :param minbaseline: Minimum baseline used for the image + :param maxbaseline: largest/maximum baseline used for the image + :param processed_ms_dir: The X (90) measurements set actually used to + create the image + :param fillrootimagegroup_exec: Executable used to add image data to + the hdf5 image + + :rtype: self.outputs['hdf5'] set to "succes" to signal node succes + :rtype: self.outputs['image'] path to the produced hdf5 image + """ + with log_time(self.logger): + ms_per_image_map = DataMap.load(ms_per_image) + + # ***************************************************************** + # 1. add image info + # Get all the files in the processed measurement dir + file_list = os.listdir(processed_ms_dir) + + processed_ms_paths = [] + ms_per_image_map.iterator = DataMap.SkipIterator + for item in ms_per_image_map: + ms_path = item.file + processed_ms_paths.append(ms_path) + + #add the information the image + try: + self.logger.debug("Start addImage Info") + addimg.addImagingInfo(awimager_output, processed_ms_paths, + sourcedb, minbaseline, maxbaseline) + + except Exception, error: + self.logger.warn("addImagingInfo Threw Exception:") + self.logger.warn(error) + # Catch raising of already done error: allows for rerunning + # of the recipe + if "addImagingInfo already done" in str(error): + self.logger.warn("addImagingInfo already done, continue") + pass + else: + raise Exception(error) + #The majority of the tables is updated correctly + + # *************************************************************** + # 2. convert to hdf5 image format + output_directory = None + pim_image = pim.image(awimager_output) + try: + self.logger.info("Saving image in HDF5 Format to: {0}" .format( + output_image)) + # Create the output directory + output_directory = os.path.dirname(output_image) + create_directory(output_directory) + # save the image + pim_image.saveas(output_image, hdf5=True) + + except Exception, error: + self.logger.error( + "Exception raised inside pyrap.images: {0}".format( + str(error))) + raise error + + # Convert to fits + # create target location + fits_output = output_image + ".fits" + # To allow reruns a possible earlier version needs to be removed! + # image2fits fails if not done!! + if os.path.exists(fits_output): + os.unlink(fits_output) + + try: + self.logger.debug("Start convert to fits") + temp_dir = tempfile.mkdtemp() + with CatchLog4CPlus(temp_dir, + self.logger.name + '.' + os.path.basename(awimager_output), + "image2fits") as logger: + catch_segfaults(["image2fits", '-in', awimager_output, + '-out', fits_output], + temp_dir, self.environment, logger) + except Exception, excp: + self.logger.error(str(excp)) + return 1 + finally: + shutil.rmtree(temp_dir) + + # **************************************************************** + # 3. Filling of the HDF5 root group + command = [fillrootimagegroup_exec, output_image] + self.logger.info(" ".join(command)) + #Spawn a subprocess and connect the pipes + proc = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + (stdoutdata, stderrdata) = proc.communicate() + + exit_status = proc.returncode + + + #if copy failed log the missing file + if exit_status != 0: + self.logger.error("Error using the fillRootImageGroup command" + ". Exit status: {0}".format(exit_status)) + self.logger.error(stdoutdata) + self.logger.error(stderrdata) + + return 1 + + # ***************************************************************** + # 4. Move the meta information to the correct directory next to the + # concat.ms + self.logger.info("Save-ing selfcal parameters to file:") + meta_dir = concat_ms + "_selfcal_information" + meta_dir_target = os.path.join(concat_ms, "selfcal_information") + if os.path.exists(meta_dir) and os.path.exists(concat_ms): + self.logger.info("Copy meta information to output measurementset") + + # Clear possible old data, allows for rerun of the pipeline + # if needed. + if os.path.exists(meta_dir_target): + shutil.rmtree(meta_dir_target) + shutil.copytree(meta_dir, meta_dir_target) + + # ***************************************************************** + # 4 Copy the measurement set to the output directory + # use msselect to copy all the data in the measurement sets + + cmd_string = "{0} in={1} out={2} baseline=* deep=True".format( + msselect_executable, concat_ms, correlated_output_location) + msselect_proc_group = SubProcessGroup(self.logger) + msselect_proc_group.run(cmd_string) + if msselect_proc_group.wait_for_finish() != None: + self.logger.error("failed copy of measurmentset to output dir") + raise Exception("an MSselect run failed!") + + self.outputs["hdf5"] = "succes" + self.outputs["image"] = output_image + self.outputs["correlated"] = correlated_output_location + + + return 0 + + +if __name__ == "__main__": + + _JOBID, _JOBHOST, _JOBPORT = sys.argv[1:4] + sys.exit(selfcal_finalize(_JOBID, _JOBHOST, + _JOBPORT).run_with_stored_arguments())