-
Jan David Mol authoredJan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ingesttransferserver.py 22.95 KiB
#!/usr/bin/env python3
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
"""
"""
import logging
from datetime import datetime, timedelta
import os
import time
import socket
import getpass
import pprint
from threading import Thread, RLock
from lofar.messaging import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME, BusListener, AbstractMessageHandler
from lofar.messaging import LofarMessage, CommandMessage, EventMessage
from lofar.common import isProductionEnvironment
from lofar.common import dbcredentials
from lofar.common.metrics import start_metrics_server
from lofar.common.datetimeutils import totalSeconds
from lofar.common.util import humanreadablesize
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.server.config import MAX_NR_OF_JOBS, MAX_USED_BANDWITH_TO_START_NEW_JOBS, NET_IF_TO_MONITOR
from lofar.lta.ingest.server.config import TRANSFER_TIMEOUT, PROMETHEUS_PORT
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.server.ingestpipeline import IngestPipeline
from lofar.lta.ingest.client.rpc import IngestRPC
from lofar.lta.ingest.server.ltaclient import *
import psutil
from lofar.common.util import waitForInterrupt
from prometheus_client import Gauge, Counter
logger = logging.getLogger(__name__)
def _getBytesSent():
try:
# try to sum the summed traffic of all interfaces in NET_IF_TO_MONITOR
counters = psutil.net_io_counters(True)
if all(interface in counters for interface in NET_IF_TO_MONITOR):
return sum(counters[interface].bytes_sent for interface in NET_IF_TO_MONITOR)
# not all interfaces found... return total bytes_sent
return psutil.net_io_counters(False).bytes_sent
except Exception as e:
logger.warning("Cannot get network interface info: %s", e)
return 0
metric_bytes_sent = Counter("ingest_eth_bytes_sent", "Number of bytes sent over the network interfaces towards the LTA")
metric_bytes_sent.set_function(lambda: _getBytesSent())
class IngestTransferServer:
metric_is_alive = Counter("alive", "Gets incremented periodically while the service is alive")
metric_not_enough_resources_reasons = Counter("ingest_not_enough_resources", "Which resources were too tight when a pipeline needed to be started", labelnames=["resource"])
def __init__(self,
exchange = DEFAULT_BUSNAME,
mom_credentials = None,
lta_credentials = None,
user = None,
broker = DEFAULT_BROKER,
max_nr_of_parallel_jobs = MAX_NR_OF_JOBS):
self.user = user
if not self.user:
self.user = getpass.getuser()
self.mom_credentials = mom_credentials
self.lta_credentials = lta_credentials
self.event_bus = ToBus(exchange=exchange, broker = broker)
self.max_nr_of_parallel_jobs = max_nr_of_parallel_jobs
self.incoming_jobs_listener = BusListener(IngestJobsForTransferHandler, {'transfer_server': self},
exchange=exchange, broker=broker,
routing_key="%s.#" % DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT)
self.__running_thread = None
self.__running_jobs = {}
self.__is_running = False
self.__lock = RLock()
self.__prev_bytes_sent = _getBytesSent()
self.__prev_bytes_sent_timestamp = datetime.utcnow()
self.__prev_used_bandwidth = 0.0
self.__running_jobs_log_timestamp = datetime.utcnow()
# register Prometheus metrics
metric_nr_running_jobs = Gauge("ingest_running_jobs", "Number of jobs currently running")
metric_nr_running_jobs.set_function(lambda: len(self.__running_jobs))
def nr_pipelines_with_status(pipeline_status: int):
with self.__lock:
pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]
return len([pipeline for pipeline in pipelines if pipeline.status == pipeline_status])
metric_nr_running_pipelines = Gauge("ingest_running_pipelines", "Number of pipelines currently running", labelnames=["status"])
metric_nr_running_pipelines.labels(status="initializing").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_INITIALIZING))
metric_nr_running_pipelines.labels(status="transferring").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_TRANSFERRING))
metric_nr_running_pipelines.labels(status="finalizing").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_FINALIZING))
metric_nr_running_pipelines.labels(status="finished").set_function(lambda: nr_pipelines_with_status(IngestPipeline.STATUS_FINISHED))
def start_job(self, job_dict):
if not self.enoughResourcesAvailable():
raise ResourceWarning("Not enough resources available to start new job: %s" % job_dict)
job_id = job_dict['JobId']
if job_id in self.__running_jobs:
raise ValueError('job %s is already running. Discarding this new job copy, and keeping the current one running...' % job_id)
def threaded_pipeline_func(job):
logger.info('starting job %s in the background', job_id)
ltaClient = LTAClient(self.lta_credentials.user, self.lta_credentials.password)
with ltaClient:
jobPipeline = IngestPipeline(job, ltaClient,
exchange = self.event_bus.exchange,
broker = self.event_bus.broker,
user = self.user)
with self.__lock:
self.__running_jobs[job_id]['pipeline'] = jobPipeline
jobPipeline.run()
with self.__lock:
thread = Thread(target = threaded_pipeline_func,
args = [job_dict],
name="transfer_thread_%s"%(job_id,))
thread.daemon = True
self.__running_jobs[job_id] = { 'thread':thread }
thread.start()
def __clearFinishedJobs(self):
try:
with self.__lock:
finished_job_ids = [job_id for job_id, job_thread_dict in list(self.__running_jobs.items()) if not job_thread_dict['thread'].is_alive()]
for job_id in finished_job_ids:
logger.info('removing finished job %s', job_id)
del self.__running_jobs[job_id]
except Exception as e:
logger.error('__clearFinishedJobs: %s', e)
def enoughResourcesAvailable(self):
try:
now = datetime.utcnow()
bytes_sent = _getBytesSent()
if bytes_sent >= self.__prev_bytes_sent: # bytes_sent might wrap around zero
# compute current speed in Gbps
speed = 8 * (bytes_sent - self.__prev_bytes_sent) / totalSeconds(now - self.__prev_bytes_sent_timestamp)
# running average for used_bandwidth
used_bandwidth = 0.5 * speed + 0.5 * self.__prev_used_bandwidth
logger.debug("resources: current used_bandwidth = %s", humanreadablesize(used_bandwidth, 'bps'))
# store for next iteration
self.__prev_bytes_sent = bytes_sent
self.__prev_bytes_sent_timestamp = now
self.__prev_used_bandwidth = used_bandwidth
# only start new jobs if we have some bandwith available
# note that this is a 'soft' limit.
# we cannot control the actual bandwith used by the running transfers
# we can only not start new jobs if we already exceed the MAX_USED_BANDWITH_TO_START_NEW_JOBS
if used_bandwidth > MAX_USED_BANDWITH_TO_START_NEW_JOBS:
logger.warning('resources: not enough bandwith available to start new jobs, using %s, max %s' %
(humanreadablesize(used_bandwidth, 'bps'),
humanreadablesize(MAX_USED_BANDWITH_TO_START_NEW_JOBS, 'bps')))
self.metric_not_enough_resources_reasons.labels(resource="bandwidth").inc()
return False
else:
# wrapped around 0, just store for next iteration, do not compute anything
self.__prev_bytes_sent = bytes_sent
self.__prev_bytes_sent_timestamp = now
# only start new jobs if we have some cpu time available
idle_cpu_percentage = psutil.cpu_times_percent().idle
logger.debug("resources: current idle_cpu_percentage = %s%%", idle_cpu_percentage)
if idle_cpu_percentage < 5:
logger.warning('resources: not enough cpu power available to start new jobs, cpu_idle %s%%' %
idle_cpu_percentage)
self.metric_not_enough_resources_reasons.labels(resource="cpu").inc()
return False
# only start new jobs if system load is not too high
short_load_avg = os.getloadavg()[0]
cpu_count = psutil.cpu_count()
allowed_load = 1.5 * cpu_count
logger.debug("resources: current short term load = %s #cpu's = %s allowed_load = %s", short_load_avg, cpu_count, allowed_load)
if short_load_avg > allowed_load:
logger.warning('resources: system load too high (%s > %s), cannot start new jobs' %
(short_load_avg,
allowed_load))
self.metric_not_enough_resources_reasons.labels(resource="load").inc()
return False
# only allow 1 job at the time if swapping
swap_memory_percentage = psutil.swap_memory().percent
logger.debug("resources: current swap_memory_percentage = %s%%", swap_memory_percentage)
if swap_memory_percentage > 5 and len(self.__running_jobs) > 0:
logger.warning('resources: system swapping. not enough memory available to start new jobs')
self.metric_not_enough_resources_reasons.labels(resource="memory").inc()
return False
# only start new jobs if number of processes is not too high
try:
current_user = getpass.getuser()
current_user_procs = [p for p in psutil.process_iter() if p.username() == current_user]
current_num_user_procs = len(current_user_procs)
allowed_num_user_procs = 64 * cpu_count
logger.debug("resources: current num_user_procs = %s allowed_num_user_procs = %s", current_num_user_procs, allowed_num_user_procs)
if current_num_user_procs > allowed_num_user_procs:
logger.warning('resources: number of processes by %s too high (%s > %s), cannot start new jobs' %
(current_user,
current_num_user_procs,
allowed_num_user_procs))
self.metric_not_enough_resources_reasons.labels(resource="process_count").inc()
return False
except Exception as e:
logger.exception(e)
pass
# limit total number of parallel transferring jobs to self.max_nr_of_parallel_jobs
with self.__lock:
starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict]
pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]
initializing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_INITIALIZING]
transferring_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_TRANSFERRING]
finalizing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINALIZING]
num_busy_transfers = len(starting_threads) + len(initializing_pipelines) + len(transferring_pipelines)
num_finalizing_transfers = len(finalizing_pipelines)
logger.debug("resources: current num_busy_transfers = %s num_finalizing_transfers = %s max_nr_of_parallel_jobs = %s",
num_busy_transfers, num_finalizing_transfers, self.max_nr_of_parallel_jobs)
if num_busy_transfers >= self.max_nr_of_parallel_jobs:
logger.warning('resources: already running %d parallel jobs (#starting=%d, #transferring=%d) limiting the total number of transferring jobs to %d' %
(len(self.__running_jobs),
len(initializing_pipelines) + len(starting_threads),
len(transferring_pipelines),
self.max_nr_of_parallel_jobs))
self.metric_not_enough_resources_reasons.labels(resource="parallel_jobs").inc()
return False
if num_finalizing_transfers >= 2 * self.max_nr_of_parallel_jobs:
logger.warning('resources: already waiting for %d jobs to finish (updating status/SIP to MoM and LTA). not starting new jobs until some jobs finished...' %
(len(finalizing_pipelines),))
self.metric_not_enough_resources_reasons.labels(resource="parallel_jobs").inc()
return False
except Exception as e:
logger.exception("error while checking for available resources: %s", e)
num_running_jobs = len(self.__running_jobs)
if num_running_jobs <= 4:
logger.info("running %d jobs, assuming we can run 1 more: ", num_running_jobs)
return True
else:
logger.warning("already running %d jobs, assuming for safety we cannot run more jobs...", num_running_jobs)
self.metric_not_enough_resources_reasons.labels(resource="error_fallback").inc()
return False
return True
def __enter__(self):
self.__running_thread = Thread(target=self.run, daemon=True)
self.__running_thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.__running_thread is not None and self.__running_thread.is_alive():
self.quit()
self.__running_thread.join()
self.__running_thread = None
@property
def is_running(self) -> bool:
with self.__lock:
return self.__is_running
def quit(self):
with self.__lock:
self.__is_running = False
def run(self):
with self.__lock:
self.__is_running = True
with self.event_bus, self.incoming_jobs_listener:
while self.is_running:
try:
metric_is_alive.inc()
self.__clearFinishedJobs()
with self.__lock:
starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict]
pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict]
initializing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_INITIALIZING]
transferring_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_TRANSFERRING]
finalizing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINALIZING]
finished_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINISHED]
log_interval = 5 if self.__running_jobs else 60
if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=log_interval):
status_log_line = "status: running %s jobs: #starting=%d, #transferring=%d, #finalizing=%d, #finished=%d, bandwith used on network interface(s) %s %s (%s), load=%.1f" % (len(self.__running_jobs),
len(initializing_pipelines) + len(starting_threads),
len(transferring_pipelines),
len(finalizing_pipelines),
len(finished_pipelines),
NET_IF_TO_MONITOR,
humanreadablesize(self.__prev_used_bandwidth, 'bps'),
humanreadablesize(self.__prev_used_bandwidth / 8, 'Bps'),
os.getloadavg()[0])
logger.info(status_log_line)
self.__running_jobs_log_timestamp = datetime.utcnow()
msg = EventMessage(subject = "%s.%s" % (INGEST_NOTIFICATION_PREFIX, 'TransferServiceStatus'),
content = { 'ingest_server': socket.gethostname(),
'message' : status_log_line })
msg.ttl = 3600 # remove message from queue's when not picked up within 1 hours
self.event_bus.send(msg)
time.sleep(5)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)
class IngestJobsForTransferHandler(AbstractMessageHandler):
def __init__(self, transfer_server: IngestTransferServer):
self._transfer_server = transfer_server
super(IngestJobsForTransferHandler, self).__init__()
def before_receive_message(self):
while not self._transfer_server.enoughResourcesAvailable():
logger.info("Waiting for resources to become available before receiving a new job...")
time.sleep(10)
def handle_message(self, msg: LofarMessage):
if not isinstance(msg, CommandMessage):
raise ValueError("%s: Ignoring non-CommandMessage: %s" % (self.__class__.__name__, msg))
job = parseJobXml(msg.content)
if job and job.get('JobId'):
self._transfer_server.start_job(job)
# sleep a little
# so jobs have a little time to start consuming resources
# this limits the numer of jobs that can be started to 1000 starts per minute
# it does not limit the total number of parallel jobs
# that is limited dynamically by enoughResourcesAvailable
# and by the hard limit self.max_nr_of_parallel_jobs
time.sleep(0.1)
def main():
# make sure we run in UTC timezone
import os, sys
os.environ['TZ'] = 'UTC'
from optparse import OptionParser, OptionGroup
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description = 'runs the ingest transfer server which picks up as many jobs as it can handle from the given --ingest_job_queuename and tranfers the dataproducts to the LTA, updates the LTA catalogue, and updates MoM')
parser.add_option('-b', '--broker', dest = 'broker', type = 'string',
default = DEFAULT_BROKER,
help = 'Address of the qpid broker, default: %default')
parser.add_option("-p", "--max_nr_of_parallel_jobs", dest = "max_nr_of_parallel_jobs", type = "int",
default = MAX_NR_OF_JOBS,
help = "Name of the job queue. [default: %default]")
parser.add_option('-e', '--exchange', dest = 'exchange', type = 'string', default = DEFAULT_BUSNAME, help = 'Name of the common bus exchange on the broker, default: %default')
parser.add_option("-u", "--user", dest = "user", type = "string", default = getpass.getuser(), help = "username for to login on data source host, [default: %default]")
parser.add_option("-l", "--lta_credentials", dest = "lta_credentials", type = "string",
default = 'LTA' if isProductionEnvironment() else 'LTA_test',
help = "Name of lofar credentials for lta user/pass (see ~/.lofar/dbcredentials) [default=%default]")
parser.add_option("-m", "--mom_credentials", dest = "mom_credentials", type = "string",
default = 'MoM_site' if isProductionEnvironment() else 'MoM_site_test',
help = "Name of credentials for MoM user/pass (see ~/.lofar/dbcredentials) [default=%default]")
parser.add_option('-V', '--verbose', dest = 'verbose', action = 'store_true', help = 'verbose logging')
group = OptionGroup(parser, "Monitoring options")
parser.add_option_group(group)
group.add_option(
"--prometheus_port",
dest="prometheus_port",
type="int",
default=PROMETHEUS_PORT,
help="Port to open for Prometheus end point, default: %default",
)
(options, args) = parser.parse_args()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
logger.info('*****************************************')
logger.info('Started ingest server on host %s', socket.gethostname())
logger.info('*****************************************')
logger.info("environment:")
for k in sorted(os.environ):
logger.info("%s=%s", k, os.environ[k])
logger.info('*****************************************')
ltacreds = dbcredentials.DBCredentials().get(options.lta_credentials)
momcreds = dbcredentials.DBCredentials().get(options.mom_credentials)
# initialise prometheus end point
start_metrics_server(options.prometheus_port)
with IngestTransferServer(exchange = options.exchange, broker = options.broker,
mom_credentials = momcreds, lta_credentials = ltacreds,
max_nr_of_parallel_jobs = options.max_nr_of_parallel_jobs):
waitForInterrupt()
if __name__ == '__main__':
main()
__all__ = ['main']