-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ltacp.py 27.70 KiB
#!/usr/bin/env python
# LTACP Python module for transferring data from a remote node to a remote SRM via localhost
#
# Remote data can be individual files or directories. Directories will be tar-ed.
#
# Between the remote and local host, md5 checksums are used to ensure integrity of the file,
# adler32 is used between localhost and the SRM.
import logging
from subprocess import Popen, PIPE
import socket
import os, sys, getpass
import time
import random
import math
import atexit
from datetime import datetime, timedelta
# TODO: reuse method from LCS.PyCommon.utils
def humanreadablesize(num, suffix='B', base=1000):
""" converts the given size (number) to a human readable string in powers of 'base'"""
try:
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(num) < float(base):
return "%3.1f%s%s" % (num, unit, suffix)
num /= float(base)
return "%.2f%s%s" % (num, 'Y', suffix)
except TypeError:
return str(num)
logger = logging.getLogger('Slave')
_ingest_init_script = '/globalhome/ingest/service/bin/init.sh'
if __name__ == '__main__':
log_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)-15s %(levelname)s %(message)s')
formatter.converter = time.gmtime
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.setLevel(logging.INFO)
class LtacpException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return str(self.value)
def getLocalIPAddress():
host = socket.gethostname()
ipaddress = socket.gethostbyname(host)
return ipaddress
# converts given srm url of an LTA site into a transport url as needed by gridftp. (Sring replacement based on arcane knowledge.)
def convert_surl_to_turl(surl):
sara_nodes = ['fly%d' % i for i in range(1, 10)] # + \
#['wasp%d' % i for i in range(1, 10)] + \
#['by27-%d' % i for i in range(1, 10)] + \
#['bw27-%d' % i for i in range(1, 10)] + \
#['by32-%d' % i for i in range(1, 10)] + \
#['bw32-%d' % i for i in range(1, 10)]
sara_turl = 'gsiftp://%s.grid.sara.nl:2811' % sara_nodes[random.randint(0, len(sara_nodes)-1)]
turl = surl.replace("srm://srm.grid.sara.nl:8443",sara_turl, 1)
turl = turl.replace("srm://srm.grid.sara.nl",sara_turl,1)
turl = turl.replace("srm://lofar-srm.fz-juelich.de:8443", "gsiftp://dcachepool%d.fz-juelich.de:2811" % (random.randint(9, 16),), 1)
turl = turl.replace("srm://lofar-srm.fz-juelich.de", "gsiftp://dcachepool%d.fz-juelich.de:2811" % (random.randint(9, 16),), 1)
turl = turl.replace("srm://srm.target.rug.nl:8444","gsiftp://gridftp02.target.rug.nl/target/gpfs2/lofar/home/srm",1)
turl = turl.replace("srm://srm.target.rug.nl","gsiftp://gridftp02.target.rug.nl/target/gpfs2/lofar/home/srm",1)
return turl
def createNetCatCmd(user, host):
'''helper method to determine the proper call syntax for netcat on host'''
# nc has no version option or other ways to check it's version
# so, just try the variants and pick the first one that does not fail
nc_variants = ['nc --send-only', 'nc -q 0']
for nc_variant in nc_variants:
cmd = ['ssh', '-n', '-x', '%s@%s' % (user, host), nc_variant]
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
out, err = p.communicate()
if 'invalid option' not in err:
return nc_variant
raise LtacpException('could not determine remote netcat version')
class LtaCp:
def __init__(self,
src_host,
src_path_data,
dst_surl,
src_user=None):
self.src_host = src_host
self.src_path_data = src_path_data
self.dst_surl = dst_surl
self.src_user = src_user if src_user else getpass.getuser()
self.logId = os.path.basename(self.src_path_data)
self.started_procs = {}
self.fifos = []
self.ssh_cmd = ['ssh', '-tt', '-n', '-x', '-q', '%s@%s' % (self.src_user, self.src_host)]
self.localIPAddress = getLocalIPAddress()
self.localNetCatCmd = createNetCatCmd(getpass.getuser(), self.localIPAddress)
self.remoteNetCatCmd = createNetCatCmd(self.src_user, self.src_host)
# make sure that all subprocesses and fifo's are cleaned up when the program exits
atexit.register(self.cleanup)
# transfer file/directory from given src to SRM location with given turl
def transfer(self):
# for cleanup
self.started_procs = {}
self.fifos = []
try:
dst_turl = convert_surl_to_turl(self.dst_surl)
logger.info('ltacp %s: initiating transfer of %s:%s to %s' % (self.logId, self.src_host, self.src_path_data, self.dst_surl))
#---
# Server part
#---
# we'll randomize ports
# to minimize initial collision, randomize based on path and time
random.seed(hash(self.src_path_data) ^ hash(time.time()))
p_data_in, port_data = self._ncListen('data')
p_md5_receive, port_md5 = self._ncListen('md5 checksums')
# create fifo paths
self.local_fifo_basename = '/tmp/ltacp_datapipe_%s_%s' % (self.src_host, self.logId)
def createLocalFifo(fifo_postfix):
fifo_path = '%s_%s' % (self.local_fifo_basename, fifo_postfix)
logger.info('ltacp %s: creating data fifo: %s' % (self.logId, fifo_path))
if os.path.exists(fifo_path):
os.remove(fifo_path)
os.mkfifo(fifo_path)
if not os.path.exists(fifo_path):
raise LtacpException("ltacp %s: Could not create fifo: %s" % (self.logId, fifo_path))
self.fifos.append(fifo_path)
return fifo_path
self.local_data_fifo = createLocalFifo('globus_url_copy')
self.local_byte_count_fifo = createLocalFifo('local_byte_count')
self.local_adler32_fifo = createLocalFifo('local_adler32')
# tee incoming data stream to fifo (and pipe stream in tee_proc.stdout)
def teeDataStreams(pipe_in, fifo_out):
cmd_tee = ['tee', fifo_out]
logger.info('ltacp %s: splitting datastream. executing: %s' % (self.logId, ' '.join(cmd_tee),))
tee_proc = Popen(cmd_tee, stdin=pipe_in, stdout=PIPE, stderr=PIPE)
self.started_procs[tee_proc] = cmd_tee
return tee_proc
p_tee_data = teeDataStreams(p_data_in.stdout, self.local_data_fifo)
p_tee_byte_count = teeDataStreams(p_tee_data.stdout, self.local_byte_count_fifo)
p_tee_checksums = teeDataStreams(p_tee_byte_count.stdout, self.local_adler32_fifo)
# start counting number of bytes in incoming data stream
cmd_byte_count = ['wc', '-c', self.local_byte_count_fifo]
logger.info('ltacp %s: computing byte count. executing: %s' % (self.logId, ' '.join(cmd_byte_count)))
p_byte_count = Popen(cmd_byte_count, stdout=PIPE, stderr=PIPE, env=dict(os.environ, LC_ALL="C"))
self.started_procs[p_byte_count] = cmd_byte_count
# start computing md5 checksum of incoming data stream
cmd_md5_local = ['md5sum']
logger.info('ltacp %s: computing local md5 checksum. executing on data pipe: %s' % (self.logId, ' '.join(cmd_md5_local)))
p_md5_local = Popen(cmd_md5_local, stdin=p_tee_checksums.stdout, stdout=PIPE, stderr=PIPE)
self.started_procs[p_md5_local] = cmd_md5_local
# start computing adler checksum of incoming data stream
cmd_a32_local = ['./md5adler/a32', self.local_adler32_fifo]
#cmd_a32_local = ['md5sum', self.local_adler32_fifo]
logger.info('ltacp %s: computing local adler32 checksum. executing: %s' % (self.logId, ' '.join(cmd_a32_local)))
p_a32_local = Popen(cmd_a32_local, stdout=PIPE, stderr=PIPE)
self.started_procs[p_a32_local] = cmd_a32_local
# start copy fifo stream to SRM
guc_options = ['-cd', #create remote directories if missing
'-p 4', #number of parallel ftp connections
'-bs 131072', #buffer size
'-b', # binary
'-nodcau', # turn off data channel authentication for ftp transfers
]
cmd_data_out = ['/bin/bash', '-c', 'source %s; globus-url-copy %s file://%s %s' % (_ingest_init_script, ' '.join(guc_options), self.local_data_fifo, dst_turl)]
logger.info('ltacp %s: copying data stream into globus-url-copy. executing: %s' % (self.logId, ' '.join(cmd_data_out)))
p_data_out = Popen(cmd_data_out, stdout=PIPE, stderr=PIPE)
self.started_procs[p_data_out] = cmd_data_out
# Check if receiver side is set up correctly
# and all processes are still waiting for input from client
finished_procs = dict((p, cl) for (p, cl) in self.started_procs.items() if p.poll() is not None)
if len(finished_procs):
msg = ''
for p, cl in finished_procs.items():
o, e = p.communicate()
msg += " process pid:%d exited prematurely with exit code %d. cmdline: %s\nstdout: %s\nstderr: %s\n" % (p.pid,
p.returncode,
cl,
o,
e)
raise LtacpException("ltacp %s: %d local process(es) exited prematurely\n%s" % (self.logId, len(finished_procs), msg))
#---
# Client part
#---
# start remote copy on src host:
# 1) create fifo
# 2) send tar stream of data/dir + tee to fifo for 3)
# 3) simultaneously to 2), calculate checksum of fifo stream
# 4) break fifo
self.remote_data_fifo = '/tmp/ltacp_md5_pipe_%s_%s' % (self.logId, port_md5)
cmd_remote_mkfifo = self.ssh_cmd + ['mkfifo %s' % (self.remote_data_fifo,)]
logger.info('ltacp %s: remote creating fifo. executing: %s' % (self.logId, ' '.join(cmd_remote_mkfifo)))
p_remote_mkfifo = Popen(cmd_remote_mkfifo, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_mkfifo] = cmd_remote_mkfifo
# block until fifo is created
output_remote_mkfifo = p_remote_mkfifo.communicate()
if p_remote_mkfifo.returncode != 0:
raise LtacpException('ltacp %s: remote fifo creation failed: \nstdout: %s\nstderr: %s' % (self.logId, output_remote_mkfifo[0],output_remote_mkfifo[1]))
# get input datasize
cmd_remote_du = self.ssh_cmd + ['du -b --max-depth=0 %s' % (self.src_path_data,)]
logger.info('ltacp %s: remote getting datasize. executing: %s' % (self.logId, ' '.join(cmd_remote_du)))
p_remote_du = Popen(cmd_remote_du, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_du] = cmd_remote_du
# block until du is finished
output_remote_du = p_remote_du.communicate()
if p_remote_du.returncode != 0:
raise LtacpException('ltacp %s: remote fifo creation failed: \nstdout: %s\nstderr: %s' % (self.logId,
output_remote_du[0],
output_remote_du[1]))
# compute various parameters for progress logging
input_datasize = int(output_remote_du[0].split()[0])
logger.info('ltacp %s: input datasize: %d bytes, %s' % (self.logId, input_datasize, humanreadablesize(input_datasize)))
estimated_tar_size = 512*(input_datasize / 512) + 3*512 #512byte header, 2*512byte ending, 512byte modulo data
tar_record_size = 10240 # 20 * 512 byte blocks
# start sending remote data, tee to fifo
src_path_parent, src_path_child = os.path.split(self.src_path_data)
cmd_remote_data = self.ssh_cmd + ['cd %s && tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent,
src_path_child,
self.remote_data_fifo,
self.remoteNetCatCmd,
self.localIPAddress,
port_data)]
logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data)))
p_remote_data = Popen(cmd_remote_data, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_data] = cmd_remote_data
# start computation of checksum on remote fifo stream
cmd_remote_checksum = self.ssh_cmd + ['md5sum %s | %s %s %s' % (self.remote_data_fifo, self.remoteNetCatCmd, self.localIPAddress, port_md5)]
logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (self.logId, ' '.join(cmd_remote_checksum)))
p_remote_checksum = Popen(cmd_remote_checksum, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_checksum] = cmd_remote_checksum
# timedelta.total_seconds is only available for python >= 2.7
def timedelta_total_seconds(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6)
# waiting for output, comparing checksums, etc.
logger.info('ltacp %s: transfering...' % self.logId)
transfer_start_time = datetime.utcnow()
prev_progress_time = datetime.utcnow()
prev_bytes_transfered = 0
# wait and poll for progress while all processes are runnning
while len([p for p in self.started_procs.values() if p.poll() is not None]) == 0
try:
# read and process tar stdout lines to create progress messages
nextline = p_remote_data.stdout.readline()
if len(nextline) > 0:
record_nr = int(nextline.split()[-1].strip())
total_bytes_transfered = record_nr * tar_record_size
percentage_done = 100.0*total_bytes_transfered/input_datasize
current_progress_time = datetime.utcnow()
elapsed_secs_since_start = timedelta_total_seconds(current_progress_time - transfer_start_time)
elapsed_secs_since_prev = timedelta_total_seconds(current_progress_time - prev_progress_time)
if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0:
avg_speed = total_bytes_transfered / elapsed_secs_since_start
current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered
current_speed = current_bytes_transfered / elapsed_secs_since_prev
if elapsed_secs_since_prev > 60 or current_bytes_transfered > 0.05*input_datasize:
prev_progress_time = current_progress_time
prev_bytes_transfered = total_bytes_transfered
percentage_to_go = 100.0 - percentage_done
time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done
logger.info('ltacp %s: transfered %d bytes, %s, %.1f%% at avgSpeed=%s curSpeed=%s to_go=%s' % (self.logId,
total_bytes_transfered,
humanreadablesize(total_bytes_transfered),
percentage_done,
humanreadablesize(avg_speed, 'Bps'),
humanreadablesize(current_speed, 'Bps'),
timedelta(seconds=int(round(time_to_go)))))
time.sleep(0.25)
except KeyboardInterrupt:
self.cleanup()
logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
output_data_out = p_data_out.communicate()
if p_data_out.returncode != 0:
raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1]))
logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId)
logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId)
output_remote_data = p_remote_data.communicate()
if p_remote_data.returncode != 0:
raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1]))
logger.debug('ltacp %s: remote data transfer finished...' % self.logId)
logger.info('ltacp %s: waiting for remote md5 checksum computation to finish...' % self.logId)
output_remote_checksum = p_remote_checksum.communicate()
if p_remote_checksum.returncode != 0:
raise LtacpException('ltacp %s: Error in remote md5 checksum computation: %s' % (self.logId, output_remote_checksum[1]))
logger.debug('ltacp %s: remote md5 checksum computation finished.' % self.logId)
logger.debug('ltacp %s: waiting to receive remote md5 checksum...' % self.logId)
output_md5_receive = p_md5_receive.communicate()
if p_md5_receive.returncode != 0:
raise LtacpException('ltacp %s: Error while receiving remote md5 checksum: %s' % (self.logId, output_md5_receive[1]))
logger.debug('ltacp %s: received md5 checksum.' % self.logId)
logger.debug('ltacp %s: waiting for local computation of md5 checksum...' % self.logId)
output_md5_local = p_md5_local.communicate()
if p_md5_local.returncode != 0:
raise LtacpException('ltacp %s: Error while receiving remote md5 checksum: %s' % (self.logId, output_md5_local[1]))
logger.debug('ltacp %s: computed local md5 checksum.' % self.logId)
# compare remote and local md5 checksums
try:
md5_checksum_remote = output_md5_receive[0].split(' ')[0]
md5_checksum_local = output_md5_local[0].split(' ')[0]
if(md5_checksum_remote != md5_checksum_local):
raise LtacpException('md5 checksum reported by client (%s) does not match local checksum of incoming data stream (%s)' % (self.logId, md5_checksum_remote, md5_checksum_local))
logger.info('ltacp %s: remote and local md5 checksums are equal: %s' % (self.logId, md5_checksum_local,))
except Exception as e:
logger.error('ltacp %s: error while parsing md5 checksum outputs: local=%s received=%s' % (self.logId, output_md5_local[0], output_md5_receive[0]))
raise
logger.debug('ltacp %s: waiting for local byte count on datastream...' % self.logId)
output_byte_count = p_byte_count.communicate()
if p_byte_count.returncode != 0:
raise LtacpException('ltacp %s: Error while receiving remote md5 checksum: %s' % (self.logId, output_byte_count[1]))
byte_count = int(output_byte_count[0].split()[0].strip())
logger.info('ltacp %s: byte count of datastream is %d %s' % (self.logId, byte_count, humanreadablesize(byte_count)))
logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId)
output_a32_local = p_a32_local.communicate()
if p_a32_local.returncode != 0:
raise LtacpException('ltacp %s: local adler32 checksum computation failed: %s' (self.logId, str(output_a32_local)))
logger.debug('ltacp %s: finished computation of local adler32 checksum' % self.logId)
a32_checksum_local = output_a32_local[0].split()[1]
logger.info('ltacp %s: fetching adler32 checksum from LTA...' % self.logId)
srm_a32_checksum = get_srm_a32_checksum(self.dst_surl)
if not srm_a32_checksum:
raise LtacpException('ltacp %s: Could not get srm adler32 checksum for: %s' % (self.logId, self.dst_surl))
if(srm_a32_checksum != a32_checksum_local):
raise LtacpException('ltacp %s: adler32 checksum reported by srm (%s) does not match original data checksum (%s)' % (self.logId,
srm_a32_checksum,
a32_checksum_local))
logger.info('ltacp %s: adler32 checksums are equal: %s' % (self.logId, a32_checksum_local))
logger.info('ltacp %s: transfer to LTA completed successfully.' % (self.logId))
except Exception as e:
# Something went wrong
logger.error('ltacp %s: Error in transfer: %s' % (self.logId, str(e)))
# re-raise the exception to the caller
raise
finally:
# cleanup
self.cleanup()
logger.info('ltacp %s: successfully completed transfer of %s:%s to %s' % (self.logId, self.src_host, self.src_path_data, self.dst_surl))
return (md5_checksum_local, a32_checksum_local, str(byte_count))
def _ncListen(self, log_name):
# pick initial random port for data receiver
port = str(random.randint(49152, 65535))
while True:
# start listen for data stream
cmd_listen = self.localNetCatCmd.split(' ') + ['-l', port]
logger.info('ltacp %s: listening for %s. executing: %s' % (self.logId, log_name, ' '.join(cmd_listen)))
p_listen = Popen(cmd_listen, stdout=PIPE, stderr=PIPE)
time.sleep(0.5)
if p_listen.poll() is not None:
# nc returned prematurely, pick another port to listen to
o, e = p_listen.communicate()
logger.info('ltacp %s: nc returned prematurely: %s' % (self.logId, e.strip()))
port = str(random.randint(49152, 65535))
else:
self.started_procs[p_listen] = cmd_listen
return (p_listen, port)
def cleanup(self):
logger.debug('ltacp %s: cleaning up' % (self.logId))
if hasattr(self, 'remote_data_fifo') and self.remote_data_fifo:
'''remove a file (or fifo) on a remote host. Test if file exists before deleting.'''
cmd_remote_rm = self.ssh_cmd + ['if [ -e "%s" ] ; then rm %s ; fi ;' % (self.remote_data_fifo, self.remote_data_fifo)]
logger.info('ltacp %s: removing remote fifo. executing: %s' % (self.logId, ' '.join(cmd_remote_rm)))
p_remote_rm = Popen(cmd_remote_rm, stdout=PIPE, stderr=PIPE)
p_remote_rm.communicate()
if p_remote_rm.returncode != 0:
logger.error("Could not remove remote fifo %s@%s:%s\n%s" % (self.src_user, self.src_host, self.remote_data_fifo, p_remote_rm.stderr))
self.remote_data_fifo = None
# remove local fifos
for fifo in self.fifos:
if os.path.exists(fifo):
logger.info('ltacp %s: removing local fifo: %s' % (self.logId, fifo))
os.remove(fifo)
self.fifos = []
# cancel any started running process, as they should all be finished by now
running_procs = dict((p, cl) for (p, cl) in self.started_procs.items() if p.poll() == None)
if len(running_procs):
logger.warning('ltacp %s: terminating %d running subprocesses...' % (self.logId, len(running_procs)))
for p,cl in running_procs.items():
if isinstance(cl, list):
cl = ' '.join(cl)
logger.warning('ltacp %s: terminated running process pid=%d cmdline: %s' % (self.logId, p.pid, cl))
p.terminate()
logger.info('ltacp %s: terminated %d running subprocesses...' % (self.logId, len(running_procs)))
self.started_procs = {}
logger.debug('ltacp %s: finished cleaning up' % (self.logId))
# execute command and return (stdout, stderr, returncode) tuple
def execute(cmd):
logger.info('executing: %s' % ' '.join(cmd))
p_cmd = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = p_cmd.communicate()
return (stdout, stderr, p_cmd.returncode)
# remove file from srm
def srmrm(surl):
return execute(['/bin/bash', '-c', 'source %s; srmrm %s' % (_ingest_init_script, surl)])
# remove (empty) directory from srm
def srmrmdir(surl):
return execute(['/bin/bash', '-c', 'source %s; srmrmdir %s' % (_ingest_init_script, surl)])
# create directory in srm
def srmmkdir(surl):
return execute(['/bin/bash', '-c', 'source %s; srmmkdir -retry_num=0 %s' % (_ingest_init_script, surl)])
# detailed listing
def srmls(surl):
return execute(['/bin/bash', '-c', 'source %s; srmls %s' % (_ingest_init_script, surl)])
# detailed listing
def srmll(surl):
return execute(['/bin/bash', '-c', 'source %s; srmls -l %s' % (_ingest_init_script, surl)])
# get checksum from srm via srmls
def get_srm_a32_checksum(surl):
output, errors, code = srmll(surl)
if code != 0:
return False
if not 'Checksum type:' in output:
return False
if 'Checksum type:' in output:
cstype = output.split('Checksum type:')[1].split()[0].strip()
if cstype.lower() != 'adler32':
return False
if 'Checksum value:' in output:
return output.split('Checksum value:')[1].lstrip().split()[0]
return False
#recursively checks for presence of parent directory and created the missing part of a tree
def create_missing_directories(surl):
parent, child = os.path.split(surl)
missing = []
# determine missing dirs
while parent:
logger.info('checking path: %s' % parent)
o, e, code = srmls(parent)
if code == 0:
logger.info('srmls returned successfully, so this path apparently exists: %s' % parent)
break;
else:
parent, child = os.path.split(parent)
missing.append(child)
# recreate missing dirs
while len(missing) > 0:
parent = parent + '/' + missing.pop()
code = srmmkdir(parent)[2]
if code != 0:
logger.info('failed to create missing directory: %s' % parent)
return code
logger.info('successfully created parent directory: %s' % parent)
return 0
# limited standalone mode for testing:
# usage: ltacp.py <remote-host> <remote-path> <surl>
if __name__ == '__main__':
if len(sys.argv) < 4:
print 'example: ./ltacp.py 10.196.232.11 /home/users/ingest/1M.txt srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/test/eor/1M.txt'
sys.exit()
# transfer test:
cp = LtaCp(sys.argv[1], sys.argv[2], sys.argv[3], 'ingest')
cp.transfer()