-
Jan David Mol authoredJan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
specification_service.py 8.96 KiB
#!/usr/bin/env python
# specification_service.py
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lxml import etree
from StringIO import StringIO
from lofar.specificationservices.validation_service_rpc import ValidationRPC
from lofar.specificationservices.translation_service_rpc import TranslationRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.messaging import Service, EventMessage, ToBus
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt
from config import MOMQUERY_BUSNAME, \
MOMQUERY_SERVICENAME, \
VALIDATION_BUSNAME, \
VALIDATION_SERVICENAME, \
SPECIFICATION_SERVICENAME, \
SPECIFICATION_BUSNAME, \
SPECIFICATIONTRANSLATION_SERVICENAME, \
SPECIFICATIONTRANSLATION_BUSNAME, \
MOMIMPORTXML_BUSNAME, \
MOMIMPORTXML_SUBJECT
momqueryrpc = MoMQueryRPC(MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME)
validationrpc = ValidationRPC(VALIDATION_BUSNAME, VALIDATION_SERVICENAME)
specificationtranslationrpc = TranslationRPC(SPECIFICATIONTRANSLATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME)
momimportxml_bus = ToBus(address=MOMIMPORTXML_BUSNAME, broker=None)
permitted_activities=["observation", "pipeline", "measurement"]
permitted_statuses=["opened", "approved"]
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def _parse_activity_paths(spec):
"""
Parses relations on a lofar spec elementtree and returns a dict to lookup the path to the containing folder by
a given activity identifier. Paths are slash separated with project identifier as root.
"""
containers = spec.findall('container')
project = spec.find('projectReference').find('identifier').find('identifier').text
paths = {}
folder_activity = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if x.find("type").text == "folder-activity"]
folder_folder = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if x.find("type").text == "folder-folder"]
def make_key(element):
source = element.find("source").text
identifier = element.find("identifier").text
return (source, identifier)
dirnames = { make_key(container.find("temporaryIdentifier")): container.find('folder').find('name').text for container in containers }
parents = { make_key(folder_id): make_key(parentfolder_id) for (parentfolder_id, folder_id) in folder_folder }
for (folder_id, activity_id) in folder_activity:
key = make_key(folder_id)
activikey = make_key(activity_id)
path = ""
while key not in parents:
if key not in dirnames:
raise Exception("Reference to missing container? (%s)" % (key,))
path = dirnames[key] + "/" + path
key = parents[key]
path = project + "/" + path
paths[activikey] = path
for key in paths.keys():
logger.debug("Activity path -> "+str(key)+" --> "+ paths.get(key))
return paths
def _check_specification(user, lofar_xml):
"""
Performs some checks to make sure the specification meets some criteria before we accespt it.
E.g. activities have to be in new folders
"""
doc = etree.parse(StringIO(lofar_xml.encode('utf-8')))
spec = doc.getroot()
if spec.tag != "{http://www.astron.nl/LofarSpecification}specification":
raise Exception("Unexpected root element: ", spec.tag)
activity_paths = _parse_activity_paths(spec)
for path in activity_paths.values():
if _folderExists(path):
raise Exception("Innermost folder already exists: "+path)
project = spec.find('projectReference').find('identifier').find('identifier').text
if not _isActive(project):
raise Exception("Project is not active: "+str(project))
activities = spec.findall('activity')
for activity in activities:
key = (activity.find("temporaryIdentifier").find("source").text , activity.find("temporaryIdentifier").find("identifier").text)
if not key in activity_paths.keys():
# allow measurements, which are activities, but not contained in folders by definition!
# todo: check, is this what we want? Or do we have to do attional checks, e.g. that the obs-measurement relation and the parent obs exists?
if not activity.find("measurement") is not None:
raise Exception("Specified action has to be in folder: "+str(key))
jobtype = None
for action in permitted_activities:
if activity.find(action) is not None:
jobtype = action
break
logger.warning("!!! %s not found..." % (action,))
if jobtype is None:
raise Exception("Specified activity is not permitted: " + str(key) + " -> "+ str(permitted_activities)+" not found in "+ str(activity.getchildren()))
status = activity.find("status")
if status is None:
raise Exception("Activity has no status: "+str(key) )
if status.text not in permitted_statuses:
raise Exception("Specified activity is not going to permitted status: " + str(key) + " -> '"+str(status.text) + "' not in "+str(permitted_statuses))
_authenticateAction(str(user), str(project), str(jobtype), str(status.text))
def _isActive(project):
logger.debug("Checking if project is active: "+ project)
response = momqueryrpc.isProjectActive(project) #todo mock this for testing
return response.lower() == 'true' #response.get("isActive")
def _folderExists(path):
logger.debug("Checking if path exists -> "+ path)
response = momqueryrpc.folderExists(path) # todo mock this for testing
return response.lower() == 'true' #response.get("folderExists")
def _authenticateAction(user, project, jobtype, state):
logger.debug("Authenticate action -> "+ user +', ' +project +', '+ jobtype +', '+ state)
response = momqueryrpc.authorized_add_with_status(user,project,jobtype,state) # todo mock this for testing
return response.lower() == 'true' #response.get("isAuthorized")
def _validate_lofarspec(lofar_xml):
response = validationrpc.validate_specification(lofar_xml)
if not response["valid"]:
raise Exception("Invalid specification: " + response["error"])
def _validate_momspec(mom_xml):
response = validationrpc.validate_mom_specification(mom_xml)
if not response["valid"]:
raise Exception("Invalid MoM specification: " + response["error"])
def _add_spec_to_mom(mom_xml):
msg = EventMessage(context=MOMIMPORTXML_SUBJECT, content=mom_xml)
momimportxml_bus.send(msg)
logger.debug("Send specs to MOM: " + mom_xml)
def _lofarxml_to_momxml(lofarxml):
logger.debug("Translating LOFAR spec to MoM spec")
response = specificationtranslationrpc.specification_to_momspecification(lofarxml)
return response.get("mom-specification")
class SpecificationHandler(MessageHandlerInterface):
def __init__(self, **kwargs):
super(SpecificationHandler, self).__init__(**kwargs)
momimportxml_bus.open()
self.service2MethodMap = {
'add_specification': self.add_specification,
'get_specification': self.get_specification,
}
def add_specification(self, user, lofar_xml):
logger.info("got specification from user " + str(user))
logger.debug("LOFAR-XML: " + lofar_xml)
_validate_lofarspec(lofar_xml)
logger.info("lofar xml validates!")
_check_specification(user, lofar_xml)
logger.info("lofar xml check successful!")
mom_xml = _lofarxml_to_momxml(lofar_xml)
_validate_momspec(mom_xml)
logger.info("mom xml validates!")
_add_spec_to_mom(mom_xml)
logger.info("Fired and forgot to mom.importXML")
def get_specification(self, user, id):
logger.info("getting spec of id "+str(id))
# todo: return real trigger info as xml
raise NotImplementedError
def create_service(servicename=SPECIFICATION_SERVICENAME, busname=SPECIFICATION_BUSNAME):
return Service(servicename,
SpecificationHandler,
busname=busname,
use_service_methods=True,
)
def main():
with create_service():
waitForInterrupt()