diff --git a/LTA/LTAIngest/ingestpipeline.py b/LTA/LTAIngest/ingestpipeline.py index 7033a95cefb0c348e3cbb9e5743f4f0bcc71c6c2..8a073bbb539d35f16ace9ac59e631ac16ddd230b 100755 --- a/LTA/LTAIngest/ingestpipeline.py +++ b/LTA/LTAIngest/ingestpipeline.py @@ -19,7 +19,7 @@ def humanreadablesize(num, suffix='B'): return "%.1f%s%s" % (num, 'Y', suffix) except TypeError: return str(num) - + IngestStarted = 10 ## 20 not used IngestSIPComplete = 30 @@ -183,11 +183,17 @@ class IngestPipeline(): if self.HostLocation == 'localhost': # copy data with ltacp client from HostLocation localhost, to ltacpserver at localhost # so no need for ssh + use_shell = ("lexar003" in hostname or "lexar004" in hostname) hostname = 'localhost' cmd = ["cd %s && %s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.PrimaryUri, self.Source)] else: - if self.HostLocation.startswith('locus') and not self.HostLocation.endswith('offline.lofar'): - self.HostLocation += '.offline.lofar' + if "lexar003" in hostname or "lexar004" in hostname: + #lexar003/004 are not aware of cep2, and cep2 is not aware of lexar003/004, + #so make sure we can handle the proper hostnames and ips + use_shell=True + if self.HostLocation.startswith('locus') and not self.HostLocation.endswith('cep2.lofar'): + hostname = '10.178.1.3' if "lexar003" in hostname else '10.178.1.4' + self.HostLocation += '.cep2.lofar' # copy data with ltacp from a remote host, so use ssh if self.PrimaryUri: @@ -196,9 +202,9 @@ class IngestPipeline(): cmd = ["ssh", "-T", "ingest@" + self.HostLocation, "cd %s;%s -Xmx256m -cp %s/qpid-properties/lexar001.offline.lofar:%s/ltacp.jar nl.astron.ltacp.client.LtaCp %s %s %s/%s %s" % (self.LocationDir, javacmd, ltacppath, ltacppath, hostname, self.ltacpport, self.tempPrimary, self.FileName, self.Source)] ## SecondaryUri handling not implemented - self.logger.debug(cmd) + self.logger.debug(' '.join(cmd)) start = time.time() - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=use_shell) logs = p.communicate() elapsed = time.time() - start self.logger.debug("File transfer for %s took %d sec" % (self.JobId, elapsed)) @@ -381,7 +387,7 @@ class IngestPipeline(): if storageTickets[0].text != str(self.ticket): self.logger.error("CheckSIPContent for %s storageTicket %s does not match expected %s" % (self.JobId, storageTickets[0].text, self.ticket)) return False - + self.logger.debug("CheckSIPContent OK for %s" % (self.JobId,)) return True except Exception as e: @@ -536,7 +542,7 @@ class IngestPipeline(): else: self.logger.debug(errortext + ' ran without a problem on %s' % self.JobId) error = '' - break + break retry += 1 if retry < times: wait_time = random.randint(5, 30) * retry @@ -581,7 +587,7 @@ class IngestPipeline(): try: if self.ticket: self.RetryRun(self.SendStatus, self.ltaRetry, 'Setting LTA status', IngestFailed) - except Exception as e: + except Exception as e: os.system('echo "Received unknown exception in SendStatus for %s to %s while handling another error:\n%s\n\nCheck LTA catalog and SRM!\n%s"|mailx -s "Warning: LTA catalog status update failed" ' % (self.JobId, IngestFailed, self._hidePassword(str(e)), self.PrimaryUri) + self.mailCommand) self.logger.error('Sent Mail: LTA catalog status update failed to ' + self.mailCommand) self.logger.exception('SendStatus IngestFailed failed') @@ -611,7 +617,7 @@ class IngestPipeline(): if self.ticket: self.RetryRun(self.SendStatus, self.ltaRetry, 'Setting LTA status', IngestFailed) raise - + def _hidePassword(self, message): ''' helper function which hides the password in the ltaClient url in the message ''' @@ -621,7 +627,7 @@ class IngestPipeline(): return message.replace(':'+password, ':HIDDENPASSWORD') except Exception as e: return message - + #----------------------------------------------------------------- selfstarter - if __name__ == '__main__':