Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
TriggerEmailService.py 11.03 KiB
#!/usr/bin/env python
#
# Copyright (C) 2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import os
import smtplib
import logging

from lofar.sas.TriggerEmailService.Templates import ABORTED_TEMPLATE_BODY, ABORTED_TEMPLATE_SUBJECT
from lofar.sas.TriggerEmailService.Templates import ACCEPTED_TEMPLATE_BODY, ACCEPTED_TEMPLATE_SUBJECT
from lofar.sas.TriggerEmailService.Templates import FINISHED_TEMPLATE_BODY, FINISHED_TEMPLATE_SUBJECT
from lofar.sas.TriggerEmailService.Templates import REJECTED_TEMPLATE_BODY, REJECTED_TEMPLATE_SUBJECT
from lofar.sas.TriggerEmailService.Templates import RECEIVED_TEMPLATE_BODY, RECEIVED_TEMPLATE_SUBJECT

from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

from datetime import timedelta, datetime
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.common.util import waitForInterrupt
from lofar.messaging.messagebus import AbstractBusListener
from lofar.sas.TriggerEmailService.common.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME
from lofar.sas.TriggerEmailService.common.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.TriggerEmailService.common.config import DEFAULT_TRIGGER_NOTIFICATION_BUSNAME
from lofar.sas.TriggerEmailService.common.config import DEFAULT_TRIGGER_NOTIFICATION_SUBJECT
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lxml import etree
from StringIO import StringIO
from re import findall
import socket

logger = logging.getLogger(__name__)


def email(recipients, subject, body, attachment, attachment_name):
    if "LOFARENV" in os.environ:
        lofar_environment = os.environ['LOFARENV']

        if lofar_environment == "PRODUCTION":
            recipients.append("sos@astron.nl")
            recipients.append("operator@astron.nl")

    hostname = socket.gethostname()
    sender = "lofarsys@" + hostname
    commaspace = ', '

    msg = MIMEMultipart()
    msg.attach(MIMEText(body, 'plain'))
    msg["Subject"] = subject
    msg["From"] = "LOFAR Science Operations & Support <sos@astron.nl>"
    msg["To"] = commaspace.join(recipients)

    if attachment:
        txt = MIMEText(attachment)
        txt.add_header('Content-Disposition', "attachment; filename= %s" % attachment_name)
        msg.attach(txt)

    s = smtplib.SMTP('localhost')
    s.sendmail(sender, recipients, msg.as_string())
    s.quit()


class OTDBTriggerListener(OTDBBusListener):
    def __init__(self, momquery_rpc=MoMQueryRPC(), busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
                 subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, broker=None, **kwargs):
        super(OTDBTriggerListener, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs)

        self.mom_rpc_client = momquery_rpc

    def start_listening(self, **kwargs):
        self.mom_rpc_client.open()
        super(OTDBTriggerListener, self).start_listening(**kwargs)

    def stop_listening(self):
        self.mom_rpc_client.close()
        super(OTDBTriggerListener, self).stop_listening()

    def onObservationAborted(self, otdb_id, _):
        self.when_trigger_send_email(otdb_id, ABORTED_TEMPLATE_SUBJECT, ABORTED_TEMPLATE_BODY)

    def onObservationScheduled(self, otdb_id, _):
        self.when_trigger_send_email(otdb_id, ACCEPTED_TEMPLATE_SUBJECT, ACCEPTED_TEMPLATE_BODY)

    def onObservationFinished(self, otdb_id, _):
        self.when_trigger_send_email(otdb_id, FINISHED_TEMPLATE_SUBJECT, FINISHED_TEMPLATE_BODY)

    def onObservationConflict(self, otdb_id, _):
        self.when_trigger_send_email(otdb_id, REJECTED_TEMPLATE_SUBJECT, REJECTED_TEMPLATE_BODY)

    def onObservationError(self, otdb_id, _):
        self.when_trigger_send_email(otdb_id, REJECTED_TEMPLATE_SUBJECT, REJECTED_TEMPLATE_BODY)

    def when_trigger_send_email(self, otdb_id, template_subject, template_body):
        mom_id = None

        while not mom_id:  # we are sometimes to fast for MOM
            mom_id = self.mom_rpc_client.getMoMIdsForOTDBIds(otdb_id)[otdb_id]

        trigger_id = self.mom_rpc_client.get_trigger_id(mom_id)['trigger_id']

        if trigger_id:
            logger.info("Emailing otdb_id: %s, mom_id: %s, trigger_id: %s, template_subject: %s, template_body: %s",
                        otdb_id, mom_id, trigger_id, template_subject, template_body)
            self._send_email(otdb_id, mom_id, trigger_id, template_subject, template_body)

    def _send_email(self, otdb_id, mom_id, trigger_id, template_subject, template_body):
        subject, body = self._fill_template(otdb_id, mom_id, trigger_id, template_subject, template_body)
        recipients = self._get_recipients(mom_id)

        email(recipients, subject, body, None, "")

    def _fill_template(self, otdb_id, mom_id, trigger_id, template_subject, template_body):
        project = self.mom_rpc_client.getObjectDetails(mom_id)[mom_id]

        data = {
            "PROJECTNAME": project["project_name"], "TRIGGERID": trigger_id, "OBSSASID": otdb_id, "OBSMOMID": mom_id,
            "MOMLINK": "https://lofar.astron.nl/mom3/user/main/list/setUpProjectList.do"
        }

        subject = template_subject % data
        body = template_body % data

        return subject, body

    def _get_recipients(self, mom_id):
        recipients = []

        emails = self.mom_rpc_client.get_project_details(mom_id)
        for k, v in emails.items():
            recipients.append(v)

        return recipients


class TriggerNotificationListener(AbstractBusListener):
    def __init__(self, momquery_rpc=MoMQueryRPC(), busname=DEFAULT_TRIGGER_NOTIFICATION_BUSNAME,
                 subject=DEFAULT_TRIGGER_NOTIFICATION_SUBJECT, broker=None, **kwargs):
        """
        TriggerNotificationListener listens on the lofar trigger message bus and emails when trigger gets submitted.
        :param address: valid Qpid address (default: lofar.otdb.status)
        :param broker: valid Qpid broker host (default: None, which means localhost)
        additional parameters in kwargs:
            options=   <dict>  Dictionary of options passed to QPID
            exclusive= <bool>  Create an exclusive binding so no other services can consume duplicate
                               messages (default: False)
            numthreads= <int>  Number of parallel threads processing messages (default: 1)
            verbose=   <bool>  Output extra logging over stdout (default: False)
        """
        address = "%s/%s" % (busname, subject)
        super(TriggerNotificationListener, self).__init__(address, broker, **kwargs)

        self.mom_rpc_client = momquery_rpc

    def _handleMessage(self, msg):
        trigger_id = msg.content['trigger_id']
        project_name = msg.content['project']
        trigger_xml = msg.content['metadata']
        start_time, stop_time = self._get_observation_start_stop_times(trigger_xml)

        mom_id = self._get_mom_id(project_name)

        if mom_id:
            subject, body = self._fill_template(trigger_id, project_name, start_time, stop_time,
                                                RECEIVED_TEMPLATE_SUBJECT, RECEIVED_TEMPLATE_BODY)
            recipients = self._get_recipients(mom_id)

            email(recipients, subject, body, trigger_xml, "trigger.xml")
        else:
            logger.error("Trigger got entered for a non existing project: %s", project_name)

    def _get_mom_id(self, project_name):
        # todo add function to momqueryserivce for it (get mom2id for project name)
        mom_id = None

        projects = self.mom_rpc_client.getProjects()
        for project in projects:
            if project["name"] == project_name:
                mom_id = project["mom2id"]

        return mom_id

    def _get_recipients(self, mom_id):

        recipients = []

        emails = self.mom_rpc_client.get_project_details(mom_id)

        for k, v in emails.items():
            recipients.append(v)

        return recipients

    def _get_observation_start_stop_times(self, trigger_xml):
        # for now we work with duration to get stop time
        doc = etree.parse(StringIO(trigger_xml.encode('UTF-8')))

        start_times = doc.getroot().findall('specification/activity/observation/timeWindowSpecification/startTime')

        start_time = datetime.strptime(start_times[0].text, '%Y-%m-%dT%H:%M:%S')

        durations = doc.getroot().findall(
            'specification/activity/observation/timeWindowSpecification/duration/duration')

        duration = durations[0].text

        duration_seconds = self._iso8601_duration_as_seconds(duration)

        stop_time = start_time + timedelta(seconds=duration_seconds)

        return start_time, stop_time

    def start_listening(self, **kwargs):
        self.mom_rpc_client.open()
        super(TriggerNotificationListener, self).start_listening(**kwargs)

    def stop_listening(self):
        self.mom_rpc_client.close()
        super(TriggerNotificationListener, self).stop_listening()

    def _fill_template(self, trigger_id, project_name, start_time, stop_time, template_subject, template_body):
        data = {
            "PROJECTNAME": project_name, "TRIGGERID": trigger_id, "STARTTIME": start_time, "ENDTIME": stop_time
        }

        subject = template_subject % data
        body = template_body % data

        return subject, body

    def _iso8601_duration_as_seconds(self, duration):
        if duration[0] != 'P':
            raise ValueError('Not an ISO 8601 Duration string')
        seconds = 0
        for i, item in enumerate(duration.split('T')):
            for number, unit in findall('(?P<number>\d+)(?P<period>S|M|H|D|W|Y)', item):
                number = int(number)
                this = 0
                if unit == 'Y':
                    this = number * 31557600  # 365.25
                elif unit == 'W':
                    this = number * 604800
                elif unit == 'D':
                    this = number * 86400
                elif unit == 'H':
                    this = number * 3600
                elif unit == 'M':
                    # ambiguity ellivated with index i
                    if i == 0:
                        this = number * 2678400  # assume 30 days
                    else:
                        this = number * 60
                elif unit == 'S':
                    this = number
                seconds += this
        return seconds


def main():
    with OTDBTriggerListener():
        with TriggerNotificationListener():
            waitForInterrupt()


if __name__ == '__main__':
    main()