Newer
Older

Jorrit Schaap
committed
'done': len(finished_group_jobs),
'corr': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_CORRELATED]),
'bf': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_BEAMFORMED]),
'img': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_IMAGE]),
'unspec': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_UNSPECIFIED]),
'pulp': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_PULP]),
'failed': len(failed_group_jobs)}
# TODO: generate lta link

Jorrit Schaap
committed
# try:
# import mechanize
# import json
# browser = mechanize.Browser()
# browser.set_handle_robots(False)
# browser.addheaders = [('User-agent', 'Firefox')]

Jorrit Schaap
committed

Jorrit Schaap
committed
# obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in all_group_jobs)))

Jorrit Schaap
committed

Jorrit Schaap
committed
# for obs_id in obs_ids:
# response = browser.open('http://scu001.control.lofar:7412/rest/tasks/otdb/%s' % obs_id)

Jorrit Schaap
committed

Jorrit Schaap
committed
# if response.code == 200:
# task = json.loads(response.read())

Jorrit Schaap
committed

Jorrit Schaap
committed
# except Exception as e:
# logger.error(e)

Jorrit Schaap
committed
if removed_group_jobs:
summary += '''\n\nTotal Removed before transfer: %s''' % (len(removed_group_jobs),)
report += summary
def file_listing_per_obs(jads, full_listing=False, dp_status_remark=''):
jobs = [jad['job'] for jad in jads]
obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in jobs)))

Jorrit Schaap
committed
obs_jads_dict = {obs_id: [] for obs_id in obs_ids}

Jorrit Schaap
committed
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
for jad in jads:
obs_jads_dict[jad['job'].get('ObservationId', -1)].append(jad)
listing = ''
for obs_id in obs_ids:
obs_jads = obs_jads_dict[obs_id]
listing += 'otdb_id: %s - #dataproducts: %s\n' % (obs_id, len(obs_jads))
if full_listing:
for obs_id in obs_ids:
obs_jads = obs_jads_dict[obs_id]
obs_jads = sorted(obs_jads, key=lambda jad: jad['job'].get('DataProduct'))
listing += '\notdb_id: %s - %s dataproducts listing\n' % (obs_id, dp_status_remark)
for jad in obs_jads:
listing += 'dataproduct: %s - archive_id: %s - location: %s' % (jad['job']['DataProduct'],
jad['job']['ArchiveId'],
jad['job'].get('Location'))
if jad.get('last_message'):
listing += ' - message: %s' % (jad['last_message'])
listing += '\n'
return listing
if current_group_jads:
report += "\n\n==== Scheduled/Running files: =====\n"
report += file_listing_per_obs(current_group_jads, False, 'scheduled/running')
if finished_group_jads:
report += "\n\n==== Finished files: =====\n"
report += file_listing_per_obs(finished_group_jads, False, 'finished')
if failed_group_jads:
report += "\n\n==== Failed files: =====\n"
report += file_listing_per_obs(failed_group_jads, True, 'failed')
if removed_group_jads:
report += "\n\n==== Removed jobs before transfer: =====\n"
report += file_listing_per_obs(removed_group_jads, False, 'removed')
return report
def sendJobGroupFinishedMail(self, job_group_id):
report = self.getReport(job_group_id)

Jorrit Schaap
committed
#replace forbidden quotes which mess up the cmdline mail call

Jorrit Schaap
committed
report = report.replace('\'', '').replace('\"', '').replace('<', '').replace('>', '')

Jorrit Schaap
committed
logger.info(report)
mailing_list = list(FINISHED_NOTIFICATION_MAILING_LIST)
finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)
unfinished_group_jads = failed_group_jads + removed_group_jads

Jorrit Schaap
committed
done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads
done_group_jobs = [jad['job'] for jad in done_group_jads]
submitters = [j['Submitter'] for j in done_group_jobs if 'Submitter' in j]
extra_mail_addresses = [j['email'] for j in done_group_jobs if 'email' in j]

Jorrit Schaap
committed
try:
if len(unfinished_group_jads) == 0:
# only for successful ingests,
# try to get the PI's email address for this export's projects
# and add these to the extra_mail_addresses
done_group_mom_jobs = [job for job in done_group_jobs if job.get('Type', '').lower() == 'mom']
mom_export_ids = set([int(job['JobId'].split('_')[1]) for job in done_group_mom_jobs if 'JobId' in job])
with MoMQueryRPC.create(exchange=self._tobus.exchange, broker=self._tobus.broker) as momrpc:
mom_objects_details = momrpc.getObjectDetails(mom_export_ids)
project_mom2ids = set(obj_details.get('project_mom2id') for obj_details in mom_objects_details.values())
project_mom2ids = [x for x in project_mom2ids if x is not None]
for project_mom2id in project_mom2ids:
project_details = momrpc.get_project_details(project_mom2id)
if project_details and 'pi_email' in project_details:
extra_mail_addresses.append(project_details['pi_email'])
if project_details and 'author_email' in project_details:
extra_mail_addresses.append(project_details['author_email'])

Jorrit Schaap
committed

Jorrit Schaap
committed
if not extra_mail_addresses:
report += '\n\nCould not find any PI\'s/Contact-author\'s email address in MoM to sent this email to.'

Jorrit Schaap
committed

Jorrit Schaap
committed
except Exception as e:

Jorrit Schaap
committed
msg = 'error while trying to get PI\'s/Contact-author\'s email address for %s: %s' % (job_group_id, e)
logger.error(msg)
report += '\n\n' + msg

Jorrit Schaap
committed

Jorrit Schaap
committed
# submitters might contain comma seperated strings
# join all sumbitterstrings in one long csv string, split it, and get the unique submitters
submitters = list(set([s.strip() for s in ','.join(submitters).split(',') if '@' in s]))

Jorrit Schaap
committed

Jorrit Schaap
committed
# do the same for extra_mail_addresses
extra_mail_addresses = list(set([s.strip() for s in ','.join(extra_mail_addresses).split(',') if '@' in s]))

Jorrit Schaap
committed
mailing_list += submitters + extra_mail_addresses
if mailing_list:

Jorrit Schaap
committed
# make it a csv list of addresses

Jorrit Schaap
committed
mailing_list_csv = ','.join(mailing_list)
bcc_mailing_list_csv = ','.join(FINISHED_NOTIFICATION_BCC_MAILING_LIST)
projects = set([j['Project'] for j in done_group_jobs if 'Project' in j])

Jorrit Schaap
committed
subject = "Ingest Export job %s of project %s " % (job_group_id, '/'.join(projects))
if len(unfinished_group_jads) == 0:
subject += 'finished successfully'

Jorrit Schaap
committed
elif removed_group_jads:
if len(removed_group_jads) == len(done_group_jads):
subject += 'was removed completely before transfer'
else:
subject += 'was removed partially before transfer'
else:
subject += 'finished with errors. %d/%d (%.1f%%) dataproducts did not transfer.' % (len(unfinished_group_jads),

Jorrit Schaap
committed
len(done_group_jads),
100.0 * len(unfinished_group_jads) / len(done_group_jads))

Jorrit Schaap
committed

Jorrit Schaap
committed
if os.system('echo "%s"|mailx -s "%s" %s %s' % (report,
subject,
'-b ' + bcc_mailing_list_csv if bcc_mailing_list_csv else '',
mailing_list_csv)) == 0:

Jorrit Schaap
committed
logger.info('sent notification email for export job %s to %s', job_group_id, mailing_list)
else:
logger.error('failed sending a notification email for export job %s to %s', job_group_id, mailing_list)

Jorrit Schaap
committed
else:
logger.warning('no email recipients for sending notification email for export job %s', job_group_id)

Jorrit Schaap
committed

Jorrit Schaap
committed
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
class IngestServiceMessageHandler(ServiceMessageHandler):
def __init__(self, job_manager: IngestJobManager):
super(IngestServiceMessageHandler, self).__init__()
# register some of the job_manager's methods for the service
self.register_service_method('RemoveExportJob', job_manager.removeExportJob)
self.register_service_method('SetExportJobPriority', job_manager.setExportJobPriority)
self.register_service_method('GetStatusReport', job_manager.getStatusReportDict)
self.register_service_method('GetReport', job_manager.getReport)
self.register_service_method('GetExportIds', job_manager.getExportIds)
class IngestIncomingJobsHandler(AbstractMessageHandler):
def __init__(self, job_manager: IngestJobManager):
self._job_manager = job_manager
super(IngestIncomingJobsHandler, self).__init__()
def handle_message(self, msg: LofarMessage):
if not isinstance(msg, CommandMessage):
raise ValueError("%s: Ignoring non-CommandMessage: %s" % (self.__class__.__name__, msg))
job = parseJobXml(msg.content)
if job and job.get('JobId'):
if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY):
job['priority'] = msg.priority
logger.info("received job from bus: %s", job)
job_admin_dict = {'job': job, 'job_xml': msg.content}
self._job_manager.addNewJob(job_admin_dict, check_non_todo_dirs=True, add_old_jobs_from_disk=True)
class IngestEventMessageHandlerForJobManager(IngestEventMessageHandler):
def __init__(self, job_manager: IngestJobManager):
self.onJobStarted = job_manager.onJobStarted
self.onJobProgress = job_manager.onJobProgress
self.onJobTransferFailed = job_manager.onJobTransferFailed
self.onJobFinished = job_manager.onJobFinished
super().__init__(['JobStarted', 'JobFinished', 'JobTransferFailed', 'TaskProgress', 'TaskFinished'])

Jorrit Schaap
committed
def main():
from optparse import OptionParser
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the ingest job management server')
parser.add_option('-j', '--jobs_dir', dest='jobs_dir', type='string', default=JOBS_DIR, help='directory path where the jobs located on disk, default: %default')
parser.add_option('-r', '--max_num_retries', dest='max_num_retries', type='int', default=MAX_NR_OF_RETRIES, help='maximum number of retries for a failing job, default: %default')
parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default')
parser.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
help="Name of the bus on the services listen. [default: %default]")

Jorrit Schaap
committed
(options, args) = parser.parse_args()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.INFO)
logger.info('*****************************************')
logger.info('Started IngestJobManager')
logger.info('*****************************************')
manager = IngestJobManager(exchange=options.exchange,

Jorrit Schaap
committed
jobs_dir=options.jobs_dir,
max_num_retries=options.max_num_retries,
broker=options.broker)
manager.run()

Jorrit Schaap
committed

Jorrit Schaap
committed
if __name__ == '__main__':
main()
__all__ = ["IngestJobManager", "main"]