Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
copier.py 3.86 KiB
#                                                        LOFAR IMAGING PIPELINE
#
#                                                        Copy from to nodes
#                                                            Wouter Klijn, 2012
#                                                               klijn@astron.nl
# -----------------------------------------------------------------------------
from __future__ import with_statement
import os
import sys
import subprocess
import errno

from lofarpipe.support.lofarnode import LOFARnodeTCP
from lofarpipe.support.pipelinelogging import log_time
from lofarpipe.support.utilities import create_directory
from lofarpipe.support.group_data import load_data_map
from lofarpipe.support.lofarexceptions import PipelineException


class copier(LOFARnodeTCP):
    """
    Node script for copying files between nodes. See master script for full public interface
    """
    def run(self, source_node, source_path, target_path, globalfs):
        self.globalfs = globalfs
        # Time execution of this job
        with log_time(self.logger):
            return self._copy_single_file_using_rsync(
                source_node, source_path, target_path)

    def _copy_single_file_using_rsync(self, source_node, source_path,
                                      target_path):
        # assure that target dir exists (rsync creates it but..
        # an error in the python code will throw a nicer error
        message = "No write acces to target path: {0}".format(
                                    os.path.dirname(target_path))
        # If not existing try to create dir catch no permission
        try:
            create_directory(os.path.dirname(target_path))
        except OSError, e:
            if e.errno == 13:  # No permision
                self.logger.error(message)
                raise IOError(message)
            else:
                raise e

        #check if the target_path is writable for the current proc
        if not os.access(os.path.dirname(target_path), os.W_OK):
            self.logger.error(message)
            raise IOError(message)


        # construct copy command: Copy to the dir
        # if process runs on local host use a simple copy command.
        if self.globalfs:
            command = ["lfs", "cp", "-r","{0}".format(source_path),"{0}".format(target_path)]
        elif source_node=="localhost":
            command = ["cp", "-r","{0}".format(source_path),"{0}".format(target_path)]
        else:
            command = ["rsync", "-r", 
                   "{0}:{1}/".format(source_node, source_path),
                   "{0}".format(target_path)]

        self.logger.debug("executing: " + " ".join(command))
        #Spawn a subprocess and connect the pipes
        copy_process = subprocess.Popen(
                        command,
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)

        (stdoutdata, stderrdata) = copy_process.communicate()
        exit_status = copy_process.returncode
        #if copy failed log the missing file
        if  exit_status != 0:
            message = \
            "Failed to (rsync) copy file, command: \n {0}".format(" ".join(
                                            command))
            self.logger.warn(message)
            self.logger.error(stderrdata)
            return 1

        self.logger.debug(stdoutdata)

        # return the target path to signal success
        self.outputs["target"] = target_path
        return 0

if __name__ == "__main__":
    #  If invoked directly, parse command line arguments for logger information
    #                       and pass the rest to the run() method defined above
    # -------------------------------------------------------------------------
    jobid, jobhost, jobport = sys.argv[1:4]
    sys.exit(copier(jobid, jobhost, jobport).run_with_stored_arguments())