Skip to content
Snippets Groups Projects
Commit 298a0033 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9192 and #9189 and #8392: merged ingest-cep4 branch via trunk into...

Task #9192 and #9189 and #8392: merged ingest-cep4 branch via trunk into release 2_16 branch for RA/CEP4 rollout for ScipSup commissioning
parents cce7fd2e b72f5834
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,7 @@ ltaClient = None ...@@ -21,7 +21,7 @@ ltaClient = None
host = socket.gethostname() host = socket.gethostname()
## Using netifaces module or something similar would be nicer, but it doesn't want to install in a custom dir ## Using netifaces module or something similar would be nicer, but it doesn't want to install in a custom dir
## So we use this hack ## So we use this hack
if 'lexar' in host: if 'lexar00' in host:
host = host + '.offline.lofar' host = host + '.offline.lofar'
if 'gridftp01.target.rug.nl' in host: if 'gridftp01.target.rug.nl' in host:
host = 'lotar2.staging.lofar' host = 'lotar2.staging.lofar'
...@@ -120,9 +120,11 @@ momURLlogout = 'https://lcs029.control.lofar:8443/useradministration/user/logout ...@@ -120,9 +120,11 @@ momURLlogout = 'https://lcs029.control.lofar:8443/useradministration/user/logout
momRetry = 3 momRetry = 3
srmRetry = 2 srmRetry = 2
if 'lexar' in host: if 'lexar001' in host or 'lexar002' in host:
srmInit = '/globalhome/ingest/service/bin/init.sh' srmInit = '/globalhome/ingest/service/bin/init.sh'
if 'lotar' in host: elif 'lexar003' in host or 'lexar004' in host:
srmInit = '/globalhome/ingest/.grid/.ingest_profile'
elif 'lotar' in host:
srmInit = '/home/lofarlocal/ltacp/bin/init.sh' srmInit = '/home/lofarlocal/ltacp/bin/init.sh'
try: try:
......
...@@ -74,6 +74,11 @@ class IngestPipeline(): ...@@ -74,6 +74,11 @@ class IngestPipeline():
self.ObsId = int(job['ObservationId']) self.ObsId = int(job['ObservationId'])
self.HostLocation = job['Location'].split(':')[0] self.HostLocation = job['Location'].split(':')[0]
self.Location = job['Location'].split(':')[1] self.Location = job['Location'].split(':')[1]
if 'cep4' in self.HostLocation.lower():
# lexar003/lexar004 have cep4 lustrefs /data dir mounted on /data,
# so we can read data with ltacp client from localhost
self.HostLocation = 'localhost'
pos = self.Location.find(self.DataProduct) pos = self.Location.find(self.DataProduct)
if pos > 0: ## trick to support tar files with different names if pos > 0: ## trick to support tar files with different names
self.LocationDir = self.Location[:pos] self.LocationDir = self.Location[:pos]
...@@ -169,19 +174,27 @@ class IngestPipeline(): ...@@ -169,19 +174,27 @@ class IngestPipeline():
self.logger.debug('Starting file transfer') self.logger.debug('Starting file transfer')
hostname = socket.getfqdn() hostname = socket.getfqdn()
javacmd = "java" javacmd = "java"
if "lexar" in hostname: if "lexar001" in hostname or "lexar002" in hostname:
javacmd = "/data/java7/jdk1.7.0_55/bin/java" javacmd = "/data/java7/jdk1.7.0_55/bin/java"
ltacppath = "/globalhome/%s/ltacp" % ("ingesttest" if self.ltacpport == 8801 else "ingest") ltacppath = "/globalhome/%s/ltacp" % ("ingesttest" if self.ltacpport == 8801 else "ingest")
if self.HostLocation == 'localhost':
# copy data with ltacp client from HostLocation localhost, to ltacpserver at localhost
# so no need for ssh
hostname = 'localhost'
cmd = ["cd %s && %s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.PrimaryUri, self.Source)]
else:
# copy data with ltacp from a remote host, so use ssh
if self.PrimaryUri: if self.PrimaryUri:
cmd = ["ssh", "-T", "ingest@" +self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.PrimaryUri, self.Source)] cmd = ["ssh", "-T", "ingest@" +self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.PrimaryUri, self.Source)]
else: else:
cmd = ["ssh", "-T", "ingest@" + self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s/%s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.tempPrimary, self.FileName, self.Source)] cmd = ["ssh", "-T", "ingest@" + self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s/%s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.tempPrimary, self.FileName, self.Source)]
## SecondaryUri handling not implemented ## SecondaryUri handling not implemented
self.logger.debug(cmd) self.logger.debug(cmd)
start = time.time() start = time.time()
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
logs = p.communicate() logs = p.communicate()
elapsed = time.time() - start elapsed = time.time() - start
self.logger.debug("File transfer for %s took %d sec" % (self.JobId, elapsed)) self.logger.debug("File transfer for %s took %d sec" % (self.JobId, elapsed))
...@@ -643,7 +656,7 @@ if __name__ == '__main__': ...@@ -643,7 +656,7 @@ if __name__ == '__main__':
job = parser.parse(path) job = parser.parse(path)
job['filename'] = path job['filename'] = path
logger.info("Parsed jobfile %s: %s" % (path, str(job))) logger.info("Parsed jobfile %s: %s" % (path, str(job)))
jobPipeline = IngestPipeline(None, jobToRun, config.momClient, config.ltaClient, config.ipaddress, config.ltacpport, config.mailCommand, config.momRetry, config.ltaRetry, config.srmRetry, config.srmInit) jobPipeline = IngestPipeline(None, job, config.momClient, config.ltaClient, config.ipaddress, config.ltacpport, config.mailCommand, config.momRetry, config.ltaRetry, config.srmRetry, config.srmInit)
jobPipeline.run() jobPipeline.run()
exit(0) exit(0)
......
...@@ -174,6 +174,7 @@ class jobHandler(Process): ...@@ -174,6 +174,7 @@ class jobHandler(Process):
class manager(SyncManager): pass class manager(SyncManager): pass
manager.register('number') manager.register('number')
manager.register('get') manager.register('get')
manager.register('slave_names')
self.manager = manager(address=(self.masterAddress, self.masterPort), authkey=self.masterAuth) self.manager = manager(address=(self.masterAddress, self.masterPort), authkey=self.masterAuth)
self.manager.connect() self.manager.connect()
nr_of_slaves = int(str(self.manager.number())) nr_of_slaves = int(str(self.manager.number()))
...@@ -182,7 +183,7 @@ class jobHandler(Process): ...@@ -182,7 +183,7 @@ class jobHandler(Process):
nr_of_slaves = int(str(self.manager.number())) nr_of_slaves = int(str(self.manager.number()))
time.sleep(10) #Let's wait a few seconds for any more slaves. Currently all slaves need to connect in 10 seconds. time.sleep(10) #Let's wait a few seconds for any more slaves. Currently all slaves need to connect in 10 seconds.
nr_of_slaves = int(str(self.manager.number())) nr_of_slaves = int(str(self.manager.number()))
self.logger.info('Slaves found: %d' % nr_of_slaves) self.logger.info('Slaves found: %d %s' % (nr_of_slaves, self.manager.slave_names()))
os.system('echo "The LTA Ingest has been restarted."|mailx -s "LTA Ingest restarted" ' + self.mailCommand) os.system('echo "The LTA Ingest has been restarted."|mailx -s "LTA Ingest restarted" ' + self.mailCommand)
## ======= Main loop ====== ## ======= Main loop ======
...@@ -204,7 +205,9 @@ class jobHandler(Process): ...@@ -204,7 +205,9 @@ class jobHandler(Process):
self.update_job_msg.put((job, job['Status'], JobProducing, None)) self.update_job_msg.put((job, job['Status'], JobProducing, None))
job['Status'] = JobProducing job['Status'] = JobProducing
self.active[job['ExportID']] = job self.active[job['ExportID']] = job
self.manager.get(None, job['destination']).put(job) ## sends it to the slave with the shortest queue of the possible destinations job_slave_hosts = job.get('slave_hosts')
slave_for_job = self.manager.get(job_slave_hosts)
slave_for_job.put(job) ## sends it to the slave with the shortest queue of the possible slave_hosts
self.logger.debug("Job's started: %s (%i)" % (job['ExportID'], len(self.active))) self.logger.debug("Job's started: %s (%i)" % (job['ExportID'], len(self.active)))
first = True first = True
except Empty: pass except Empty: pass
...@@ -260,6 +263,12 @@ class queueHandler(Process): ...@@ -260,6 +263,12 @@ class queueHandler(Process):
if job['Status'] == JobScheduled: if job['Status'] == JobScheduled:
self.update_job(job, None, JobScheduled, None) self.update_job(job, None, JobScheduled, None)
job['destination'] = self.job_groups[job['job_group']].get_destination() job['destination'] = self.job_groups[job['job_group']].get_destination()
sourcehost = job['Location'].split(':')[0]
self.logger.info("job: %s sourcehost=%s" % (fileName, sourcehost))
if 'cep4' in sourcehost.lower():
job['slave_hosts'] = ['lexar003.offline.lofar', 'lexar004.offline.lofar']
self.scheduled.put(job) self.scheduled.put(job)
# self.talker.put(job) ## Tell MoM we've done something # self.talker.put(job) ## Tell MoM we've done something
else: else:
...@@ -353,21 +362,24 @@ class ltaMaster(): ...@@ -353,21 +362,24 @@ class ltaMaster():
raise Exception('No MoM to listen to!') raise Exception('No MoM to listen to!')
def add_slave(self, slave): def add_slave(self, slave):
self.logger.info('Slave \'%s\' added to master' % (slave, ))
self.slaves[slave] = Queue() self.slaves[slave] = Queue()
return self.slaves[slave] return self.slaves[slave]
def slave_size(self): def slave_size(self):
return len(self.slaves) return len(self.slaves)
##Gives you the shortest slave queue unless you ask for a specific one. def slave_names(self):
def get_slave(self, source, destination): return self.slaves.keys()
if source: ## this code was developed for use on lse nodes/staging area, not really used.
return self.slaves[source] ##Gives you the shortest slave queue out of the requested slave_hosts
else: def get_slave(self, slave_hosts):
if slave_hosts == None:
slave_hosts = self.slaves.keys()
result = None result = None
length = sys.maxint length = sys.maxint
for k in self.slaves.keys(): for k in slave_hosts:
if destination in k:# subselection of slaves based on destination, bit of a hack right now: choice between: lexar,lotar
size = self.slaves[k].qsize() size = self.slaves[k].qsize()
if length > size: if length > size:
result = self.slaves[k] result = self.slaves[k]
...@@ -391,6 +403,7 @@ class ltaMaster(): ...@@ -391,6 +403,7 @@ class ltaMaster():
class manager(SyncManager): pass class manager(SyncManager): pass
manager.register('add_slave', self.add_slave) manager.register('add_slave', self.add_slave)
manager.register('number', self.slave_size) manager.register('number', self.slave_size)
manager.register('slave_names', self.slave_names)
manager.register('get', self.get_slave) manager.register('get', self.get_slave)
manager.register('remove_slave', self.remove_slave) manager.register('remove_slave', self.remove_slave)
manager.register('slave_done', self.slave_done) manager.register('slave_done', self.slave_done)
......
...@@ -135,8 +135,10 @@ class executer(Process): ...@@ -135,8 +135,10 @@ class executer(Process):
if (self.job['Status'] == JobProduced) or (self.job['Status'] == JobError): if (self.job['Status'] == JobProduced) or (self.job['Status'] == JobError):
self.talker.put(self.job) self.talker.put(self.job)
self.manager.slave_done(self.job, self.result, pipeline.FileType) self.manager.slave_done(self.job, self.result, pipeline.FileType)
with self.jobs.get_lock(): #python 'with' does not work for some reason, so just use acquire/release
self.jobs.get_lock().acquire()
self.jobs.value -= 1 self.jobs.value -= 1
self.jobs.get_lock().release()
self.logger.debug("Slave Pipeline executer finished for %s in %d sec" % (self.job['ExportID'], time.time() - start)) self.logger.debug("Slave Pipeline executer finished for %s in %d sec" % (self.job['ExportID'], time.time() - start))
## ---------------- LTA Slave -------------------------------------------- ## ---------------- LTA Slave --------------------------------------------
...@@ -198,8 +200,10 @@ class ltaSlave(): ...@@ -198,8 +200,10 @@ class ltaSlave():
except QueueEmpty: except QueueEmpty:
job = None job = None
if job: if job:
with self.jobs.get_lock(): #python 'with' does not work for some reason, so just use acquire/release
self.jobs.get_lock().acquire()
self.jobs.value += 1 self.jobs.value += 1
self.jobs.get_lock().release()
runner = executer(self.logger, self.logdir, job, talker, self.jobs, self.momClient, self.ltaClient, self.host, self.ltacpport, self.mailSlCommand, self.manager, self.pipelineRetry, self.momRetry, self.ltaRetry, self.srmRetry, self.srmInit) runner = executer(self.logger, self.logdir, job, talker, self.jobs, self.momClient, self.ltaClient, self.host, self.ltacpport, self.mailSlCommand, self.manager, self.pipelineRetry, self.momRetry, self.ltaRetry, self.srmRetry, self.srmInit)
runner.start() runner.start()
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment