Skip to content
Snippets Groups Projects
ingestjobmanagementserver.py 63.2 KiB
Newer Older
                                      'done': len(finished_group_jobs),
                                      'corr': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_CORRELATED]),
                                      'bf': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_BEAMFORMED]),
                                      'img': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_IMAGE]),
                                      'unspec': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_UNSPECIFIED]),
                                      'pulp': len([j for j in finished_group_jobs if j.get('file_type',-1) == FILE_TYPE_PULP]),
                                      'failed': len(failed_group_jobs)}

            # TODO: generate lta link
            # try:
            # import mechanize
            # import json
            # browser = mechanize.Browser()
            # browser.set_handle_robots(False)
            # browser.addheaders = [('User-agent', 'Firefox')]
            # obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in all_group_jobs)))
            # for obs_id in obs_ids:
            # response = browser.open('http://scu001.control.lofar:7412/rest/tasks/otdb/%s' % obs_id)
            # if response.code == 200:
            # task = json.loads(response.read())

            if removed_group_jobs:
                summary += '''\n\nTotal Removed before transfer: %s''' % (len(removed_group_jobs),)

            report += summary

            def file_listing_per_obs(jads, full_listing=False, dp_status_remark=''):
                jobs = [jad['job'] for jad in jads]
                obs_ids = sorted(list(set(job.get('ObservationId', -1) for job in jobs)))
                obs_jads_dict = {obs_id: [] for obs_id in obs_ids}

                for jad in jads:
                    obs_jads_dict[jad['job'].get('ObservationId', -1)].append(jad)

                listing = ''
                for obs_id in obs_ids:
                    obs_jads = obs_jads_dict[obs_id]
                    listing += 'otdb_id: %s - #dataproducts: %s\n' % (obs_id, len(obs_jads))

                if full_listing:
                    for obs_id in obs_ids:
                        obs_jads = obs_jads_dict[obs_id]
                        obs_jads = sorted(obs_jads, key=lambda jad: jad['job'].get('DataProduct'))
                        listing += '\notdb_id: %s - %s dataproducts listing\n' % (obs_id, dp_status_remark)

                        for jad in obs_jads:
                            listing += 'dataproduct: %s - archive_id: %s - location: %s' % (jad['job']['DataProduct'],
                                                                                            jad['job']['ArchiveId'],
                                                                                            jad['job'].get('Location'))

                            if jad.get('last_message'):
                                listing += ' - message: %s' % (jad['last_message'])
                            listing += '\n'

                return listing

            if current_group_jads:
                report += "\n\n==== Scheduled/Running files: =====\n"
                report += file_listing_per_obs(current_group_jads, False, 'scheduled/running')

            if finished_group_jads:
                report += "\n\n==== Finished files: =====\n"
                report += file_listing_per_obs(finished_group_jads, False, 'finished')

            if failed_group_jads:
                report += "\n\n==== Failed files: =====\n"
                report += file_listing_per_obs(failed_group_jads, True, 'failed')

            if removed_group_jads:
                report += "\n\n==== Removed jobs before transfer: =====\n"
                report += file_listing_per_obs(removed_group_jads, False, 'removed')

            return report

    def sendJobGroupFinishedMail(self, job_group_id):
        report = self.getReport(job_group_id)
        #replace forbidden quotes which mess up the cmdline mail call
        report = report.replace('\'', '').replace('\"', '').replace('<', '').replace('>', '')
        logger.info(report)

        mailing_list = list(FINISHED_NOTIFICATION_MAILING_LIST)

        finished_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobProduced)
        failed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobFailed)
        removed_group_jads = self.getJobAdminDicts(job_group_id=job_group_id, status=JobRemoved)
        unfinished_group_jads = failed_group_jads + removed_group_jads

        done_group_jads = finished_group_jads + failed_group_jads + removed_group_jads
        done_group_jobs = [jad['job'] for jad in done_group_jads]
        submitters = [j['Submitter'] for j in done_group_jobs if 'Submitter' in j]
        extra_mail_addresses = [j['email'] for j in done_group_jobs if 'email' in j]

        try:
            if len(unfinished_group_jads) == 0:
                # only for successful ingests,
                # try to get the PI's email address for this export's projects
                # and add these to the extra_mail_addresses
                done_group_mom_jobs = [job for job in done_group_jobs if job.get('Type', '').lower() == 'mom']
                mom_export_ids = set([int(job['JobId'].split('_')[1]) for job in done_group_mom_jobs if 'JobId' in job])
                with MoMQueryRPC.create(exchange=self._tobus.exchange, broker=self._tobus.broker) as momrpc:
                    mom_objects_details = momrpc.getObjectDetails(mom_export_ids)
Jorrit Schaap's avatar
Jorrit Schaap committed
                    project_mom2ids = set(obj_details.get('project_mom2id') for obj_details in mom_objects_details.values())
                    project_mom2ids = [x for x in project_mom2ids if x is not None]

                    for project_mom2id in project_mom2ids:
                        project_details = momrpc.get_project_details(project_mom2id)
                        if project_details and 'pi_email' in project_details:
                            extra_mail_addresses.append(project_details['pi_email'])
                        if project_details and 'author_email' in project_details:
                            extra_mail_addresses.append(project_details['author_email'])
                if not extra_mail_addresses:
                    report += '\n\nCould not find any PI\'s/Contact-author\'s email address in MoM to sent this email to.'
            msg = 'error while trying to get PI\'s/Contact-author\'s email address for %s: %s' % (job_group_id, e)
            logger.error(msg)
            report += '\n\n' + msg
        # submitters might contain comma seperated strings
        # join all sumbitterstrings in one long csv string, split it, and get the unique submitters
        submitters = list(set([s.strip() for s in ','.join(submitters).split(',') if '@' in s]))
        # do the same for extra_mail_addresses
        extra_mail_addresses = list(set([s.strip() for s in ','.join(extra_mail_addresses).split(',') if '@' in s]))

        mailing_list += submitters + extra_mail_addresses
        if mailing_list:
            mailing_list_csv = ','.join(mailing_list)
            bcc_mailing_list_csv = ','.join(FINISHED_NOTIFICATION_BCC_MAILING_LIST)
            projects = set([j['Project'] for j in done_group_jobs if 'Project' in j])
            subject = "Ingest Export job %s of project %s " % (job_group_id, '/'.join(projects))
            if len(unfinished_group_jads) == 0:
                if len(removed_group_jads) == len(done_group_jads):
                    subject += 'was removed completely before transfer'
                else:
                    subject += 'was removed partially before transfer'
            else:
                subject += 'finished with errors. %d/%d (%.1f%%) dataproducts did not transfer.' % (len(unfinished_group_jads),
                len(done_group_jads),
                100.0 * len(unfinished_group_jads) / len(done_group_jads))
            if os.system('echo "%s"|mailx -s "%s" %s %s' % (report,
                                                            subject,
                                                            '-b ' + bcc_mailing_list_csv if bcc_mailing_list_csv else '',
                                                            mailing_list_csv)) == 0:
                logger.info('sent notification email for export job %s to %s', job_group_id, mailing_list)
            else:
                logger.error('failed sending a notification email for export job %s to %s', job_group_id, mailing_list)
        else:
            logger.warning('no email recipients for sending notification email for export job %s', job_group_id)
class IngestServiceMessageHandler(ServiceMessageHandler):
    def __init__(self, job_manager: IngestJobManager):
        super(IngestServiceMessageHandler, self).__init__()
        # register some of the job_manager's methods for the service
        self.register_service_method('RemoveExportJob', job_manager.removeExportJob)
        self.register_service_method('SetExportJobPriority', job_manager.setExportJobPriority)
        self.register_service_method('GetStatusReport', job_manager.getStatusReportDict)
        self.register_service_method('GetReport', job_manager.getReport)
        self.register_service_method('GetExportIds', job_manager.getExportIds)


class IngestIncomingJobsHandler(AbstractMessageHandler):
    def __init__(self, job_manager: IngestJobManager):
        self._job_manager = job_manager
        super(IngestIncomingJobsHandler, self).__init__()

    def handle_message(self, msg: LofarMessage):
        if not isinstance(msg, CommandMessage):
            raise ValueError("%s: Ignoring non-CommandMessage: %s" % (self.__class__.__name__, msg))

        job = parseJobXml(msg.content)
        if job and job.get('JobId'):
            if msg.priority != None and msg.priority > job.get('priority', DEFAULT_JOB_PRIORITY):
                job['priority'] = msg.priority

            logger.info("received job from bus: %s", job)
            job_admin_dict = {'job': job, 'job_xml': msg.content}
Jorrit Schaap's avatar
Jorrit Schaap committed
            self._job_manager.addNewJob(job_admin_dict, check_non_todo_dirs=True, add_old_jobs_from_disk=True)

class IngestEventMessageHandlerForJobManager(IngestEventMessageHandler):
    def __init__(self, job_manager: IngestJobManager):
        self.onJobStarted = job_manager.onJobStarted
        self.onJobProgress = job_manager.onJobProgress
        self.onJobTransferFailed = job_manager.onJobTransferFailed
        self.onJobFinished = job_manager.onJobFinished
        super().__init__(['JobStarted', 'JobFinished', 'JobTransferFailed', 'TaskProgress', 'TaskFinished'])
def main():
    from optparse import OptionParser

    # make sure we run in UTC timezone
    os.environ['TZ'] = 'UTC'

    # Check the invocation arguments
    parser = OptionParser('%prog [options]',
                          description='run the ingest job management server')
    parser.add_option('-j', '--jobs_dir', dest='jobs_dir', type='string', default=JOBS_DIR, help='directory path where the jobs located on disk, default: %default')
    parser.add_option('-r', '--max_num_retries', dest='max_num_retries', type='int', default=MAX_NR_OF_RETRIES, help='maximum number of retries for a failing job, default: %default')
Jorrit Schaap's avatar
Jorrit Schaap committed
    parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default')
    parser.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
                      help="Name of the bus on the services listen. [default: %default]")
    (options, args) = parser.parse_args()

    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        level=logging.INFO)

    logger.info('*****************************************')
    logger.info('Started IngestJobManager')
    logger.info('*****************************************')

Jorrit Schaap's avatar
Jorrit Schaap committed
    manager = IngestJobManager(exchange=options.exchange,
                               jobs_dir=options.jobs_dir,
                               max_num_retries=options.max_num_retries,
__all__ = ["IngestJobManager", "main"]