diff --git a/LTA/LTAIngest/ltacp.py b/LTA/LTAIngest/ltacp.py index 2985c3b9dd5c327e8678fd953824ece20804a43b..522f773f7494c102afe26639b808908719dfc80e 100644 --- a/LTA/LTAIngest/ltacp.py +++ b/LTA/LTAIngest/ltacp.py @@ -31,7 +31,7 @@ class LtacpException(Exception): def __init__(self, value): self.value = value def __str__(self): - return repr(self.value) + return str(self.value) def getfqdn(): return "10.178.1.2" @@ -61,136 +61,138 @@ def transfer(src_host, dst_surl, src_user=None): - src_filename = os.path.basename(src_path_data) - - if not src_user: - src_user = getpass.getuser() - - dst_turl = convert_surl_to_turl(dst_surl) - logger.info('ltacp %s: initiating transfer of %s:%s to %s' % (src_filename, src_host, src_path_data, dst_surl)) - # for cleanup - started_procs = [] - - #--- - # Server part - #--- - - # we'll randomize ports - # to minimize initial collision, randomize based on path and time - random.seed(hash(src_path_data) ^ hash(time.time())) - - # pick initial random port for data receiver - port_data = str(random.randint(49152, 65535)) - while True: - # start listen for data stream - cmd_data_in = ['nc', '-l', '-q','0', port_data] - logger.info('ltacp %s: listening for data. executing: %s' % (src_filename, ' '.join(cmd_data_in))) - p_data_in = Popen(cmd_data_in, stdout=PIPE, stderr=PIPE) - - time.sleep(0.5) - if p_data_in.poll(): - # nc returned prematurely, pick another port to listen to - o, e = p_data_in.communicate() - logger.info('ltacp %s: nc returned prematurely: %s' % (src_filename, e.strip())) - port_data = str(random.randint(49152, 65535)) - else: - started_procs.append(p_data_in) - break - - - # pick initial random port for md5 receiver - port_md5 = str(random.randint(49152, 65535)) - while True: - # start listen for checksums - cmd_md5_receive = ['nc','-l', '-q','0', port_md5] - logger.info('ltacp %s: listening for md5 checksums. executing: %s' % (src_filename, ' '.join(cmd_md5_receive))) - p_md5_receive = Popen(cmd_md5_receive, stdout=PIPE, stderr=PIPE) - - time.sleep(0.5) - if p_md5_receive.poll(): - # nc returned prematurely, pick another port to listen to - o, e = p_md5_receive.communicate() - logger.info('ltacp %s: nc returned prematurely: %s' % (src_filename, e.strip())) - port_md5 = str(random.randint(49152, 65535)) - else: - started_procs.append(p_md5_receive) - break - - # create fifo paths - local_data_fifo = '/tmp/ltacp_datapipe_'+src_host+'_'+port_data - remote_data_fifo = '/tmp/ltacp_md5_receivepipe_'+port_md5 - - # create local fifo to stream data to globus-url-copy - logger.info('ltacp %s: creating data fifo for globus-url-copy: %s' % (src_filename, local_data_fifo)) - if os.path.exists(local_data_fifo): - os.remove(local_data_fifo) - os.mkfifo(local_data_fifo) - - # create local fifo to stream data to adler32 - local_adler32_fifo = local_data_fifo+'_adler32' - logger.info('ltacp %s: creating data fifo for adler32: %s' % (src_filename, local_adler32_fifo)) - if os.path.exists(local_adler32_fifo): - os.remove(local_adler32_fifo) - os.mkfifo(local_adler32_fifo) - - # start tee incoming data stream to fifo (pipe stream further for checksum) - cmd_tee_data = ['tee', local_data_fifo] - logger.info('ltacp %s: splitting datastream. executing on stdout of data listener: %s' % (src_filename, ' '.join(cmd_tee_data),)) - p_tee_data = Popen(cmd_tee_data, stdin=p_data_in.stdout, stdout=PIPE) - started_procs.append(p_tee_data) - - # start tee incoming data stream to fifo (pipe stream further for checksum) - cmd_tee_checksums = ['tee', local_adler32_fifo] - logger.info('ltacp %s: splitting datastream again. executing: on stdout of 1st data tee: %s' % (src_filename, ' '.join(cmd_tee_checksums),)) - p_tee_checksums = Popen(cmd_tee_checksums, stdin=p_tee_data.stdout, stdout=PIPE) - started_procs.append(p_tee_checksums) - - # start computing md5 checksum of incoming data stream - cmd_md5_local = ['md5sum'] - logger.info('ltacp %s: computing local md5 checksum. executing on stdout of 2nd data tee: %s' % (src_filename, ' '.join(cmd_md5_local))) - p_md5_local = Popen(cmd_md5_local, stdin=p_tee_checksums.stdout, stdout=PIPE, stderr=PIPE) - started_procs.append(p_md5_local) - - # start computing adler checksum of incoming data stream - cmd_a32_local = ['./md5adler/a32', local_adler32_fifo] - logger.info('ltacp %s: computing local adler32 checksum. executing: %s' % (src_filename, ' '.join(cmd_a32_local))) - p_a32_local = Popen(cmd_a32_local, stdout=PIPE, stderr=PIPE) - started_procs.append(p_a32_local) - - # start copy fifo stream to SRM - cmd_data_out = ['globus-url-copy', local_data_fifo, dst_turl] - logger.info('ltacp %s: copying data stream into globus-url-copy. executing: %s' % (src_filename, ' '.join(cmd_data_out))) - p_data_out = Popen(cmd_data_out, stdout=PIPE, stderr=PIPE) - started_procs.append(p_data_out) - - # Check if our side is set up correctly - # and all processes are still waiting for input from client - for p in started_procs: - ret = p.poll() - if ret is not None: - raise LtacpException("process %d exited prematurely with exit code %d" % (src_filename, p.pid, ret)) - - #--- - # Client part - #--- - - # start remote copy on src host: - # 1a) remove any obsolete fifo which might be in the way - # 1b) create fifo - # 2) send tar stream of data/dir + tee to fifo for 3) - # 3) simultaneously to 2), calculate checksum of fifo stream - # 4) break fifo - _removeRemoteFile(src_user, src_host, remote_data_fifo, log_prefix='ltacp %s: ' % src_filename) - cmd_remote_mkfifo = ['ssh '+src_user+'@'+src_host+ - ' \'mkfifo '+remote_data_fifo+ - '\''] - logger.info('ltacp %s: remote creating fifo. executing: %s' % (src_filename, ' '.join(cmd_remote_mkfifo))) - p_remote_mkfifo = Popen(cmd_remote_mkfifo, shell=True, stdout=PIPE, stderr=PIPE) - started_procs.append(p_remote_mkfifo) - + started_procs = {} try: + src_filename = os.path.basename(src_path_data) + + if not src_user: + src_user = getpass.getuser() + + dst_turl = convert_surl_to_turl(dst_surl) + logger.info('ltacp %s: initiating transfer of %s:%s to %s' % (src_filename, src_host, src_path_data, dst_surl)) + + #--- + # Server part + #--- + + # we'll randomize ports + # to minimize initial collision, randomize based on path and time + random.seed(hash(src_path_data) ^ hash(time.time())) + + # pick initial random port for data receiver + port_data = str(random.randint(49152, 65535)) + while True: + # start listen for data stream + cmd_data_in = ['nc', '-l', '-q','0', port_data] + logger.info('ltacp %s: listening for data. executing: %s' % (src_filename, ' '.join(cmd_data_in))) + p_data_in = Popen(cmd_data_in, stdout=PIPE, stderr=PIPE) + + time.sleep(0.5) + if p_data_in.poll(): + # nc returned prematurely, pick another port to listen to + o, e = p_data_in.communicate() + logger.info('ltacp %s: nc returned prematurely: %s' % (src_filename, e.strip())) + port_data = str(random.randint(49152, 65535)) + else: + started_procs[p_data_in] = cmd_data_in + break + + + # pick initial random port for md5 receiver + port_md5 = str(random.randint(49152, 65535)) + while True: + # start listen for checksums + cmd_md5_receive = ['nc','-l', '-q','0', port_md5] + logger.info('ltacp %s: listening for md5 checksums. executing: %s' % (src_filename, ' '.join(cmd_md5_receive))) + p_md5_receive = Popen(cmd_md5_receive, stdout=PIPE, stderr=PIPE) + + time.sleep(0.5) + if p_md5_receive.poll(): + # nc returned prematurely, pick another port to listen to + o, e = p_md5_receive.communicate() + logger.info('ltacp %s: nc returned prematurely: %s' % (src_filename, e.strip())) + port_md5 = str(random.randint(49152, 65535)) + else: + started_procs[p_md5_receive] = cmd_md5_receive + break + + # create fifo paths + local_data_fifo = '/tmp/ltacp_datapipe_'+src_host+'_'+port_data + remote_data_fifo = '/tmp/ltacp_md5_receivepipe_'+port_md5 + + # create local fifo to stream data to globus-url-copy + logger.info('ltacp %s: creating data fifo for globus-url-copy: %s' % (src_filename, local_data_fifo)) + if os.path.exists(local_data_fifo): + os.remove(local_data_fifo) + os.mkfifo(local_data_fifo) + + # create local fifo to stream data to adler32 + local_adler32_fifo = local_data_fifo+'_adler32' + logger.info('ltacp %s: creating data fifo for adler32: %s' % (src_filename, local_adler32_fifo)) + if os.path.exists(local_adler32_fifo): + os.remove(local_adler32_fifo) + os.mkfifo(local_adler32_fifo) + + # start tee incoming data stream to fifo (pipe stream further for checksum) + cmd_tee_data = ['tee', local_data_fifo] + logger.info('ltacp %s: splitting datastream. executing on stdout of data listener: %s' % (src_filename, ' '.join(cmd_tee_data),)) + p_tee_data = Popen(cmd_tee_data, stdin=p_data_in.stdout, stdout=PIPE) + started_procs[p_tee_data] = cmd_tee_data + + # start tee incoming data stream to fifo (pipe stream further for checksum) + cmd_tee_checksums = ['tee', local_adler32_fifo] + logger.info('ltacp %s: splitting datastream again. executing: on stdout of 1st data tee: %s' % (src_filename, ' '.join(cmd_tee_checksums),)) + p_tee_checksums = Popen(cmd_tee_checksums, stdin=p_tee_data.stdout, stdout=PIPE) + started_procs[p_tee_checksums] = cmd_tee_checksums + + # start computing md5 checksum of incoming data stream + cmd_md5_local = ['md5sum'] + logger.info('ltacp %s: computing local md5 checksum. executing on stdout of 2nd data tee: %s' % (src_filename, ' '.join(cmd_md5_local))) + p_md5_local = Popen(cmd_md5_local, stdin=p_tee_checksums.stdout, stdout=PIPE, stderr=PIPE) + started_procs[p_md5_local] = cmd_md5_local + + # start computing adler checksum of incoming data stream + cmd_a32_local = ['./md5adler/a32', local_adler32_fifo] + logger.info('ltacp %s: computing local adler32 checksum. executing: %s' % (src_filename, ' '.join(cmd_a32_local))) + p_a32_local = Popen(cmd_a32_local, stdout=PIPE, stderr=PIPE) + started_procs[p_a32_local] = cmd_a32_local + + # start copy fifo stream to SRM + cmd_data_out = ['globus-url-copy', local_data_fifo, dst_turl] + logger.info('ltacp %s: copying data stream into globus-url-copy. executing: %s' % (src_filename, ' '.join(cmd_data_out))) + p_data_out = Popen(cmd_data_out, stdout=PIPE, stderr=PIPE) + started_procs[p_data_out] = cmd_data_out + + # Check if receiver side is set up correctly + # and all processes are still waiting for input from client + finished_procs = dict((p, cl) for (p, cl) in started_procs.items() if p.poll() is not None) + + if len(finished_procs): + msg = '' + for p, cl in finished_procs.items(): + msg += " process pid:%d exited prematurely with exit code %d. cmdline: %s\n" % (p.pid, ret, cl) + raise LtacpException("ltacp: %s %s local process(es) exited prematurely\n%s" % (src_filename, msg)) + + #--- + # Client part + #--- + + # start remote copy on src host: + # 1a) remove any obsolete fifo which might be in the way + # 1b) create fifo + # 2) send tar stream of data/dir + tee to fifo for 3) + # 3) simultaneously to 2), calculate checksum of fifo stream + # 4) break fifo + _removeRemoteFile(src_user, src_host, remote_data_fifo, log_prefix='ltacp %s: ' % src_filename) + cmd_remote_mkfifo = ['ssh '+src_user+'@'+src_host+ + ' \'mkfifo '+remote_data_fifo+ + '\''] + logger.info('ltacp %s: remote creating fifo. executing: %s' % (src_filename, ' '.join(cmd_remote_mkfifo))) + p_remote_mkfifo = Popen(cmd_remote_mkfifo, shell=True, stdout=PIPE, stderr=PIPE) + started_procs[p_remote_mkfifo] = cmd_remote_mkfifo + # block until fifo is created output_remote_mkfifo = p_remote_mkfifo.communicate() if p_remote_mkfifo.returncode != 0: @@ -204,7 +206,7 @@ def transfer(src_host, '\''] logger.info('ltacp %s: remote starting transfer. executing: %s' % (src_filename, ' '.join(cmd_remote_data))) p_remote_data = Popen(cmd_remote_data, shell=True, stdout=PIPE, stderr=PIPE) - started_procs.append(p_remote_data) + started_procs[p_remote_data] = cmd_remote_data # start computation of checksum on remote fifo stream cmd_remote_checksum = ['ssh '+src_user+'@'+src_host+ @@ -212,7 +214,7 @@ def transfer(src_host, '\''] logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (src_filename, ' '.join(cmd_remote_checksum))) p_remote_checksum = Popen(cmd_remote_checksum, shell=True, stdout=PIPE, stderr=PIPE) - started_procs.append(p_remote_checksum) + started_procs[p_remote_checksum] = cmd_remote_checksum # waiting for output, comparing checksums, etc. @@ -267,27 +269,30 @@ def transfer(src_host, else: raise LtacpException('SSH (for sending data and checksum) failed: '+output_remote_data[1]+' --- '+output_remote_checksum[1]) - except LtacpException as e: + except Exception as e: # Something went wrong - logger.error('ltacp %s: ! Fatal Error: %s' % (src_filename, str(e))) - - # --- - # wrapping up - # --- - - # remove remote fifo - _removeRemoteFile(src_user, src_host, remote_data_fifo, log_prefix='ltacp %s: ' % src_filename) + logger.error('ltacp %s: %s' % (src_filename, str(e))) + # re-raise the exception to the caller + raise + finally: + # --- + # cleanup + # --- + + # remove remote fifo + _removeRemoteFile(src_user, src_host, remote_data_fifo, log_prefix='ltacp %s: ' % src_filename) + + # remove local data fifo + logger.info('ltacp %s: removing local data fifo for globus-url-copy: %s' % (src_filename, local_data_fifo)) + os.remove(local_data_fifo) - # remove local data fifo - logger.info('ltacp %s: removing local data fifo for globus-url-copy: %s' % (src_filename, local_data_fifo)) - os.remove(local_data_fifo) + logger.debug('ltacp %s: waiting for subprocesses to complete...' % src_filename) - logger.debug('ltacp %s: waiting for subprocesses to complete...' % src_filename) - # cancel any started process - for p in started_procs: - if p.poll() == None: - p.terminate() - logger.info('ltacp %s: terminated hanging process %d' % (src_filename, p.pid)) + # cancel any started hanging process, as they should all be finished by now + for p,cl in started_procs.items(): + if p.poll() == None: + logger.warning('ltacp %s: terminated hanging process pid=%d cmdline=%s' % (src_filename, p.pid, cl)) + p.terminate() logger.info('ltacp %s: successfully completed transfer of %s:%s to %s' % (src_filename, src_host, src_path_data, dst_surl)) return (md5_checksum_local, a32_checksum_local) @@ -368,7 +373,7 @@ def create_missing_directories(surl): if __name__ == '__main__': # transfer test: - transfer(sys.argv[1], sys.argv[2], sys.argv[3]) + transfer(sys.argv[1], sys.argv[2], sys.argv[3], 'ingest') # srmls/srmrm test: #print get_srm_a32_checksum(sys.argv[3])