Skip to content
Snippets Groups Projects
Commit dba7bd10 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8598: convert dawn hostnames to ip addresses. fetch sip from...

Task #8598: convert dawn hostnames to ip addresses. fetch sip from SIPLocation. Add ingest fields to SIP.
parent c5645b9d
No related branches found
No related tags found
No related merge requests found
...@@ -5,19 +5,20 @@ from lxml import etree ...@@ -5,19 +5,20 @@ from lxml import etree
from cStringIO import StringIO from cStringIO import StringIO
from job_group import corr_type, bf_type, img_type, unspec_type, pulp_type from job_group import corr_type, bf_type, img_type, unspec_type, pulp_type
import ltacp import ltacp
import getpass
# TODO: reuse method from LCS.PyCommon.utils
def humanreadablesize(num, suffix='B', base=1000): def humanreadablesize(num, suffix='B'):
""" converts the given size (number) to a human readable string in powers of 'base'""" """ converts the given size (number) to a human readable string in powers of 1024
try: """
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: try:
if abs(num) < float(base): for unit in ['','K','M','G','T','P','E','Z']:
return "%3.1f%s%s" % (num, unit, suffix) if abs(num) < 1024.0:
num /= float(base) return "%3.1f%s%s" % (num, unit, suffix)
return "%.2f%s%s" % (num, 'Y', suffix) num /= 1024.0
except TypeError: return "%.1f%s%s" % (num, 'Y', suffix)
return str(num) except TypeError:
return str(num)
IngestStarted = 10 IngestStarted = 10
## 20 not used ## 20 not used
IngestSIPComplete = 30 IngestSIPComplete = 30
...@@ -178,7 +179,7 @@ class IngestPipeline(): ...@@ -178,7 +179,7 @@ class IngestPipeline():
## SecondaryUri handling not implemented ## SecondaryUri handling not implemented
self.logger.debug(cmd) self.logger.debug(cmd)
start = time.time() start = time.time()
p = subprocess.Popen(cmd, stdin=open('/dev/null'), stdout=subprocess.PIPE, stderr=subprocess.PIPE) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logs = p.communicate() logs = p.communicate()
elapsed = time.time() - start elapsed = time.time() - start
self.logger.debug("File transfer for %s took %d sec" % (self.JobId, elapsed)) self.logger.debug("File transfer for %s took %d sec" % (self.JobId, elapsed))
...@@ -214,7 +215,17 @@ class IngestPipeline(): ...@@ -214,7 +215,17 @@ class IngestPipeline():
try: try:
start = time.time() start = time.time()
cp = ltacp.LtaCp(self.HostLocation, host = self.HostLocation
# eor dawn nodes are not known in dns
# convert to ip address
for i in range(1, 33):
host = host.replace('node%d.intra.dawn.rug.nl' % (i+100,), '10.196.232.%d' % (i+10,))
self.logger.info(os.path.join(self.LocationDir, self.Source))
self.logger.info(self.PrimaryUri)
cp = ltacp.LtaCp(host,
os.path.join(self.LocationDir, self.Source), os.path.join(self.LocationDir, self.Source),
self.PrimaryUri) self.PrimaryUri)
...@@ -260,6 +271,7 @@ class IngestPipeline(): ...@@ -260,6 +271,7 @@ class IngestPipeline():
uris = '' uris = ''
try: try:
start = time.time() start = time.time()
self.logger.debug("SendChecksums for %s: project=%s ticket=%s size=%s md5=%s a32=%s uris=%s" % (self.JobId, self.Project, self.ticket, self.FileSize,self.MD5Checksum,self.Adler32Checksum, uris))
result = self.ltaClient.SendChecksums(self.Project, self.ticket, self.FileSize, {'MD5':self.MD5Checksum,'Adler32':self.Adler32Checksum}, uris) result = self.ltaClient.SendChecksums(self.Project, self.ticket, self.FileSize, {'MD5':self.MD5Checksum,'Adler32':self.Adler32Checksum}, uris)
self.logger.debug("SendChecksums for %s took %ds" % (self.JobId, time.time() - start)) self.logger.debug("SendChecksums for %s took %ds" % (self.JobId, time.time() - start))
except xmlrpclib.Fault as err: except xmlrpclib.Fault as err:
...@@ -366,10 +378,78 @@ class IngestPipeline(): ...@@ -366,10 +378,78 @@ class IngestPipeline():
raise raise
self.logger.debug('SIP received for %s from MoM with size %d (%s): %s' % (self.JobId, len(self.SIP), humanreadablesize(len(self.SIP)), self.SIP[0:256])) self.logger.debug('SIP received for %s from MoM with size %d (%s): %s' % (self.JobId, len(self.SIP), humanreadablesize(len(self.SIP)), self.SIP[0:256]))
elif self.Type.lower() == "eor": elif self.Type.lower() == "eor":
# TODO: fetch SIP from EoR, fill in checksums try:
self.SIP = unspecifiedSIP.makeSIP(self.Project, self.ObsId, self.ArchiveId, self.ticket, self.FileName, self.FileSize, self.MD5Checksum, self.Adler32Checksum, self.Type) sip_host = job['SIPLocation'].split(':')[0]
self.logger.warning('Generated unspecified SIP for %s: %s' % (self.JobId, self.SIP)) for i in range(1, 43):
self.FileType = unspec_type sip_host = sip_host.replace('node%d.intra.dawn.rug.nl' % (i+100,), '10.196.232.%d' % (i+10,))
sip_path = job['SIPLocation'].split(':')[1]
cmd = ['ssh', '-tt', '-n', '-x', '-q', '%s@%s' % (getpass.getuser(), sip_host), 'cat %s' % sip_path]
self.logger.debug("GetSIP for %s with mom2DPId %s - StorageTicket %s - FileName %s - Uri %s - cmd %s" % (self.JobId, self.ArchiveId, self.ticket, self.FileName, self.PrimaryUri, ' ' .join(cmd)))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if p.returncode != 0:
raise PipelineError('GetSIP error getting EoR SIP for %s: %s' % (self.JobId, err), 'GetSip')
self.SIP = out
with open('eor_sip1.xml', 'w') as f:
f.write(self.SIP)
# parse sip xml and add filesize, storageticket and checkums
from xml.dom import minidom
sip_dom = minidom.parseString(self.SIP)
dp_node = sip_dom.getElementsByTagName('dataProduct')[0]
for elem in dp_node.getElementsByTagName('checksum'):
dp_node.removeChild(elem)
for elem in dp_node.getElementsByTagName('size'):
dp_node.removeChild(elem)
for elem in dp_node.getElementsByTagName('storageTicket'):
dp_node.removeChild(elem)
sip_namespace = "http://www.astron.nl/SIP-Lofar"
storageticket_node = sip_dom.createElementNS(sip_namespace, 'storageTicket')
storageticket_node.appendChild(sip_dom.createTextNode(str(self.ticket)))
size_node = sip_dom.createElementNS(sip_namespace, 'size')
size_node.appendChild(sip_dom.createTextNode(str(self.FileSize)))
checksum_md5_algo_node = sip_dom.createElementNS(sip_namespace, 'algorithm')
checksum_md5_algo_node.appendChild(sip_dom.createTextNode('MD5'))
checksum_md5_value_node = sip_dom.createElementNS(sip_namespace, 'value')
checksum_md5_value_node.appendChild(sip_dom.createTextNode(str(self.MD5Checksum)))
checksum_md5_node = sip_dom.createElementNS(sip_namespace, 'checksum')
checksum_md5_node.appendChild(checksum_md5_algo_node)
checksum_md5_node.appendChild(checksum_md5_value_node)
checksum_a32_algo_node = sip_dom.createElementNS(sip_namespace, 'algorithm')
checksum_a32_algo_node.appendChild(sip_dom.createTextNode('Adler32'))
checksum_a32_value_node = sip_dom.createElementNS(sip_namespace, 'value')
checksum_a32_value_node.appendChild(sip_dom.createTextNode(str(self.Adler32Checksum)))
checksum_a32_node = sip_dom.createElementNS(sip_namespace, 'checksum')
checksum_a32_node.appendChild(checksum_a32_algo_node)
checksum_a32_node.appendChild(checksum_a32_value_node)
filesize_node = sip_dom.createElementNS(sip_namespace, 'size')
filesize_node.appendChild(sip_dom.createTextNode(str(self.FileSize)))
dp_node.insertBefore(checksum_a32_node, dp_node.getElementsByTagName('fileName')[0])
dp_node.insertBefore(checksum_md5_node, checksum_a32_node)
dp_node.insertBefore(filesize_node, checksum_md5_node)
dp_node.insertBefore(storageticket_node, filesize_node)
self.SIP = sip_dom.toxml("utf-8")
self.SIP = self.SIP.replace('<stationType>Europe</stationType>','<stationType>International</stationType>')
with open('eor_sip2.xml', 'w') as f:
f.write(self.SIP)
except:
self.logger.exception('Getting SIP from EoR failed')
raise
self.logger.debug('SIP received for %s from EoR with size %d (%s): \n%s' % (self.JobId, len(self.SIP), humanreadablesize(len(self.SIP)), self.SIP))
else: else:
self.SIP = unspecifiedSIP.makeSIP(self.Project, self.ObsId, self.ArchiveId, self.ticket, self.FileName, self.FileSize, self.MD5Checksum, self.Adler32Checksum, self.Type) self.SIP = unspecifiedSIP.makeSIP(self.Project, self.ObsId, self.ArchiveId, self.ticket, self.FileName, self.FileSize, self.MD5Checksum, self.Adler32Checksum, self.Type)
self.FileType = unspec_type self.FileType = unspec_type
...@@ -415,7 +495,7 @@ class IngestPipeline(): ...@@ -415,7 +495,7 @@ class IngestPipeline():
## SecondaryUri handling not implemented ## SecondaryUri handling not implemented
self.logger.debug(cmd) self.logger.debug(cmd)
start = time.time() start = time.time()
p = subprocess.Popen(cmd, stdin=open('/dev/null'), stdout=subprocess.PIPE) p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
log = p.communicate()[0].split('\n') log = p.communicate()[0].split('\n')
self.logger.debug("RollBack for %s took %ds" % (self.JobId, time.time() - start)) self.logger.debug("RollBack for %s took %ds" % (self.JobId, time.time() - start))
self.logger.debug(log) self.logger.debug(log)
...@@ -540,7 +620,6 @@ if __name__ == '__main__': ...@@ -540,7 +620,6 @@ if __name__ == '__main__':
logger.info(str(job)) logger.info(str(job))
import getpass
if getpass.getuser() == 'ingest': if getpass.getuser() == 'ingest':
import ingest_config as config import ingest_config as config
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment