Skip to content
Snippets Groups Projects
Commit 90b7a769 authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

Task #9895 - notifications, some stub for the spec checks, ...

parent 5b7cf041
No related branches found
No related tags found
No related merge requests found
Showing
with 177 additions and 116 deletions
......@@ -5798,7 +5798,6 @@ SAS/Specification_Services/lib/translation_service_rpc.py -text
SAS/Specification_Services/lib/validation_service.py -text
SAS/Specification_Services/lib/validation_service_rpc.py -text
SAS/Specification_Services/test/CMakeLists.txt -text
SAS/Specification_Services/test/setup_queues_and_services.sh -text
SAS/Trigger_Services/CMakeLists.txt -text
SAS/Trigger_Services/__init__.py -text
SAS/Trigger_Services/bin/CMakeLists.txt -text
......@@ -5807,6 +5806,7 @@ SAS/Trigger_Services/bin/triggerservice -text
SAS/Trigger_Services/django_rest/CMakeLists.txt -text
SAS/Trigger_Services/django_rest/db.sqlite3 -text
SAS/Trigger_Services/django_rest/manage.py -text
SAS/Trigger_Services/django_rest/readme.txt -text
SAS/Trigger_Services/django_rest/restinterface/.idea/.name -text
SAS/Trigger_Services/django_rest/restinterface/.idea/encodings.xml -text
SAS/Trigger_Services/django_rest/restinterface/.idea/inspectionProfiles/Project_Default.xml -text
......@@ -5840,7 +5840,6 @@ SAS/Trigger_Services/lib/config.py -text
SAS/Trigger_Services/lib/trigger_service.py -text
SAS/Trigger_Services/lib/trigger_service_rpc.py -text
SAS/Trigger_Services/test/CMakeLists.txt -text
SAS/Trigger_Services/test/setup_queues_and_services.sh -text
SAS/Trigger_Services/test/trigger-testing.xml -text
SAS/Trigger_Services/test/trigger_misc_testing_nov2016.xml -text
SAS/XML_generator/CMakeLists.txt -text
......
......@@ -138,7 +138,7 @@
<xsd:element name="status" minOccurs="0" type="xsd:string"/><!-- default would be opened, might need to be come an enum -->
<xsd:element name="qualityOfService" type="QualityOfServiceType" default="THROUGHPUT"/><!-- qualityOfService and TriggerId might need to move to the base: classes -->
<xsd:element name="priority" type="PriorityType"/>
<xsd:element name="TriggerId" minOccurs="0" type="base:Identifier"/>
<xsd:element name="triggerId" minOccurs="0" type="base:Identifier"/>
</xsd:sequence>
</xsd:complexType>
<xsd:annotation>
......@@ -212,7 +212,7 @@
<xsd:extension base="Relation">
<xsd:sequence>
<xsd:element name="sibling" minOccurs="1" maxOccurs="unbounded" type="base:Identifier"/>
<xsd:element name="type" type="SibblingRelationType"/>
<xsd:element name="type" type="SiblingRelationType"/>
</xsd:sequence>
</xsd:extension>
</xsd:complexContent>
......
<?xml version="1.0" encoding="utf-8" ?>
<xsd:schema targetNamespace="http://www.astron.nl/LofarTrigger"version="0.3" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://www.astron.nl/LofarTrigger" xmlns:spec="http://www.astron.nl/LofarSpecification" xmlns:base="http://www.astron.nl/LofarBase">
<xsd:schema targetNamespace="http://www.astron.nl/LofarTrigger" version="0.3" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://www.astron.nl/LofarTrigger" xmlns:spec="http://www.astron.nl/LofarSpecification" xmlns:base="http://www.astron.nl/LofarBase">
<xsd:import schemaLocation="LofarSpecification.xsd" namespace="http://www.astron.nl/LofarSpecification"/>
<xsd:import schemaLocation="LofarBase.xsd" namespace="http://www.astron.nl/LofarBase"/>
<xsd:annotation>
......@@ -20,8 +20,8 @@
<xsd:element name="contactInformation" type="base:ContactInformation"/><!-- Does not need to be the authenticated user -->
<xsd:element name="userName" type="xsd:string"/><!-- The same as the authenticated user -->
<xsd:element name="comment" type="xsd:string" minOccurs="0"/><!-- Freeform field for the user, will not be machine interpreted -->
<xsd:element name="event" ref="base:Event"/><!--We only support one event, in the case of multiple events, the trigger was generated by a meta event caused by the other events.-->
<xsd:element name="specification" ref="spec:Specification"/>
<xsd:element name="event" type="base:Event"/><!--We only support one event, in the case of multiple events, the trigger was generated by a meta event caused by the other events.-->
<xsd:element ref="spec:specification"/>
<xsd:element name="generatorVersion" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
......
......@@ -8,8 +8,8 @@ CONNECTION_STRING = '127.0.0.1'
MOMQUERY_BUSNAME = "momquerybus"
MOMQUERY_SERVICENAME = "momqueryservice"
VALIDATION_BUSNAME = "validationbus"
VALIDATION_SERVICENAME = "validationservice"
VALIDATION_BUSNAME = "specificationvalidationbus"
VALIDATION_SERVICENAME = "specificationvalidationservice"
SPECIFICATION_BUSNAME = "specificationbus"
SPECIFICATION_SERVICENAME = "specificationservice"
......@@ -17,6 +17,8 @@ SPECIFICATION_SERVICENAME = "specificationservice"
SPECIFICATIONTRANSLATION_BUSNAME = "specificationtranslationbus"
SPECIFICATIONTRANSLATION_SERVICENAME = "specificationtranslationservice"
MOMIMPORTXML_BUSNAME = "mom.importxml"
MOMIMPORTXML_SUBJECT = "add specification"
# XSD paths (for validation service)
......
......@@ -20,59 +20,95 @@
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#XMLPATH_TRIGGER="../test/trigger.xml"
from lxml import etree
from StringIO import StringIO
from lofar.specificationservices.validation_service_rpc import ValidationRPC
from lofar.specificationservices.translation_service_rpc import TranslationRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.messaging import Service
from lofar.messaging import Service, EventMessage, ToBus
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt
from config import MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME, VALIDATION_BUSNAME, VALIDATION_SERVICENAME, SPECIFICATION_SERVICENAME, SPECIFICATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME, SPECIFICATIONTRANSLATION_BUSNAME
from config import MOMQUERY_BUSNAME, \
MOMQUERY_SERVICENAME, \
VALIDATION_BUSNAME, \
VALIDATION_SERVICENAME, \
SPECIFICATION_SERVICENAME, \
SPECIFICATION_BUSNAME, \
SPECIFICATIONTRANSLATION_SERVICENAME, \
SPECIFICATIONTRANSLATION_BUSNAME, \
MOMIMPORTXML_BUSNAME, \
MOMIMPORTXML_SUBJECT
momqueryrpc = MoMQueryRPC(MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME)
validationrpc = ValidationRPC(VALIDATION_BUSNAME, VALIDATION_SERVICENAME)
specificationtranslationrpc = TranslationRPC(SPECIFICATIONTRANSLATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME)
momimportxml_bus = ToBus(address=MOMIMPORTXML_BUSNAME, broker=None)
permitted_actions=["observation", "pipeline"]
permitted_statuses=["opened", "approved"]
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def _check_specification(lofar_xml):
folder = "innermost" # where to find this in xml???
#if momqueryrpc.folderExists(folder):
# raise Exception("Folder already exists!")
processes = [] # where to obtain these?
for process in processes:
pass
def _check_specification(user, lofar_xml):
# todo: obtain innermost folder from relations!
folder = "innermost"
if _folderExists(folder):
raise Exception("Innermost folder already exists! ("+folder+")")
# todo: determine projects;
projects = []
for project in projects:
if not _isActive(project):
raise Exception("Project is not active! "+str(project))
actions = []
for action in actions:
continue
# todo: i. Check whether the specifications are in folder(s), return ERROR if not.
# --> folder-relation exists?
infolder = True
if not infolder:
raise Exception("Specified action has to be in folder!"+str(action))
# todo: ii. Whether the specifications are observations or pipelines
if action.tag not in permitted_actions:
raise Exception("Specified action is not permitted!"+str(action))
# todo: iii. Whether the specifications go to opened or approved
status = action.find("status")
if status is not None and status.text not in permitted_statuses:
raise Exception("Specified action is not going to permitted status!"+str(action))
# todo: iv. Queries the MoMQueryService whether user can open or approve specifications in the selected project(s), return ERROR if not.
_authenticateAction(user, project, jobtype, state)
return True
def _authenticate(user, project, jobtype, state):
# todo: momqueryrpc.authState(user,project,jobtype,state)
def _isActive(project):
#return momqueryrpc.isProjectActive(project) #todo mock this for testing
return True
def _folderExists(path):
#return momqueryrpc.folderExists(path) # todo mock this for testing
return False
def _authenticateAction(user, project, jobtype, state):
#momqueryrpc.authState(user,project,jobtype,state) # todo mock this for testing
return True
def _validate_lofarspec(lofar_xml):
return validationrpc.validate_specification(lofar_xml)
#return validationhandler.validate_specification(lofar_xml)
def _validate_momspec(mom_xml):
return validationrpc.validate_mom_specification(mom_xml)
#return validationhandler.validate_mom_specification(mom_xml)
def _add_spec_to_mom(mom_xml):
# todo: momrpc.importXML(mom_xml) -- which service is this?
return True
msg = EventMessage(context=MOMIMPORTXML_SUBJECT, content=mom_xml)
momimportxml_bus.send(msg)
def _lofarxml_to_momxml(lofarxml):
momxml = specificationtranslationrpc.specification_to_momspecification(lofarxml)
......@@ -85,33 +121,29 @@ class SpecificationHandler(MessageHandlerInterface):
self.service2MethodMap = {
'add_specification': self.add_specification,
'get_specification': self.get_specification,
}
def add_specification(self, user, lofar_xml):
logger.info("got specification from user " + str(user))
logger.debug("LOFAR-XML: " + lofar_xml)
id = None
if _authenticate(None, None, None, None):
logger.info("authenticated!")
if _validate_lofarspec(lofar_xml):
logger.info("lofar xml validates!")
if _check_specification(lofar_xml):
logger.info("lofar xml check successful!")
mom_xml = _lofarxml_to_momxml(lofar_xml)
if _validate_momspec(mom_xml):
logger.info("mom xml validates!")
_add_spec_to_mom(mom_xml)
else:
logger.error("Could not create valid LOFAR XML: "+lofar_xml)
if _validate_lofarspec(lofar_xml):
logger.info("lofar xml validates!")
if _check_specification(user, lofar_xml):
logger.info("lofar xml check successful!")
mom_xml = _lofarxml_to_momxml(lofar_xml)
if _validate_momspec(mom_xml):
logger.info("mom xml validates!")
_add_spec_to_mom(mom_xml)
logger.info("Fired and forgot to mom.importXML")
else:
logger.error("Could not create valid LOFAR XML: "+lofar_xml)
return id
def get_specification(self, user, id):
logger.info("getting spec of id "+str(id))
# todo: return real trigger info as xml
# for testing:
# with open(XMLPATH_TRIGGER) as xml:
# return xml.read()
return "This functionality is not implemented yet, sorry!"
......
......@@ -36,9 +36,12 @@ from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt
from config import MOM_EXTRASPECS, SPECIFICATIONTRANSLATION_SERVICENAME, SPECIFICATIONTRANSLATION_BUSNAME, VALIDATION_SERVICENAME, VALIDATION_BUSNAME
from config import MOM_EXTRASPECS, \
SPECIFICATIONTRANSLATION_SERVICENAME, SPECIFICATIONTRANSLATION_BUSNAME, \
VALIDATION_SERVICENAME, VALIDATION_BUSNAME
from validation_service_rpc import ValidationRPC
validationrpc = ValidationRPC(VALIDATION_BUSNAME, VALIDATION_SERVICENAME)
......@@ -67,26 +70,29 @@ class SpecificationTranslationHandler(MessageHandlerInterface):
# pick the specification element
doc = etree.parse(StringIO(trigger_spec.encode('UTF-8')))
spec = doc.getroot().find('specification')
spec = doc.getroot().find('{http://www.astron.nl/LofarSpecification}specification')
logger.debug("root is "+spec.tag)
# inject priority # todo: if optional item, move to trigger service
priority = spec.find("priority")
priority.text = str(job_priority)
# inject identifier and project priority # todo: check if we have to restrict original priority setting! Should the injection happen here or in trigger service?
activities = spec.findall('.//activity')
for activity in activities:
priority = activity.find('priority')
priority.text = str(int(priority.text) + int(job_priority))
identifier = activity.find("triggerId")
if identifier is not None:
activity.remove(identifier)
identifier = etree.SubElement(activity, 'triggerId')
etree.SubElement(identifier, 'source').text = 'MoM'
etree.SubElement(identifier, 'identifier').text = str(trigger_id)
# inject identifier # todo: if optional item, move to trigger service
identifier = spec.find("activity").find("temporaryIdentifier") # todo: discuss where this is supposed to really go!
source = identifier.find("source")
ident = identifier.find("identifier")
source.text = "Trigger"
ident.text = str(trigger_id)
# create new document with correct namespace and return xml as string:
# todo: check whether a type reference can be used to have the correct namespace on specification!
lofar_spec = etree.Element("{http://www.astron.nl/LofarSpecification}specification")
for child in spec.getchildren():
lofar_spec.append(child)
specification_xml = etree.tostring(lofar_spec)
#lofar_spec = etree.Element("{http://www.astron.nl/LofarSpecification}specification")
#for child in spec.getchildren():
# lofar_spec.append(child)
specification_xml = etree.tostring(spec)
logger.debug("specification after translation from trigger -> "+specification_xml)
if validationrpc.validate_specification(specification_xml):
......
......@@ -54,11 +54,12 @@ def _validateXSD(xml, xsdpath):
try:
xmlschema.assertValid(doc)
except Exception as err:
logger.error(err)
logger.error(err)
raise err
logger.debug("valid -> "+str(valid))
return valid
#class ValidationHandler():
class ValidationHandler(MessageHandlerInterface):
def __init__(self, **kwargs):
super(ValidationHandler, self).__init__(**kwargs)
......@@ -73,7 +74,7 @@ class ValidationHandler(MessageHandlerInterface):
return str(_validateXSD(xml, TRIGGER_XSD))
def validate_specification(self, xml):
# todo: further checks
# todo: further checks -> build relation graph, check for consistency!
return str(_validateXSD(xml, LOFARSPEC_XSD))
def validate_mom_specification(self, xml):
......
#/bin/sh
echo "Setting up queues"
./qpid-config add exchange topic translationbus
./qpid-config add exchange topic specificationtranslationbus
./qpid-config add exchange topic specificationbus
./qpid-config add exchange topic triggerbus
./qpid-config add exchange topic validationbus
echo "Starting spec service"
specificationservice &
echo "Starting spec translation service"
specificationtranslationservice &
echo "Starting spec validation service"
specificationvalidationservice &
\ No newline at end of file
------------------------------
Trigger rest interface README.
------------------------------
This web service will build as part of the package Trigger_Services.
The executable 'triggerrestinterface' or running 'manage.py runserver' will start a local service (-> http://localhost:8000/triggers/).
The interface is not making use of a local database, but the django rest framework requires one, and so the service
will create a local sqlite database file if none is present.
Run 'manage.py migrate' to remove warnings abot missing migrations.
Run 'manage.py collectstatic' to collect static files for deployment in Apache
\ No newline at end of file
......@@ -4,5 +4,5 @@
# Messaging
DEFAULT_TRIGGER_SUBMISSION_NOTIFICATION_BUSNAME='lofar.ts.notification'
DEFAULT_TRIGGER_SUBMISSION_NOTIFICATION_SUBJECT='TS.TriggerSubmitted'
\ No newline at end of file
TRIGGER_SUBMISSION_NOTIFICATION_BUSNAME='lofar.ts.notification'
TRIGGER_SUBMISSION_NOTIFICATION_SUBJECT='TS.TriggerSubmitted'
\ No newline at end of file
......@@ -31,12 +31,16 @@ logger = logging.getLogger(__name__)
triggerrpc = TriggerRPC()
specrpc = SpecificationRPC()
from config import DEFAULT_TRIGGER_SUBMISSION_NOTIFICATION_BUSNAME, DEFAULT_TRIGGER_SUBMISSION_NOTIFICATION_SUBJECT
notification_bus = ToBus(address=DEFAULT_TRIGGER_SUBMISSION_NOTIFICATION_BUSNAME, broker=None)
from config import TRIGGER_SUBMISSION_NOTIFICATION_BUSNAME, TRIGGER_SUBMISSION_NOTIFICATION_SUBJECT
notification_bus = ToBus(address=TRIGGER_SUBMISSION_NOTIFICATION_BUSNAME, broker=None)
class TriggerListView(views.APIView):
def __init__(self, **kwargs):
super(TriggerListView, self).__init__(**kwargs)
notification_bus.open()
def get(self,request, format=None, **kwargs):
logger.debug('got GET from -> '+str(request.META['REMOTE_ADDR']))
return Response("Listing not implemented yet, sorry! Post trigger XML on this URL to add a new trigger.", status=status.HTTP_501_NOT_IMPLEMENTED)
......@@ -44,10 +48,12 @@ class TriggerListView(views.APIView):
def post(self, request, format=None, **kwargs):
logger.debug('got POST from -> '+str(request.META['REMOTE_ADDR']))
IP = str(request.META['REMOTE_ADDR'])
logger.debug('got POST from -> '+IP)
#logger.debug('received text -> '+str(request.body))
#logger.debug('received data -> '+str(request.data))
logger.debug('from user -> '+str( request.user))
self._sendNotification(str(request.user), IP)
# OPTIONALLY USE DATA MODEL:
......@@ -66,10 +72,9 @@ class TriggerListView(views.APIView):
# OR: USE RECEIVED XML DIRECTLY:
xml = str(request.body)
logger.debug('calling trigger handler')
try:
id = self._handle_trigger(str(request.user), xml)
id = self._handle_trigger(str(request.user), IP, xml)
except Exception as err:
traceback.print_exc()
return Response('Provided data has some issues! (Details: '+str(err)+")", status=status.HTTP_400_BAD_REQUEST)
......@@ -83,12 +88,16 @@ class TriggerListView(views.APIView):
root.tag = newname
return etree.tostring(root)
def _handle_trigger(self, user, xml):
return triggerrpc.handle_trigger(user, xml)
def _handle_trigger(self, user, host, xml):
return triggerrpc.handle_trigger(user, host, xml)
def _sendNotification(self, user, IP):
msg = EventMessage(context=DEFAULT_TRIGGER_SUBMISSION_NOTIFICATION_SUBJECT, content="Trigger received by "+str(user)+" (IP:"+IP+")")
notification_bus.send(msg)
msg = EventMessage(context=TRIGGER_SUBMISSION_NOTIFICATION_SUBJECT, content="Trigger received by "+str(user)+" (IP:"+IP+")")
try:
notification_bus.send(msg)
except Exception as err:
logger.error("Could not send notification ->" + str(err))
......
......@@ -8,8 +8,8 @@ CONNECTION_STRING = '127.0.0.1'
MOMQUERY_BUSNAME = "momquerybus"
MOMQUERY_SERVICENAME = "momqueryservice"
VALIDATION_BUSNAME = "validationbus"
VALIDATION_SERVICENAME = "validationservice"
VALIDATION_BUSNAME = "specificationvalidationbus"
VALIDATION_SERVICENAME = "specificationvalidationservice"
SPECIFICATION_BUSNAME = "specificationbus"
SPECIFICATION_SERVICENAME = "specificationservice"
......@@ -19,3 +19,6 @@ SPECIFICATIONTRANSLATION_SERVICENAME = "specificationtranslationservice"
TRIGGER_BUSNAME = "triggerbus"
TRIGGER_SERVICENAME = "triggerservice"
TRIGGER_ADDITION_NOTIFICATION_BUSNAME = "lofar.ts.notification"
TRIGGER_ADDITION_NOTIFICATION_SUBJECT='TS.TriggerAdded'
\ No newline at end of file
......@@ -24,7 +24,7 @@
from StringIO import StringIO
from lxml import etree
from lofar.messaging import Service
from lofar.messaging import Service, EventMessage, ToBus
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt
......@@ -33,7 +33,12 @@ from lofar.specificationservices.specification_service_rpc import SpecificationR
from lofar.specificationservices.validation_service_rpc import ValidationRPC
from lofar.specificationservices.translation_service_rpc import TranslationRPC
from config import MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME, SPECIFICATION_BUSNAME, SPECIFICATION_SERVICENAME, VALIDATION_BUSNAME, VALIDATION_SERVICENAME, SPECIFICATIONTRANSLATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME, TRIGGER_SERVICENAME, TRIGGER_BUSNAME
from config import MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME, \
SPECIFICATION_BUSNAME, SPECIFICATION_SERVICENAME, \
VALIDATION_BUSNAME, VALIDATION_SERVICENAME, \
SPECIFICATIONTRANSLATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME, \
TRIGGER_SERVICENAME, TRIGGER_BUSNAME, \
TRIGGER_ADDITION_NOTIFICATION_BUSNAME, TRIGGER_ADDITION_NOTIFICATION_SUBJECT
import logging
logger = logging.getLogger(__name__)
......@@ -43,9 +48,11 @@ validationrpc = ValidationRPC(VALIDATION_BUSNAME, VALIDATION_SERVICENAME)
specificationrpc = SpecificationRPC(SPECIFICATION_BUSNAME, SPECIFICATION_SERVICENAME)
translationrpc = TranslationRPC(SPECIFICATIONTRANSLATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME)
notification_bus = ToBus(address=TRIGGER_ADDITION_NOTIFICATION_BUSNAME, broker=None)
def _auth_allows_triggers(project):
# todo: momqueryrpc.authAllowsTriggers(project)
#return momqueryrpc.allowsTriggers(project)
return True
......@@ -56,15 +63,16 @@ def _validate_trigger(trigger_xml):
def _validate_lofarspec(lofar_xml):
return validationrpc.validate_specification(lofar_xml)
def _add_trigger(trigger_xml):
def _add_trigger(username, hostname, projectname, metadata):
logger.info("Adding trigger")
# trigger_id = momqueryrp.addTrigger(xml)
# trigger_id = momqueryrpc.addTrigger(username,hostname ,projectname,metadata)
trigger_id = 42
return trigger_id
def _get_job_priority(project):
# prio = momqueryrp.getProjectPriority(project)
prio = 20
def _get_project_priority(project):
logger.info("Getting project priority for project"+ str(project))
# prio = momqueryrpc.getProjectPriority(project)
prio = 1
return prio
def _add_specification(user, lofar_xml):
......@@ -78,9 +86,12 @@ def _translate_trigger_to_specification(trigger_xml, trigger_id, job_priority):
return specification_xml
def _send_notification(text):
logger.info("sending notification")
# todo: send notification to bus
def _sendNotification(self, user, IP):
try:
msg = EventMessage(context=TRIGGER_ADDITION_NOTIFICATION_SUBJECT, content="Trigger from "+str(user)+" was added under ID "+str(id))
notification_bus.send(msg)
except Exception as err:
logger.error("Could not send notification ->" + str(err))
def _parse_project_id(trigger_xml):
......@@ -88,27 +99,35 @@ def _parse_project_id(trigger_xml):
ref = doc.find("projectReference")
return ref.find("identifier").find("identifier").text
def _parse_project_priority(trigger_xml):
doc = etree.parse(StringIO(trigger_xml.encode('utf-8')))
ref = doc.find("projectReference")
return ref.find("identifier").find("identifier").text
class TriggerHandler(MessageHandlerInterface):
def __init__(self, **kwargs):
super(TriggerHandler, self).__init__(**kwargs)
notification_bus.open()
self.service2MethodMap = {
'handle_trigger': self.handle_trigger,
}
def handle_trigger(self, user, trigger_xml):
def handle_trigger(self, user, host, trigger_xml):
logger.info("Handling trigger from user -> "+str(user))
trigger_id = None
if _validate_trigger(trigger_xml):
logger.info("Trigger XML is valid")
project = _parse_project_id(trigger_xml)
logger.debug('project is -> ' + str(project))
priority = _get_job_priority(project) #todo: how to determine job_prio based on project prio? Consider priority from trigger xml?
priority = _get_project_priority(project)
logger.debug('project priority is ->' + str(priority))
if _auth_allows_triggers(project):
logger.info("trigger is authorized, adding to trigger and specification")
trigger_id = _add_trigger(trigger_xml)
trigger_id = _add_trigger(str(user), host, project, trigger_xml) # todo: How to determine hostname from Qpid message?
logger.debug("Trigger was assigned id -> "+str(trigger_id))
if trigger_id:
lofar_xml = _translate_trigger_to_specification(trigger_xml, trigger_id, priority)
......@@ -120,7 +139,7 @@ class TriggerHandler(MessageHandlerInterface):
logger.error(msg)
raise Exception(msg)
else:
msg = "There was a problem adding the trigger!"
msg = "There was a problem adding the trigger to the database!"
logger.error(msg)
raise Exception(msg)
else:
......@@ -134,7 +153,7 @@ class TriggerHandler(MessageHandlerInterface):
raise Exception(msg)
logger.info("trigger handling done. -> "+str(trigger_id))
return str(trigger_id)
return str(trigger_id) # todo: Design document asks to return the specification status. Does that make sense and is it obtainable at all?
def create_service(servicename=TRIGGER_SERVICENAME, busname=TRIGGER_BUSNAME):
return Service(servicename,
......
......@@ -14,8 +14,8 @@ class TriggerRPC(RPCWrapper):
super(TriggerRPC, self).__init__(busname, servicename, broker, timeout=timeout)
def handle_trigger(self, user, trigger_xml):
def handle_trigger(self, user, host, trigger_xml):
logger.info("Requesting handling of trigger")
result = self.rpc('handle_trigger', user=user, trigger_xml=trigger_xml)
result = self.rpc('handle_trigger', user=user, host=host, trigger_xml=trigger_xml)
logger.info("Received trigger handling result -> " +result)
return result
#/bin/sh
echo "Setting up queues"
./qpid-config add exchange topic triggerbus
echo "Starting trigger service"
triggerservice &
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment