Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
specification_service.py 8.96 KiB
#!/usr/bin/env python

# specification_service.py
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.

from lxml import etree
from StringIO import StringIO
from lofar.specificationservices.validation_service_rpc import ValidationRPC
from lofar.specificationservices.translation_service_rpc import TranslationRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC

from lofar.messaging import Service, EventMessage, ToBus
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt

from config import MOMQUERY_BUSNAME, \
    MOMQUERY_SERVICENAME, \
    VALIDATION_BUSNAME, \
    VALIDATION_SERVICENAME, \
    SPECIFICATION_SERVICENAME, \
    SPECIFICATION_BUSNAME, \
    SPECIFICATIONTRANSLATION_SERVICENAME, \
    SPECIFICATIONTRANSLATION_BUSNAME, \
    MOMIMPORTXML_BUSNAME, \
    MOMIMPORTXML_SUBJECT

momqueryrpc = MoMQueryRPC(MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME)
validationrpc = ValidationRPC(VALIDATION_BUSNAME, VALIDATION_SERVICENAME)
specificationtranslationrpc = TranslationRPC(SPECIFICATIONTRANSLATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME)

momimportxml_bus = ToBus(address=MOMIMPORTXML_BUSNAME, broker=None)

permitted_activities=["observation", "pipeline", "measurement"]
permitted_statuses=["opened", "approved"]

import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def _parse_activity_paths(spec):
    """
    Parses relations on a lofar spec elementtree and returns a dict to lookup the path to the containing folder by
    a given activity identifier. Paths are slash separated with project identifier as root.
    """

    containers = spec.findall('container')
    project = spec.find('projectReference').find('identifier').find('identifier').text
    paths = {}

    folder_activity = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if x.find("type").text == "folder-activity"]
    folder_folder = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if x.find("type").text == "folder-folder"]

    def make_key(element):
        source     = element.find("source").text
        identifier = element.find("identifier").text
        return (source, identifier)

    dirnames = { make_key(container.find("temporaryIdentifier")): container.find('folder').find('name').text for container in containers }
    parents  = { make_key(folder_id): make_key(parentfolder_id) for (parentfolder_id, folder_id) in folder_folder }

    for (folder_id, activity_id) in folder_activity:
        key = make_key(folder_id)
        activikey = make_key(activity_id)
        path = ""
        while key not in parents:
            if key not in dirnames:
                raise Exception("Reference to missing container? (%s)" % (key,))
            path = dirnames[key] + "/" + path
            key = parents[key]
        path = project + "/" + path
        paths[activikey] = path

    for key in paths.keys():
        logger.debug("Activity path -> "+str(key)+" --> "+ paths.get(key))
    return paths


def _check_specification(user, lofar_xml):
    """
    Performs some checks to make sure the specification meets some criteria before we accespt it.
    E.g. activities have to be in new folders
    """

    doc = etree.parse(StringIO(lofar_xml.encode('utf-8')))
    spec = doc.getroot()

    if spec.tag != "{http://www.astron.nl/LofarSpecification}specification":
        raise Exception("Unexpected root element: ", spec.tag)

    activity_paths = _parse_activity_paths(spec)
    for path in activity_paths.values():
        if _folderExists(path):
            raise Exception("Innermost folder already exists: "+path)

    project = spec.find('projectReference').find('identifier').find('identifier').text
    if not _isActive(project):
        raise Exception("Project is not active: "+str(project))

    activities = spec.findall('activity')
    for activity in activities:
        key = (activity.find("temporaryIdentifier").find("source").text , activity.find("temporaryIdentifier").find("identifier").text)
        if not key in activity_paths.keys():
            # allow measurements, which are activities, but not contained in folders by definition!
            # todo: check, is this what we want? Or do we have to do attional checks, e.g. that the obs-measurement relation and the parent obs exists?
            if not activity.find("measurement") is not None:
                raise Exception("Specified action has to be in folder: "+str(key))
        jobtype = None
        for action in permitted_activities:
            if activity.find(action) is not None:
                jobtype = action
                break
            logger.warning("!!! %s not found..." % (action,))
        if jobtype is None:
            raise Exception("Specified activity is not permitted: " + str(key) + " -> "+ str(permitted_activities)+" not found in "+ str(activity.getchildren()))
        status = activity.find("status")
        if status is None:
            raise Exception("Activity has no status: "+str(key) )
        if status.text not in permitted_statuses:
            raise Exception("Specified activity is not going to permitted status: " + str(key) + " -> '"+str(status.text) + "' not in "+str(permitted_statuses))
        _authenticateAction(str(user), str(project), str(jobtype), str(status.text))

def _isActive(project):
    logger.debug("Checking if project is active: "+ project)
    response = momqueryrpc.isProjectActive(project) #todo mock this for testing
    return response.lower()  == 'true' #response.get("isActive")

def _folderExists(path):
    logger.debug("Checking if path exists -> "+ path)
    response = momqueryrpc.folderExists(path) # todo mock this for testing
    return response.lower()  == 'true' #response.get("folderExists")

def _authenticateAction(user, project, jobtype, state):
    logger.debug("Authenticate action -> "+ user +', ' +project +', '+ jobtype +', '+ state)
    response = momqueryrpc.authorized_add_with_status(user,project,jobtype,state) # todo mock this for testing
    return response.lower()  == 'true' #response.get("isAuthorized")

def _validate_lofarspec(lofar_xml):
    response = validationrpc.validate_specification(lofar_xml)
    if not response["valid"]:
        raise Exception("Invalid specification: " + response["error"])

def _validate_momspec(mom_xml):
    response = validationrpc.validate_mom_specification(mom_xml)
    if not response["valid"]:
        raise Exception("Invalid MoM specification: " + response["error"])

def _add_spec_to_mom(mom_xml):
    msg = EventMessage(context=MOMIMPORTXML_SUBJECT, content=mom_xml)
    momimportxml_bus.send(msg)

    logger.debug("Send specs to MOM: " + mom_xml)

def _lofarxml_to_momxml(lofarxml):
    logger.debug("Translating LOFAR spec to MoM spec")
    response =  specificationtranslationrpc.specification_to_momspecification(lofarxml)
    return response.get("mom-specification")

class SpecificationHandler(MessageHandlerInterface):

    def __init__(self, **kwargs):
        super(SpecificationHandler, self).__init__(**kwargs)

        momimportxml_bus.open()

        self.service2MethodMap = {
            'add_specification': self.add_specification,
            'get_specification': self.get_specification,
            }

    def add_specification(self, user, lofar_xml):
        logger.info("got specification from user " + str(user))
        logger.debug("LOFAR-XML: " + lofar_xml)

        _validate_lofarspec(lofar_xml)
        logger.info("lofar xml validates!")

        _check_specification(user, lofar_xml)
        logger.info("lofar xml check successful!")

        mom_xml = _lofarxml_to_momxml(lofar_xml)
        _validate_momspec(mom_xml)
        logger.info("mom xml validates!")

        _add_spec_to_mom(mom_xml)
        logger.info("Fired and forgot to mom.importXML")

    def get_specification(self, user, id):
        logger.info("getting spec of id "+str(id))
        # todo: return real trigger info as xml
        raise NotImplementedError


def create_service(servicename=SPECIFICATION_SERVICENAME, busname=SPECIFICATION_BUSNAME):
    return Service(servicename,
                       SpecificationHandler,
                       busname=busname,
                       use_service_methods=True,
    )


def main():
    with create_service():
       waitForInterrupt()