Skip to content
Snippets Groups Projects
Commit 9654e2cb authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8725: code flow, fail early with proper exception messages.

parent f3e36519
No related branches found
No related tags found
No related merge requests found
......@@ -23,7 +23,7 @@ if __name__ == '__main__':
formatter.converter = time.gmtime
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.setLevel(logging.DEBUG)
logger.setLevel(logging.INFO)
class LtacpException(Exception):
......@@ -142,7 +142,7 @@ class LtaCp:
# start tee incoming data stream to fifo (pipe stream further for checksum)
cmd_tee_checksums = ['tee', self.local_adler32_fifo]
logger.info('ltacp %s: splitting datastream again. executing: on stdout of 1st data tee: %s' % (self.logId, ' '.join(cmd_tee_checksums),))
logger.info('ltacp %s: splitting datastream again. executing on stdout of 1st data tee: %s' % (self.logId, ' '.join(cmd_tee_checksums),))
p_tee_checksums = Popen(cmd_tee_checksums, stdin=p_tee_data.stdout, stdout=PIPE)
self.started_procs[p_tee_checksums] = cmd_tee_checksums
......@@ -217,59 +217,65 @@ class LtaCp:
# waiting for output, comparing checksums, etc.
logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId)
output_remote_data = p_remote_data.communicate()
if p_remote_data.returncode != 0:
raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1]))
logger.debug('ltacp %s: remote data transfer finished...' % self.logId)
logger.info('ltacp %s: waiting for remote md5 checksum transfer to finish...' % self.logId)
output_remote_checksum = p_remote_checksum.communicate()
logger.debug('ltacp %s: remote md5 checksum transfer finished.' % self.logId)
if p_remote_data.returncode == 0 and p_remote_checksum.returncode == 0:
logger.debug('ltacp %s: waiting for remote md5 checksum...' % self.logId)
output_md5_remote = p_md5_receive.communicate()
logger.debug('ltacp %s: waiting for local md5 checksum...' % self.logId)
output_md5_local = p_md5_local.communicate()
if p_md5_receive.returncode == 0 and p_md5_local.returncode == 0:
md5_checksum_remote = output_md5_remote[0].split()[0]
md5_checksum_local = output_md5_local[0].split()[0]
if(md5_checksum_remote == md5_checksum_local):
logger.info('ltacp %s: remote and local md5 checksums are equal: %s' % (self.logId, md5_checksum_local,))
else:
raise LtacpException('md5 checksum reported by client (%s) does not match local checksum of incoming data stream (%s)' % (self.logId, md5_checksum_remote, md5_checksum_local))
logger.debug('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
output_data_out = p_data_out.communicate()
if p_data_out.returncode == 0:
logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId)
logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId)
output_a32_local = p_a32_local.communicate()
a32_checksum_local = output_a32_local[0].split()[1]
if p_a32_local.returncode != 0:
raise LtacpException('local adler32 checksum computation failed: '+str(output_a32_local))
logger.debug('ltacp %s: fetching adler32 checksum from LTA...' % self.logId)
srm_a32_checksum = get_srm_a32_checksum(self.dst_surl)
if not srm_a32_checksum:
raise LtacpException('Could not get srm adler32 checksum for: '+self.dst_surl)
if(srm_a32_checksum != a32_checksum_local):
raise LtacpException('adler32 checksum reported by srm ('+srm_a32_checksum+') does not match original data checksum ('+a32_checksum_local+')')
logger.info('ltacp %s: adler32 checksums are equal: %s' % (self.logId, a32_checksum_local))
logger.info('ltacp %s: transfer to LTA completed successfully.' % (self.logId))
else:
raise LtacpException('Transfer to SRM failed: '+output_data_out[1])
else:
raise LtacpException('MD5 checksum comparison failed, remote error: '+output_md5_remote[1]+' --- '+output_md5_local[1])
else:
raise LtacpException('SSH (for sending data and checksum) failed: '+output_remote_data[1]+' --- '+output_remote_checksum[1])
logger.info('ltacp %s: waiting for remote md5 checksum computation to finish...' % self.logId)
output_remote_checksum = p_remote_checksum.communicate()
if p_remote_checksum.returncode != 0:
raise LtacpException('ltacp %s: Error in remote md5 checksum computation: %s' % (self.logId, output_remote_checksum[1]))
logger.debug('ltacp %s: remote md5 checksum computation finished.' % self.logId)
logger.debug('ltacp %s: waiting to receive remote md5 checksum...' % self.logId)
output_md5_receive = p_md5_receive.communicate()
if p_md5_receive.returncode != 0:
raise LtacpException('ltacp %s: Error while receiving remote md5 checksum: %s' % (self.logId, output_md5_receive[1]))
logger.debug('ltacp %s: received md5 checksum.' % self.logId)
logger.debug('ltacp %s: waiting for local computation of md5 checksum...' % self.logId)
output_md5_local = p_md5_local.communicate()
if p_md5_local.returncode != 0:
raise LtacpException('ltacp %s: Error while receiving remote md5 checksum: %s' % (self.logId, output_md5_local[1]))
logger.debug('ltacp %s: computed local md5 checksum.' % self.logId)
# compare remote and local md5 checksums
md5_checksum_remote = output_md5_receive[0].split()[0]
md5_checksum_local = output_md5_local[0].split()[0]
if(md5_checksum_remote != md5_checksum_local):
raise LtacpException('md5 checksum reported by client (%s) does not match local checksum of incoming data stream (%s)' % (self.logId, md5_checksum_remote, md5_checksum_local))
logger.info('ltacp %s: remote and local md5 checksums are equal: %s' % (self.logId, md5_checksum_local,))
logger.debug('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
output_data_out = p_data_out.communicate()
if p_data_out.returncode != 0:
raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId,output_data_out[1]))
logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId)
logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId)
output_a32_local = p_a32_local.communicate()
if p_a32_local.returncode != 0:
raise LtacpException('local adler32 checksum computation failed: '+str(output_a32_local))
logger.debug('ltacp %s: finished computation of local adler32 checksum' % self.logId)
a32_checksum_local = output_a32_local[0].split()[1]
logger.debug('ltacp %s: fetching adler32 checksum from LTA...' % self.logId)
srm_a32_checksum = get_srm_a32_checksum(self.dst_surl)
if not srm_a32_checksum:
raise LtacpException('ltacp %s: Could not get srm adler32 checksum for: %s' % (self.logId, self.dst_surl))
if(srm_a32_checksum != a32_checksum_local):
raise LtacpException('ltacp %s: adler32 checksum reported by srm (%s) does not match original data checksum (%s)' % (self.logId,
srm_a32_checksum,
a32_checksum_local))
logger.info('ltacp %s: adler32 checksums are equal: %s' % (self.logId, a32_checksum_local))
logger.info('ltacp %s: transfer to LTA completed successfully.' % (self.logId))
except Exception as e:
# Something went wrong
logger.error('ltacp %s: %s' % (self.logId, str(e)))
logger.error('ltacp %s: Error in transfer: %s' % (self.logId, str(e)))
# re-raise the exception to the caller
raise
finally:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment