diff --git a/LTA/LTAIngest/ingestpipeline.py b/LTA/LTAIngest/ingestpipeline.py index f26ba5b52d52a9b1713fdf93583f08cb1945cdbe..df8ea0b528e5237c17c1bf5613e88f67d46b02ed 100755 --- a/LTA/LTAIngest/ingestpipeline.py +++ b/LTA/LTAIngest/ingestpipeline.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import logging, os, time, xmlrpclib, subprocess, random, unspecifiedSIP import socket +import re from lxml import etree from cStringIO import StringIO from job_group import corr_type, bf_type, img_type, unspec_type, pulp_type @@ -260,12 +261,13 @@ class IngestPipeline(): except Exception: pass - except ltacp.LtacpException as exp: - if '550 File not found' in exp.value: - self.logger.error('Destination directory does not exist. Creating %s in LTA for %s' % (self.PrimaryUri, self.JobId)) + except Exception as exp: + if isinstance(exp, ltacp.LtacpException): + if '550 File not found' in exp.value: + self.logger.error('Destination directory does not exist. Creating %s in LTA for %s' % (self.PrimaryUri, self.JobId)) - if ltacp.create_missing_directories(self.PrimaryUri) == 0: - self.logger.info('Created path %s in LTA for %s' % (self.PrimaryUri, self.JobId)) + if ltacp.create_missing_directories(self.PrimaryUri) == 0: + self.logger.info('Created path %s in LTA for %s' % (self.PrimaryUri, self.JobId)) raise Exception('New style file transfer failed of %s\n%s' % (self.JobId, str(exp))) @@ -455,8 +457,9 @@ class IngestPipeline(): self.SIP = sip_dom.toxml("utf-8") self.SIP = self.SIP.replace('<stationType>Europe</stationType>','<stationType>International</stationType>') - self.SIP = self.SIP.replace('<source>EOR</source>','<source>EoR</source>') #EoR sips sometime contain EOR instead of EoR as source + #make sure the source in the SIP is the same as the type of the storageticket + self.SIP = re.compile('<source>eor</source>', re.IGNORECASE).sub('<source>%s</source>' % (self.Type,), self.SIP) except: self.logger.exception('Getting SIP from EoR failed') raise @@ -533,7 +536,7 @@ class IngestPipeline(): break retry += 1 if retry < times: - wait_time = random.randint(30, 60) * retry + wait_time = random.randint(5, 30) * retry self.logger.debug('waiting %d seconds before trying %s again' % (wait_time, self.JobId)) time.sleep(wait_time) if error: @@ -631,6 +634,8 @@ if __name__ == '__main__': description='Run the ingestpipeline on a single jobfile, or a directory containing many jobfiles.') parser.add_option("-u", "--user", dest="user", type="string", default=getpass.getuser(), help="username for to login on <host>, [default: %default]") parser.add_option("-p", "--parallel", dest="maxParallelJobs", type="int", default=4, help="number of parellel pipelines to run when processing a directory of jobfiles [default: %default]") + parser.add_option("", "--odd", dest="odd", action="store_true", default=None, help="only process dataproducts with an odd subband number, [default: %default]") + parser.add_option("", "--even", dest="even", action="store_true", default=None, help="only process dataproducts with an even subband number, [default: %default]") (options, args) = parser.parse_args() if len(args) != 1: @@ -655,7 +660,7 @@ if __name__ == '__main__': parser = job_parser.parser(logger) job = parser.parse(path) job['filename'] = path - logger.info("Parsed jobfile %s: %s" % (path, str(job))) + logger.info("Parsed jobfile %s" % (path,)) jobPipeline = IngestPipeline(None, job, config.momClient, config.ltaClient, config.ipaddress, config.ltacpport, config.mailCommand, config.momRetry, config.ltaRetry, config.srmRetry, config.srmInit) jobPipeline.run() exit(0) @@ -680,7 +685,16 @@ if __name__ == '__main__': parser = job_parser.parser(logger) job = parser.parse(jobfilepath) job['filename'] = jobfilepath - logger.info("Parsed jobfile %s: %s" % (jobfilepath, str(job))) + logger.info("Parsed jobfile %s" % (jobfilepath,)) + + if options.odd or options.even: + # parse subband number of dataproduct + # only add this job if the subband number odd/even complies with given option + subband = next(x for x in job['DataProduct'].split('_') if x.startswith('SB')) + subband_nr = int(filter(str.isdigit,subband)) + is_even = subband_nr % 2 == 0 + if (options.odd and is_even) or (options.even and not is_even): + continue if 'host' in job: host = job['host'] @@ -696,6 +710,9 @@ if __name__ == '__main__': busyHosts = set(runningPipelinesPerHost.keys()) freeHosts = set(hostJobQueues.keys()) - busyHosts + # sort free host with longest queues first, so their jobs get started first + freeHosts = sorted(freeHosts, key=lambda h: len(hostJobQueues[h]), reverse=True) + startedNewJobs = False # start new pipeline for one job per free host @@ -746,7 +763,7 @@ if __name__ == '__main__': if jobPipeline.finishedSuccessfully: try: if os.path.isdir(path): - path, filename = os.split(jobPipeline.job['filename']) + path, filename = os.path.split(jobPipeline.job['filename']) os.rename(jobPipeline.job['filename'], os.path.join(path, 'done', filename)) except Exception as e: logger.error(str(e))