diff --git a/LTA/LTAIngest/ingestpipeline.py b/LTA/LTAIngest/ingestpipeline.py index dda52f54430327f6b730ac59e05f897ec9616ea6..16e1da5471e7fafa788fda14bfe8c7cd211c0fe8 100755 --- a/LTA/LTAIngest/ingestpipeline.py +++ b/LTA/LTAIngest/ingestpipeline.py @@ -5,19 +5,20 @@ from lxml import etree from cStringIO import StringIO from job_group import corr_type, bf_type, img_type, unspec_type, pulp_type import ltacp - -# TODO: reuse method from LCS.PyCommon.utils -def humanreadablesize(num, suffix='B', base=1000): - """ converts the given size (number) to a human readable string in powers of 'base'""" - try: - for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: - if abs(num) < float(base): - return "%3.1f%s%s" % (num, unit, suffix) - num /= float(base) - return "%.2f%s%s" % (num, 'Y', suffix) - except TypeError: - return str(num) - +import getpass + +def humanreadablesize(num, suffix='B'): + """ converts the given size (number) to a human readable string in powers of 1024 + """ + try: + for unit in ['','K','M','G','T','P','E','Z']: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, 'Y', suffix) + except TypeError: + return str(num) + IngestStarted = 10 ## 20 not used IngestSIPComplete = 30 @@ -178,7 +179,7 @@ class IngestPipeline(): ## SecondaryUri handling not implemented self.logger.debug(cmd) start = time.time() - p = subprocess.Popen(cmd, stdin=open('/dev/null'), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logs = p.communicate() elapsed = time.time() - start self.logger.debug("File transfer for %s took %d sec" % (self.JobId, elapsed)) @@ -214,7 +215,17 @@ class IngestPipeline(): try: start = time.time() - cp = ltacp.LtaCp(self.HostLocation, + host = self.HostLocation + + # eor dawn nodes are not known in dns + # convert to ip address + for i in range(1, 33): + host = host.replace('node%d.intra.dawn.rug.nl' % (i+100,), '10.196.232.%d' % (i+10,)) + + self.logger.info(os.path.join(self.LocationDir, self.Source)) + self.logger.info(self.PrimaryUri) + + cp = ltacp.LtaCp(host, os.path.join(self.LocationDir, self.Source), self.PrimaryUri) @@ -260,6 +271,7 @@ class IngestPipeline(): uris = '' try: start = time.time() + self.logger.debug("SendChecksums for %s: project=%s ticket=%s size=%s md5=%s a32=%s uris=%s" % (self.JobId, self.Project, self.ticket, self.FileSize,self.MD5Checksum,self.Adler32Checksum, uris)) result = self.ltaClient.SendChecksums(self.Project, self.ticket, self.FileSize, {'MD5':self.MD5Checksum,'Adler32':self.Adler32Checksum}, uris) self.logger.debug("SendChecksums for %s took %ds" % (self.JobId, time.time() - start)) except xmlrpclib.Fault as err: @@ -366,10 +378,78 @@ class IngestPipeline(): raise self.logger.debug('SIP received for %s from MoM with size %d (%s): %s' % (self.JobId, len(self.SIP), humanreadablesize(len(self.SIP)), self.SIP[0:256])) elif self.Type.lower() == "eor": - # TODO: fetch SIP from EoR, fill in checksums - self.SIP = unspecifiedSIP.makeSIP(self.Project, self.ObsId, self.ArchiveId, self.ticket, self.FileName, self.FileSize, self.MD5Checksum, self.Adler32Checksum, self.Type) - self.logger.warning('Generated unspecified SIP for %s: %s' % (self.JobId, self.SIP)) - self.FileType = unspec_type + try: + sip_host = job['SIPLocation'].split(':')[0] + for i in range(1, 43): + sip_host = sip_host.replace('node%d.intra.dawn.rug.nl' % (i+100,), '10.196.232.%d' % (i+10,)) + sip_path = job['SIPLocation'].split(':')[1] + cmd = ['ssh', '-tt', '-n', '-x', '-q', '%s@%s' % (getpass.getuser(), sip_host), 'cat %s' % sip_path] + self.logger.debug("GetSIP for %s with mom2DPId %s - StorageTicket %s - FileName %s - Uri %s - cmd %s" % (self.JobId, self.ArchiveId, self.ticket, self.FileName, self.PrimaryUri, ' ' .join(cmd))) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = p.communicate() + if p.returncode != 0: + raise PipelineError('GetSIP error getting EoR SIP for %s: %s' % (self.JobId, err), 'GetSip') + + self.SIP = out + + with open('eor_sip1.xml', 'w') as f: + f.write(self.SIP) + + # parse sip xml and add filesize, storageticket and checkums + from xml.dom import minidom + sip_dom = minidom.parseString(self.SIP) + dp_node = sip_dom.getElementsByTagName('dataProduct')[0] + + for elem in dp_node.getElementsByTagName('checksum'): + dp_node.removeChild(elem) + + for elem in dp_node.getElementsByTagName('size'): + dp_node.removeChild(elem) + + for elem in dp_node.getElementsByTagName('storageTicket'): + dp_node.removeChild(elem) + + sip_namespace = "http://www.astron.nl/SIP-Lofar" + storageticket_node = sip_dom.createElementNS(sip_namespace, 'storageTicket') + storageticket_node.appendChild(sip_dom.createTextNode(str(self.ticket))) + + size_node = sip_dom.createElementNS(sip_namespace, 'size') + size_node.appendChild(sip_dom.createTextNode(str(self.FileSize))) + + checksum_md5_algo_node = sip_dom.createElementNS(sip_namespace, 'algorithm') + checksum_md5_algo_node.appendChild(sip_dom.createTextNode('MD5')) + checksum_md5_value_node = sip_dom.createElementNS(sip_namespace, 'value') + checksum_md5_value_node.appendChild(sip_dom.createTextNode(str(self.MD5Checksum))) + checksum_md5_node = sip_dom.createElementNS(sip_namespace, 'checksum') + checksum_md5_node.appendChild(checksum_md5_algo_node) + checksum_md5_node.appendChild(checksum_md5_value_node) + + checksum_a32_algo_node = sip_dom.createElementNS(sip_namespace, 'algorithm') + checksum_a32_algo_node.appendChild(sip_dom.createTextNode('Adler32')) + checksum_a32_value_node = sip_dom.createElementNS(sip_namespace, 'value') + checksum_a32_value_node.appendChild(sip_dom.createTextNode(str(self.Adler32Checksum))) + checksum_a32_node = sip_dom.createElementNS(sip_namespace, 'checksum') + checksum_a32_node.appendChild(checksum_a32_algo_node) + checksum_a32_node.appendChild(checksum_a32_value_node) + + filesize_node = sip_dom.createElementNS(sip_namespace, 'size') + filesize_node.appendChild(sip_dom.createTextNode(str(self.FileSize))) + + dp_node.insertBefore(checksum_a32_node, dp_node.getElementsByTagName('fileName')[0]) + dp_node.insertBefore(checksum_md5_node, checksum_a32_node) + dp_node.insertBefore(filesize_node, checksum_md5_node) + dp_node.insertBefore(storageticket_node, filesize_node) + + self.SIP = sip_dom.toxml("utf-8") + self.SIP = self.SIP.replace('<stationType>Europe</stationType>','<stationType>International</stationType>') + + with open('eor_sip2.xml', 'w') as f: + f.write(self.SIP) + + except: + self.logger.exception('Getting SIP from EoR failed') + raise + self.logger.debug('SIP received for %s from EoR with size %d (%s): \n%s' % (self.JobId, len(self.SIP), humanreadablesize(len(self.SIP)), self.SIP)) else: self.SIP = unspecifiedSIP.makeSIP(self.Project, self.ObsId, self.ArchiveId, self.ticket, self.FileName, self.FileSize, self.MD5Checksum, self.Adler32Checksum, self.Type) self.FileType = unspec_type @@ -415,7 +495,7 @@ class IngestPipeline(): ## SecondaryUri handling not implemented self.logger.debug(cmd) start = time.time() - p = subprocess.Popen(cmd, stdin=open('/dev/null'), stdout=subprocess.PIPE) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE) log = p.communicate()[0].split('\n') self.logger.debug("RollBack for %s took %ds" % (self.JobId, time.time() - start)) self.logger.debug(log) @@ -540,7 +620,6 @@ if __name__ == '__main__': logger.info(str(job)) - import getpass if getpass.getuser() == 'ingest': import ingest_config as config else: