Skip to content
Snippets Groups Projects
ingesttransferserver.py 15.6 KiB
Newer Older
#!/usr/bin/env python

# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#

"""
"""

import qpid.messaging
import logging
from datetime import datetime, timedelta
import os
import time
import socket
import getpass
from threading import Thread
from lofar.messaging.messagebus import FromBus
from lofar.messaging.messages import *
from lofar.common import isProductionEnvironment
from lofar.common import dbcredentials
from lofar.common.datetimeutils import totalSeconds
from lofar.common.util import humanreadablesize
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME
from lofar.lta.ingest.server.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.common.config import DEFAULT_BROKER
from lofar.lta.ingest.server.config import MAX_NR_OF_JOBS, MAX_USED_BANDWITH_TO_START_NEW_JOBS, NET_IF_TO_MONITOR
from lofar.lta.ingest.server.config import TRANSFER_TIMEOUT
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.server.ingestpipeline import IngestPipeline
from lofar.lta.ingest.server.ltaclient import *
from lofar.lta.ingest.server.momclient import *

try:
    import psutil
except ImportError as e:
    print str(e)
    print 'Please install python package psutil: pip install psutil'
    exit(1)

logger = logging.getLogger(__name__)

def _getBytesSent():
    try:
        return psutil.net_io_counters(True).get(NET_IF_TO_MONITOR, psutil.net_io_counters(False)).bytes_sent
    except Exception:
        return 0


class IngestTransferServer:
    def __init__(self,
                 job_queuename=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
                 notification_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME,
                 notification_prefix=DEFAULT_INGEST_NOTIFICATION_PREFIX,
                 mom_credentials=None,
                 lta_credentials=None,
                 user=None,
                 broker=None,
                 max_nr_of_parallel_jobs=MAX_NR_OF_JOBS):

        self.broker                   = broker
        self.user                     = user
        if not self.user:
            self.user=getpass.getuser()

        self.mom_credentials          = mom_credentials
        self.lta_credentials          = lta_credentials
        self.notification_busname     = notification_busname
        self.notification_prefix      = notification_prefix
        self.max_nr_of_parallel_jobs  = max_nr_of_parallel_jobs

        self.job_frombus = FromBus(address=job_queuename, broker=broker)
        self.__running_jobs = {}
        self.__log_recource_warning = True
        self.__prev_bytes_sent = _getBytesSent()
        self.__prev_bytes_sent_timestamp = datetime.utcnow()
        self.__prev_used_bandwidth = 0.0
        self.__running_jobs_log_timestamp = datetime.utcnow()

    def __start_job(self, job_dict):
        job_id = job_dict['JobId']

        if job_id in self.__running_jobs:
            logger.warning('job %s is already running', job_id)
            return

        def threaded_pipeline_func(job):
            logger.info('starting job %s in the background', job_id)
            ltaClient = LTAClient(self.lta_credentials.user, self.lta_credentials.password)
            with MoMClient(self.mom_credentials.user, self.mom_credentials.password) as momClient:
                jobPipeline = IngestPipeline(job, momClient, ltaClient,
                                             notification_busname=self.notification_busname,
                                             notification_prefix=self.notification_prefix,
                                             broker=self.broker,
                                             user=self.user)
                jobPipeline.run()

        thread = Thread(target=threaded_pipeline_func, args=[job_dict])
        thread.daemon = True
        self.__running_jobs[job_id] = thread
        thread.start()

    def __clearFinishedJobs(self):
        try:
            finished_job_ids = [job_id for job_id, job in self.__running_jobs.items() if not job.is_alive()]
            for job_id in finished_job_ids:
                logger.info('removing finished job %s', job_id)
                del self.__running_jobs[job_id]
        except Exception as e:
            logger.error('__clearFinishedJobs: %s', e)

    def __enoughResourcesAvailable(self):
        try:
            # helper func to do conditional warning of unavailable resources
            # this will print at most only one such message per started job
            # so the log won't be flooded
            def log_recource_warning(message):
                if self.__log_recource_warning:
                    logger.warn(message)
                    self.__log_recource_warning = False

            import pprint
            now = datetime.utcnow()
            bytes_sent = _getBytesSent()

            if bytes_sent >= self.__prev_bytes_sent: #bytes_sent might wrap around zero
                #compute current speed in Gbps
                speed = 8*(bytes_sent - self.__prev_bytes_sent) / totalSeconds(now - self.__prev_bytes_sent_timestamp)

                #running average for used_bandwidth
                used_bandwidth = 0.5*speed + 0.5*self.__prev_used_bandwidth

                #store for next iteration
                self.__prev_bytes_sent = bytes_sent
                self.__prev_bytes_sent_timestamp = now
                self.__prev_used_bandwidth = used_bandwidth

                # only start new jobs if we have some bandwith available
                # note that this is a 'soft' limit.
                # we cannot control the actual bandwith used by the running transfers
                # we can only not start new jobs if we already exceed the MAX_USED_BANDWITH_TO_START_NEW_JOBS
                if used_bandwidth > MAX_USED_BANDWITH_TO_START_NEW_JOBS:
                    log_recource_warning('not enough bandwith available to start new jobs, using %s, max %s' %
                                         (humanreadablesize(used_bandwidth, 'bps'),
                                          humanreadablesize(MAX_USED_BANDWITH_TO_START_NEW_JOBS, 'bps')))
                    return False
            else:
                #wrapped around 0, just store for next iteration, do not compute anything
                self.__prev_bytes_sent = bytes_sent
                self.__prev_bytes_sent_timestamp = now

            # only start new jobs if we have some cpu time available
            if psutil.cpu_times_percent().idle < 5:
                log_recource_warning('not enough cpu power available to start new jobs, cpu_idle %s%%' %
                                     psutil.cpu_times_percent().idle)
                return False

            # only start new jobs if system load is not too high
            if os.getloadavg()[0] > psutil.cpu_count():
                log_recource_warning('system load too high (%s > %s), cannot start new jobs' %
                                     (os.getloadavg()[0],
                                      psutil.cpu_count()))
                return False

            # only allow 1 job at the time if swapping
            if psutil.swap_memory().percent > 5 and len(self.__running_jobs) > 0:
                log_recource_warning('system swapping. not enough memory available to start new jobs')
                return False

            # only start new jobs if number of processes is not too high
            try:
                current_user = getpass.getuser()
                current_user_procs = [p for p in psutil.process_iter() if p.username() == current_user]
                if len(current_user_procs) > 64*psutil.cpu_count():
                    log_recource_warning('number of processes by %s too high (%s > %s), cannot start new jobs' %
                                        (current_user,
                                        len(current_user_procs),
                                        64*psutil.cpu_count()))
                    return False
            except:
                pass

            #limit total number of parallel jobs to self.max_nr_of_parallel_jobs
            if len(self.__running_jobs) >= self.max_nr_of_parallel_jobs:
                log_recource_warning('already running %s parallel jobs. limiting the total number of jobs to %s' %
                                     (len(self.__running_jobs),
                                      self.max_nr_of_parallel_jobs))
                return False

        except Exception as e:
            logger.error(e)
            #unknown error, run 1 job at a time
            return len(self.__running_jobs) == 0

        return True

    def run(self):
        log_recource_warning = True
        while True:
            try:
                        with self.job_frombus:
                            msg = self.job_frombus.receive(timeout=60)
                            if msg:
                                logger.debug("received msg on job queue: %s", msg)
                                self.job_frombus.ack(msg)

                                if isinstance(msg, CommandMessage):
                                    job_dict = parseJobXml(msg.content)
                                    logger.info("received job: %s", job_dict)

                                    self.__start_job(job_dict)

                                    #allow 1 new recource_warning to be logged
                                    self.__log_recource_warning = True
                                else:
                                    logger.warn("unexpected message type: %s", msg)
                except KeyboardInterrupt:
                    break
                except Exception as e:
                    logger.error(e)
                try:
                    # sleep a little
                    # so jobs have a little time to start consuming resources
                    # this limits the numer of jobs that can be started to 1000 starts per minute
                    # it does not limit the total number of parallel jobs
                    # that is limited dynamically by __enoughResourcesAvailable
                    # and by the hard limit self.max_nr_of_parallel_jobs
                    time.sleep(0.1)

                    # if already running at high bandwith usages,
                    # we can sleep a little extra depending on how close we are to the MAX_USED_BANDWITH_TO_START_NEW_JOBS
                    if self.__prev_used_bandwidth > 0.5*MAX_USED_BANDWITH_TO_START_NEW_JOBS:
                        time.sleep(0.5)
                        if self.__prev_used_bandwidth > 0.8*MAX_USED_BANDWITH_TO_START_NEW_JOBS:
                            time.sleep(2.5)

                    if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10):
                        self.__running_jobs_log_timestamp = datetime.utcnow()
                        logger.info("running %s jobs, bandwith used on network interface %s %s (%s)",
                                    len(self.__running_jobs),
                                    NET_IF_TO_MONITOR,
                                    humanreadablesize(self.__prev_used_bandwidth, 'bps'),
                                    humanreadablesize(self.__prev_used_bandwidth/8, 'Bps'))
                except KeyboardInterrupt:
                    break
                except Exception as e:
                    logger.error(e)

            except KeyboardInterrupt:
                break
            except Exception as e:
                logger.error(e)

def main():
    # make sure we run in UTC timezone
    import os
    os.environ['TZ'] = 'UTC'

    from optparse import OptionParser
    from lofar.messaging import setQpidLogLevel
    from lofar.common.util import waitForInterrupt

    # Check the invocation arguments
    parser = OptionParser("%prog [options]",
                          description='runs the ingest transfer server which picks up as many jobs as it can handle from the given --ingest_job_queuename and tranfers the dataproducts to the LTA, updates the LTA catalogue, and updates MoM')
    parser.add_option('-q', '--broker', dest='broker', type='string',
                      default=DEFAULT_BROKER,
                      help='Address of the qpid broker, default: %default')
    parser.add_option("--ingest_job_queuename", dest="ingest_job_queuename", type="string",
                      default=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
                      help="Name of the job queue. [default: %default]")
    parser.add_option("-p", "--max_nr_of_parallel_jobs", dest="max_nr_of_parallel_jobs", type="int",
                      default=MAX_NR_OF_JOBS,
                      help="Name of the job queue. [default: %default]")
    parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', default=DEFAULT_INGEST_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
    parser.add_option("--ingest_notification_prefix", dest="ingest_notification_prefix", type="string", default=DEFAULT_INGEST_NOTIFICATION_PREFIX, help="The prefix for all notifications of this publisher, [default: %default]")
    parser.add_option("-u", "--user", dest="user", type="string", default=getpass.getuser(), help="username for to login on data source host, [default: %default]")
    parser.add_option("-l", "--lta_credentials", dest="lta_credentials", type="string",
                      default='LTA' if isProductionEnvironment() else 'LTA_test',
                      help="Name of lofar credentials for lta user/pass (see ~/.lofar/dbcredentials) [default=%default]")
    parser.add_option("-m", "--mom_credentials", dest="mom_credentials", type="string",
                      default='MoM_site' if isProductionEnvironment() else 'MoM_site_test',
                      help="Name of credentials for MoM user/pass (see ~/.lofar/dbcredentials) [default=%default]")
    parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
    (options, args) = parser.parse_args()

    setQpidLogLevel(logging.INFO)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        level=logging.DEBUG if options.verbose else logging.INFO)

    logger.info('*****************************************')
    logger.info('Started ingest server on host %s', socket.gethostname())
    logger.info('*****************************************')

    ltacreds = dbcredentials.DBCredentials().get(options.lta_credentials)
    momcreds = dbcredentials.DBCredentials().get(options.mom_credentials)

    server = IngestTransferServer(job_queuename=options.ingest_job_queuename,
                                  broker=options.broker,
                                  mom_credentials=momcreds,
                                  lta_credentials=ltacreds,
                                  max_nr_of_parallel_jobs=options.max_nr_of_parallel_jobs)
    server.run()