-
Wouter Klijn authoredWouter Klijn authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
copier.py 3.93 KiB
# LOFAR IMAGING PIPELINE
#
# Copy from to nodes
# Wouter Klijn, 2012
# klijn@astron.nl
# ------------------------------------------------------------------------------
from __future__ import with_statement
import os
import shutil
from lofarpipe.support.lofarnode import LOFARnodeTCP
from lofarpipe.support.pipelinelogging import log_time
from lofarpipe.support.utilities import create_directory
from lofarpipe.support.group_data import load_data_map, store_data_map
from lofarpipe.support.subprocessgroup import SubProcessGroup
from lofarpipe.support.lofarexceptions import PipelineException
class copier(LOFARnodeTCP):
def run(self, source_mapfile, target_mapfile):
# Time execution of this job
with log_time(self.logger):
try:
source_map = load_data_map(source_mapfile)
target_map = load_data_map(target_mapfile)
except Exception, e:
self.logger.error("An error occured during loading of mapfiles")
self.logger.error("Retrieved exception: {0}".format(e.str()))
raise e
return self._copy_all_sources_to_target(source_map,
target_map)
def _copy_all_sources_to_target(self, source_map, target_map):
#combine the two lists to get the copy pairs
for source_pair, target_pair in zip(source_map, target_map):
source_node, source_path = source_pair
target_node, target_path = target_pair
#assure that target dir exists
create_directory(os.path.dirname(target_path))
# if on same node
if source_node == target_node:
# quick copy
try:
shutil.copyfile(source_path, target_path)
except Exception, e:
self.logger.error("Failed copy file: {0} on node {1} ".format(
source_path, source_node))
self.logger.error(e.str())
raise e
else:
self._copy_single_file_using_rsync(self,
source_node, source_path, target_path)
return 0
def _copy_single_file_using_rsync(self, source_node, source_path,
target_path):
# construct copy command
command = ["rsync", "-r", "{0}:{1}".format(source_node, source_path) ,
"{0}".format(target_path)]
self.logger.debug("executing: " + " ".join(command))
#Spawn a subprocess and connect the pipes
copy_process = subprocess.Popen(
command,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
(stdoutdata, stderrdata) = copy_process.communicate()
exit_status = copy_process.returncode
#if copy failed log the missing file
if exit_status != 0:
missing_files.append(path)
message = "Failed to (rsync) copy file: {0} on node {1}".format(
source_path, source_node)
self.logger.warning()
self.logger.warning(stderrdata)
raise PipelineException(message)
self.logger.debug(stdoutdata)
return 0
if __name__ == "__main__":
# If invoked directly, parse command line arguments for logger information
# and pass the rest to the run() method defined above
# --------------------------------------------------------------------------
jobid, jobhost, jobport = sys.argv[1:4]
sys.exit(Copier(jobid, jobhost, jobport).run_with_stored_arguments())