Newer
Older
Jörn Künsemöller
committed
#!/usr/bin/env python3

Jorrit Schaap
committed
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
"""
"""
import logging
from datetime import datetime, timedelta
import os
import time
import socket
import getpass

Jorrit Schaap
committed
from threading import Thread, Lock

Jorrit Schaap
committed
from lofar.messaging.messagebus import FromBus, ToBus

Jorrit Schaap
committed
from lofar.messaging.messages import *
from lofar.common import isProductionEnvironment
from lofar.common import dbcredentials
from lofar.common.datetimeutils import totalSeconds
from lofar.common.util import humanreadablesize
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME
from lofar.lta.ingest.server.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.common.config import DEFAULT_BROKER
from lofar.lta.ingest.server.config import MAX_NR_OF_JOBS, MAX_USED_BANDWITH_TO_START_NEW_JOBS, NET_IF_TO_MONITOR

Jorrit Schaap
committed
from lofar.lta.ingest.server.config import TRANSFER_TIMEOUT

Jorrit Schaap
committed
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.server.ingestpipeline import IngestPipeline
from lofar.lta.ingest.server.ltaclient import *
from lofar.lta.ingest.server.momclient import *

Jorrit Schaap
committed
logger = logging.getLogger(__name__)
def _getBytesSent():
try:
# try to sum the summed traffic of all interfaces in NET_IF_TO_MONITOR
counters = psutil.net_io_counters(True)
if all(interface in counters for interface in NET_IF_TO_MONITOR):
return sum(counters[interface].bytes_sent for interface in NET_IF_TO_MONITOR)
# not all interfaces found... return total bytes_sent
return psutil.net_io_counters(False).bytes_sent
except Exception as e:
logger.warning("Cannot get network interface info: %s", e)

Jorrit Schaap
committed
return 0
class IngestTransferServer:
def __init__(self,
job_queuename = DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
notification_busname = DEFAULT_INGEST_NOTIFICATION_BUSNAME,
notification_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX,
mom_credentials = None,
lta_credentials = None,
user = None,
broker = None,
max_nr_of_parallel_jobs = MAX_NR_OF_JOBS):
self.broker = broker
self.job_queuename = job_queuename
self.user = user

Jorrit Schaap
committed
if not self.user:
self.user = getpass.getuser()

Jorrit Schaap
committed
self.mom_credentials = mom_credentials
self.lta_credentials = lta_credentials
self.notification_busname = notification_busname
self.notification_prefix = notification_prefix
self.event_bus = ToBus(notification_busname, broker = broker)
self.max_nr_of_parallel_jobs = max_nr_of_parallel_jobs

Jorrit Schaap
committed
self.__running_jobs = {}

Jorrit Schaap
committed
self.__lock = Lock()

Jorrit Schaap
committed
self.__log_recource_warning = True
self.__prev_bytes_sent = _getBytesSent()
self.__prev_bytes_sent_timestamp = datetime.utcnow()
self.__prev_used_bandwidth = 0.0
self.__running_jobs_log_timestamp = datetime.utcnow()
def __start_job(self, job_dict):
job_id = job_dict['JobId']
if job_id in self.__running_jobs:
logger.warning('job %s is already running', job_id)
return
def threaded_pipeline_func(job):
logger.info('starting job %s in the background', job_id)
ltaClient = LTAClient(self.lta_credentials.user, self.lta_credentials.password)
with MoMClient(self.mom_credentials.user, self.mom_credentials.password) as momClient:
jobPipeline = IngestPipeline(job, momClient, ltaClient,
notification_busname = self.notification_busname,
notification_prefix = self.notification_prefix,
broker = self.broker,
user = self.user)

Jorrit Schaap
committed
with self.__lock:
self.__running_jobs[job_id]['pipeline'] = jobPipeline

Jorrit Schaap
committed
jobPipeline.run()
thread = Thread(target = threaded_pipeline_func, args = [job_dict])

Jorrit Schaap
committed
thread.daemon = True

Jorrit Schaap
committed
with self.__lock:
self.__running_jobs[job_id] = { 'thread':thread }

Jorrit Schaap
committed
thread.start()
def __clearFinishedJobs(self):

Jorrit Schaap
committed
try:
finished_job_ids = [job_id for job_id, job_thread_dict in list(self.__running_jobs.items()) if not job_thread_dict['thread'].is_alive()]

Jorrit Schaap
committed

Jorrit Schaap
committed
for job_id in finished_job_ids:
logger.info('removing finished job %s', job_id)
del self.__running_jobs[job_id]
except Exception as e:
logger.error('__clearFinishedJobs: %s', e)

Jorrit Schaap
committed
def __enoughResourcesAvailable(self):
try:
# helper func to do conditional warning of unavailable resources
# this will print at most only one such message per started job
# so the log won't be flooded
def log_recource_warning(message):
if self.__log_recource_warning:
logger.warning("resources: %s", message)
#self.__log_recource_warning = False

Jorrit Schaap
committed
now = datetime.utcnow()
bytes_sent = _getBytesSent()
if bytes_sent >= self.__prev_bytes_sent: # bytes_sent might wrap around zero
# compute current speed in Gbps
speed = 8 * (bytes_sent - self.__prev_bytes_sent) / totalSeconds(now - self.__prev_bytes_sent_timestamp)

Jorrit Schaap
committed
# running average for used_bandwidth
used_bandwidth = 0.5 * speed + 0.5 * self.__prev_used_bandwidth

Jorrit Schaap
committed
logger.debug("resources: current used_bandwidth = %s", humanreadablesize(used_bandwidth, 'bps'))
# store for next iteration

Jorrit Schaap
committed
self.__prev_bytes_sent = bytes_sent
self.__prev_bytes_sent_timestamp = now
self.__prev_used_bandwidth = used_bandwidth
# only start new jobs if we have some bandwith available
# note that this is a 'soft' limit.
# we cannot control the actual bandwith used by the running transfers
# we can only not start new jobs if we already exceed the MAX_USED_BANDWITH_TO_START_NEW_JOBS
if used_bandwidth > MAX_USED_BANDWITH_TO_START_NEW_JOBS:

Jorrit Schaap
committed
log_recource_warning('not enough bandwith available to start new jobs, using %s, max %s' %
(humanreadablesize(used_bandwidth, 'bps'),
humanreadablesize(MAX_USED_BANDWITH_TO_START_NEW_JOBS, 'bps')))
return False
else:
# wrapped around 0, just store for next iteration, do not compute anything

Jorrit Schaap
committed
self.__prev_bytes_sent = bytes_sent
self.__prev_bytes_sent_timestamp = now
# only start new jobs if we have some cpu time available
idle_cpu_percentage = psutil.cpu_times_percent().idle
logger.debug("resources: current idle_cpu_percentage = %s%%", idle_cpu_percentage)
if idle_cpu_percentage < 5:

Jorrit Schaap
committed
log_recource_warning('not enough cpu power available to start new jobs, cpu_idle %s%%' %

Jorrit Schaap
committed
return False
# only start new jobs if system load is not too high
short_load_avg = os.getloadavg()[0]
cpu_count = psutil.cpu_count()
allowed_load = 1.5 * cpu_count
logger.debug("resources: current short term load = %s #cpu's = %s allowed_load = %s", short_load_avg, cpu_count, allowed_load)
if short_load_avg > allowed_load:

Jorrit Schaap
committed
log_recource_warning('system load too high (%s > %s), cannot start new jobs' %

Jorrit Schaap
committed
return False
# only allow 1 job at the time if swapping
swap_memory_percentage = psutil.swap_memory().percent
logger.debug("resources: current swap_memory_percentage = %s%%", swap_memory_percentage)
if swap_memory_percentage > 5 and len(self.__running_jobs) > 0:

Jorrit Schaap
committed
log_recource_warning('system swapping. not enough memory available to start new jobs')
return False
# only start new jobs if number of processes is not too high

Jorrit Schaap
committed
try:
current_user = getpass.getuser()
current_user_procs = [p for p in psutil.process_iter() if p.username() == current_user]
current_num_user_procs = len(current_user_procs)
allowed_num_user_procs = 64 * cpu_count
logger.debug("resources: current num_user_procs = %s allowed_num_user_procs = %s", current_num_user_procs, allowed_num_user_procs)
if current_num_user_procs > allowed_num_user_procs:

Jorrit Schaap
committed
log_recource_warning('number of processes by %s too high (%s > %s), cannot start new jobs' %
(current_user,
current_num_user_procs,
allowed_num_user_procs))

Jorrit Schaap
committed
return False
except Exception as e:
logger.exception(e)

Jorrit Schaap
committed
pass

Jorrit Schaap
committed
# limit total number of parallel transferring jobs to self.max_nr_of_parallel_jobs

Jorrit Schaap
committed
with self.__lock:
starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict]
pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]

Jorrit Schaap
committed
initializing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_INITIALIZING]
transferring_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_TRANSFERRING]
finalizing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINALIZING]

Jorrit Schaap
committed
num_busy_transfers = len(starting_threads) + len(initializing_pipelines) + len(transferring_pipelines)
num_finalizing_transfers = len(finalizing_pipelines)
logger.debug("resources: current num_busy_transfers = %s num_finalizing_transfers = %s max_nr_of_parallel_jobs = %s",
num_busy_transfers, num_finalizing_transfers, self.max_nr_of_parallel_jobs)
if num_busy_transfers >= self.max_nr_of_parallel_jobs:
log_recource_warning('already running %d parallel jobs (#starting=%d, #transferring=%d) limiting the total number of transferring jobs to %d' %

Jorrit Schaap
committed
(len(self.__running_jobs),
len(initializing_pipelines) + len(starting_threads),
len(transferring_pipelines),
self.max_nr_of_parallel_jobs))
return False
if num_finalizing_transfers >= 2 * self.max_nr_of_parallel_jobs:
log_recource_warning('already waiting for %d jobs to finish (updating status/SIP to MoM and LTA). not starting new jobs until some jobs finished...' %
(len(finalizing_pipelines),))

Jorrit Schaap
committed
return False

Jorrit Schaap
committed
except Exception as e:
logger.error(e)
# unknown error, run 1 job at a time

Jorrit Schaap
committed
return len(self.__running_jobs) == 0
return True
def run(self):
log_recource_warning = True
with FromBus(address = self.job_queuename, broker = self.broker) as job_frombus, self.event_bus as _1:

Jorrit Schaap
committed
while True:

Jorrit Schaap
committed
try:

Jorrit Schaap
committed
try:
if self.__enoughResourcesAvailable():
logger.info("enough resources available to start new jobs. waiting for new job on %s", job_frombus.address)
msg = job_frombus.receive(timeout = 10)
if msg:
logger.info("received msg on job queue: %s", msg)
if isinstance(msg, CommandMessage):
job_dict = parseJobXml(msg.content)
logger.info("received job: %s", job_dict)
self.__start_job(job_dict)
# allow 1 new recource_warning to be logged
self.__log_recource_warning = True
else:
logger.warning("unexpected message type: %s", msg)
else:
# wait for resource to become available
time.sleep(5)

Jorrit Schaap
committed
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)
self.__clearFinishedJobs()
try:
# sleep a little
# so jobs have a little time to start consuming resources
# this limits the numer of jobs that can be started to 1000 starts per minute
# it does not limit the total number of parallel jobs
# that is limited dynamically by __enoughResourcesAvailable
# and by the hard limit self.max_nr_of_parallel_jobs
time.sleep(0.1)
# if already running at high bandwith usages,
# we can sleep a little extra depending on how close we are to the MAX_USED_BANDWITH_TO_START_NEW_JOBS
if self.__prev_used_bandwidth > 0.5 * MAX_USED_BANDWITH_TO_START_NEW_JOBS:

Jorrit Schaap
committed
time.sleep(0.5)
if self.__prev_used_bandwidth > 0.85 * MAX_USED_BANDWITH_TO_START_NEW_JOBS:

Jorrit Schaap
committed
time.sleep(1.0)
if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds = 2):

Jorrit Schaap
committed
self.__running_jobs_log_timestamp = datetime.utcnow()
with self.__lock:
starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict]
pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]

Jorrit Schaap
committed
initializing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_INITIALIZING]
transferring_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_TRANSFERRING]
finalizing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINALIZING]
finished_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINISHED]
status_log_line = "status: running %s jobs: #starting=%d, #transferring=%d, #finalizing=%d, #finished=%d, bandwith used on network interface(s) %s %s (%s), load=%.1f" % (len(self.__running_jobs),

Jorrit Schaap
committed
len(initializing_pipelines) + len(starting_threads),
len(transferring_pipelines),
len(finalizing_pipelines),
len(finished_pipelines),
NET_IF_TO_MONITOR,
humanreadablesize(self.__prev_used_bandwidth, 'bps'),
humanreadablesize(self.__prev_used_bandwidth / 8, 'Bps'),

Jorrit Schaap
committed
os.getloadavg()[0])

Jorrit Schaap
committed
msg = EventMessage(context = self.notification_prefix + 'TransferServiceStatus',
content = { 'ingest_server': socket.gethostname(),
'message' : status_log_line })
msg.ttl = 3600 # remove message from queue's when not picked up within 1 hours
self.event_bus.send(msg)

Jorrit Schaap
committed
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)

Jorrit Schaap
committed

Jorrit Schaap
committed
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)

Jorrit Schaap
committed

Jorrit Schaap
committed
def main():
# make sure we run in UTC timezone
import os
os.environ['TZ'] = 'UTC'
from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description = 'runs the ingest transfer server which picks up as many jobs as it can handle from the given --ingest_job_queuename and tranfers the dataproducts to the LTA, updates the LTA catalogue, and updates MoM')
parser.add_option('-q', '--broker', dest = 'broker', type = 'string',
default = DEFAULT_BROKER,
help = 'Address of the qpid broker, default: %default')
parser.add_option("--ingest_job_queuename", dest = "ingest_job_queuename", type = "string",
default = DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
help = "Name of the job queue. [default: %default]")
parser.add_option("-p", "--max_nr_of_parallel_jobs", dest = "max_nr_of_parallel_jobs", type = "int",
default = MAX_NR_OF_JOBS,
help = "Name of the job queue. [default: %default]")
parser.add_option('--ingest_notification_busname', dest = 'ingest_notification_busname', type = 'string', default = DEFAULT_INGEST_NOTIFICATION_BUSNAME, help = 'Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
parser.add_option("--ingest_notification_prefix", dest = "ingest_notification_prefix", type = "string", default = DEFAULT_INGEST_NOTIFICATION_PREFIX, help = "The prefix for all notifications of this publisher, [default: %default]")
parser.add_option("-u", "--user", dest = "user", type = "string", default = getpass.getuser(), help = "username for to login on data source host, [default: %default]")
parser.add_option("-l", "--lta_credentials", dest = "lta_credentials", type = "string",
default = 'LTA' if isProductionEnvironment() else 'LTA_test',
help = "Name of lofar credentials for lta user/pass (see ~/.lofar/dbcredentials) [default=%default]")
parser.add_option("-m", "--mom_credentials", dest = "mom_credentials", type = "string",
default = 'MoM_site' if isProductionEnvironment() else 'MoM_site_test',
help = "Name of credentials for MoM user/pass (see ~/.lofar/dbcredentials) [default=%default]")
parser.add_option('-V', '--verbose', dest = 'verbose', action = 'store_true', help = 'verbose logging')

Jorrit Schaap
committed
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s',
level = logging.DEBUG if options.verbose else logging.INFO)

Jorrit Schaap
committed
logger.info('*****************************************')
logger.info('Started ingest server on host %s', socket.gethostname())
logger.info('*****************************************')
logger.info("environment:")
for k in sorted(os.environ):
logger.info("%s=%s", k, os.environ[k])
logger.info('*****************************************')

Jorrit Schaap
committed
ltacreds = dbcredentials.DBCredentials().get(options.lta_credentials)
momcreds = dbcredentials.DBCredentials().get(options.mom_credentials)
server = IngestTransferServer(job_queuename = options.ingest_job_queuename,
broker = options.broker,
mom_credentials = momcreds,
lta_credentials = ltacreds,
max_nr_of_parallel_jobs = options.max_nr_of_parallel_jobs)
server.run()

Jorrit Schaap
committed
if __name__ == '__main__':
main()
__all__ = ['main']