#!/usr/bin/env python # Copyright (C) 2015 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener from lofar.lta.ingest.common.job import * from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME, DEFAULT_INGEST_NOTIFICATION_PREFIX from lofar.lta.ingest.server.config import DEFAULT_INGEST_BUSNAME, DEFAULT_INGEST_SERVICENAME from lofar.lta.ingest.common.config import DEFAULT_BROKER from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, DEFAULT_JOB_PRIORITY from lofar.lta.ingest.server.config import DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus from lofar.messaging import Service from lofar.messaging.Service import MessageHandlerInterface from lofar.common.util import convertIntKeysToString from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.common import isProductionEnvironment import os import os.path import fnmatch import shutil import time from threading import RLock from datetime import datetime, timedelta import logging logger = logging.getLogger() class IngestServiceMessageHandler(MessageHandlerInterface): def __init__(self, job_manager, **kwargs): super(IngestServiceMessageHandler, self).__init__(**kwargs) self._job_manager = job_manager self.service2MethodMap = { 'RemoveExportJob': self._job_manager.removeExportJob, 'SetExportJobPriority': self._job_manager.setExportJobPriority, 'GetStatusReport': self._job_manager.getStatusReportDict, 'GetReport': self._job_manager.getReport, 'GetExportIds': self._job_manager.getExportIds } class IngestJobManager: def __init__(self, busname=DEFAULT_INGEST_BUSNAME, servicename=DEFAULT_INGEST_SERVICENAME, notification_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME, notification_prefix=DEFAULT_INGEST_NOTIFICATION_PREFIX, notification_listen_queue_name=DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME, incoming_job_queue_name=DEFAULT_INGEST_JOBS_QUEUENAME, jobs_for_transfer_queue_name=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, jobs_dir=JOBS_DIR, max_num_retries=MAX_NR_OF_RETRIES, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, broker=None, mom_broker=None, **kwargs): self.__notification_listener = IngestBusListener(busname=notification_listen_queue_name, subjects=notification_prefix+'*', broker=broker) self.__notification_listener.onJobStarted = self.onJobStarted self.__notification_listener.onJobProgress = self.onJobProgress self.__notification_listener.onJobTransferFailed = self.onJobTransferFailed self.__notification_listener.onJobFinished = self.onJobFinished self.__jobs_dir = jobs_dir self.__max_num_retries = max_num_retries self.__job_admin_dicts = {} self.__lock = RLock() self.__running = False self.notification_prefix = notification_prefix self.event_bus = ToBus(notification_busname, broker=broker) if not mom_broker: mom_broker = broker self.__momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=mom_broker, timeout=180) self.service = Service(servicename, IngestServiceMessageHandler, busname=busname, broker=broker, use_service_methods=True, numthreads=1, handler_args={'job_manager': self}) #incoming jobs (from mom, eor, user ingest, etc) self.__incoming_job_queue = FromBus(incoming_job_queue_name, broker=broker) #queue into which this manager produces jobs, which are consumened for actual data transfer by the ingestservices self.__jobs_for_transfer_queue = ToBus(jobs_for_transfer_queue_name, broker=broker) # the peeker frombus is used to see if the managed job queue we submit jobs to is emptyied by consumers # we want this manager to produce jobs just in time, so we can actively shuffle jobs priorities etc. self.__jobs_for_transfer_queue_peeker = FromBus(jobs_for_transfer_queue_name, broker=broker) self.__running_jobs_log_timestamp = datetime.utcnow() self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow() def quit(self): self.__running = False def run(self): self.__running = True # start with full jobs dir scan to retreive state from disk self.scanJobsdir() logger.info('starting listening for new jobs and notifications') #open queue connections... with self.service as _1, self.__incoming_job_queue as _2, self.__jobs_for_transfer_queue as _3, self.__notification_listener as _4, self.event_bus as _5: #... and run the event loop, #produce jobs to managed job queue for ingest transfer services #receive new jobs logger.info('starting to produce jobs') while self.__running: try: #produce next jobs self.produceNextJobsIfPossible() #receive any jobs from mom/user_ingest/eor/etc receive_start = datetime.utcnow() msg = self.__incoming_job_queue.receive(timeout=0.01) while msg: logger.debug("received msg on job queue %s: %s", self.__incoming_job_queue.address, msg) self.__incoming_job_queue.ack(msg) if isinstance(msg, CommandMessage): job = parseJobXml(msg.content) if job and job.get('JobId'): if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY): job['priority'] = msg.priority logger.info("received job on queue %s: %s", self.__incoming_job_queue.address, job) job_admin_dict = { 'job': job, 'job_xml': msg.content } self.addNewJob(job_admin_dict, check_done_dirs=True) else: logger.warn("unexpected message type: %s", msg) if datetime.utcnow() - receive_start > timedelta(seconds=10): # break out of receiving while loop early, # so we can also produce some jobs # receive more in next iteration break #see if there are any more jobs to receive and process them, else jump out of while loop msg = self.__incoming_job_queue.receive(timeout=0.01) # check if producing jobs are actually making progress # maybe the ingest transfer server was shut down in the middle of a transer? # the ingest transfer server is stateless, so it won't restart that job itself (by design) # when transfering at very regular intervals progress updates are given # so it is easy for us to detect here if the job is progressing or stalled (see onJobProgress) self.__putStalledJobsBackToToDo() #report on running jobs if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds=10): self.__running_jobs_log_timestamp = datetime.utcnow() producing_jads = self.getJobAdminDicts(status=JobProducing) if producing_jads: if len(producing_jads) == 1: logger.info('1 job is running') else: logger.info('%s jobs are running', len(producing_jads)) except KeyboardInterrupt: break except Exception as e: logger.error(e) logger.info('finished producing jobs') logger.info('finished listening for new jobs and notifications') def nrOfUnfinishedJobs(self): return len(self.getNotDoneJobAdminDicts()) def getJobAdminDictsFromDisk(self, job_status=None, job_type=None, job_group_id=None, job_id=None): dir_path = self.jobDir(job_admin_dict=None, job_status=job_status, job_type=job_type, job_group_id=job_group_id) job_admin_dicts = [] if not os.path.isdir(dir_path): return job_admin_dicts dir_paths = [dir_path] if job_status==JobRetry: dir_paths += IngestJobManager.getSubDirs(dir_path) for dp in dir_paths: try: if job_id: logger.info('scanning %s for job file of %s', dp, job_id) else: logger.info('scanning %s for job files', dp) pattern = ('*%s*.xml*' % job_id) if job_id else '*.xml*' xml_files = [os.path.join(dp, f) for f in os.listdir(dp) if fnmatch.fnmatch(f, pattern)] logger.debug('found %d xml files in %s', len(xml_files), dp) for path in xml_files: try: with open(path) as file: file_content = file.read() job = parseJobXml(file_content) if job and job.get('JobId'): job_admin_dict = { 'path': path, 'job': job, 'job_xml': file_content, 'created_at': datetime.fromtimestamp(os.lstat(path).st_ctime) } job_admin_dicts.append(job_admin_dict) except Exception as e: logger.error(e) except Exception as e: logger.error(e) if job_status: for job_admin_dict in job_admin_dicts: job_admin_dict['status'] = job_status logger.info('read %d job files from %s', len(job_admin_dicts), dir_path) return job_admin_dicts def jobStatusBaseDir(self, jobstatus): if jobstatus == JobToDo: return os.path.join(self.__jobs_dir, 'to_do') if jobstatus == JobRetry: return os.path.join(self.__jobs_dir, 'retry') if jobstatus == JobFailed: return os.path.join(self.__jobs_dir, 'failed') if jobstatus == JobHold: return os.path.join(self.__jobs_dir, 'on_hold') if jobstatus == JobScheduled: return os.path.join(self.__jobs_dir, 'scheduled') if jobstatus == JobProducing: return os.path.join(self.__jobs_dir, 'producing') if jobstatus == JobProduced: return os.path.join(self.__jobs_dir, 'done') if jobstatus == JobRemoved: return os.path.join(self.__jobs_dir, 'removed') return os.path.join(self.__jobs_dir, 'unknown') def jobDir(self, job_admin_dict, job_status=None, job_type=None, job_group_id=None, retry_attempt=None): if job_admin_dict: return self.jobDir(job_admin_dict=None, job_status=job_admin_dict['status'], job_type=job_admin_dict['job'].get('Type', 'unknown'), job_group_id=job_admin_dict['job'].get('job_group_id', 'unknown'), retry_attempt=job_admin_dict.get('retry_attempt', 1)) base_dir = self.jobStatusBaseDir(job_status) if job_status == JobRetry and retry_attempt != None: return os.path.join(base_dir, str(retry_attempt)) elif job_status in [JobProduced, JobRemoved, JobFailed]: group_dir = '%s_%s' % (job_type, job_group_id) return os.path.join(base_dir, group_dir) return base_dir def jobPath(self, job_admin_dict): dir = self.jobDir(job_admin_dict) filename = 'j%s.xml' % job_admin_dict['job']['JobId'] path = os.path.join(dir, filename) return path def scanJobsdir(self): with self.__lock: logger.info('scanning jobs dirs in %s', self.__jobs_dir) for status in [JobToDo, JobScheduled, JobProducing, JobRetry]: job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status) for job_admin_dict in job_admin_dicts: self.addNewJob(job_admin_dict, check_done_dirs=False) logger.info('added %d existing %s jobs from disk', len(job_admin_dicts), jobState2String(status)) #which (type, group_id) jobs were read #read the done jobs for these groups as well unique_type_groups = set([(jad['job']['Type'], jad['job'].get('job_group_id', 'unknown_group')) for jad in self.__job_admin_dicts.values()]) if unique_type_groups: logger.info('scanning for done jobs for %s', unique_type_groups) for job_type, job_group_id in unique_type_groups: for status in [JobFailed, JobProduced, JobRemoved]: job_admin_dicts = self.getJobAdminDictsFromDisk(job_status=status, job_type=job_type, job_group_id=job_group_id) for job_admin_dict in job_admin_dicts: self.addNewJob(job_admin_dict, check_done_dirs=False) logger.info('added %d existing %s jobs for %s %s', len(job_admin_dicts), jobState2String(status), job_type, job_group_id) logger.info('finished scanning jobs') def addNewJob(self, job_admin_dict, check_done_dirs=False): with self.__lock: job_id = job_admin_dict['job']['JobId'] job_group_id = job_admin_dict['job'].get('job_group_id') job_type=job_admin_dict['job']['Type'] logger.info('adding new job %s in group %s %s with status %s', job_id, job_type, job_group_id, jobState2String(job_admin_dict.get('status', JobToDo))) if check_done_dirs: #remove job from 'done' directories if present (this is a resubmitted job) for done_status in [JobFailed, JobProduced, JobRemoved]: matching_done_jads = self.getJobAdminDictsFromDisk(job_status=done_status, job_type=job_type, job_group_id=job_group_id, job_id=job_id) for done_jad in matching_done_jads: try: logger.info('removing done job %s from %s because it is resubmitted', job_id, done_jad['path']) os.remove(done_jad['path']) except Exception as e: logger.error('error while removing done job %s from %s: %s', job_id, done_jad['path'], e) self.__job_admin_dicts[job_id] = job_admin_dict if 'status' not in job_admin_dict: job_admin_dict['status'] = JobToDo if 'created_at' not in job_admin_dict: job_admin_dict['created_at'] = datetime.utcnow() job_admin_dict['updated_at'] = job_admin_dict['created_at'] #store start- finish times per try in runs job_admin_dict['runs'] = {} #store new job todo_dir = self.jobDir(job_admin_dict) #create dir dir if not exists if not os.path.isdir(todo_dir): try: os.makedirs(todo_dir) except OSError as e: logger.error(e) if 'path' not in job_admin_dict: path = self.jobPath(job_admin_dict) job_admin_dict['path'] = path try: if not os.path.exists(path): logger.info('saving job %s on disk: %s', job_id, path) with open(path, 'w') as file: file.write(job_admin_dict['job_xml']) except Exception as e: logger.error(e) else: job_dirname = os.path.dirname(job_admin_dict['path']) expected_dirname = self.jobDir(job_admin_dict) if job_dirname != expected_dirname: # a new todo job should be located in the expected dir based on its status, # it is not, so force it there self.updateJobStatus(job_admin_dict['job']['JobId'], job_admin_dict['status']) def updateJobStatus(self, job_id, new_status, lta_site=None, message=None): with self.__lock: job_admin_dict = self.__job_admin_dicts.get(job_id) if not job_admin_dict: logger.error('updateJobStatus: unknown job %s with new status %s', job_id, jobState2String(new_status)) return try: #update updated_at timestamp job_admin_dict['updated_at'] = datetime.utcnow() if new_status == JobProducing: job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)] = {} job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['started_at'] = datetime.utcnow() if new_status == JobProduced or new_status == JobTransferFailed: job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow() if lta_site: job_admin_dict['lta_site'] = lta_site job_admin_dict['last_message'] = message current_status = job_admin_dict.get('status', JobToDo) if new_status == JobTransferFailed: #special case for jobs which failed to transer, which will be retried current_retry_attempt = job_admin_dict.get('retry_attempt', 0) next_retry_attempt = current_retry_attempt+1 if next_retry_attempt < self.__max_num_retries: if message and 'not on disk' in message: logger.info('job %s transfer failed because source data was not on disk, not retrying anymore', job_id) new_status = JobFailed else: new_status = JobRetry job_admin_dict['retry_attempt'] = next_retry_attempt job_admin_dict['job']['last_retry_attempt'] = next_retry_attempt == (self.__max_num_retries-1) else: logger.info('job %s transfer failed %s times, not retrying anymore', job_id, job_admin_dict.get('retry_attempt', 0)) new_status = JobFailed if new_status != current_status: #update the internal status logger.info('updating job %s status from %s to %s%s', job_id, jobState2String(current_status), jobState2String(new_status), (' attempt #%d' % job_admin_dict.get('retry_attempt', 1)) if (new_status == JobRetry or current_status == JobRetry) else '') job_admin_dict['status'] = new_status #move the job file to its new status directory #determine current and new paths current_path = job_admin_dict.get('path', '') new_path = self.jobPath(job_admin_dict) new_dirname = os.path.dirname(new_path) #create dir dir if not exists if not os.path.isdir(new_dirname): try: os.makedirs(new_dirname) except OSError as e: logger.error(e) if new_path != current_path: #do actual file move logger.debug('moving job file from %s to %s.', current_path, new_path) shutil.move(current_path, new_path) job_admin_dict['path'] = new_path if new_status == JobRemoved or new_status == JobFailed: #send notification #this is (also) picked up by the ingestmomadapter #which also sends a status update to MoM, including the last_message if new_status == JobRemoved: job_admin_dict['last_message'] = 'removed from queue' self._sendNotification(job_admin_dict) #finally, remove job from interal admin jobs dict if finished if job_admin_dict['status'] in [JobProduced, JobFailed, JobRemoved]: current_job_group_id = job_admin_dict['job'].get('job_group_id', 'unknown') current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=current_job_group_id) if len(current_group_jads) == 0: logger.info('all jobs in group %s are done', current_job_group_id) self.sendJobGroupFinishedMail(current_job_group_id) current_group_done_jads = self.getDoneJobAdminDicts(job_group_id=current_job_group_id) logger.info('removing %s jobs of group %s from job management server memory', len(current_group_done_jads), current_job_group_id) for jad in current_group_done_jads: del self.__job_admin_dicts[jad['job']['JobId']] else: logger.info('%s jobs to do in group %s', len(current_group_jads), current_job_group_id) except Exception as e: logger.error("updateJobStatus(job_id=%s, new_status=%s, lta_site=%s, message=%s) %s", job_id, jobState2String(new_status), lta_site, message, e) def _sendNotification(self, job_admin_dict): try: job = job_admin_dict['job'] contentDict = { 'job_id': job['JobId'], 'archive_id': job['ArchiveId'], 'project': job['Project'], 'type': job["Type"], 'dataproduct': job['DataProduct'] } if 'ObservationId' in job: contentDict['otdb_id'] = job['ObservationId'] if 'lta_site' in job_admin_dict: contentDict['lta_site'] = job_admin_dict['lta_site'] if 'last_message' in job_admin_dict: contentDict['message'] = job_admin_dict['last_message'] if 'job_group_id' in job_admin_dict: contentDict['export_id'] = job_admin_dict['job_group_id'] status = jobState2String(job_admin_dict['status']) status = status[status.index('(')+1:status.index(')')] msg = EventMessage(context=self.notification_prefix + status, content=contentDict) # remove message from queue's when not picked up within 48 hours, # otherwise mom might endlessly reject messages if it cannot handle them msg.ttl = 48*3600 logger.info('Sending notification %s: %s' % (status, str(contentDict).replace('\n', ' '))) self.event_bus.send(msg) except Exception as e: logger.error(str(e)) def removeExportJob(self, export_group_id): logger.info('removing export job %s', export_group_id) job_admin_dicts = self.getJobAdminDicts(job_group_id=export_group_id) if job_admin_dicts: for jad in job_admin_dicts: self.updateJobStatus(jad['job']['JobId'], JobRemoved) def getExportIds(self): with self.__lock: return sorted(list(set([jad['job'].get('job_group_id', 'unknown_group') for jad in self.__job_admin_dicts.values()]))) def __putStalledJobsBackToToDo(self): if datetime.utcnow() - self.__last_putStalledJobsBackToToDo_timestamp < timedelta(minutes=1): return with self.__lock: now = datetime.utcnow() threshold = timedelta(minutes=15) stalled_job_admin_dicts = [jad for jad in self.__job_admin_dicts.values() if (jad['status'] == JobProducing or jad['status'] == JobScheduled) and now - jad['updated_at'] >= threshold] for jad in stalled_job_admin_dicts: logger.info('putting job %s back to ToDo because it did not make any progress during the last 15min', jad['job']['JobId']) self.updateJobStatus(jad['job']['JobId'], JobToDo) self.__last_putStalledJobsBackToToDo_timestamp = datetime.utcnow() def getNextJobToRun(self): '''get the next job to run. examine all 'to_do' and 'retry' jobs higher priority jobs always go first equal priority jobs will return 'to_do' jobs over 'retry' jobs 'retry' jobs are sorted by least amount of retry attempts source load balancing: the more jobs transfer from a certain source host, the less likely it is a next job will be for that source host as well ''' #helper method to get the source host from the job's location def getSourceHost(job_admin_dict): try: host = job_admin_dict['job']['Location'].split(':')[0] if 'cep4' in host.lower() or 'cpu' in host.lower(): return 'localhost' return host except: return 'localhost' running_jads = self.getJobAdminDicts(status=JobProducing) + self.getJobAdminDicts(status=JobScheduled) running_hosts = {} for jad in running_jads: host = getSourceHost(jad) running_hosts[host] = running_hosts.get(host, 0) + 1 with self.__lock: def getNextJobByStatus(status, min_age=None): def jad_compare_func(jad_a, jad_b): #sort on priority first if jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY) != jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY): return jad_b['job'].get('priority', DEFAULT_JOB_PRIORITY) - jad_a['job'].get('priority', DEFAULT_JOB_PRIORITY) #equal priorities, so sort on next sort criterion, the retry attempt if jad_a['status'] == JobRetry and jad_b['status'] == JobRetry: if jad_a.get('retry_attempt', 0) != jad_b.get('retry_attempt', 0): return jad_b.get('retry_attempt', 0) - jad_a.get('retry_attempt', 0) # equal retry_attempt, so sort on next sort criterion, # jobs for which the number of running jobs on that host is lower # (load balance the source hosts) nrOfRunningJobsOnSourceHostA = running_hosts.get(getSourceHost(jad_a), 0) nrOfRunningJobsOnSourceHostB = running_hosts.get(getSourceHost(jad_b), 0) if nrOfRunningJobsOnSourceHostA != nrOfRunningJobsOnSourceHostB: return nrOfRunningJobsOnSourceHostA - nrOfRunningJobsOnSourceHostB #equal number of jobs on source hosts, so sort on next sort criterion, the created_at timestamp (FIFO) if jad_a['created_at'] < jad_b['created_at']: return -1 if jad_a['created_at'] > jad_b['created_at']: return 1 #TODO: we can add a lot of sort criteria in the future. #For now, stick with FIFO and retry_attempt, after priority and source_host_load_balancing return 0 job_admin_dicts = self.getJobAdminDicts(status=status) # filter out priority 0 jobs (which are paused) job_admin_dicts = [jad for jad in job_admin_dicts if jad['job'].get('priority', 0) > 0] if min_age: now = datetime.utcnow() job_admin_dicts = [jad for jad in job_admin_dicts if now - jad['updated_at'] >= min_age] job_admin_dicts = sorted(job_admin_dicts, cmp=jad_compare_func) if job_admin_dicts: logger.info('%s jobs with status %s waiting', len(job_admin_dicts), jobState2String(status)) return job_admin_dicts[0] return None #get the next job to run, both for JobToDo and JobRetry next_to_do_jad = getNextJobByStatus(JobToDo) next_retry_jad = getNextJobByStatus(JobRetry, timedelta(minutes=15) if next_to_do_jad else None) #limit the number of running jobs per source host if not localhost if next_to_do_jad and getSourceHost(next_to_do_jad) != 'localhost': if running_hosts.get(getSourceHost(next_to_do_jad), 0) > 1: next_to_do_jad = None #limit the number of running jobs per source host if not localhost if next_retry_jad and getSourceHost(next_retry_jad) != 'localhost': if running_hosts.get(getSourceHost(next_retry_jad), 0) > 1: next_retry_jad = None if next_to_do_jad and next_retry_jad: # if next_retry_jad has higher priority then next_to_do_jad, then return next_retry_jad if next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY): return next_retry_jad # or if next_to_do_jad has higher priority then next_retry_jad, then return next_to_do_jad if next_to_do_jad['job'].get('priority', DEFAULT_JOB_PRIORITY) > next_retry_jad['job'].get('priority', DEFAULT_JOB_PRIORITY): return next_to_do_jad # or if next_retry_jad is already waiting for over an hour, then return next_retry_jad if datetime.utcnow() - next_retry_jad['updated_at'] > timedelta(minutes=60): return next_retry_jad # or if next_retry_jad is older than next_to_do_jad if next_retry_jad['updated_at'] > next_to_do_jad.get('updated_at', next_to_do_jad.get('created_at')): return next_retry_jad if next_to_do_jad: # just return the next_to_do_jad return next_to_do_jad # in all other cases, return next_retry_jad (which might be None) return next_retry_jad def canProduceNextJob(self): # test if the managed_job_queue is empty try: with self.__jobs_for_transfer_queue_peeker: num_scheduled = self.__jobs_for_transfer_queue_peeker.nr_of_messages_in_queue(0.01) if num_scheduled == 0: scheduled_jads = self.getJobAdminDicts(status=JobScheduled) return len(scheduled_jads) < 10 return False except Exception as e: logger.error('canProduceNextJob: %s', e) return True def produceNextJobsIfPossible(self): start_producing_timestamp = datetime.utcnow() while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5): job_admin_dict = self.getNextJobToRun() if job_admin_dict: if os.path.exists(job_admin_dict.get('path')): msg = CommandMessage(content=job_admin_dict.get('job_xml')) msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY) self.__jobs_for_transfer_queue.send(msg) logger.info('submitted job %s to queue %s at %s', job_admin_dict['job']['JobId'], self.__jobs_for_transfer_queue.address, self.__jobs_for_transfer_queue.broker) self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled) else: job_id = job_admin_dict['job']['JobId'] logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path')) del self.__job_admin_dicts[job_id] #do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job #so the queue is empty again #and we can submit yet another job time.sleep(0.1) else: return def onJobStarted(self, job_notification_dict): self.__notification_listener._logJobNotification('started ', job_notification_dict); self.updateJobStatus(job_notification_dict.get('job_id'), JobProducing, job_notification_dict.get('lta_site'), job_notification_dict.get('message')) def onJobFinished(self, job_notification_dict): self.__notification_listener._logJobNotification('finished', job_notification_dict); # file_type might have changed to unspec for example if 'file_type' in job_notification_dict: with self.__lock: job_admin_dict = self.__job_admin_dicts.get(job_id) if job_admin_dict: job_admin_dict['job']['file_type'] = job_notification_dict['file_type'] self.updateJobStatus(job_notification_dict.get('job_id'), JobProduced, job_notification_dict.get('lta_site'), job_notification_dict.get('message')) def onJobTransferFailed(self, job_notification_dict): self.__notification_listener._logJobNotification('transfer failed ', job_notification_dict); self.updateJobStatus(job_notification_dict.get('job_id'), JobTransferFailed, job_notification_dict.get('lta_site'), job_notification_dict.get('message')) def onJobProgress(self, job_notification_dict): self.__notification_listener._logJobNotification('progress', job_notification_dict, level=logging.DEBUG); #touch job #producing jobs which are untouched for 5min are put back to JobToDo self.updateJobStatus(job_notification_dict.get('job_id'), JobProducing, job_notification_dict.get('lta_site'), job_notification_dict.get('message')) @staticmethod def getSubDirs(dir_path): dir_lists = [[os.path.join(root,dir) for dir in dirs if root==dir_path] for root, dirs, files in os.walk(dir_path) if dirs] if dir_lists: return reduce(lambda x,y: x+y, dir_lists) return [] def getDoneJobAdminDicts(self, job_group_id=None): return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobFailed, JobProduced, JobRemoved]) def getNotDoneJobAdminDicts(self, job_group_id=None): return self.getJobAdminDicts(job_group_id=job_group_id, status=[JobToDo, JobScheduled, JobProducing, JobRetry]) def getJobAdminDicts(self, job_group_id=None, status=None): with self.__lock: jads = [jad for jad in self.__job_admin_dicts.values()] if job_group_id != None: job_group_id = str(job_group_id) jads = [jad for jad in jads if str(jad['job'].get('job_group_id')) == job_group_id] if status != None: if isinstance(status, int): jads = [jad for jad in jads if jad['status'] == status] else: statuses = set(status) jads = [jad for jad in jads if jad['status'] in statuses] return jads def getStatusReportDict(self): with self.__lock: export_ids = self.getExportIds() logger.info('getStatusReportDict export_ids: %s', export_ids) result = {} for export_id in export_ids: try: finished_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobProduced) failed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobFailed) removed_group_jads = self.getJobAdminDicts(job_group_id=export_id, status=JobRemoved) done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads done_group_jobs = [jad['job'] for jad in done_group_jads] current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=export_id) current_group_jobs = [jad['job'] for jad in current_group_jads] all_group_jads = current_group_jads + done_group_jads all_group_jobs = current_group_jobs + done_group_jobs priority = min([job.get('priority', DEFAULT_JOB_PRIORITY) for job in current_group_jobs]) if current_group_jobs else 4 submitters = list(set([job['Submitter'] for job in all_group_jobs if 'Submitter' in job])) projects = list(set([job['Project'] for job in all_group_jobs if 'Project' in job])) lta_sites = list(set([jad['lta_site'] for jad in all_group_jads if 'lta_site' in jad])) job_run_events = {} for jad in all_group_jads: for run in jad['runs'].values(): if 'started_at' in run: started_timestamp = run['started_at'] if started_timestamp not in job_run_events: job_run_events[started_timestamp] = 0 job_run_events[started_timestamp] += 1 if 'finished_at' in run: finished_timestamp = run['finished_at'] if finished_timestamp not in job_run_events: job_run_events[finished_timestamp] = 0 job_run_events[finished_timestamp] -= 1 all_run_timestamps = sorted(job_run_events.keys()) running_jobs_values = [] if all_run_timestamps: prev_value = 0 for t in all_run_timestamps: value = prev_value + job_run_events[t] running_jobs_values.append(value) prev_value = value job_finised_events = {} for jad in all_group_jads: if jad['status'] == JobProduced or jad['status'] == JobFailed: if jad['runs']: final_run = max(jad['runs'].keys()) run = jad['runs'][final_run] if 'started_at' in run and 'finished_at' in run: finished_timestamp = run['finished_at'] if finished_timestamp not in job_finised_events: job_finised_events[finished_timestamp] = 0 job_finised_events[finished_timestamp] += 1 finished_jobs_values = [] finished_timestamps = sorted(job_finised_events.keys()) for i, t in enumerate(finished_timestamps): finished_jobs_values.append(i + 1) result[export_id] = { 'priority' : priority, 'submitters' : submitters, 'projects' : projects, 'lta_sites' : lta_sites, 'series': { 'running_jobs': { 'timestamps': all_run_timestamps, 'values': running_jobs_values }, 'finished_jobs': { 'timestamps': finished_timestamps, 'values': finished_jobs_values } }, 'jobs': { 'running': len(self.getJobAdminDicts(job_group_id=export_id, status=JobProducing)), 'to_do': len(self.getJobAdminDicts(job_group_id=export_id, status=JobToDo)), 'scheduled': len(self.getJobAdminDicts(job_group_id=export_id, status=JobScheduled)), 'retry': len(self.getJobAdminDicts(job_group_id=export_id, status=JobRetry)), 'finished': len(finished_group_jads), 'failed': len(failed_group_jads) } } except Exception as e: logger.error(e) return convertIntKeysToString(result) def setExportJobPriority(self, export_id, priority): priority = max(0, min(9, int(priority))) with self.__lock: jads = self.getJobAdminDicts(job_group_id=export_id) logger.info('updating the priority of %s jobs of export %s to level %s', len(jads), export_id, priority) for jad in jads: try: #update local copy jad['job']['priority'] = priority #persist to disk updatePriorityInJobFile(jad['path'], priority) except Exception as e: logger.error(e) def getReport(self, job_group_id): with self.__lock: #still running/waiting jobs current_group_jads = self.getNotDoneJobAdminDicts(job_group_id=job_group_id) current_group_jobs = [jad['job'] for jad in current_group_jads] #done jobs finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced) finished_group_jobs = [jad['job'] for jad in finished_group_jads] failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed) failed_group_jobs = [jad['job'] for jad in failed_group_jads] removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved) removed_group_jobs = [jad['job'] for jad in removed_group_jads] done_group_jobs = finished_group_jobs + failed_group_jobs + removed_group_jobs all_group_jobs = current_group_jobs + done_group_jobs submitters = set([j['Submitter'] for j in all_group_jobs if 'Submitter' in j]) projects = set([j['Project'] for j in all_group_jobs if 'Project' in j]) report = '' header = """=== Report on ingest Export Job (%(id)s) === User(s): %(user)s Project: %(project)s""" % { 'id': job_group_id, 'user': ', '.join(submitters), 'project': ', '.join(projects)} report += header summary = """\n\n=== Summary === Total Files: %(total)i - Failed: %(failed)i - Success: %(done)i - Interferometer: %(corr)i - Beamformed: %(bf)i - SkyImages: %(img)i - Unspecified: %(unspec)i - Pulsar Pipeline: %(pulp)i""" % {'total': len(all_group_jobs), 'done': len(finished_group_jobs), 'corr': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_CORRELATED]), 'bf': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_BEAMFORMED]), 'img': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_IMAGE]), 'unspec': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_UNSPECIFIED]), 'pulp': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_PULP]), 'failed': len(failed_group_jobs)} # TODO: generate lta link #try: #import mechanize #import json #browser = mechanize.Browser() #browser.set_handle_robots(False) #browser.addheaders = [('User-agent', 'Firefox')] #obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in all_group_jobs))) #for obs_id in obs_ids: #response = browser.open('http://scu001.control.lofar:7412/rest/tasks/otdb/%s' % obs_id) #if response.code == 200: #task = json.loads(response.read()) #except Exception as e: #logger.error(e) if removed_group_jobs: summary += '''\n\nTotal Removed before transfer: %s''' % (len(removed_group_jobs),) report += summary def file_listing_per_obs(jads, full_listing=False, dp_status_remark=''): jobs = [jad['job'] for jad in jads] obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in jobs))) obs_jads_dict = {obs_id:[] for obs_id in obs_ids} for jad in jads: obs_jads_dict[jad['job'].get('ObservationId', -1)].append(jad) listing = '' for obs_id in obs_ids: obs_jads = obs_jads_dict[obs_id] listing += 'otdb_id: %s - #dataproducts: %s\n' % (obs_id, len(obs_jads)) if full_listing: for obs_id in obs_ids: obs_jads = obs_jads_dict[obs_id] obs_jads = sorted(obs_jads, key=lambda jad: jad['job'].get('DataProduct')) listing += '\notdb_id: %s - %s dataproducts listing\n' % (obs_id, dp_status_remark) for jad in obs_jads: listing += 'dataproduct: %s - archive_id: %s - location: %s' % (jad['job']['DataProduct'], jad['job']['ArchiveId'], jad['job'].get('Location')) if jad.get('last_message'): listing += ' - message: %s' % (jad['last_message']) listing += '\n' return listing if current_group_jads: report += "\n\n==== Scheduled/Running files: =====\n" report += file_listing_per_obs(current_group_jads, False, 'scheduled/running') if finished_group_jads: report += "\n\n==== Finished files: =====\n" report += file_listing_per_obs(finished_group_jads, False, 'finished') if failed_group_jads: report += "\n\n==== Failed files: =====\n" report += file_listing_per_obs(failed_group_jads, True, 'failed') if removed_group_jads: report += "\n\n==== Removed jobs before transfer: =====\n" report += file_listing_per_obs(removed_group_jads, False, 'removed') return report def sendJobGroupFinishedMail(self, job_group_id): report = self.getReport(job_group_id) logger.info(report) mailing_list = list(FINISHED_NOTIFICATION_MAILING_LIST) finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced) failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed) removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved) unfinished_group_jads = failed_group_jads + removed_group_jads done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads done_group_jobs = [jad['job'] for jad in done_group_jads] submitters = [j['Submitter'] for j in done_group_jobs if 'Submitter' in j] extra_mail_addresses = [j['email'] for j in done_group_jobs if 'email' in j] try: if len(unfinished_group_jads) == 0: # only for successful ingests, # try to get the PI's email address for this export's projects # and add these to the extra_mail_addresses done_group_mom_jobs = [job for job in done_group_jobs if job.get('type','').lower() == 'mom'] mom_export_ids = set([job['JobId'].split('_')[1] for job in done_group_mom_jobs if 'JobId' in job]) project_mom2ids = set([self.__momrpc.getObjectDetails(mom_export_id).get('project_mom2id') for mom_export_id in mom_export_ids]) project_mom2ids = [x for x in project_mom2ids if x is not None] for project_mom2id in project_mom2ids: project_details = self.__momrpc.get_project_details(project_mom2id) if project_details and 'pi_email' in project_details: extra_mail_addresses.append(project_details['pi_email']) except Exception as e: logger.error('error while trying to get PI\'s email address for %s: %s', job_group_id, e) #submitters might contain comma seperated strings #join all sumbitterstrings in one long csv string, split it, and get the unique submitters submitters = list(set([s.strip() for s in ','.join(submitters).split(',') if '@' in s])) #do the same for extra_mail_addresses extra_mail_addresses = list(set([s.strip() for s in ','.join(extra_mail_addresses).split(',') if '@' in s])) mailing_list += submitters + extra_mail_addresses if mailing_list: #make it a csv list of addresses mailing_list = ','.join(mailing_list) projects = set([j['Project'] for j in done_group_jobs if 'Project' in j]) subject = "Ingest Export job %s of project %s " % (job_group_id, '/'.join(projects)) if len(unfinished_group_jads) == 0: subject += 'finished successfully' elif removed_group_jads : if len(removed_group_jads) == len(done_group_jads): subject += 'was removed completely before transfer' else: subject += 'was removed partially before transfer' else: subject += 'finished with errors. %d/%d (%.1f%%) dataproducts did not transfer.' % (len(unfinished_group_jads), len(done_group_jads), 100.0*len(unfinished_group_jads)/len(done_group_jads)) if os.system('echo "%s"|mailx -s "%s" %s' % (report, subject, mailing_list)) == 0: logger.info('sent notification email for export job %s to %s', job_group_id, mailing_list) else: logger.error('failed sending a notification email for export job %s to %s', job_group_id, mailing_list) def main(): from lofar.messaging import setQpidLogLevel from optparse import OptionParser # make sure we run in UTC timezone os.environ['TZ'] = 'UTC' # Check the invocation arguments parser = OptionParser('%prog [options]', description='run the ingest job management server') parser.add_option('-j', '--jobs_dir', dest='jobs_dir', type='string', default=JOBS_DIR, help='directory path where the jobs located on disk, default: %default') parser.add_option('-r', '--max_num_retries', dest='max_num_retries', type='int', default=MAX_NR_OF_RETRIES, help='maximum number of retries for a failing job, default: %default') parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') parser.add_option("--ingest_busname", dest="ingest_busname", type="string", default=DEFAULT_INGEST_BUSNAME, help="Name of the bus on which the ingest service listens. [default: %default]") parser.add_option("--ingest_servicename", dest="ingest_servicename", type="string", default=DEFAULT_INGEST_SERVICENAME, help="Name of the ingest service. [default: %default]") parser.add_option('--ingest_notification_listen_queuename', dest='ingest_notification_listen_queuename', type='string', default=DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME, help='Name of the notification listener queue on the qpid broker on which the ingest notifications are published, default: %default') parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', default=DEFAULT_INGEST_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default') parser.add_option('--ingest_notification_prefix', dest='ingest_notification_prefix', type='string', default=DEFAULT_INGEST_NOTIFICATION_PREFIX, help='Prefix of subject(s) to listen for on the ingest_notification_listen_queuename on the qpid broker, default: %default') parser.add_option('--incoming_job_queue_name', dest='incoming_job_queue_name', type='string', default=DEFAULT_INGEST_JOBS_QUEUENAME, help='name of the incoming job queue (jobs comming from mom/useringest/etc), default: %default') parser.add_option('--jobs_for_transfer_queue_name', dest='jobs_for_transfer_queue_name', type='string', default=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, help='name of the managed job queue (which jobs are handled by the ingestservices), default: %default') parser.add_option('--mom_query_busname', dest='mom_query_busname', type='string', default=DEFAULT_MOMQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momqueryservice listens, default: %default') parser.add_option('--mom_query_servicename', dest='mom_query_servicename', type='string', default=DEFAULT_MOMQUERY_SERVICENAME, help='Name of the momqueryservice, default: %default') parser.add_option('--mom_query_service_broker', dest='mom_query_service_broker', type='string', default='scu001.control.lofar' if isProductionEnvironment() else 'scu199.control.lofar', help='Address of the qpid broker where the mom query service runs, default: %default') (options, args) = parser.parse_args() setQpidLogLevel(logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logger.info('*****************************************') logger.info('Started IngestJobManager') logger.info('*****************************************') manager = IngestJobManager(busname=options.ingest_busname, servicename=options.ingest_servicename, notification_busname=options.ingest_notification_busname, notification_listen_queue_name=options.ingest_notification_listen_queuename, notification_prefix=options.ingest_notification_prefix, incoming_job_queue_name=options.incoming_job_queue_name, jobs_for_transfer_queue_name=options.jobs_for_transfer_queue_name, jobs_dir=options.jobs_dir, max_num_retries=options.max_num_retries, mom_busname=options.mom_query_busname, mom_servicename=options.mom_query_servicename, broker=options.broker, mom_broker=options.mom_query_service_broker) manager.run() if __name__ == '__main__': main() __all__ = ["IngestJobManager"]