Skip to content
Snippets Groups Projects
Commit 84b08459 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9189: merged patches from ^/branches/LOFAR-Release-2_16 -c 34833

parent 4261cd29
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python #!/usr/bin/env python
import logging, os, time, xmlrpclib, subprocess, random, unspecifiedSIP import logging, os, time, xmlrpclib, subprocess, random, unspecifiedSIP
import socket import socket
import re
from lxml import etree from lxml import etree
from cStringIO import StringIO from cStringIO import StringIO
from job_group import corr_type, bf_type, img_type, unspec_type, pulp_type from job_group import corr_type, bf_type, img_type, unspec_type, pulp_type
...@@ -260,12 +261,13 @@ class IngestPipeline(): ...@@ -260,12 +261,13 @@ class IngestPipeline():
except Exception: except Exception:
pass pass
except ltacp.LtacpException as exp: except Exception as exp:
if '550 File not found' in exp.value: if isinstance(exp, ltacp.LtacpException):
self.logger.error('Destination directory does not exist. Creating %s in LTA for %s' % (self.PrimaryUri, self.JobId)) if '550 File not found' in exp.value:
self.logger.error('Destination directory does not exist. Creating %s in LTA for %s' % (self.PrimaryUri, self.JobId))
if ltacp.create_missing_directories(self.PrimaryUri) == 0: if ltacp.create_missing_directories(self.PrimaryUri) == 0:
self.logger.info('Created path %s in LTA for %s' % (self.PrimaryUri, self.JobId)) self.logger.info('Created path %s in LTA for %s' % (self.PrimaryUri, self.JobId))
raise Exception('New style file transfer failed of %s\n%s' % (self.JobId, str(exp))) raise Exception('New style file transfer failed of %s\n%s' % (self.JobId, str(exp)))
...@@ -455,8 +457,9 @@ class IngestPipeline(): ...@@ -455,8 +457,9 @@ class IngestPipeline():
self.SIP = sip_dom.toxml("utf-8") self.SIP = sip_dom.toxml("utf-8")
self.SIP = self.SIP.replace('<stationType>Europe</stationType>','<stationType>International</stationType>') self.SIP = self.SIP.replace('<stationType>Europe</stationType>','<stationType>International</stationType>')
self.SIP = self.SIP.replace('<source>EOR</source>','<source>EoR</source>') #EoR sips sometime contain EOR instead of EoR as source
#make sure the source in the SIP is the same as the type of the storageticket
self.SIP = re.compile('<source>eor</source>', re.IGNORECASE).sub('<source>%s</source>' % (self.Type,), self.SIP)
except: except:
self.logger.exception('Getting SIP from EoR failed') self.logger.exception('Getting SIP from EoR failed')
raise raise
...@@ -533,7 +536,7 @@ class IngestPipeline(): ...@@ -533,7 +536,7 @@ class IngestPipeline():
break break
retry += 1 retry += 1
if retry < times: if retry < times:
wait_time = random.randint(30, 60) * retry wait_time = random.randint(5, 30) * retry
self.logger.debug('waiting %d seconds before trying %s again' % (wait_time, self.JobId)) self.logger.debug('waiting %d seconds before trying %s again' % (wait_time, self.JobId))
time.sleep(wait_time) time.sleep(wait_time)
if error: if error:
...@@ -631,6 +634,8 @@ if __name__ == '__main__': ...@@ -631,6 +634,8 @@ if __name__ == '__main__':
description='Run the ingestpipeline on a single jobfile, or a directory containing many jobfiles.') description='Run the ingestpipeline on a single jobfile, or a directory containing many jobfiles.')
parser.add_option("-u", "--user", dest="user", type="string", default=getpass.getuser(), help="username for to login on <host>, [default: %default]") parser.add_option("-u", "--user", dest="user", type="string", default=getpass.getuser(), help="username for to login on <host>, [default: %default]")
parser.add_option("-p", "--parallel", dest="maxParallelJobs", type="int", default=4, help="number of parellel pipelines to run when processing a directory of jobfiles [default: %default]") parser.add_option("-p", "--parallel", dest="maxParallelJobs", type="int", default=4, help="number of parellel pipelines to run when processing a directory of jobfiles [default: %default]")
parser.add_option("", "--odd", dest="odd", action="store_true", default=None, help="only process dataproducts with an odd subband number, [default: %default]")
parser.add_option("", "--even", dest="even", action="store_true", default=None, help="only process dataproducts with an even subband number, [default: %default]")
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
if len(args) != 1: if len(args) != 1:
...@@ -655,7 +660,7 @@ if __name__ == '__main__': ...@@ -655,7 +660,7 @@ if __name__ == '__main__':
parser = job_parser.parser(logger) parser = job_parser.parser(logger)
job = parser.parse(path) job = parser.parse(path)
job['filename'] = path job['filename'] = path
logger.info("Parsed jobfile %s: %s" % (path, str(job))) logger.info("Parsed jobfile %s" % (path,))
jobPipeline = IngestPipeline(None, job, config.momClient, config.ltaClient, config.ipaddress, config.ltacpport, config.mailCommand, config.momRetry, config.ltaRetry, config.srmRetry, config.srmInit) jobPipeline = IngestPipeline(None, job, config.momClient, config.ltaClient, config.ipaddress, config.ltacpport, config.mailCommand, config.momRetry, config.ltaRetry, config.srmRetry, config.srmInit)
jobPipeline.run() jobPipeline.run()
exit(0) exit(0)
...@@ -680,7 +685,16 @@ if __name__ == '__main__': ...@@ -680,7 +685,16 @@ if __name__ == '__main__':
parser = job_parser.parser(logger) parser = job_parser.parser(logger)
job = parser.parse(jobfilepath) job = parser.parse(jobfilepath)
job['filename'] = jobfilepath job['filename'] = jobfilepath
logger.info("Parsed jobfile %s: %s" % (jobfilepath, str(job))) logger.info("Parsed jobfile %s" % (jobfilepath,))
if options.odd or options.even:
# parse subband number of dataproduct
# only add this job if the subband number odd/even complies with given option
subband = next(x for x in job['DataProduct'].split('_') if x.startswith('SB'))
subband_nr = int(filter(str.isdigit,subband))
is_even = subband_nr % 2 == 0
if (options.odd and is_even) or (options.even and not is_even):
continue
if 'host' in job: if 'host' in job:
host = job['host'] host = job['host']
...@@ -696,6 +710,9 @@ if __name__ == '__main__': ...@@ -696,6 +710,9 @@ if __name__ == '__main__':
busyHosts = set(runningPipelinesPerHost.keys()) busyHosts = set(runningPipelinesPerHost.keys())
freeHosts = set(hostJobQueues.keys()) - busyHosts freeHosts = set(hostJobQueues.keys()) - busyHosts
# sort free host with longest queues first, so their jobs get started first
freeHosts = sorted(freeHosts, key=lambda h: len(hostJobQueues[h]), reverse=True)
startedNewJobs = False startedNewJobs = False
# start new pipeline for one job per free host # start new pipeline for one job per free host
...@@ -746,7 +763,7 @@ if __name__ == '__main__': ...@@ -746,7 +763,7 @@ if __name__ == '__main__':
if jobPipeline.finishedSuccessfully: if jobPipeline.finishedSuccessfully:
try: try:
if os.path.isdir(path): if os.path.isdir(path):
path, filename = os.split(jobPipeline.job['filename']) path, filename = os.path.split(jobPipeline.job['filename'])
os.rename(jobPipeline.job['filename'], os.path.join(path, 'done', filename)) os.rename(jobPipeline.job['filename'], os.path.join(path, 'done', filename))
except Exception as e: except Exception as e:
logger.error(str(e)) logger.error(str(e))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment