Skip to content
Snippets Groups Projects
ingestpipeline.py 29.1 KiB
Newer Older
import logging
import os
import time
import subprocess
import random
import socket
import re
import getpass
from prometheus_client import Gauge, Counter, Histogram, INF
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.server.sip import validateSIPAgainstSchema, addIngestInfoToSIP
from lofar.lta.ingest.server.ltacp import *
from lofar.lta.ingest.server.unspecifiedSIP import makeSIP
from lofar.lta.ingest.server.ltaclient import *
from lofar.lta.ingest.client.rpc import IngestTMSSRPC
from lofar.common.util import humanreadablesize
from lofar.common import isProductionEnvironment
from lofar.common.metrics import metric_track_duration
from lofar.common.subprocess_utils import communicate_returning_strings
from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.server.config import GLOBUS_TIMEOUT
from lofar.common.dbcredentials import DBCredentials
Jorrit Schaap's avatar
Jorrit Schaap committed
logger = logging.getLogger(__name__)
#---------------------- Prometheus Metrics ----------------------------------------

metric_nr_transfers_in_progress = Gauge("ingest_transfers_in_progress", "Count how many transfers are currently in progress", labelnames=["site"])
metric_transfer_durations = Histogram("ingest_transfer_durations", "How long transfers are taking", labelnames=["site"],
    buckets=(1.0, 10.0, 60.0, 5 * 60.0, 10 * 60.0, 30 * 60.0, 3600.0, 2 * 3600.0, 4 * 3600.0, 8 * 3600.0, 12 * 3600.0, 24 * 3600.0, INF))
metric_nr_transfer_exceptions = Counter("ingest_transfer_exceptions", "Number of exceptions raised during transfer", labelnames=["site", "reason"])
metric_nr_bytes_transferred = Counter("ingest_bytes_ingested", "Number of payload bytes ingested into the LTA (sum of ingested file sizes)", labelnames=["site"])

#---------------------- Custom Exception ----------------------------------------

PipelineJobFailedError      = 1
PipelineNoSourceError       = 2
PipelineAlreadyInLTAError   = 3
PipelineNoProjectInLTAError = 4

class PipelineError(Exception):
    def __init__(self, message, type = PipelineJobFailedError):
        Exception.__init__(self, message)
        self.type         = type

#---------------------- IngestPipeline ------------------------------------------

class IngestPipeline():
    STATUS_INITIALIZING = 1
    STATUS_TRANSFERRING = 2
    STATUS_FINALIZING   = 3
    STATUS_FINISHED     = 4

                 exchange=DEFAULT_BUSNAME,
                 broker=DEFAULT_BROKER,
                 user=getpass.getuser(),
                 globus_timeout=GLOBUS_TIMEOUT,
                 minimal_SIP=False):
        self.status              = IngestPipeline.STATUS_INITIALIZING

        self.hostname            = socket.gethostname()
        self.job                 = job
        self.ltaClient           = ltaClient
        self.user                = user

        if not self.user:
            self.user=getpass.getuser()

        self.globus_timeout      = globus_timeout

        self.event_bus           = ToBus(exchange, broker=broker, connection_log_level=logging.DEBUG)
        self.ingest_tmss_rpc     = IngestTMSSRPC.create(exchange, broker, timeout=300)
        self.Project             = job['Project']
        self.DataProduct         = job['DataProduct']
        self.FileName            = job['FileName']
        self.JobId               = job['JobId']
        self.ArchiveId           = int(job['ArchiveId'])
        self.ObsId               = int(job['ObservationId'])
        self.ExportID            = job['ExportID']
        self.Type                = job["Type"]
        self.HostLocation        = job['Location'].partition(':')[0]
        self.Location            = job['Location'].partition(':')[2]
        self.FileSize            = '0'
        self.MD5Checksum         = ''
        self.Adler32Checksum     = ''
        self.ChecksumResult      = False
        self.SIP                 = ''
        self.PrimaryUri          = ''
        self.SecondaryUri        = ''
        self.lta_site            = ''

    @metric_track_duration(prefix="ingest_")
    def GetStorageTicket(self):
        do_check_already_in_lta=isProductionEnvironment()
        result = self.ltaClient.GetStorageTicket(self.Project, self.FileName, self.FileSize, self.ArchiveId, self.JobId, self.ObsId, do_check_already_in_lta, self.Type)

        error = result.get('error')
        if error:
            if 'StorageTicket with ID "%i"' % (self.ArchiveId) in error:
                if 'existing_ticket_id' in result and 'existing_ticket_state' in result:
                    logger.warning("Got a Tier 1 GetStorageTicket error for an incomplete storage ticket %s with status %s" % (result['existing_ticket_id'],result['existing_ticket_state']))
                    if result['existing_ticket_state'] < IngestSuccessful:
                        try:
                            self.ticket                = result['existing_ticket_id']
                            logger.warning("trying to repair status of StorageTicket %s" % self.ticket)

                            self.SendStatusToLTA(IngestFailed)
                        except Exception as e:
                            logger.exception('ResettingStatus IngestFailed failed for %s' % self.ticket)
                        raise Exception ('Had to reset state for %s' % self.ticket)
                    else:
                        raise PipelineError('GetStorageTicket error: Dataproduct already in LTA for %s' % (self.JobId), PipelineAlreadyInLTAError)
                else:
                    raise Exception('GetStorageTicket error I can''t interpret: %s' % result)

            if 'no storage resources defined for project' in error or "project does not exists" in error:
                raise PipelineError('GetStorageTicket error for project not known in LTA: %s' % error, PipelineNoProjectInLTAError)

            raise Exception('GetStorageTicket error: %s' % error)
        else:
            self.ticket            = result.get('ticket')
            self.PrimaryUri        = result.get('primary_uri_rnd')
            self.SecondaryUri      = result.get('secondary_uri_rnd')

            if 'sara' in self.PrimaryUri:
                self.lta_site = 'sara'
            elif 'juelich' in self.PrimaryUri:
                self.lta_site = 'juelich'
            elif 'psnc' in self.PrimaryUri:
                self.lta_site = 'poznan'

    @metric_track_duration(prefix="ingest_")
    def TransferFile(self):
        try:
            logger.info('Starting file transfer for %s ' % self.JobId)
            start = time.time()
            self.status = IngestPipeline.STATUS_TRANSFERRING

            self.__sendNotification('JobProgress',
                                    message='transfer starting',
                                    percentage_done=0.0,
                                    total_bytes_transfered=0)

            if 'cep4' in self.HostLocation.lower() or 'cpu' in self.HostLocation.lower():
                self.HostLocation = 'localhost'

            def progress_callback(percentage_done, current_speed, total_bytes_transfered):
                self.__sendNotification('JobProgress',
                                        percentage_done=min(100.0, round(10.0*percentage_done)/10.0),
                                        current_speed=current_speed,
                                        total_bytes_transfered=total_bytes_transfered)

            if (os.path.splitext(self.Location)[-1] == '.h5' and
                os.path.splitext(os.path.basename(self.Location))[0].endswith('_bf')):
                logger.info('dataproduct is a beamformed h5 file. adding raw file to the transfer')
                self.Location = [self.Location, self.Location.replace('.h5', '.raw')]

            if self.DataProduct not in self.Location and 'Source' in self.job:
                # old hack, is needed to support dynspec / pulsar archiving scripts
                self.Location = os.path.join(self.Location, self.job['Source'])

            with metric_track_duration.labels(site=self.lta_site).time(),
                 metric_nr_transfers_in_progress.labels(site=self.lta_site).track_inprogress():
                cp = LtaCp(self.Location,
                           self.PrimaryUri,
                           globus_timeout=self.globus_timeout,
                           progress_callback=progress_callback)
                transfer_result = cp.transfer(force=True)
            self.status = IngestPipeline.STATUS_FINALIZING

            if not transfer_result:
                msg = 'error while transferring %s with ltacp' % (self.JobId)
                logger.error(msg)
                raise Exception(msg)

            self.MD5Checksum = transfer_result[0]
            self.Adler32Checksum = transfer_result[1]
            self.FileSize = transfer_result[2]

            if self.MD5Checksum and self.Adler32Checksum and self.FileSize:
                logger.debug('valid checksums found for %s with filesize %sB (%s). md5: %s adler32: %s', self.JobId,
                                                                                                         self.FileSize,
                                                                                                         humanreadablesize(int(self.FileSize), 'B'),
                                                                                                         self.MD5Checksum,
                                                                                                         self.Adler32Checksum)
            else:
                msg = 'no valid checksums found for %s with filesize %sB (%s). md5: %s adler32: %s' % (self.JobId,
                                                                                                       self.FileSize,
                                                                                                       humanreadablesize(int(self.FileSize), 'B'),
                                                                                                       self.MD5Checksum,
                                                                                                       self.Adler32Checksum)
                logger.error(msg)
                raise Exception(msg)

            try:
                self.__sendNotification('JobProgress',
                                        message='transfer finished',
                                        percentage_done=100.0,
                                        total_bytes_transfered=int(self.FileSize))

                metric_nr_bytes_transferred.inc(self.FileSize)
            except ValueError, TypeError:
            elapsed = time.time() - start

            try:
                if int(self.FileSize) > 0:
                    avgSpeed = float(self.FileSize) / elapsed
                logger.info("Finished file transfer for %s in %d sec with an average speed of %s for %s including ltacp overhead" % (self.JobId, elapsed, humanreadablesize(avgSpeed, 'Bps'), humanreadablesize(float(self.FileSize), 'B')))
            except Exception:
                logger.info('Finished file transfer of %s in %s' % (self.JobId, elapsed))

        except Exception as exp:
            if isinstance(exp, LtacpException):
                if '550 File not found' in exp.value:
                    logger.error('Destination directory does not exist. Creating %s in LTA for %s' % (self.PrimaryUri, self.JobId))
                    metric_nr_transfer_exceptions.labels(site=self.lta_site,reason="target_directory_not_found").inc()

                    if create_missing_directories(self.PrimaryUri) == 0:
                        logger.info('Created path %s in LTA for %s' % (self.PrimaryUri, self.JobId))
                elif 'source path' in exp.value and 'does not exist' in exp.value:
                    metric_nr_transfer_exceptions.labels(site=self.lta_site,reason="source_path_not_found").inc()
                    raise PipelineError(exp.value, PipelineNoSourceError)
                else:
                    metric_nr_transfer_exceptions.labels(site=self.lta_site,reason="other").inc()

            raise Exception('transfer failed for %s: %s' % (self.JobId, str(exp)))

    @metric_track_duration(prefix="ingest_")
    def SendChecksumsToLTA(self):
        result = self.ltaClient.SendChecksums(self.JobId, self.Project, self.ticket, self.FileSize, self.PrimaryUri, self.SecondaryUri, self.MD5Checksum, self.Adler32Checksum)
        if not result.get('error'):
            #store final uri's
            self.PrimaryUri   = result['primary_uri']
            self.SecondaryUri = result.get('secondary_uri')

    @metric_track_duration(prefix="ingest_")
    def SendStatusToLTA(self, lta_state_id):
        if self.ticket:
            self.ltaClient.UpdateUriState(self.JobId, self.Project, self.ticket, self.PrimaryUri, lta_state_id)

            # no need to check for valid SIP's in TMSS. It adds a significant load to the server.
            # If TMSS is able to return a SIP, it's guaranteed to be valid as TMSS does the validation already.
            # If TMSS is not able to return a SIP, that is caused by invalid observation/pipeline feedback, and the SIP won't validate anyway.
            pass
        elif 'SIPLocation' in self.job: # job file might know where the sip is when it is not a MoM job
            try:
                sip_host = self.job['SIPLocation'].split(':')[0]
                sip_path = self.job['SIPLocation'].split(':')[1]

                cmd = ['ssh', '-tt', '-n', '-x', '-q', '%s@%s' % (self.user, sip_host), 'cat %s' % sip_path]
                logger.info("GetSIP for %s at SIPLocation %s - cmd %s" % (self.JobId, self.job['SIPLocation'], ' ' .join(cmd)))
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                out, err = communicate_returning_strings(p)
                if p.returncode != 0:
                    raise PipelineError('GetSIP error getting EoR SIP for %s: %s' % (self.JobId, out + err))

                tmp_SIP = out

                tmp_SIP = addIngestInfoToSIP(tmp_SIP, self.ticket, self.FileSize, self.MD5Checksum, self.Adler32Checksum)

                tmp_SIP = tmp_SIP.replace('<stationType>Europe</stationType>','<stationType>International</stationType>')

                #make sure the source in the SIP is the same as the type of the storageticket
                tmp_SIP = re.compile('<source>eor</source>', re.IGNORECASE).sub('<source>%s</source>' % (self.Type,), tmp_SIP)

                if not validateSIPAgainstSchema(tmp_SIP):
                    logger.error('CheckForValidSIP: Invalid SIP:\n%s', tmp_SIP)
                    raise Exception('SIP for %s does not validate against schema' % self.JobId)

            except:
                logger.exception('CheckForValidSIP: Getting SIP from SIPLocation %s failed', self.job['SIPLocation'])
                raise

        logger.info('SIP for %s is valid, can proceed with transfer' % (self.JobId,))

    @metric_track_duration(prefix="ingest_")
                # TMSS works differently than MoM
                # an Ingest-Export is a subtask with input dataproducts which should be ingested...
                # and output dataproducts which are ingested
                # so, for this transfered input dataproduct get the corresponding output,
                # and store the archive/transfer info with the output dataproduct

                output_dataproduct_id = self.ingest_tmss_rpc.get_output_dataproduct_id(subtask_id=self.job.get('TMSSIngestSubtaskId'),
                                                                                       input_dataproduct_id=self.job['TMSSInputDataproductId'])
                self.ingest_tmss_rpc.store_archive_information(dataproduct_id=output_dataproduct_id,
                                                               storage_ticket=self.ticket,
                                                               size=int(self.FileSize),
                                                               filepath=self.PrimaryUri,
                                                               md5_hash=self.MD5Checksum,
                                                               adler32_hash=self.Adler32Checksum)

                # get the SIP for the output dataproduct which is/should_be the same as for the input,
                # but enriched with the archive information which is needed by the LTA.
                self.SIP = self.ingest_tmss_rpc.get_SIP(output_dataproduct_id)
            elif 'SIPLocation' in self.job: # job file might know where the sip is when it is not a MoM job
                try:
                    sip_host = self.job['SIPLocation'].split(':')[0]
                    sip_path = self.job['SIPLocation'].split(':')[1]
                    cmd = ['ssh', '-tt', '-n', '-x', '-q', '%s@%s' % (self.user, sip_host), 'cat %s' % sip_path]
                    logger.info("GetSIP for %s at SIPLocation %s - cmd %s" % (self.JobId, self.job['SIPLocation'], ' ' .join(cmd)))
                    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    out, err = communicate_returning_strings(p)
                    if p.returncode != 0:
                        raise PipelineError('GetSIP error getting EoR SIP for %s: %s' % (self.JobId, out + err))
                    self.SIP = addIngestInfoToSIP(self.SIP, self.ticket, self.FileSize, self.MD5Checksum, self.Adler32Checksum)
                    self.SIP = self.SIP.replace('<stationType>Europe</stationType>','<stationType>International</stationType>')
                    #make sure the source in the SIP is the same as the type of the storageticket
                    self.SIP = re.compile('<source>eor</source>', re.IGNORECASE).sub('<source>%s</source>' % (self.Type,), self.SIP)
                    if not validateSIPAgainstSchema(self.SIP):
                        logger.error('Invalid SIP:\n%s', self.SIP)
                        raise Exception('SIP for %s does not validate against schema' % self.JobId)
                except:
                    logger.exception('Getting SIP from SIPLocation %s failed', self.job['SIPLocation'])
                    raise
                logger.info('SIP received for %s from SIPLocation %s with size %d (%s): \n%s' % (self.JobId,
                                                                                                self.job['SIPLocation'],
                                                                                                len(self.SIP),
                                                                                                humanreadablesize(len(self.SIP)),
                                                                                                self.SIP[0:1024]))
            else:
                self.SIP = makeSIP(self.Project, self.ObsId, self.ArchiveId, self.ticket, self.FileName, self.FileSize, self.MD5Checksum, self.Adler32Checksum, self.Type)
                self.FileType = FILE_TYPE_UNSPECIFIED
        except Exception as e:
            if self.minimal_SIP:
                logger.info('making minimal SIP for %s', self.JobId)
                self.SIP = makeSIP(self.Project, self.ObsId, self.ArchiveId, self.ticket, self.FileName, self.FileSize, self.MD5Checksum, self.Adler32Checksum, self.Type)
                logger.info('minimal SIP for %s: \n%s', self.JobId, self.SIP)
                self.FileType = FILE_TYPE_UNSPECIFIED
            else:
    @metric_track_duration(prefix="ingest_")
    def SendSIPToLTA(self):
        try:
            self.ltaClient.SendSIP(self.JobId, self.SIP, self.ticket)
        except Exception as e:
            logger.error('SendSIPToLTA exception: %s', e)
            raise PipelineError(str(e), PipelineJobFailedError)
    @metric_track_duration(prefix="ingest_")
    def RollBack(self):
        try:
            logger.info('rolling back file transfer for %s', self.JobId)
            start     = time.time()

            if self.PrimaryUri:
                srmrm(self.PrimaryUri, log_prefix=self.JobId, timeout=300)

            if self.SecondaryUri:
                srmrm(self.SecondaryUri, log_prefix=self.JobId, timeout=300)

            if self.Type == "TMSS":
                # delete archive info for the output_dataproduct in TMSS
                output_dataproduct_id = self.ingest_tmss_rpc.get_output_dataproduct_id(subtask_id=self.job.get('TMSSIngestSubtaskId'),
                                                                                       input_dataproduct_id=self.job['TMSSInputDataproductId'])
                self.ingest_tmss_rpc.delete_hashes_and_archive_information(output_dataproduct_id)
            logger.debug("rollBack for %s took %ds", self.JobId, time.time() - start)
        except Exception as e:
            logger.exception('rollback failed for %s: %s', self.JobId, e)

    def __sendNotification(self, subject, message='', **kwargs):
        try:
            contentDict = { 'job_id': self.JobId,
                            'export_id': self.job.get('job_group_id', self.job.get('TMSSIngestSubtaskId')),
                            'archive_id': self.ArchiveId,
                            'project': self.Project,
                            'type': self.Type,
                            'ingest_server': self.hostname,
                            'dataproduct': self.DataProduct,
                            'srm_url': self.PrimaryUri }
            if 'ObservationId' in self.job and self.Type.lower()=='tmss':
                contentDict['tmss_producing_subtask_id'] = self.job['ObservationId']
            if 'TMSSInputDataproductId' in self.job and self.Type.lower()=='tmss':
                contentDict['input_dataproduct_id'] = self.job['TMSSInputDataproductId']

            if self.lta_site:
                contentDict['lta_site'] = self.lta_site

            if message:
                contentDict['message'] = message

            for k,v in list(kwargs.items()):
            msg = EventMessage(subject="%s.%s" % (INGEST_NOTIFICATION_PREFIX, subject), content=contentDict)
            msg.ttl = 48*3600 #remove message from queue's when not picked up within 48 hours
            logger.info('Sending notification %s: %s' % (subject, str(contentDict).replace('\n', ' ')))
            self.event_bus.send(msg)
        except Exception as e:
            logger.error(str(e))

    def run(self):
        with self.event_bus, self.ingest_tmss_rpc:
            try:
                logger.info("starting ingestpipeline for %s" % self.JobId)
                start = time.time()
                self.__sendNotification('JobStarted')
                self.TransferFile()
                self.SendChecksumsToLTA()
                self.GetSIP()
                self.SendSIPToLTA()
                self.SendStatusToLTA(IngestSuccessful)
                    avgSpeed = float(self.FileSize) / elapsed
                    logger.info("Ingest Pipeline finished for %s in %d sec with average speed of %s for %s including all overhead",
                                self.JobId, elapsed, humanreadablesize(avgSpeed, 'Bps'), humanreadablesize(float(self.FileSize), 'B'))
                except Exception:
                    logger.info("Ingest Pipeline finished for %s in %d sec", self.JobId, elapsed)

                self.__sendNotification('JobFinished',
                                        average_speed=avgSpeed,
                                        total_bytes_transfered=int(self.FileSize))

Jorrit Schaap's avatar
Jorrit Schaap committed
                logger.log(logging.WARNING if pe.type == PipelineAlreadyInLTAError else logging.ERROR,
                           'Encountered PipelineError for %s : %s', self.JobId, str(pe))
                if pe.type == PipelineNoSourceError:
                    self.__sendNotification('JobTransferFailed', 'data not transfered because it was not on disk')
                elif pe.type == PipelineAlreadyInLTAError:
                    self.__sendNotification('JobFinished', 'data was already in the LTA',
                                            average_speed=0,
                                            total_bytes_transfered=0)
                else:
                    self.RollBack()

                    # by default the error_message for the notification is the exception
                    self.__sendNotification('JobTransferFailed', error_message)

                try:
                    if pe.type != PipelineAlreadyInLTAError:
                        self.SendStatusToLTA(IngestFailed)
                except Exception as e:
                    logger.error('SendStatusToLTA failed for %s: %s', self.JobId, e)

            except Exception as e:
                logger.error('Encountered unexpected error for %s: %s', self.JobId, e)

                # by default the error_message for the notification is the exception
                error_message = str(e)
                # for known messsages in the exception, make a nice readable error_message
                if 'ltacp' in error_message and ('file listing failed' in error_message or 'du failed' in error_message):
                    error_message = 'dataproduct %s not found at location %s:%s' % (self.DataProduct, self.HostLocation, self.Location)
                elif 'does not validate against schema' in error_message:
                    error_message = 'invalid SIP does not validate against schema'

                try:
                    self.RollBack()
                except Exception as rbe:
                    logger.error('RollBack failed for %s: %s', self.JobId, rbe)
                try:
                    self.SendStatusToLTA(IngestFailed)
                except Exception as sse:
                    logger.error('SendStatusToLTA failed for %s: %s', self.JobId, sse)
                try:
                    self.__sendNotification('JobTransferFailed', error_message)
                except Exception as sne:
                    logger.error('sendNotification failed for %s: %s', self.JobId, sne)
            finally:
                self.status = IngestPipeline.STATUS_FINISHED
    from optparse import OptionParser, OptionGroup
    from lofar.common import dbcredentials

    # Check the invocation arguments
    parser = OptionParser("%prog [options] <path_to_jobfile.xml>",
                          description='Run the ingestpipeline on a single jobfile.')
    parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default')
    parser.add_option('--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
    parser.add_option("-u", "--user", dest="user", type="string", default=getpass.getuser(), help="username for to login on <host>, [default: %default]")
    parser.add_option('-s', '--minimal-SIP', dest='minimal_SIP', action='store_true', help='create and upload a minimal SIP to the LTA catalogue when the normal SIP is not accepted.')
    parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
    parser.add_option('-t', '--timeout', dest='globus_timeout', type='int', default=GLOBUS_TIMEOUT, help='number of seconds (default=%default) to wait for globus-url-copy to finish after the transfer is done (while lta-site is computing checksums)')
    parser.add_option("-l", "--lta_credentials", dest="lta_credentials", type="string",
                      default='LTA' if isProductionEnvironment() else 'LTA_test',
                      help="Name of lofar credentials for lta user/pass (see ~/.lofar/dbcredentials) [default=%default]")
    (options, args) = parser.parse_args()

    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        level=logging.DEBUG if options.verbose else logging.INFO)

    if len(args) != 1:
        parser.print_help()
        sys.exit(1)

    try:
        path = args[0]

        if os.path.isfile(path):
            job = parseJobXmlFile(path)
            job['filename'] = path
            logger.info("Parsed jobfile %s: %s", path, job)

            ltacreds = dbcredentials.DBCredentials().get(options.lta_credentials)
            ltaClient = LTAClient(ltacreds.user, ltacreds.password)

            jobPipeline = IngestPipeline(job, ltaClient,
                                         busname=options.busname,
                                         globus_timeout=options.globus_timeout,
                                         minimal_SIP=options.minimal_SIP)
            jobPipeline.run()
            exit(0)
        else:
            logger.info("No such file %s", path)
            exit(1)
    except Exception as e:
        logger.error(e)
        exit(1)

if __name__ == '__main__':
    main()