diff --git a/LTA/LTAIngest/ingest_config.py.in b/LTA/LTAIngest/ingest_config.py.in index e39b43b8a6ed4822dda4f8a9e8aad20c0a8b9bef..3ae0d647c5a1fb650772fd39da38bbdb131493b7 100644 --- a/LTA/LTAIngest/ingest_config.py.in +++ b/LTA/LTAIngest/ingest_config.py.in @@ -21,7 +21,7 @@ ltaClient = None host = socket.gethostname() ## Using netifaces module or something similar would be nicer, but it doesn't want to install in a custom dir ## So we use this hack -if 'lexar' in host: +if 'lexar00' in host: host = host + '.offline.lofar' if 'gridftp01.target.rug.nl' in host: host = 'lotar2.staging.lofar' @@ -120,9 +120,11 @@ momURLlogout = 'https://lcs029.control.lofar:8443/useradministration/user/logout momRetry = 3 srmRetry = 2 -if 'lexar' in host: +if 'lexar001' in host or 'lexar002' in host: srmInit = '/globalhome/ingest/service/bin/init.sh' -if 'lotar' in host: +elif 'lexar003' in host or 'lexar004' in host: + srmInit = '/globalhome/ingest/.grid/.ingest_profile' +elif 'lotar' in host: srmInit = '/home/lofarlocal/ltacp/bin/init.sh' try: diff --git a/LTA/LTAIngest/ingestpipeline.py b/LTA/LTAIngest/ingestpipeline.py index 612b6032ce0eb45d296ce3bece331e10bc91af02..f26ba5b52d52a9b1713fdf93583f08cb1945cdbe 100755 --- a/LTA/LTAIngest/ingestpipeline.py +++ b/LTA/LTAIngest/ingestpipeline.py @@ -74,6 +74,11 @@ class IngestPipeline(): self.ObsId = int(job['ObservationId']) self.HostLocation = job['Location'].split(':')[0] self.Location = job['Location'].split(':')[1] + if 'cep4' in self.HostLocation.lower(): + # lexar003/lexar004 have cep4 lustrefs /data dir mounted on /data, + # so we can read data with ltacp client from localhost + self.HostLocation = 'localhost' + pos = self.Location.find(self.DataProduct) if pos > 0: ## trick to support tar files with different names self.LocationDir = self.Location[:pos] @@ -169,19 +174,27 @@ class IngestPipeline(): self.logger.debug('Starting file transfer') hostname = socket.getfqdn() javacmd = "java" - if "lexar" in hostname: + if "lexar001" in hostname or "lexar002" in hostname: javacmd = "/data/java7/jdk1.7.0_55/bin/java" ltacppath = "/globalhome/%s/ltacp" % ("ingesttest" if self.ltacpport == 8801 else "ingest") - if self.PrimaryUri: - cmd = ["ssh", "-T", "ingest@" +self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.PrimaryUri, self.Source)] + if self.HostLocation == 'localhost': + # copy data with ltacp client from HostLocation localhost, to ltacpserver at localhost + # so no need for ssh + hostname = 'localhost' + cmd = ["cd %s && %s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.PrimaryUri, self.Source)] else: - cmd = ["ssh", "-T", "ingest@" + self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s/%s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.tempPrimary, self.FileName, self.Source)] + # copy data with ltacp from a remote host, so use ssh + if self.PrimaryUri: + cmd = ["ssh", "-T", "ingest@" +self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.PrimaryUri, self.Source)] + else: + cmd = ["ssh", "-T", "ingest@" + self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s/%s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.tempPrimary, self.FileName, self.Source)] + ## SecondaryUri handling not implemented self.logger.debug(cmd) start = time.time() - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) logs = p.communicate() elapsed = time.time() - start self.logger.debug("File transfer for %s took %d sec" % (self.JobId, elapsed)) @@ -643,7 +656,7 @@ if __name__ == '__main__': job = parser.parse(path) job['filename'] = path logger.info("Parsed jobfile %s: %s" % (path, str(job))) - jobPipeline = IngestPipeline(None, jobToRun, config.momClient, config.ltaClient, config.ipaddress, config.ltacpport, config.mailCommand, config.momRetry, config.ltaRetry, config.srmRetry, config.srmInit) + jobPipeline = IngestPipeline(None, job, config.momClient, config.ltaClient, config.ipaddress, config.ltacpport, config.mailCommand, config.momRetry, config.ltaRetry, config.srmRetry, config.srmInit) jobPipeline.run() exit(0) diff --git a/LTA/LTAIngest/master.py b/LTA/LTAIngest/master.py index 773d5fe66f2338e9fb83ac2313d321597654d54b..55e2b5c1eed86ded94e95af66029fa93e6157a17 100755 --- a/LTA/LTAIngest/master.py +++ b/LTA/LTAIngest/master.py @@ -174,6 +174,7 @@ class jobHandler(Process): class manager(SyncManager): pass manager.register('number') manager.register('get') + manager.register('slave_names') self.manager = manager(address=(self.masterAddress, self.masterPort), authkey=self.masterAuth) self.manager.connect() nr_of_slaves = int(str(self.manager.number())) @@ -182,7 +183,7 @@ class jobHandler(Process): nr_of_slaves = int(str(self.manager.number())) time.sleep(10) #Let's wait a few seconds for any more slaves. Currently all slaves need to connect in 10 seconds. nr_of_slaves = int(str(self.manager.number())) - self.logger.info('Slaves found: %d' % nr_of_slaves) + self.logger.info('Slaves found: %d %s' % (nr_of_slaves, self.manager.slave_names())) os.system('echo "The LTA Ingest has been restarted."|mailx -s "LTA Ingest restarted" ' + self.mailCommand) ## ======= Main loop ====== @@ -204,7 +205,9 @@ class jobHandler(Process): self.update_job_msg.put((job, job['Status'], JobProducing, None)) job['Status'] = JobProducing self.active[job['ExportID']] = job - self.manager.get(None, job['destination']).put(job) ## sends it to the slave with the shortest queue of the possible destinations + job_slave_hosts = job.get('slave_hosts') + slave_for_job = self.manager.get(job_slave_hosts) + slave_for_job.put(job) ## sends it to the slave with the shortest queue of the possible slave_hosts self.logger.debug("Job's started: %s (%i)" % (job['ExportID'], len(self.active))) first = True except Empty: pass @@ -260,6 +263,12 @@ class queueHandler(Process): if job['Status'] == JobScheduled: self.update_job(job, None, JobScheduled, None) job['destination'] = self.job_groups[job['job_group']].get_destination() + + sourcehost = job['Location'].split(':')[0] + self.logger.info("job: %s sourcehost=%s" % (fileName, sourcehost)) + if 'cep4' in sourcehost.lower(): + job['slave_hosts'] = ['lexar003.offline.lofar', 'lexar004.offline.lofar'] + self.scheduled.put(job) # self.talker.put(job) ## Tell MoM we've done something else: @@ -353,27 +362,30 @@ class ltaMaster(): raise Exception('No MoM to listen to!') def add_slave(self, slave): + self.logger.info('Slave \'%s\' added to master' % (slave, )) self.slaves[slave] = Queue() return self.slaves[slave] def slave_size(self): return len(self.slaves) - ##Gives you the shortest slave queue unless you ask for a specific one. - def get_slave(self, source, destination): - if source: ## this code was developed for use on lse nodes/staging area, not really used. - return self.slaves[source] - else: - result = None - length = sys.maxint - for k in self.slaves.keys(): - if destination in k:# subselection of slaves based on destination, bit of a hack right now: choice between: lexar,lotar - size = self.slaves[k].qsize() - if length > size: - result = self.slaves[k] - length = size - self.logger.debug('found slave %s' % k) - return result + def slave_names(self): + return self.slaves.keys() + + ##Gives you the shortest slave queue out of the requested slave_hosts + def get_slave(self, slave_hosts): + if slave_hosts == None: + slave_hosts = self.slaves.keys() + + result = None + length = sys.maxint + for k in slave_hosts: + size = self.slaves[k].qsize() + if length > size: + result = self.slaves[k] + length = size + self.logger.debug('found slave %s' % k) + return result def remove_slave(self, slave): q = self.slaves.pop(slave, None) @@ -391,6 +403,7 @@ class ltaMaster(): class manager(SyncManager): pass manager.register('add_slave', self.add_slave) manager.register('number', self.slave_size) + manager.register('slave_names', self.slave_names) manager.register('get', self.get_slave) manager.register('remove_slave', self.remove_slave) manager.register('slave_done', self.slave_done) diff --git a/LTA/LTAIngest/slave.py b/LTA/LTAIngest/slave.py index 680a77afadbed5543630f48ee8393bb37ac35534..0b2ae2921a4b117bad0d7ff17958b02e32a489f3 100755 --- a/LTA/LTAIngest/slave.py +++ b/LTA/LTAIngest/slave.py @@ -135,8 +135,10 @@ class executer(Process): if (self.job['Status'] == JobProduced) or (self.job['Status'] == JobError): self.talker.put(self.job) self.manager.slave_done(self.job, self.result, pipeline.FileType) - with self.jobs.get_lock(): - self.jobs.value -= 1 + #python 'with' does not work for some reason, so just use acquire/release + self.jobs.get_lock().acquire() + self.jobs.value -= 1 + self.jobs.get_lock().release() self.logger.debug("Slave Pipeline executer finished for %s in %d sec" % (self.job['ExportID'], time.time() - start)) ## ---------------- LTA Slave -------------------------------------------- @@ -198,8 +200,10 @@ class ltaSlave(): except QueueEmpty: job = None if job: - with self.jobs.get_lock(): - self.jobs.value += 1 + #python 'with' does not work for some reason, so just use acquire/release + self.jobs.get_lock().acquire() + self.jobs.value += 1 + self.jobs.get_lock().release() runner = executer(self.logger, self.logdir, job, talker, self.jobs, self.momClient, self.ltaClient, self.host, self.ltacpport, self.mailSlCommand, self.manager, self.pipelineRetry, self.momRetry, self.ltaRetry, self.srmRetry, self.srmInit) runner.start() else: