Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
long_baseline.py 18.53 KiB
# LOFAR IMAGING PIPELINE
# Prepare phase node
# Wouter Klijn
# 2012
# klijn@astron.nl
# -----------------------------------------------------------------------------
from __future__ import with_statement
import sys
import shutil
import os
import subprocess
import copy
import pyrap.tables as pt   # order of pyrap import influences the type conversion binding
from lofarpipe.support.pipelinelogging import CatchLog4CPlus
from lofarpipe.support.pipelinelogging import log_time
from lofarpipe.support.utilities import patch_parset
from lofarpipe.support.utilities import catch_segfaults
from lofarpipe.support.lofarnode import  LOFARnodeTCP
from lofarpipe.support.utilities import create_directory
from lofarpipe.support.data_map import DataMap
from lofarpipe.support.subprocessgroup import SubProcessGroup


# Some constant settings for the recipe
_time_slice_dir_name = "time_slices"


class imager_prepare(LOFARnodeTCP):
    """
    Steps perform on the node:
    
    0. Create directories and assure that they are empty.
    1. Collect the Measurement Sets (MSs): copy to the current node.
    2. Start dppp: Combines the data from subgroups into single timeslice.
    3. Flag rfi.
    4. Add addImagingColumns to the casa ms.
    5. Concatenate the time slice measurment sets, to a single virtual ms.
    6. Filter bad stations. Find station with repeated bad measurement and
       remove these completely from the dataset.

    **Members:**
    """
    def run(self, environment, parset, working_dir, processed_ms_dir,
             ndppp_executable, output_measurement_set,
            time_slices_per_image, subbands_per_group, raw_ms_mapfile,
            asciistat_executable, statplot_executable, msselect_executable,
            rficonsole_executable, add_beam_tables):
        """
        Entry point for the node recipe
        """
        self.environment.update(environment)
        with log_time(self.logger):
            input_map = DataMap.load(raw_ms_mapfile)

            #******************************************************************
            # I. Create the directories used in this recipe
            create_directory(processed_ms_dir)

            # time slice dir_to_remove: assure empty directory: Stale data
            # is problematic for dppp
            time_slice_dir = os.path.join(working_dir, _time_slice_dir_name)
            create_directory(time_slice_dir)
            for root, dirs, files in os.walk(time_slice_dir):
                for file_to_remove in files:
                    os.unlink(os.path.join(root, file_to_remove))
                for dir_to_remove in dirs:
                    shutil.rmtree(os.path.join(root, dir_to_remove))
            self.logger.debug("Created directory: {0}".format(time_slice_dir))
            self.logger.debug("and assured it is empty")

            #******************************************************************
            # 1. Copy the input files
            copied_ms_map = self._copy_input_files(
                            processed_ms_dir, input_map)

            #******************************************************************
            # 2. run dppp: collect frequencies into larger group
            time_slices_path_list = \
                self._run_dppp(working_dir, time_slice_dir,
                    time_slices_per_image, copied_ms_map, subbands_per_group,
                    processed_ms_dir, parset, ndppp_executable)

            # If no timeslices were created, bail out with exit status 1
            if len(time_slices_path_list) == 0:
                self.logger.error("No timeslices were created.")
                self.logger.error("Exiting with error state 1")
                return 1

            self.logger.debug(
                    "Produced time slices: {0}".format(time_slices_path_list))
            #***********************************************************
            # 3. run rfi_concole: flag datapoints which are corrupted
            self._run_rficonsole(rficonsole_executable, time_slice_dir,
                                 time_slices_path_list)

            #******************************************************************
            # 4. Add imaging columns to each timeslice
            # ndppp_executable fails if not present
            for time_slice_path in time_slices_path_list:
                pt.addImagingColumns(time_slice_path)
                self.logger.debug(
                "Added imaging columns to time_slice: {0}".format(
                                                            time_slice_path))

            #*****************************************************************
            # 5. Filter bad stations
            time_slice_filtered_path_list = self._filter_bad_stations(
                time_slices_path_list, asciistat_executable,
                statplot_executable, msselect_executable)

            #*****************************************************************
            # Add measurmenttables
            if add_beam_tables:
                self.add_beam_tables(time_slice_filtered_path_list)

            #******************************************************************
            # 6. Perform the (virtual) concatenation of the timeslices
            self._concat_timeslices(time_slice_filtered_path_list,
                                    output_measurement_set)

            #******************************************************************
            # return
            self.outputs["time_slices"] = \
                time_slices_path_list

        return 0

    def add_beam_tables(self, time_slices_path_list):
        beamtable_proc_group = SubProcessGroup(self.logger)
        for ms_path in time_slices_path_list:
            self.logger.debug("makebeamtables start")
            cmd_string = "makebeamtables ms={0} overwrite=true".format(ms_path)
            self.logger.debug(cmd_string)
            beamtable_proc_group.run(cmd_string)

        if beamtable_proc_group.wait_for_finish() != None:
            raise Exception("an makebeamtables run failed!")

        self.logger.debug("makebeamtables finished")

    def _copy_input_files(self, processed_ms_dir, input_map):
        """
        Collect all the measurement sets in a single directory:
        The measurement sets are located on different nodes on the cluster.
        This function collects all the file in the input map in the
        processed_ms_dir Return value is a set of missing files
        """
        copied_ms_map = copy.deepcopy(input_map)
        # loop all measurement sets
        for input_item, copied_item in zip(input_map, copied_ms_map):
            # fill the copied item with the correct data
            copied_item.host = self.host
            copied_item.file = os.path.join(
                    processed_ms_dir, os.path.basename(input_item.file))

            # If we have to skip this ms
            if input_item.skip == True:
                exit_status = 1  #

            # skip the copy if machine is the same (execution on localhost) 
            # make sure data is in the correct directory. for now: working_dir/[jobname]/subbands
            #if input_item.host == "localhost":           
            #    continue
            # construct copy command
            command = ["rsync", "-r", "{0}:{1}".format(
                            input_item.host, input_item.file),
                               "{0}".format(processed_ms_dir)]
            if input_item.host == "localhost":
                command = ["cp", "-r", "{0}".format(input_item.file),
                                       "{0}".format(processed_ms_dir)]

            self.logger.debug("executing: " + " ".join(command))

            # Spawn a subprocess and connect the pipes
            # The copy step is performed 720 at once in that case which might
            # saturate the cluster.
            copy_process = subprocess.Popen(
                        command,
                        stdin = subprocess.PIPE,
                        stdout = subprocess.PIPE,
                        stderr = subprocess.PIPE)

            # Wait for finish of copy inside the loop: enforce single tread
            # copy
            (stdoutdata, stderrdata) = copy_process.communicate()

            exit_status = copy_process.returncode

            # if copy failed log the missing file and update the skip fields
            if  exit_status != 0:
                input_item.skip = True
                copied_item.skip = True
                self.logger.warning(
                            "Failed loading file: {0}".format(input_item.file))
                self.logger.warning(stderrdata)

            self.logger.debug(stdoutdata)

        return copied_ms_map


    def _dppp_call(self, working_dir, ndppp, cmd, environment):
        """
        Muckable function running the dppp executable.
        Wraps dppp with catchLog4CPLus and catch_segfaults
        """
        with CatchLog4CPlus(working_dir, self.logger.name +
             "." + os.path.basename("imager_prepare_ndppp"),
                  os.path.basename(ndppp)) as logger:
            catch_segfaults(cmd, working_dir, environment,
                                  logger, cleanup = None)

    def _run_dppp(self, working_dir, time_slice_dir_path, slices_per_image,
                  copied_ms_map, subbands_per_image, collected_ms_dir_name, parset,
                  ndppp):
        """
        Run NDPPP:
        Create dir for grouped measurements, assure clean workspace
        Call with log for cplus and catch segfaults. Pparameters are
        supplied in parset
        """
        time_slice_path_list = []
        for idx_time_slice in range(slices_per_image):
            start_slice_range = idx_time_slice * subbands_per_image
            end_slice_range = (idx_time_slice + 1) * subbands_per_image
            # Get the subset of ms that are part of the current timeslice,
            # cast to datamap
            input_map_subgroup = DataMap(
                            copied_ms_map[start_slice_range:end_slice_range])

            output_ms_name = "time_slice_{0}.dppp.ms".format(idx_time_slice)

            # construct time slice name
            time_slice_path = os.path.join(time_slice_dir_path,
                                         output_ms_name)

            # convert the datamap to a file list: Do not remove skipped files:
            # ndppp needs the incorrect files there to allow filling with zeros
            ndppp_input_ms = [item.file for item in input_map_subgroup]

            # Join into a single list of paths.
            msin = "['{0}']".format("', '".join(ndppp_input_ms))
            # Update the parset with computed parameters
            patch_dictionary = {'uselogger': 'True',  # enables log4cplus
                               'msin': msin,
                               'msout': time_slice_path}
            nddd_parset_path = time_slice_path + ".ndppp.par"
            try:
                temp_parset_filename = patch_parset(parset, patch_dictionary)
                shutil.copyfile(temp_parset_filename, nddd_parset_path)
            # Remove the temp file
            finally:
                os.remove(temp_parset_filename)

            try:
                nddd_parset_path = time_slice_path + ".ndppp.par"
                temp_parset_filename = patch_parset(parset, patch_dictionary)
                shutil.copy(temp_parset_filename, nddd_parset_path)
                self.logger.debug(
                            "Wrote a ndppp parset with runtime variables:"
                                  " {0}".format(nddd_parset_path))

            except Exception, exception:
                self.logger.error("failed loading and updating the " +
                                  "parset: {0}".format(parset))
                raise exception
            # remove the temp file
            finally:
                os.unlink(temp_parset_filename)

            # run ndppp
            cmd = [ndppp, nddd_parset_path]

            try:
                # Actual dppp call to externals (allows mucking)
                self._dppp_call(working_dir, ndppp, cmd, self.environment)
                # append the created timeslice on succesfull run
                time_slice_path_list.append(time_slice_path)

            # On error the current timeslice should be skipped
            except subprocess.CalledProcessError, exception:
                self.logger.warning(str(exception))
                continue

            except Exception, exception:
                self.logger.warning(str(exception))
                continue

        return time_slice_path_list

    def _concat_timeslices(self, group_measurements_collected,
                                    output_file_path):
        """
        Msconcat to combine the time slices in a single ms:
        It is a virtual ms, a ms with symbolic links to actual data is created!
        """
        pt.msconcat(group_measurements_collected,
                               output_file_path, concatTime = True)
        self.logger.debug("Concatenated the files: {0} into the single measure"
            "mentset: {1}".format(
                ", ".join(group_measurements_collected), output_file_path))

    def _run_rficonsole(self, rficonsole_executable, time_slice_dir,
                        time_slices):
        """
        _run_rficonsole runs the rficonsole application on the supplied
        timeslices in time_slices.

        """

        # loop all measurement sets
        rfi_temp_dir = os.path.join(time_slice_dir, "rfi_temp_dir")
        create_directory(rfi_temp_dir)

        try:
            rfi_console_proc_group = SubProcessGroup(self.logger)
            for time_slice in time_slices:
                # Each rfi console needs own working space for temp files
                temp_slice_path = os.path.join(rfi_temp_dir,
                    os.path.basename(time_slice))
                create_directory(temp_slice_path)

                # construct copy command
                self.logger.info(time_slice)
                command = [rficonsole_executable, "-indirect-read",
                            time_slice]
                self.logger.info("executing rficonsole command: {0}".format(
                            " ".join(command)))

                # Add the command to the process group
                rfi_console_proc_group.run(command, cwd = temp_slice_path)

            # wait for all to finish
            if rfi_console_proc_group.wait_for_finish() != None:
                raise Exception("an rfi_console_proc_group run failed!")

        finally:
            shutil.rmtree(rfi_temp_dir)

    def _filter_bad_stations(self, time_slice_path_list,
            asciistat_executable, statplot_executable, msselect_executable):
        """
        A Collection of scripts for finding and filtering of bad stations:

        1. First a number of statistics with regards to the spread of the data
           is collected using the asciistat_executable.
        2. Secondly these statistics are consumed by the statplot_executable
           which produces a set of bad stations.
        3. In the final step the bad stations are removed from the dataset 
           using ms select

        REF: http://www.lofar.org/wiki/lib/exe/fetch.php?media=msss:pandeymartinez-week9-v1p2.pdf
        """
        # run asciistat to collect statistics about the ms
        self.logger.info("Filtering bad stations")
        self.logger.debug("Collecting statistical properties of input data")
        asciistat_output = []
        asciistat_proc_group = SubProcessGroup(self.logger)
        for ms in time_slice_path_list:
            output_dir = ms + ".filter_temp"
            create_directory(output_dir)
            asciistat_output.append((ms, output_dir))

            cmd_string = "{0} -i {1} -r {2}".format(asciistat_executable,
                            ms, output_dir)
            asciistat_proc_group.run(cmd_string)

        if asciistat_proc_group.wait_for_finish() != None:
            raise Exception("an ASCIIStats run failed!")

        # Determine the station to remove
        self.logger.debug("Select bad stations depending on collected stats")
        asciiplot_output = []
        asciiplot_proc_group = SubProcessGroup(self.logger)
        for (ms, output_dir) in asciistat_output:
            ms_stats = os.path.join(
                            output_dir, os.path.split(ms)[1] + ".stats")

            cmd_string = "{0} -i {1} -o {2}".format(statplot_executable,
                                                     ms_stats, ms_stats)
            asciiplot_output.append((ms, ms_stats))
            asciiplot_proc_group.run(cmd_string)

        if asciiplot_proc_group.wait_for_finish() != None:
            raise Exception("an ASCIIplot run failed!")

        # remove the bad stations
        self.logger.debug("Use ms select to remove bad stations")
        msselect_output = {}
        msselect_proc_group = SubProcessGroup(self.logger)
        for ms, ms_stats  in asciiplot_output:
            # parse the .tab file containing the bad stations
            station_to_filter = []
            file_pointer = open(ms_stats + ".tab")

            for line in file_pointer.readlines():
                # skip headed line
                if line[0] == "#":
                    continue

                entries = line.split()
                # if the current station is bad (the last entry on the line)
                if entries[-1] == "True":
                    # add the name of station
                    station_to_filter.append(entries[1])

            # if this measurement does not contain baselines to skip do not
            # filter and provide the original ms as output
            if len(station_to_filter) == 0:
                msselect_output[ms] = ms
                continue

            ms_output_path = ms + ".filtered"
            msselect_output[ms] = ms_output_path

            # use msselect to remove the stations from the ms
            msselect_baseline = "!{0}".format(",".join(station_to_filter))
            cmd_string = "{0} in={1} out={2} baseline={3} deep={4}".format(
                            msselect_executable, ms, ms_output_path,
                            msselect_baseline, "False")
            msselect_proc_group.run(cmd_string)

        if msselect_proc_group.wait_for_finish() != None:
            raise Exception("an MSselect run failed!")

        filtered_list_of_ms = []
        # The order of the inputs needs to be preserved when producing the
        # filtered output!
        for input_ms in time_slice_path_list:
            filtered_list_of_ms.append(msselect_output[input_ms])

        return filtered_list_of_ms


if __name__ == "__main__":
    _jobid, _jobhost, _jobport = sys.argv[1:4]
    sys.exit(
        imager_prepare(_jobid, _jobhost, _jobport).run_with_stored_arguments())