Skip to content
Snippets Groups Projects
ingesttransferserver.py 21.3 KiB
Newer Older

# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#

"""
"""

import logging
from datetime import datetime, timedelta
import os
import time
import socket
import getpass
import pprint
from lofar.messaging.messagebus import FromBus, ToBus
from lofar.messaging.messages import *
from lofar.common import isProductionEnvironment
from lofar.common import dbcredentials
from lofar.common.datetimeutils import totalSeconds
from lofar.common.util import humanreadablesize
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME
from lofar.lta.ingest.server.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.common.config import DEFAULT_BROKER
from lofar.lta.ingest.server.config import MAX_NR_OF_JOBS, MAX_USED_BANDWITH_TO_START_NEW_JOBS, NET_IF_TO_MONITOR
from lofar.lta.ingest.server.config import TRANSFER_TIMEOUT
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.server.ingestpipeline import IngestPipeline
from lofar.lta.ingest.server.ltaclient import *
from lofar.lta.ingest.server.momclient import *
import psutil

logger = logging.getLogger(__name__)

def _getBytesSent():
    try:
        # try to sum the summed traffic of all interfaces in NET_IF_TO_MONITOR
        counters = psutil.net_io_counters(True)
        if all(interface in counters for interface in NET_IF_TO_MONITOR):
            return sum(counters[interface].bytes_sent for interface in NET_IF_TO_MONITOR)

        # not all interfaces found... return total bytes_sent
        return psutil.net_io_counters(False).bytes_sent
    except Exception as e:
        logger.warning("Cannot get network interface info: %s", e)
        return 0

class IngestTransferServer:
    def __init__(self,
                 job_queuename = DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
                 notification_busname = DEFAULT_INGEST_NOTIFICATION_BUSNAME,
                 notification_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX,
                 mom_credentials = None,
                 lta_credentials = None,
                 user = None,
                 broker = None,
                 max_nr_of_parallel_jobs = MAX_NR_OF_JOBS):

        self.broker = broker
        self.job_queuename = job_queuename
        self.user = user
            self.user = getpass.getuser()
        self.mom_credentials = mom_credentials
        self.lta_credentials = lta_credentials
        self.notification_busname = notification_busname
        self.notification_prefix = notification_prefix
        self.event_bus = ToBus(notification_busname, broker = broker)
        self.max_nr_of_parallel_jobs = max_nr_of_parallel_jobs
        self.__log_recource_warning = True
        self.__prev_bytes_sent = _getBytesSent()
        self.__prev_bytes_sent_timestamp = datetime.utcnow()
        self.__prev_used_bandwidth = 0.0
        self.__running_jobs_log_timestamp = datetime.utcnow()

    def __start_job(self, job_dict):
        job_id = job_dict['JobId']

        if job_id in self.__running_jobs:
            logger.warning('job %s is already running', job_id)
            return

        def threaded_pipeline_func(job):
            logger.info('starting job %s in the background', job_id)
            ltaClient = LTAClient(self.lta_credentials.user, self.lta_credentials.password)
            with MoMClient(self.mom_credentials.user, self.mom_credentials.password) as momClient:
                jobPipeline = IngestPipeline(job, momClient, ltaClient,
                                             notification_busname = self.notification_busname,
                                             notification_prefix = self.notification_prefix,
                                             broker = self.broker,
                                             user = self.user)
                with self.__lock:
                    self.__running_jobs[job_id]['pipeline'] = jobPipeline

        thread = Thread(target = threaded_pipeline_func, args = [job_dict])
        with self.__lock:
            self.__running_jobs[job_id] = { 'thread':thread }
            finished_job_ids = [job_id for job_id, job_thread_dict in list(self.__running_jobs.items()) if not job_thread_dict['thread'].is_alive()]
            for job_id in finished_job_ids:
                logger.info('removing finished job %s', job_id)
                del self.__running_jobs[job_id]
        except Exception as e:
            logger.error('__clearFinishedJobs: %s', e)

    def __enoughResourcesAvailable(self):
        try:
            # helper func to do conditional warning of unavailable resources
            # this will print at most only one such message per started job
            # so the log won't be flooded
            def log_recource_warning(message):
                if self.__log_recource_warning:
                    logger.warning("resources: %s", message)
                    #self.__log_recource_warning = False
            if bytes_sent >= self.__prev_bytes_sent:    # bytes_sent might wrap around zero
                # compute current speed in Gbps
                speed = 8 * (bytes_sent - self.__prev_bytes_sent) / totalSeconds(now - self.__prev_bytes_sent_timestamp)
                # running average for used_bandwidth
                used_bandwidth = 0.5 * speed + 0.5 * self.__prev_used_bandwidth
                logger.debug("resources: current used_bandwidth = %s", humanreadablesize(used_bandwidth, 'bps'))

                # store for next iteration
                self.__prev_bytes_sent = bytes_sent
                self.__prev_bytes_sent_timestamp = now
                self.__prev_used_bandwidth = used_bandwidth

                # only start new jobs if we have some bandwith available
                # note that this is a 'soft' limit.
                # we cannot control the actual bandwith used by the running transfers
                # we can only not start new jobs if we already exceed the MAX_USED_BANDWITH_TO_START_NEW_JOBS
                if used_bandwidth > MAX_USED_BANDWITH_TO_START_NEW_JOBS:
                    log_recource_warning('not enough bandwith available to start new jobs, using %s, max %s' %
                                         (humanreadablesize(used_bandwidth, 'bps'),
                                          humanreadablesize(MAX_USED_BANDWITH_TO_START_NEW_JOBS, 'bps')))
                    return False
            else:
                # wrapped around 0, just store for next iteration, do not compute anything
                self.__prev_bytes_sent = bytes_sent
                self.__prev_bytes_sent_timestamp = now

            # only start new jobs if we have some cpu time available
            idle_cpu_percentage = psutil.cpu_times_percent().idle
            logger.debug("resources: current idle_cpu_percentage = %s%%", idle_cpu_percentage)
            if idle_cpu_percentage < 5:
                log_recource_warning('not enough cpu power available to start new jobs, cpu_idle %s%%' %
                                     idle_cpu_percentage)
                return False

            # only start new jobs if system load is not too high
            short_load_avg = os.getloadavg()[0]
            cpu_count = psutil.cpu_count()
            allowed_load = 1.5 * cpu_count
            logger.debug("resources: current short term load = %s #cpu's = %s allowed_load = %s", short_load_avg, cpu_count, allowed_load)
            if short_load_avg > allowed_load:
                log_recource_warning('system load too high (%s > %s), cannot start new jobs' %
                                     (short_load_avg,
                                      allowed_load))
                return False

            # only allow 1 job at the time if swapping
            swap_memory_percentage = psutil.swap_memory().percent
            logger.debug("resources: current swap_memory_percentage = %s%%", swap_memory_percentage)
            if swap_memory_percentage > 5 and len(self.__running_jobs) > 0:
                log_recource_warning('system swapping. not enough memory available to start new jobs')
                return False

            # only start new jobs if number of processes is not too high
            try:
                current_user = getpass.getuser()
                current_user_procs = [p for p in psutil.process_iter() if p.username() == current_user]
                current_num_user_procs = len(current_user_procs)
                allowed_num_user_procs = 64 * cpu_count

                logger.debug("resources: current num_user_procs = %s allowed_num_user_procs = %s", current_num_user_procs, allowed_num_user_procs)

                if current_num_user_procs > allowed_num_user_procs:
                    log_recource_warning('number of processes by %s too high (%s > %s), cannot start new jobs' %
                                        (current_user,
                                        current_num_user_procs,
                                        allowed_num_user_procs))
            except Exception as e:
                logger.exception(e)
            # limit total number of parallel transferring jobs to self.max_nr_of_parallel_jobs
                starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict]
                pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]
                initializing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_INITIALIZING]
                transferring_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_TRANSFERRING]
Jorrit Schaap's avatar
Jorrit Schaap committed
                finalizing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINALIZING]
                num_busy_transfers = len(starting_threads) + len(initializing_pipelines) + len(transferring_pipelines)
                num_finalizing_transfers = len(finalizing_pipelines)

                logger.debug("resources: current num_busy_transfers = %s num_finalizing_transfers = %s max_nr_of_parallel_jobs = %s",
                             num_busy_transfers, num_finalizing_transfers, self.max_nr_of_parallel_jobs)

                if num_busy_transfers >= self.max_nr_of_parallel_jobs:
Jorrit Schaap's avatar
Jorrit Schaap committed
                    log_recource_warning('already running %d parallel jobs (#starting=%d, #transferring=%d) limiting the total number of transferring jobs to %d' %
Jorrit Schaap's avatar
Jorrit Schaap committed
                                         len(initializing_pipelines) + len(starting_threads),
                                         len(transferring_pipelines),
                                         self.max_nr_of_parallel_jobs))
                    return False

                if num_finalizing_transfers >= 2 * self.max_nr_of_parallel_jobs:
Jorrit Schaap's avatar
Jorrit Schaap committed
                    log_recource_warning('already waiting for %d jobs to finish (updating status/SIP to MoM and LTA). not starting new jobs until some jobs finished...' %
                                        (len(finalizing_pipelines),))
            # unknown error, run 1 job at a time
            return len(self.__running_jobs) == 0

        return True

    def run(self):
        log_recource_warning = True
        with FromBus(address = self.job_queuename, broker = self.broker) as job_frombus, self.event_bus as _1:
                            logger.info("enough resources available to start new jobs. waiting for new job on %s", job_frombus.address)
                            msg = job_frombus.receive(timeout = 10)
                                logger.info("received msg on job queue: %s", msg)

                                if isinstance(msg, CommandMessage):
                                    job_dict = parseJobXml(msg.content)
                                    logger.info("received job: %s", job_dict)

                                    self.__start_job(job_dict)

                                    # allow 1 new recource_warning to be logged
                                    logger.warning("unexpected message type: %s", msg)
                        else:
                            # wait for resource to become available
                            time.sleep(5)
                    except KeyboardInterrupt:
                        break
                    except Exception as e:
                        logger.error(e)

                    self.__clearFinishedJobs()

                    try:
                        # sleep a little
                        # so jobs have a little time to start consuming resources
                        # this limits the numer of jobs that can be started to 1000 starts per minute
                        # it does not limit the total number of parallel jobs
                        # that is limited dynamically by __enoughResourcesAvailable
                        # and by the hard limit self.max_nr_of_parallel_jobs
                        time.sleep(0.1)

                        # if already running at high bandwith usages,
                        # we can sleep a little extra depending on how close we are to the MAX_USED_BANDWITH_TO_START_NEW_JOBS
                        if self.__prev_used_bandwidth > 0.5 * MAX_USED_BANDWITH_TO_START_NEW_JOBS:
                            if self.__prev_used_bandwidth > 0.85 * MAX_USED_BANDWITH_TO_START_NEW_JOBS:
                        if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds = 2):
                            self.__running_jobs_log_timestamp = datetime.utcnow()

                            with self.__lock:
                                starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict]
                                pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]
                                initializing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_INITIALIZING]
                                transferring_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_TRANSFERRING]
                                finalizing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINALIZING]
                                finished_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINISHED]

                                status_log_line = "status: running %s jobs: #starting=%d, #transferring=%d, #finalizing=%d, #finished=%d, bandwith used on network interface(s) %s %s (%s), load=%.1f" % (len(self.__running_jobs),
                                       len(initializing_pipelines) + len(starting_threads),
                                       len(transferring_pipelines),
                                       len(finalizing_pipelines),
                                       len(finished_pipelines),
                                       NET_IF_TO_MONITOR,
                                       humanreadablesize(self.__prev_used_bandwidth, 'bps'),
                                       humanreadablesize(self.__prev_used_bandwidth / 8, 'Bps'),
                            logger.info(status_log_line)
                            msg = EventMessage(context = self.notification_prefix + 'TransferServiceStatus',
                                                content = { 'ingest_server': socket.gethostname(),
                                                            'message' : status_log_line })
                            msg.ttl = 3600    # remove message from queue's when not picked up within 1 hours
                            self.event_bus.send(msg)

                    except KeyboardInterrupt:
                        break
                    except Exception as e:
                        logger.error(e)
                except KeyboardInterrupt:
                    break
                except Exception as e:
                    logger.error(e)
def main():
    # make sure we run in UTC timezone
    import os
    os.environ['TZ'] = 'UTC'

    from optparse import OptionParser
    from lofar.messaging import setQpidLogLevel
    from lofar.common.util import waitForInterrupt

    # Check the invocation arguments
    parser = OptionParser("%prog [options]",
                          description = 'runs the ingest transfer server which picks up as many jobs as it can handle from the given --ingest_job_queuename and tranfers the dataproducts to the LTA, updates the LTA catalogue, and updates MoM')
    parser.add_option('-q', '--broker', dest = 'broker', type = 'string',
                      default = DEFAULT_BROKER,
                      help = 'Address of the qpid broker, default: %default')
    parser.add_option("--ingest_job_queuename", dest = "ingest_job_queuename", type = "string",
                      default = DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
                      help = "Name of the job queue. [default: %default]")
    parser.add_option("-p", "--max_nr_of_parallel_jobs", dest = "max_nr_of_parallel_jobs", type = "int",
                      default = MAX_NR_OF_JOBS,
                      help = "Name of the job queue. [default: %default]")
    parser.add_option('--ingest_notification_busname', dest = 'ingest_notification_busname', type = 'string', default = DEFAULT_INGEST_NOTIFICATION_BUSNAME, help = 'Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
    parser.add_option("--ingest_notification_prefix", dest = "ingest_notification_prefix", type = "string", default = DEFAULT_INGEST_NOTIFICATION_PREFIX, help = "The prefix for all notifications of this publisher, [default: %default]")
    parser.add_option("-u", "--user", dest = "user", type = "string", default = getpass.getuser(), help = "username for to login on data source host, [default: %default]")
    parser.add_option("-l", "--lta_credentials", dest = "lta_credentials", type = "string",
                      default = 'LTA' if isProductionEnvironment() else 'LTA_test',
                      help = "Name of lofar credentials for lta user/pass (see ~/.lofar/dbcredentials) [default=%default]")
    parser.add_option("-m", "--mom_credentials", dest = "mom_credentials", type = "string",
                      default = 'MoM_site' if isProductionEnvironment() else 'MoM_site_test',
                      help = "Name of credentials for MoM user/pass (see ~/.lofar/dbcredentials) [default=%default]")
    parser.add_option('-V', '--verbose', dest = 'verbose', action = 'store_true', help = 'verbose logging')
    (options, args) = parser.parse_args()

    setQpidLogLevel(logging.INFO)
    logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s',
                        level = logging.DEBUG if options.verbose else logging.INFO)

    logger.info('*****************************************')
    logger.info('Started ingest server on host %s', socket.gethostname())
    logger.info('*****************************************')

    logger.info("environment:")
    for k in sorted(os.environ):
        logger.info("%s=%s", k, os.environ[k])
    logger.info('*****************************************')

    ltacreds = dbcredentials.DBCredentials().get(options.lta_credentials)
    momcreds = dbcredentials.DBCredentials().get(options.mom_credentials)

    server = IngestTransferServer(job_queuename = options.ingest_job_queuename,
                                  broker = options.broker,
                                  mom_credentials = momcreds,
                                  lta_credentials = ltacreds,
                                  max_nr_of_parallel_jobs = options.max_nr_of_parallel_jobs)