Newer
Older

Jorrit Schaap
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
"""
"""
import qpid.messaging
import logging
from datetime import datetime, timedelta
import os
import time
import socket
import getpass
from threading import Thread
from lofar.messaging.messagebus import FromBus
from lofar.messaging.messages import *
from lofar.common import isProductionEnvironment
from lofar.common import dbcredentials
from lofar.common.datetimeutils import totalSeconds
from lofar.common.util import humanreadablesize
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME
from lofar.lta.ingest.server.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.common.config import DEFAULT_BROKER
from lofar.lta.ingest.server.config import MAX_NR_OF_JOBS, MAX_USED_BANDWITH_TO_START_NEW_JOBS, NET_IF_TO_MONITOR

Jorrit Schaap
committed
from lofar.lta.ingest.server.config import TRANSFER_TIMEOUT

Jorrit Schaap
committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.server.ingestpipeline import IngestPipeline
from lofar.lta.ingest.server.ltaclient import *
from lofar.lta.ingest.server.momclient import *
try:
import psutil
except ImportError as e:
print str(e)
print 'Please install python package psutil: pip install psutil'
exit(1)
logger = logging.getLogger(__name__)
def _getBytesSent():
try:
return psutil.net_io_counters(True).get(NET_IF_TO_MONITOR, psutil.net_io_counters(False)).bytes_sent
except Exception:
return 0
class IngestTransferServer:
def __init__(self,
job_queuename=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
notification_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME,
notification_prefix=DEFAULT_INGEST_NOTIFICATION_PREFIX,
mom_credentials=None,
lta_credentials=None,
user=None,
broker=None,
max_nr_of_parallel_jobs=MAX_NR_OF_JOBS):
self.broker = broker
self.user = user
if not self.user:
self.user=getpass.getuser()
self.mom_credentials = mom_credentials
self.lta_credentials = lta_credentials
self.notification_busname = notification_busname
self.notification_prefix = notification_prefix
self.max_nr_of_parallel_jobs = max_nr_of_parallel_jobs
self.job_frombus = FromBus(address=job_queuename, broker=broker)
self.__running_jobs = {}
self.__log_recource_warning = True
self.__prev_bytes_sent = _getBytesSent()
self.__prev_bytes_sent_timestamp = datetime.utcnow()
self.__prev_used_bandwidth = 0.0
self.__running_jobs_log_timestamp = datetime.utcnow()
def __start_job(self, job_dict):
job_id = job_dict['JobId']
if job_id in self.__running_jobs:
logger.warning('job %s is already running', job_id)
return
def threaded_pipeline_func(job):
logger.info('starting job %s in the background', job_id)
ltaClient = LTAClient(self.lta_credentials.user, self.lta_credentials.password)
with MoMClient(self.mom_credentials.user, self.mom_credentials.password) as momClient:
jobPipeline = IngestPipeline(job, momClient, ltaClient,
notification_busname=self.notification_busname,
notification_prefix=self.notification_prefix,
broker=self.broker,
user=self.user)
jobPipeline.run()
thread = Thread(target=threaded_pipeline_func, args=[job_dict])
thread.daemon = True
self.__running_jobs[job_id] = thread
thread.start()
def __clearFinishedJobs(self):

Jorrit Schaap
committed
try:
finished_job_ids = [job_id for job_id, job in self.__running_jobs.items() if not job.is_alive()]

Jorrit Schaap
committed

Jorrit Schaap
committed
for job_id in finished_job_ids:
logger.info('removing finished job %s', job_id)
del self.__running_jobs[job_id]
except Exception as e:
logger.error('__clearFinishedJobs: %s', e)

Jorrit Schaap
committed
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def __enoughResourcesAvailable(self):
try:
# helper func to do conditional warning of unavailable resources
# this will print at most only one such message per started job
# so the log won't be flooded
def log_recource_warning(message):
if self.__log_recource_warning:
logger.warn(message)
self.__log_recource_warning = False
import pprint
now = datetime.utcnow()
bytes_sent = _getBytesSent()
if bytes_sent >= self.__prev_bytes_sent: #bytes_sent might wrap around zero
#compute current speed in Gbps
speed = 8*(bytes_sent - self.__prev_bytes_sent) / totalSeconds(now - self.__prev_bytes_sent_timestamp)
#running average for used_bandwidth
used_bandwidth = 0.5*speed + 0.5*self.__prev_used_bandwidth
#store for next iteration
self.__prev_bytes_sent = bytes_sent
self.__prev_bytes_sent_timestamp = now
self.__prev_used_bandwidth = used_bandwidth
# only start new jobs if we have some bandwith available
# note that this is a 'soft' limit.
# we cannot control the actual bandwith used by the running transfers
# we can only not start new jobs if we already exceed the MAX_USED_BANDWITH_TO_START_NEW_JOBS
if used_bandwidth > MAX_USED_BANDWITH_TO_START_NEW_JOBS:
log_recource_warning('not enough bandwith available to start new jobs, using %s, max %s' %
(humanreadablesize(used_bandwidth, 'bps'),
humanreadablesize(MAX_USED_BANDWITH_TO_START_NEW_JOBS, 'bps')))
return False
else:
#wrapped around 0, just store for next iteration, do not compute anything
self.__prev_bytes_sent = bytes_sent
self.__prev_bytes_sent_timestamp = now
# only start new jobs if we have some cpu time available
if psutil.cpu_times_percent().idle < 5:
log_recource_warning('not enough cpu power available to start new jobs, cpu_idle %s%%' %
psutil.cpu_times_percent().idle)
return False
# only start new jobs if system load is not too high
if os.getloadavg()[0] > psutil.cpu_count():
log_recource_warning('system load too high (%s > %s), cannot start new jobs' %
(os.getloadavg()[0],
psutil.cpu_count()))
return False
# only allow 1 job at the time if swapping
if psutil.swap_memory().percent > 5 and len(self.__running_jobs) > 0:
log_recource_warning('system swapping. not enough memory available to start new jobs')
return False
# only start new jobs if number of processes is not too high

Jorrit Schaap
committed
try:
current_user = getpass.getuser()
current_user_procs = [p for p in psutil.process_iter() if p.username() == current_user]
if len(current_user_procs) > 64*psutil.cpu_count():
log_recource_warning('number of processes by %s too high (%s > %s), cannot start new jobs' %
(current_user,
len(current_user_procs),
64*psutil.cpu_count()))
return False
except:
pass

Jorrit Schaap
committed
#limit total number of parallel jobs to self.max_nr_of_parallel_jobs
if len(self.__running_jobs) >= self.max_nr_of_parallel_jobs:
log_recource_warning('already running %s parallel jobs. limiting the total number of jobs to %s' %
(len(self.__running_jobs),
self.max_nr_of_parallel_jobs))
return False
except Exception as e:
logger.error(e)
#unknown error, run 1 job at a time
return len(self.__running_jobs) == 0
return True
def run(self):
log_recource_warning = True
while True:
try:

Jorrit Schaap
committed
try:
if self.__enoughResourcesAvailable():
with self.job_frombus:
msg = self.job_frombus.receive(timeout=60)
if msg:
logger.debug("received msg on job queue: %s", msg)
self.job_frombus.ack(msg)
if isinstance(msg, CommandMessage):
job_dict = parseJobXml(msg.content)
logger.info("received job: %s", job_dict)
self.__start_job(job_dict)
#allow 1 new recource_warning to be logged
self.__log_recource_warning = True
else:
logger.warn("unexpected message type: %s", msg)

Jorrit Schaap
committed
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)

Jorrit Schaap
committed
self.__clearFinishedJobs()

Jorrit Schaap
committed
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
try:
# sleep a little
# so jobs have a little time to start consuming resources
# this limits the numer of jobs that can be started to 1000 starts per minute
# it does not limit the total number of parallel jobs
# that is limited dynamically by __enoughResourcesAvailable
# and by the hard limit self.max_nr_of_parallel_jobs
time.sleep(0.1)
# if already running at high bandwith usages,
# we can sleep a little extra depending on how close we are to the MAX_USED_BANDWITH_TO_START_NEW_JOBS
if self.__prev_used_bandwidth > 0.5*MAX_USED_BANDWITH_TO_START_NEW_JOBS:
time.sleep(0.5)
if self.__prev_used_bandwidth > 0.8*MAX_USED_BANDWITH_TO_START_NEW_JOBS:
time.sleep(2.5)
if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10):
self.__running_jobs_log_timestamp = datetime.utcnow()
logger.info("running %s jobs, bandwith used on network interface %s %s (%s)",
len(self.__running_jobs),
NET_IF_TO_MONITOR,
humanreadablesize(self.__prev_used_bandwidth, 'bps'),
humanreadablesize(self.__prev_used_bandwidth/8, 'Bps'))
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)

Jorrit Schaap
committed
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)
def main():
# make sure we run in UTC timezone
import os
os.environ['TZ'] = 'UTC'
from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the ingest transfer server which picks up as many jobs as it can handle from the given --ingest_job_queuename and tranfers the dataproducts to the LTA, updates the LTA catalogue, and updates MoM')
parser.add_option('-q', '--broker', dest='broker', type='string',
default=DEFAULT_BROKER,
help='Address of the qpid broker, default: %default')
parser.add_option("--ingest_job_queuename", dest="ingest_job_queuename", type="string",
default=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
help="Name of the job queue. [default: %default]")
parser.add_option("-p", "--max_nr_of_parallel_jobs", dest="max_nr_of_parallel_jobs", type="int",
default=MAX_NR_OF_JOBS,
help="Name of the job queue. [default: %default]")
parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', default=DEFAULT_INGEST_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
parser.add_option("--ingest_notification_prefix", dest="ingest_notification_prefix", type="string", default=DEFAULT_INGEST_NOTIFICATION_PREFIX, help="The prefix for all notifications of this publisher, [default: %default]")
parser.add_option("-u", "--user", dest="user", type="string", default=getpass.getuser(), help="username for to login on data source host, [default: %default]")
parser.add_option("-l", "--lta_credentials", dest="lta_credentials", type="string",
default='LTA' if isProductionEnvironment() else 'LTA_test',
help="Name of lofar credentials for lta user/pass (see ~/.lofar/dbcredentials) [default=%default]")
parser.add_option("-m", "--mom_credentials", dest="mom_credentials", type="string",
default='MoM_site' if isProductionEnvironment() else 'MoM_site_test',
help="Name of credentials for MoM user/pass (see ~/.lofar/dbcredentials) [default=%default]")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
logger.info('*****************************************')
logger.info('Started ingest server on host %s', socket.gethostname())
logger.info('*****************************************')
ltacreds = dbcredentials.DBCredentials().get(options.lta_credentials)
momcreds = dbcredentials.DBCredentials().get(options.mom_credentials)
server = IngestTransferServer(job_queuename=options.ingest_job_queuename,
broker=options.broker,
mom_credentials=momcreds,
lta_credentials=ltacreds,
max_nr_of_parallel_jobs=options.max_nr_of_parallel_jobs)
server.run()

Jorrit Schaap
committed
if __name__ == '__main__':
main()
__all__ = ['main']