Newer
Older

Jorrit Schaap
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener
from lofar.lta.ingest.common.job import *
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX
from lofar.lta.ingest.server.config import DEFAULT_INGEST_BUSNAME, DEFAULT_INGEST_SERVICENAME
from lofar.lta.ingest.common.config import DEFAULT_BROKER
from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, DEFAULT_JOB_PRIORITY
from lofar.lta.ingest.server.config import DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME
from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME
from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import convertIntKeysToString
import os
import os.path
import fnmatch
import shutil
import time
from threading import RLock
from datetime import datetime, timedelta
import logging
logger = logging.getLogger()
class IngestServiceMessageHandler(MessageHandlerInterface):
def __init__(self, job_manager, **kwargs):
super(IngestServiceMessageHandler, self).__init__(**kwargs)
self._job_manager = job_manager
self.service2MethodMap = { 'RemoveExportJob': self._job_manager.removeExportJob,
'SetExportJobPriority': self._job_manager.setExportJobPriority,
'GetStatusReport': self._job_manager.getStatusReportDict,
'GetReport': self._job_manager.getReport,
'GetExportIds': self._job_manager.getExportIds }
class IngestJobManager:
def __init__(self,
busname=DEFAULT_INGEST_BUSNAME,
servicename=DEFAULT_INGEST_SERVICENAME,
notification_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME,
notification_prefix=DEFAULT_INGEST_NOTIFICATION_PREFIX,
notification_listen_queue_name=DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME,
incoming_job_queue_name=DEFAULT_INGEST_JOBS_QUEUENAME,
jobs_for_transfer_queue_name=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME,
jobs_dir=JOBS_DIR,
max_num_retries=MAX_NR_OF_RETRIES,
broker=None,
**kwargs):
self.__notification_listener = IngestBusListener(busname=notification_listen_queue_name, subjects=notification_prefix+'*', broker=broker)
self.__notification_listener.onJobStarted = self.onJobStarted
self.__notification_listener.onJobProgress = self.onJobProgress
self.__notification_listener.onJobTransferFailed = self.onJobTransferFailed
self.__notification_listener.onJobFinished = self.onJobFinished
self.__jobs_dir = jobs_dir
self.__max_num_retries = max_num_retries
self.__job_admin_dicts = {}
self.__lock = RLock()
self.__running = False
self.notification_prefix = notification_prefix
self.event_bus = ToBus(notification_busname, broker=broker)
self.service = Service(servicename,
IngestServiceMessageHandler,
busname=busname,
broker=broker,
use_service_methods=True,
numthreads=1,
handler_args={'job_manager': self})
#incoming jobs (from mom, eor, user ingest, etc)
self.__incoming_job_queue = FromBus(incoming_job_queue_name, broker=broker)
#queue into which this manager produces jobs, which are consumened for actual data transfer by the ingestservices
self.__jobs_for_transfer_queue = ToBus(jobs_for_transfer_queue_name, broker=broker)
# the peeker frombus is used to see if the managed job queue we submit jobs to is emptyied by consumers
# we want this manager to produce jobs just in time, so we can actively shuffle jobs priorities etc.
self.__jobs_for_transfer_queue_peeker = FromBus(jobs_for_transfer_queue_name, broker=broker)
self.__running_jobs_log_timestamp = datetime.utcnow()
self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()

Jorrit Schaap
committed
def quit(self):
self.__running = False
def run(self):
self.__running = True
# start with full jobs dir scan to retreive state from disk
self.scanJobsdir()
logger.info('starting listening for new jobs and notifications')
#open queue connections...
with self.service as _1, self.__incoming_job_queue as _2, self.__jobs_for_transfer_queue as _3, self.__notification_listener as _4, self.event_bus as _5:
#... and run the event loop,
#produce jobs to managed job queue for ingest transfer services
#receive new jobs
logger.info('starting to produce jobs')
while self.__running:
try:
#produce next jobs

Jorrit Schaap
committed
self.produceNextJobsIfPossible()
#receive any jobs from mom/user_ingest/eor/etc
receive_start = datetime.utcnow()
msg = self.__incoming_job_queue.receive(timeout=0.01)

Jorrit Schaap
committed
while msg:
logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg)
self.__incoming_job_queue.ack(msg)
if isinstance(msg, CommandMessage):
job = parseJobXml(msg.content)
if job and job.get('JobId'):
if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY):
job['priority'] = msg.priority
logger.info("received job on queue %s: %s", self.__incoming_job_queue.address, job)
job_admin_dict = { 'job': job, 'job_xml': msg.content }
self.addNewJob(job_admin_dict, check_done_dirs=True)
else:
logger.warn("unexpected message type: %s", msg)
if datetime.utcnow() - receive_start > timedelta(seconds=10):
# break out of receiving while loop early,
# so we can also produce some jobs
# receive more in next iteration
break
#see if there are any more jobs to receive and process them, else jump out of while loop
msg = self.__incoming_job_queue.receive(timeout=0.01)

Jorrit Schaap
committed
# check if producing jobs are actually making progress
# maybe the ingest transfer server was shut down in the middle of a transer?
# the ingest transfer server is stateless, so it won't restart that job itself (by design)
# when transfering at very regular intervals progress updates are given
# so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress)
self.__putStalledJobsBackToToDo()

Jorrit Schaap
committed
#report on running jobs
if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10):
self.__running_jobs_log_timestamp = datetime.utcnow()
producing_jads = self.getJobAdminDicts(status=JobProducing)
if producing_jads:
if len(producing_jads) == 1:
logger.info('1 job is running')
else:
logger.info('%s jobs are running', len(producing_jads))

Jorrit Schaap
committed
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)
logger.info('finished producing jobs')
logger.info('finished listening for new jobs and notifications')
def nrOfUnfinishedJobs(self):
return len(self.getNotDoneJobAdminDicts())
def getJobAdminDictsFromDisk(self, job_status=None, job_type=None, job_group_id=None, job_id=None):
dir_path = self.jobDir(job_admin_dict=None, job_status=job_status, job_type=job_type, job_group_id=job_group_id)
job_admin_dicts = []
if not os.path.isdir(dir_path):
return job_admin_dicts
dir_paths = [dir_path]
if job_status==JobRetry:
dir_paths += IngestJobManager.getSubDirs(dir_path)
for dp in dir_paths:
try:
if job_id:
logger.info('scanning %s for job file of %s', dp, job_id)
else:
logger.info('scanning %s for job files', dp)
pattern = ('*%s*.xml*' % job_id) if job_id else '*.xml*'
xml_files = [os.path.join(dp, f) for f in os.listdir(dp) if fnmatch.fnmatch(f, pattern)]
logger.debug('found %d xml files in %s', len(xml_files), dp)
for path in xml_files:
try:
with open(path) as file:
file_content = file.read()
job = parseJobXml(file_content)
if job and job.get('JobId'):
job_admin_dict = { 'path': path,
'job': job,
'job_xml': file_content,
'created_at': datetime.fromtimestamp(os.lstat(path).st_ctime) }
job_admin_dicts.append(job_admin_dict)
except Exception as e:
logger.error(e)
except Exception as e:
logger.error(e)
if job_status:
for job_admin_dict in job_admin_dicts:
job_admin_dict['status'] = job_status
logger.info('read %d job files from %s', len(job_admin_dicts), dir_path)
return job_admin_dicts
def jobStatusBaseDir(self, jobstatus):
if jobstatus == JobToDo:
return os.path.join(self.__jobs_dir, 'to_do')
if jobstatus == JobRetry:
return os.path.join(self.__jobs_dir, 'retry')
if jobstatus == JobFailed:
return os.path.join(self.__jobs_dir, 'failed')
if jobstatus == JobHold:
return os.path.join(self.__jobs_dir, 'on_hold')
if jobstatus == JobScheduled:
return os.path.join(self.__jobs_dir, 'scheduled')
if jobstatus == JobProducing:
return os.path.join(self.__jobs_dir, 'producing')
if jobstatus == JobProduced:
return os.path.join(self.__jobs_dir, 'done')
if jobstatus == JobRemoved:
return os.path.join(self.__jobs_dir, 'removed')
return os.path.join(self.__jobs_dir, 'unknown')
def jobDir(self, job_admin_dict, job_status=None, job_type=None, job_group_id=None, retry_attempt=None):
if job_admin_dict:
return self.jobDir(job_admin_dict=None,
job_status=job_admin_dict['status'],
job_type=job_admin_dict['job'].get('Type', 'unknown'),
job_group_id=job_admin_dict['job'].get('job_group_id', 'unknown'),
retry_attempt=job_admin_dict.get('retry_attempt', 1))
base_dir = self.jobStatusBaseDir(job_status)
if job_status == JobRetry and retry_attempt != None:
return os.path.join(base_dir, str(retry_attempt))
elif job_status in [JobProduced, JobRemoved, JobFailed]:
group_dir = '%s_%s' % (job_type, job_group_id)
return os.path.join(base_dir, group_dir)
return base_dir
def jobPath(self, job_admin_dict):
dir = self.jobDir(job_admin_dict)
filename = 'j%s.xml' % job_admin_dict['job']['JobId']
path = os.path.join(dir, filename)
return path
def scanJobsdir(self):
with self.__lock:
logger.info('scanning jobs dirs in %s', self.__jobs_dir)
for status in [JobToDo, JobScheduled, JobProducing, JobRetry]:
job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status)
for job_admin_dict in job_admin_dicts:
self.addNewJob(job_admin_dict, check_done_dirs=False)
logger.info('added %d existing %s jobs from disk', len(job_admin_dicts), jobState2String(status))
#which (type, group_id) jobs were read
#read the done jobs for these groups as well
unique_type_groups = set([(jad['job']['Type'], jad['job'].get('job_group_id', 'unknown_group')) for jad in self.__job_admin_dicts.values()])
if unique_type_groups:
logger.info('scanning for done jobs for %s', unique_type_groups)
for job_type, job_group_id in unique_type_groups:
for status in [JobFailed, JobProduced, JobRemoved]:
job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status, job_type=job_type, job_group_id=job_group_id)
for job_admin_dict in job_admin_dicts:
self.addNewJob(job_admin_dict, check_done_dirs=False)
logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id)
logger.info('finished scanning jobs')
def addNewJob(self, job_admin_dict, check_done_dirs=False):
with self.__lock:
job_id = job_admin_dict['job']['JobId']
job_group_id = job_admin_dict['job'].get('job_group_id')
job_type=job_admin_dict['job']['Type']
logger.info('adding new job %s in group %s %s with status %s',
job_id,
job_type,
job_group_id,
jobState2String(job_admin_dict.get('status', JobToDo)))
if check_done_dirs:
#remove job from 'done' directories if present (this is a resubmitted job)
for done_status in [JobFailed, JobProduced, JobRemoved]:
matching_done_jads = self.getJobAdminDictsFromDisk(job_status=done_status,
job_type=job_type,
job_group_id=job_group_id,
job_id=job_id)
for done_jad in matching_done_jads:
try:
logger.info('removing done job %s from %s because it is resubmitted', job_id, done_jad['path'])
os.remove(done_jad['path'])
except Exception as e:
logger.error('error while removing done job %s from %s: %s', job_id, done_jad['path'], e)
self.__job_admin_dicts[job_id] = job_admin_dict
if 'status' not in job_admin_dict:
job_admin_dict['status'] = JobToDo
if 'created_at' not in job_admin_dict:
job_admin_dict['created_at'] = datetime.utcnow()
job_admin_dict['updated_at'] = job_admin_dict['created_at']
#store start- finish times per try in runs
job_admin_dict['runs'] = {}
#store new job
todo_dir = self.jobDir(job_admin_dict)
#create dir dir if not exists
if not os.path.isdir(todo_dir):
try:
os.makedirs(todo_dir)
except OSError as e:
logger.error(e)
if 'path' not in job_admin_dict:
path = self.jobPath(job_admin_dict)
job_admin_dict['path'] = path
try:
if not os.path.exists(path):
logger.info('saving job %s on disk: %s', job_id, path)
with open(path, 'w') as file:
file.write(job_admin_dict['job_xml'])
except Exception as e:
logger.error(e)
else:
job_dirname = os.path.dirname(job_admin_dict['path'])
expected_dirname = self.jobDir(job_admin_dict)
if job_dirname != expected_dirname:
# a new todo job should be located in the expected dir based on its status,
# it is not, so force it there
self.updateJobStatus(job_admin_dict['job']['JobId'], job_admin_dict['status'])
def updateJobStatus(self, job_id, new_status, lta_site=None, message=None):
with self.__lock:
job_admin_dict = self.__job_admin_dicts.get(job_id)
if not job_admin_dict:
logger.error('updateJobStatus: unknown job %s with new status %s', job_id, jobState2String(new_status))
return
try:
#update updated_at timestamp
job_admin_dict['updated_at'] = datetime.utcnow()
if new_status == JobProducing:
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)] = {}
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['started_at'] = datetime.utcnow()
if new_status == JobProduced or new_status == JobTransferFailed:
job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow()
if lta_site:
job_admin_dict['lta_site'] = lta_site
job_admin_dict['last_message'] = message
current_status = job_admin_dict.get('status', JobToDo)
if new_status == JobTransferFailed:
#special case for jobs which failed to transer, which will be retried
current_retry_attempt = job_admin_dict.get('retry_attempt', 0)
next_retry_attempt = current_retry_attempt+1
if next_retry_attempt < self.__max_num_retries:
if message and 'not on disk' in message:
logger.info('job %s transfer failed because source data was not on disk, not retrying anymore',
job_id)
new_status = JobFailed
else:
new_status = JobRetry
job_admin_dict['retry_attempt'] = next_retry_attempt
job_admin_dict['job']['last_retry_attempt'] = next_retry_attempt == (self.__max_num_retries-1)
else:
logger.info('job %s transfer failed %s times, not retrying anymore',
job_id,
job_admin_dict.get('retry_attempt', 0))
new_status = JobFailed
if new_status != current_status:
#update the internal status
logger.info('updating job %s status from %s to %s%s',
job_id,
jobState2String(current_status),
jobState2String(new_status),
(' attempt #%d' % job_admin_dict.get('retry_attempt', 1))
if (new_status == JobRetry or current_status == JobRetry) else '')
job_admin_dict['status'] = new_status
#move the job file to its new status directory
#determine current and new paths
current_path = job_admin_dict.get('path', '')
new_path = self.jobPath(job_admin_dict)
new_dirname = os.path.dirname(new_path)
#create dir dir if not exists
if not os.path.isdir(new_dirname):
try:
os.makedirs(new_dirname)
except OSError as e:
logger.error(e)
if new_path != current_path:
#do actual file move
logger.debug('moving job file from %s to %s.', current_path, new_path)
shutil.move(current_path, new_path)
job_admin_dict['path'] = new_path
if new_status == JobRemoved or new_status == JobFailed:
#send notification
#this is (also) picked up by the ingestmomadapter
#which also sends a status update to MoM, including the last_message
if new_status == JobRemoved:
job_admin_dict['last_message'] = 'removed from queue'
self._sendNotification(job_admin_dict)
#finally, remove job from interal admin jobs dict if finished
if job_admin_dict['status'] in [JobProduced, JobFailed, JobRemoved]:
current_job_group_id = job_admin_dict['job'].get('job_group_id', 'unknown')
current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=current_job_group_id)
if len(current_group_jads) == 0:
logger.info('all jobs in group %s are done', current_job_group_id)
self.sendJobGroupFinishedMail(current_job_group_id)
current_group_done_jads = self.getDoneJobAdminDicts(job_group_id=current_job_group_id)
logger.info('removing %s jobs of group %s from job management server memory',
len(current_group_done_jads),
current_job_group_id)
for jad in current_group_done_jads:
del self.__job_admin_dicts[jad['job']['JobId']]
else:
logger.info('%s jobs to do in group %s', len(current_group_jads), current_job_group_id)
except Exception as e:
logger.error("updateJobStatus(job_id=%s, new_status=%s, lta_site=%s, message=%s) %s",
job_id,
jobState2String(new_status),
lta_site,
message,
e)
def _sendNotification(self, job_admin_dict):
try:
job = job_admin_dict['job']
contentDict = { 'job_id': job['JobId'],
'archive_id': job['ArchiveId'],
'project': job['Project'],
'type': job["Type"],
'dataproduct': job['DataProduct'] }
if 'ObservationId' in job:
contentDict['otdb_id'] = job['ObservationId']
if 'lta_site' in job_admin_dict:
contentDict['lta_site'] = job_admin_dict['lta_site']
if 'last_message' in job_admin_dict:
contentDict['message'] = job_admin_dict['last_message']
if 'job_group_id' in job_admin_dict:
contentDict['export_id'] = job_admin_dict['job_group_id']
status = jobState2String(job_admin_dict['status'])
status = status[status.index('(')+1:status.index(')')]
msg = EventMessage(context=self.notification_prefix + status, content=contentDict)
# remove message from queue's when not picked up within 48 hours,
# otherwise mom might endlessly reject messages if it cannot handle them
msg.ttl = 48*3600
logger.info('Sending notification %s: %s' % (status, str(contentDict).replace('\n', ' ')))
self.event_bus.send(msg)
except Exception as e:
logger.error(str(e))
def removeExportJob(self, export_group_id):
logger.info('removing export job %s', export_group_id)
job_admin_dicts = self.getJobAdminDicts(job_group_id=export_group_id)

Jorrit Schaap
committed
if job_admin_dicts:
for jad in job_admin_dicts:
self.updateJobStatus(jad['job']['JobId'], JobRemoved)

Jorrit Schaap
committed
def getExportIds(self):
with self.__lock:
return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in self.__job_admin_dicts.values()])))
def __putStalledJobsBackToToDo(self):
if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(minutes=1):
return

Jorrit Schaap
committed
with self.__lock:
now = datetime.utcnow()
threshold = timedelta(minutes=15)

Jorrit Schaap
committed
stalled_job_admin_dicts = [jad for jad in self.__job_admin_dicts.values()
if (jad['status'] == JobProducing or jad['status'] == JobScheduled)
and now - jad['updated_at'] >= threshold]
for jad in stalled_job_admin_dicts:
logger.info('putting job %s back to ToDo because it did not make any progress during the last 15min', jad['job']['JobId'])

Jorrit Schaap
committed
self.updateJobStatus(jad['job']['JobId'], JobToDo)
self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow()

Jorrit Schaap
committed
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
def getNextJobToRun(self):
'''get the next job to run.
examine all 'to_do' and 'retry' jobs
higher priority jobs always go first
equal priority jobs will return 'to_do' jobs over 'retry' jobs
'retry' jobs are sorted by least amount of retry attempts
source load balancing: the more jobs transfer from a certain source host,
the less likely it is a next job will be for that source host as well
'''
#helper method to get the source host from the job's location
def getSourceHost(job_admin_dict):
try:
host = job_admin_dict['job']['Location'].split(':')[0]
if 'cep4' in host.lower() or 'cpu' in host.lower():
return 'localhost'
return host
except:
return 'localhost'
running_jads = self.getJobAdminDicts(status=JobProducing) + self.getJobAdminDicts(status=JobScheduled)
running_hosts = {}
for jad in running_jads:
host = getSourceHost(jad)
running_hosts[host] = running_hosts.get(host, 0) + 1
with self.__lock:
def getNextJobByStatus(status, min_age=None):
def jad_compare_func(jad_a, jad_b):
#sort on priority first
if jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY) != jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY):
return jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY) - jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY)
#equal priorities, so sort on next sort criterion, the retry attempt
if jad_a['status'] == JobRetry and jad_b['status'] == JobRetry:
if jad_a.get('retry_attempt', 0) != jad_b.get('retry_attempt', 0):
return jad_b.get('retry_attempt', 0) - jad_a.get('retry_attempt', 0)
# equal retry_attempt, so sort on next sort criterion,
# jobs for which the number of running jobs on that host is lower
# (load balance the source hosts)
nrOfRunningJobsOnSourceHostA = running_hosts.get(getSourceHost(jad_a), 0)
nrOfRunningJobsOnSourceHostB = running_hosts.get(getSourceHost(jad_b), 0)
if nrOfRunningJobsOnSourceHostA != nrOfRunningJobsOnSourceHostB:
return nrOfRunningJobsOnSourceHostA - nrOfRunningJobsOnSourceHostB
#equal number of jobs on source hosts, so sort on next sort criterion, the created_at timestamp (FIFO)
if jad_a['created_at'] < jad_b['created_at']:
return -1
if jad_a['created_at'] > jad_b['created_at']:
return 1
#TODO: we can add a lot of sort criteria in the future.
#For now, stick with FIFO and retry_attempt, after priority and source_host_load_balancing
return 0
job_admin_dicts = self.getJobAdminDicts(status=status)
# filter out priority 0 jobs (which are paused)
job_admin_dicts = [jad for jad in job_admin_dicts if jad['job'].get('priority', 0) > 0]
if min_age:
now = datetime.utcnow()
job_admin_dicts = [jad for jad in job_admin_dicts if now - jad['updated_at'] >= min_age]
job_admin_dicts = sorted(job_admin_dicts, cmp=jad_compare_func)
if job_admin_dicts:
logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status))
return job_admin_dicts[0]
return None
#get the next job to run, both for JobToDo and JobRetry
next_to_do_jad = getNextJobByStatus(JobToDo)
next_retry_jad = getNextJobByStatus(JobRetry, timedelta(minutes=15) if next_to_do_jad else None)
#limit the number of running jobs per source host if not localhost
if next_to_do_jad and getSourceHost(next_to_do_jad) != 'localhost':

Jorrit Schaap
committed
if running_hosts.get(getSourceHost(next_to_do_jad), 0) > 1:

Jorrit Schaap
committed
next_to_do_jad = None
#limit the number of running jobs per source host if not localhost
if next_retry_jad and getSourceHost(next_retry_jad) != 'localhost':

Jorrit Schaap
committed
if running_hosts.get(getSourceHost(next_retry_jad), 0) > 1:

Jorrit Schaap
committed
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
next_retry_jad = None
if next_to_do_jad and next_retry_jad:
# if next_retry_jad has higher priority then next_to_do_jad, then return next_retry_jad
if next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
return next_retry_jad
# or if next_to_do_jad has higher priority then next_retry_jad, then return next_to_do_jad
if next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY):
return next_to_do_jad
# or if next_retry_jad is already waiting for over an hour, then return next_retry_jad
if datetime.utcnow() - next_retry_jad['updated_at'] > timedelta(minutes=60):
return next_retry_jad
# or if next_retry_jad is older than next_to_do_jad
if next_retry_jad['updated_at'] > next_to_do_jad.get('updated_at', next_to_do_jad.get('created_at')):
return next_retry_jad
if next_to_do_jad:
# just return the next_to_do_jad
return next_to_do_jad
# in all other cases, return next_retry_jad (which might be None)
return next_retry_jad
def canProduceNextJob(self):
# test if the managed_job_queue is empty
try:
with self.__jobs_for_transfer_queue_peeker:
num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.01)
if num_scheduled == 0:
scheduled_jads = self.getJobAdminDicts(status=JobScheduled)
return len(scheduled_jads) < 10
return False

Jorrit Schaap
committed
except Exception as e:
logger.error('canProduceNextJob: %s', e)
return True
def produceNextJobsIfPossible(self):
start_producing_timestamp = datetime.utcnow()
while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5):
job_admin_dict = self.getNextJobToRun()
if job_admin_dict:
if os.path.exists(job_admin_dict.get('path')):
msg = CommandMessage(content=job_admin_dict.get('job_xml'))
msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY)
self.__jobs_for_transfer_queue.send(msg)
logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker)
self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled)

Jorrit Schaap
committed
else:
job_id = job_admin_dict['job']['JobId']
logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path'))
del self.__job_admin_dicts[job_id]
#do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job
#so the queue is empty again
#and we can submit yet another job
time.sleep(0.1)
else:
return

Jorrit Schaap
committed
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
def onJobStarted(self, job_notification_dict):
self.__notification_listener._logJobNotification('started ', job_notification_dict);
self.updateJobStatus(job_notification_dict.get('job_id'),
JobProducing,
job_notification_dict.get('lta_site'),
job_notification_dict.get('message'))
def onJobFinished(self, job_notification_dict):
self.__notification_listener._logJobNotification('finished', job_notification_dict);
# file_type might have changed to unspec for example
if 'file_type' in job_notification_dict:
with self.__lock:
job_admin_dict = self.__job_admin_dicts.get(job_id)
if job_admin_dict:
job_admin_dict['job']['file_type'] = job_notification_dict['file_type']
self.updateJobStatus(job_notification_dict.get('job_id'),
JobProduced,
job_notification_dict.get('lta_site'),
job_notification_dict.get('message'))
def onJobTransferFailed(self, job_notification_dict):
self.__notification_listener._logJobNotification('transfer failed ', job_notification_dict);
self.updateJobStatus(job_notification_dict.get('job_id'),
JobTransferFailed,
job_notification_dict.get('lta_site'),
job_notification_dict.get('message'))
def onJobProgress(self, job_notification_dict):
self.__notification_listener._logJobNotification('progress', job_notification_dict, level=logging.DEBUG);
#touch job
#producing jobs which are untouched for 5min are put back to JobToDo
self.updateJobStatus(job_notification_dict.get('job_id'),
JobProducing,
job_notification_dict.get('lta_site'),
job_notification_dict.get('message'))
@staticmethod
def getSubDirs(dir_path):
dir_lists = [[os.path.join(root,dir) for dir in dirs if root==dir_path] for root, dirs, files in os.walk(dir_path) if dirs]
if dir_lists:
return reduce(lambda x,y: x+y, dir_lists)
return []
def getDoneJobAdminDicts(self, job_group_id=None):
return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobFailed, JobProduced, JobRemoved])

Jorrit Schaap
committed
def getNotDoneJobAdminDicts(self, job_group_id=None):
return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobToDo, JobScheduled, JobRetry])

Jorrit Schaap
committed
def getJobAdminDicts(self, job_group_id=None, status=None):
with self.__lock:
jads = [jad for jad in self.__job_admin_dicts.values()]

Jorrit Schaap
committed
if job_group_id != None:
job_group_id = str(job_group_id)
jads = [jad for jad in jads if str(jad['job'].get('job_group_id')) == job_group_id]
if status != None:
if isinstance(status, int):
jads = [jad for jad in jads if jad['status'] == status]
else:
statuses = set(status)
jads = [jad for jad in jads if jad['status'] in statuses]

Jorrit Schaap
committed
return jads

Jorrit Schaap
committed
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
def getStatusReportDict(self):
with self.__lock:
export_ids = self.getExportIds()
logger.info('getStatusReportDict export_ids: %s', export_ids)
result = {}
for export_id in export_ids:
try:
finished_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobProduced)
failed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobFailed)
removed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobRemoved)
done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads
done_group_jobs = [jad['job'] for jad in done_group_jads]
current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=export_id)
current_group_jobs = [jad['job'] for jad in current_group_jads]
all_group_jads = current_group_jads + done_group_jads
all_group_jobs = current_group_jobs + done_group_jobs
priority = min([job.get('priority', DEFAULT_JOB_PRIORITY) for job in current_group_jobs]) if current_group_jobs else 4
submitters = list(set([job['Submitter'] for job in all_group_jobs if 'Submitter' in job]))
projects = list(set([job['Project'] for job in all_group_jobs if 'Project' in job]))
lta_sites = list(set([jad['lta_site'] for jad in all_group_jads if 'lta_site' in jad]))
job_run_events = {}
for jad in all_group_jads:
for run in jad['runs'].values():
if 'started_at' in run:
started_timestamp = run['started_at']
if started_timestamp not in job_run_events:
job_run_events[started_timestamp] = 0
job_run_events[started_timestamp] += 1
if 'finished_at' in run:
finished_timestamp = run['finished_at']
if finished_timestamp not in job_run_events:
job_run_events[finished_timestamp] = 0
job_run_events[finished_timestamp] -= 1
all_run_timestamps = sorted(job_run_events.keys())
running_jobs_values = []
if all_run_timestamps:
prev_value = 0
for t in all_run_timestamps:
value = prev_value + job_run_events[t]
running_jobs_values.append(value)
prev_value = value
job_finised_events = {}
for jad in all_group_jads:
if jad['status'] == JobProduced or jad['status'] == JobFailed:
if jad['runs']:
final_run = max(jad['runs'].keys())
run = jad['runs'][final_run]
if 'started_at' in run and 'finished_at' in run:
finished_timestamp = run['finished_at']
if finished_timestamp not in job_finised_events:
job_finised_events[finished_timestamp] = 0
job_finised_events[finished_timestamp] += 1
finished_jobs_values = []
finished_timestamps = sorted(job_finised_events.keys())
for i, t in enumerate(finished_timestamps):
finished_jobs_values.append(i + 1)
result[export_id] = { 'priority' : priority,
'submitters' : submitters,
'projects' : projects,
'lta_sites' : lta_sites,
'series': { 'running_jobs': { 'timestamps': all_run_timestamps, 'values': running_jobs_values },
'finished_jobs': { 'timestamps': finished_timestamps, 'values': finished_jobs_values }
},
'jobs': { 'running': len(self.getJobAdminDicts(job_group_id=export_id, status=JobProducing)),
'to_do': len(self.getJobAdminDicts(job_group_id=export_id, status=JobToDo)),
'scheduled': len(self.getJobAdminDicts(job_group_id=export_id, status=JobScheduled)),
'retry': len(self.getJobAdminDicts(job_group_id=export_id, status=JobRetry)),
'finished': len(finished_group_jads),
'failed': len(failed_group_jads) } }
except Exception as e:
logger.error(e)
return convertIntKeysToString(result)
def setExportJobPriority(self, export_id, priority):
priority = max(0, min(9, int(priority)))
with self.__lock:
jads = self.getJobAdminDicts(job_group_id=export_id)
logger.info('updating the priority of %s jobs of export %s to level %s', len(jads), export_id, priority)
for jad in jads:
try:
#update local copy
jad['job']['priority'] = priority
#persist to disk
updatePriorityInJobFile(jad['path'], priority)
except Exception as e:
logger.error(e)
def getReport(self, job_group_id):
with self.__lock:
#still running/waiting jobs
current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=job_group_id)
current_group_jobs = [jad['job'] for jad in current_group_jads]
#done jobs
finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
finished_group_jobs = [jad['job'] for jad in finished_group_jads]
failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
failed_group_jobs = [jad['job'] for jad in failed_group_jads]
removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)
removed_group_jobs = [jad['job'] for jad in removed_group_jads]
done_group_jobs = finished_group_jobs + failed_group_jobs + removed_group_jobs
all_group_jobs = current_group_jobs + done_group_jobs
submitters = set([j['Submitter'] for j in all_group_jobs if 'Submitter' in j])
projects = set([j['Project'] for j in all_group_jobs if 'Project' in j])
report = ''
header = """=== Report on ingest Export Job (%(id)s) ===
User(s): %(user)s
Project: %(project)s""" % { 'id': job_group_id,
'user': ', '.join(submitters),
'project': ', '.join(projects)}
report += header
summary = """\n\n=== Summary ===
Total Files: %(total)i
- Failed: %(failed)i
- Success: %(done)i
- Interferometer: %(corr)i
- Beamformed: %(bf)i
- SkyImages: %(img)i
- Unspecified: %(unspec)i
- Pulsar Pipeline: %(pulp)i""" % {'total': len(all_group_jobs),
'done': len(finished_group_jobs),
'corr': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_CORRELATED]),
'bf': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_BEAMFORMED]),
'img': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_IMAGE]),
'unspec': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_UNSPECIFIED]),
'pulp': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_PULP]),
'failed': len(failed_group_jobs)}
# TODO: generate lta link
#try:
#import mechanize
#import json
#browser = mechanize.Browser()
#browser.set_handle_robots(False)
#browser.addheaders = [('User-agent', 'Firefox')]
#obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in all_group_jobs)))
#for obs_id in obs_ids:
#response = browser.open('http://scu001.control.lofar:7412/rest/tasks/otdb/%s' % obs_id)
#if response.code == 200:
#task = json.loads(response.read())
#except Exception as e:
#logger.error(e)
if removed_group_jobs:
summary += '''\n\nTotal Removed before transfer: %s''' % (len(removed_group_jobs),)
report += summary
def file_listing_per_obs(jads, full_listing=False, dp_status_remark=''):
jobs = [jad['job'] for jad in jads]
obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in jobs)))
obs_jads_dict = {obs_id:[] for obs_id in obs_ids}
for jad in jads:
obs_jads_dict[jad['job'].get('ObservationId', -1)].append(jad)
listing = ''
for obs_id in obs_ids:
obs_jads = obs_jads_dict[obs_id]
listing += 'otdb_id: %s - #dataproducts: %s\n' % (obs_id, len(obs_jads))
if full_listing:
for obs_id in obs_ids:
obs_jads = obs_jads_dict[obs_id]
obs_jads = sorted(obs_jads, key=lambda jad: jad['job'].get('DataProduct'))
listing += '\notdb_id: %s - %s dataproducts listing\n' % (obs_id, dp_status_remark)
for jad in obs_jads:
listing += 'dataproduct: %s - archive_id: %s - location: %s' % (jad['job']['DataProduct'],
jad['job']['ArchiveId'],
jad['job'].get('Location'))
if jad.get('last_message'):
listing += ' - message: %s' % (jad['last_message'])
listing += '\n'
return listing
if current_group_jads:
report += "\n\n==== Scheduled/Running files: =====\n"
report += file_listing_per_obs(current_group_jads, False, 'scheduled/running')
if finished_group_jads:
report += "\n\n==== Finished files: =====\n"
report += file_listing_per_obs(finished_group_jads, False, 'finished')
if failed_group_jads:
report += "\n\n==== Failed files: =====\n"
report += file_listing_per_obs(failed_group_jads, True, 'failed')
if removed_group_jads:
report += "\n\n==== Removed jobs before transfer: =====\n"
report += file_listing_per_obs(removed_group_jads, False, 'removed')
return report
def sendJobGroupFinishedMail(self, job_group_id):
report = self.getReport(job_group_id)
logger.info(report)
mailing_list = list(FINISHED_NOTIFICATION_MAILING_LIST)
finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)
unfinished_group_jads = failed_group_jads + removed_group_jads