From c60272a9eda8d84e6bc570ecf54ec87dea9341d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20K=C3=BCnsem=C3=B6ller?= <jkuensem@physik.uni-bielefeld.de> Date: Fri, 10 Feb 2017 09:49:42 +0000 Subject: [PATCH] Task #10283 - Measurements are now properly added. Calls to momqueryservice updated. --- .gitattributes | 1 + .../lib/specification_service.py | 30 ++- .../lib/translation_service.py | 180 ++++++++++-------- SAS/Trigger_Services/lib/trigger_service.py | 20 +- .../test/setup_queues_and_services.sh | 32 ++++ 5 files changed, 170 insertions(+), 93 deletions(-) create mode 100755 SAS/Trigger_Services/test/setup_queues_and_services.sh diff --git a/.gitattributes b/.gitattributes index c0468c6b2d0..ebeb7cf01ca 100644 --- a/.gitattributes +++ b/.gitattributes @@ -5842,6 +5842,7 @@ SAS/Trigger_Services/lib/config.py -text SAS/Trigger_Services/lib/trigger_service.py -text SAS/Trigger_Services/lib/trigger_service_rpc.py -text SAS/Trigger_Services/test/CMakeLists.txt -text +SAS/Trigger_Services/test/setup_queues_and_services.sh -text SAS/Trigger_Services/test/trigger-testing.xml -text SAS/Trigger_Services/test/trigger_misc_testing_nov2016.xml -text SAS/XML_generator/CMakeLists.txt -text diff --git a/SAS/Specification_Services/lib/specification_service.py b/SAS/Specification_Services/lib/specification_service.py index 90cb25078fa..423566de526 100644 --- a/SAS/Specification_Services/lib/specification_service.py +++ b/SAS/Specification_Services/lib/specification_service.py @@ -55,6 +55,10 @@ logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) def _parse_activity_paths(spec): + """ + Parses relations on a lofar spec elementtree and returns a dict to lookup the path to the containing folder by + a given activity identifier. Paths are slash separated with project identifier as root. + """ containers = spec.findall('container') project = spec.find('projectReference').find('identifier').find('identifier').text @@ -97,6 +101,10 @@ def _parse_activity_paths(spec): def _check_specification(user, lofar_xml): + """ + Performs some checks to make sure the specification meets some criteria before we accespt it. + E.g. activities have to be in new folders + """ doc = etree.parse(StringIO(lofar_xml.encode('utf-8'))) spec = doc.getroot() @@ -117,7 +125,9 @@ def _check_specification(user, lofar_xml): for activity in activities: key = (activity.find("temporaryIdentifier").find("source").text , activity.find("temporaryIdentifier").find("identifier").text) if not key in activity_paths.keys(): - if not activity.find("measurement") is not None: # allow measurements, which are not in folders by definition! + # allow measurements, which are activities, but not contained in folders by definition! + # todo: check, is this what we want? Or do we have to do attional checks, e.g. that the obs-measurement relation and the parent obs exists? + if not activity.find("measurement") is not None: raise Exception("Specified action has to be in folder! "+str(key)) jobtype = None for action in permitted_activities: @@ -138,21 +148,21 @@ def _check_specification(user, lofar_xml): def _isActive(project): logger.debug("Checking if project is active: "+ project) - #response = momqueryrpc.isProjectActive(project) #todo mock this for testing - # return response.get("isActive") - return True + response = momqueryrpc.isProjectActive(project) #todo mock this for testing + return response.lower() == 'true' #response.get("isActive") + #return True def _folderExists(path): logger.debug("Checking if path exists -> "+ path) - #response = momqueryrpc.folderExists(path) # todo mock this for testing - #return response.get("folderExists") - return False + response = momqueryrpc.folderExists(path) # todo mock this for testing + return response.lower() == 'true' #response.get("folderExists") + #return False def _authenticateAction(user, project, jobtype, state): logger.debug("Authenticate action -> "+ user +', ' +project +', '+ jobtype +', '+ state) - #response = momqueryrpc.authState(user,project,jobtype,state) # todo mock this for testing - #return response.get("isAuthorized") - return True + response = momqueryrpc.authorized_add_with_status(user,project,jobtype,state) # todo mock this for testing + return response.lower() == 'true' #response.get("isAuthorized") + #return True def _validate_lofarspec(lofar_xml): response = validationrpc.validate_specification(lofar_xml) diff --git a/SAS/Specification_Services/lib/translation_service.py b/SAS/Specification_Services/lib/translation_service.py index 10422246834..2663a3754b3 100644 --- a/SAS/Specification_Services/lib/translation_service.py +++ b/SAS/Specification_Services/lib/translation_service.py @@ -23,6 +23,7 @@ # This Service translates specifications, e.g. from a generic LOFAR spec to MoM spec. +# Want to change something? Before you tumble down this rabbit hole: Grab a coffee. (Make it strong.) import logging logger = logging.getLogger(__name__) @@ -48,13 +49,17 @@ from validation_service_rpc import ValidationRPC # These specification elements are to be recoded for MoM as json MOM_ACTIVITY_EXTRASPECS = [ - "triggerId", - "priority", - "qualityOfService", - "observation::timeWindowSpecification", - "observation::stationSelectionSpecification", - #"pipeline..." # no pipeline time constraints...? - ] + "triggerId", + "priority", + "qualityOfService", + "observation::timeWindowSpecification", + "observation::stationSelectionSpecification", + #"pipeline..." # no pipeline time constraints...? +] + +# These activity types can carry a misc element for extraspec. +# Measurements do not have that, but have to have a parent observation that has. +ACTIVITIES_WITH_MOM_EXTRASPECS = ['observation', 'pipeline'] # These have to be put somewhere else for mom. First item is appended to second item (-> order matters!). # ('::' is separator, '.' doesn't work due to occurrence in namespace uri). @@ -129,7 +134,7 @@ MOM_ACTIVITY_ATTRIBUTE_MAPPING = OrderedDict([ ("measurement::dec","measurement::measurementAttributes"), ("measurement::equinox","measurement::measurementAttributes"), ("measurement::subbandsSpecification","measurement::measurementAttributes"), - ("measurement::tiedArrayBeams","measurement::measurementAttributes"), + ("measurement::tiedArrayBeams","measurement::measurementAttributes::specification"), # todo: If used, LofarBeamMeasurementSpecificationAttributesType requires more items! ("measurement::measurementType","measurement::measurementAttributes"), # todo: add other measurements? Currently not defined on LofarBase.xsd, so these cannot occur... ]) @@ -161,39 +166,39 @@ def _jsonify(xml): def _encode_mom_extraspecs(activity): - """ - encodes extra specs on an activity element as json - return the json string - """ - try: - # move extraspec elements from activity to new element tree - extraspecelement = etree.Element("extraspec") # holds data for misc - for extraspec in MOM_ACTIVITY_EXTRASPECS: - elements = extraspec.split("::") - source = activity - target = extraspecelement - prevelement = None - for element in elements: - # walk orginal tree to source: - source = source.find(element) - if source is None: - break - # create parents in new element tree: - if prevelement: - if target.find(prevelement) is None: - target = etree.SubElement(target, prevelement) - else: - target = target.find(prevelement) - prevelement = element - if source is not None: - source.getparent().remove(source) - target.append(source) - - # Jsonify extraspec tree and add to misc element on the original activity element. - json = _jsonify(etree.tostring(extraspecelement)) - return json - except Exception as err: - raise Exception("Error while encoding MoM extraspecs -> "+err.message) + """ + encodes extra specs on an activity element as json + return the json string + """ + try: + # move extraspec elements from activity to new element tree + extraspecelement = etree.Element("extraspec") # holds data for misc + for extraspec in MOM_ACTIVITY_EXTRASPECS: + elements = extraspec.split("::") + source = activity + target = extraspecelement + prevelement = None + for element in elements: + # walk orginal tree to source: + source = source.find(element) + if source is None: + break + # create parents in new element tree: + if prevelement: + if target.find(prevelement) is None: + target = etree.SubElement(target, prevelement) + else: + target = target.find(prevelement) + prevelement = element + if source is not None: + source.getparent().remove(source) + target.append(source) + + # Jsonify extraspec tree and add to misc element on the original activity element. + json = _jsonify(etree.tostring(extraspecelement)) + return json + except Exception as err: + raise Exception("Error while encoding MoM extraspecs -> "+err.message) @@ -214,14 +219,14 @@ def _create_foldertree_in_momproject(spec, mom_project): # populate folder element dictionary. folder identifier is key for container in containers: - container_source = container.find("temporaryIdentifier").find("source").text - container_identifier = container.find("temporaryIdentifier").find("identifier").text - key = (container_source, container_identifier) - folder = container.find('folder') - momfolder = etree.Element('{http://www.astron.nl/MoM2-Lofar}folder') - for child in folder.getchildren(): - momfolder.append(child) - folders[key] = momfolder + container_source = container.find("temporaryIdentifier").find("source").text + container_identifier = container.find("temporaryIdentifier").find("identifier").text + key = (container_source, container_identifier) + folder = container.find('folder') + momfolder = etree.Element('{http://www.astron.nl/MoM2-Lofar}folder') + for child in folder.getchildren(): + momfolder.append(child) + folders[key] = momfolder # parent folder dictionary, child idenfifier is key. for (parentfolder_id, folder_id) in folder_folder: @@ -268,6 +273,7 @@ def _create_foldertree_in_momproject(spec, mom_project): top = folder.find('topology') if top is not None: folder.remove(top) + folder.insert(0, top) # Not only containers can contain children, but also activities. @@ -277,10 +283,10 @@ def _create_foldertree_in_momproject(spec, mom_project): observations = {} for obs_act in observation_acts: - obs_source = obs_act.find("temporaryIdentifier").find("source").text - obs_identifier = obs_act.find("temporaryIdentifier").find("identifier").text - key = (obs_source, obs_identifier) - observations[key] = obs_act.find('observation') + obs_source = obs_act.find("temporaryIdentifier").find("source").text + obs_identifier = obs_act.find("temporaryIdentifier").find("identifier").text + key = (obs_source, obs_identifier) + observations[key] = obs_act.find('observation') for (obs_id, measurement_id) in observation_measurement: key = (obs_id.find("source").text , obs_id.find("identifier").text) @@ -331,12 +337,12 @@ def _mommify(activity): momtype = None momtype_cc = None try: - t = act.attrib["{http://www.w3.org/2001/XMLSchema-instance}type"] - if t: - momtype = t.split(':')[1] - momtype_cc = momtype[:1].lower() + momtype[1:] + t = act.attrib["{http://www.w3.org/2001/XMLSchema-instance}type"] + if t: + momtype = t.split(':')[1] + momtype_cc = momtype[:1].lower() + momtype[1:] except Exception as err: - logger.error("Could not determine a more specific MoM type from type attribute for the activity -> "+activitytype+" "+str(err)) + logger.error("Could not determine a more specific MoM type from type attribute for the activity -> "+activitytype+" "+str(err)) # momtype/_cc should now be present for pipelines/measurements but not observations # restructure elements according to mapping.: @@ -354,22 +360,26 @@ def _mommify(activity): d = d.replace(activitytype, "{http://www.astron.nl/MoM2-Lofar}"+momtype_cc) if d is not "": dst_node = _find_or_create_subelement(dst_node, d) - logger.warn("Ignoring empty string in mapping. -> "+str(dst)) + else: + logger.warn("Ignoring empty string in mapping. -> "+str(dst)) src_node.getparent().remove(src_node) dst_node.append(src_node) - # jsonify new specs that MoM does not know about and put them as json in misc element: - if momtype_cc is not None: - # use the specific type if present (pipelines/measurements) - atts = _find_or_create_subelement(act, "{http://www.astron.nl/MoM2-Lofar}"+momtype_cc+"Attributes") - userspec = _find_or_create_subelement(atts,'userSpecification', 0) # goes first here + if activitytype in ACTIVITIES_WITH_MOM_EXTRASPECS: + # jsonify new specs that MoM does not know about and put them as json in misc element: + if momtype_cc is not None: + # use the specific type if present (pipelines/measurements) + atts = _find_or_create_subelement(act, "{http://www.astron.nl/MoM2-Lofar}"+momtype_cc+"Attributes") + userspec = _find_or_create_subelement(atts,'userSpecification', 0) # goes first here + else: + atts = _find_or_create_subelement(act, "{http://www.astron.nl/MoM2-Lofar}"+activitytype+"Attributes") + userspec = _find_or_create_subelement(atts,'userSpecification') + json = _encode_mom_extraspecs(activity) + misc = _find_or_create_subelement(userspec, "misc") + misc.text=json else: - atts = _find_or_create_subelement(act, "{http://www.astron.nl/MoM2-Lofar}"+activitytype+"Attributes") - userspec = _find_or_create_subelement(atts,'userSpecification') - json = _encode_mom_extraspecs(activity) - misc = _find_or_create_subelement(userspec, "misc") - misc.text=json + _encode_mom_extraspecs(activity) # to remove the extraspec elements, but ignore json, # convert the very specific bits: @@ -394,6 +404,7 @@ def _mommify(activity): # move stuff to new mom element momact.append(child) + print "!!!!!"+str(item), str(momact) return item raise Exception("Cannot translate activity for MoM! -> "+ str(ACTIVITY_TYPES) + " not found in "+str(activity.getchildren())) @@ -479,13 +490,32 @@ class SpecificationTranslationHandler(MessageHandlerInterface): logger.debug("No key "+str(key)+" in "+str(activityparents)) raise Exception("No parent for key "+str(key)) - # add and restructure activity in MoM-comprehensible form: + # restructure activity in MoM-comprehensible form item = _mommify(activity) + + # Add the mommified item to it's parent children = _find_or_create_subelement(parent, "children") index = len(children.findall('item')) item.attrib["index"] = str(index) children.append(item) + # Some activities, like observations, can also serve as containers for measurements. + # While all the containers are set up separately, we have to update the reference to the new mommified + # parent activity, should it change at this step, so the child activity can be added to it. + # Note: This is probably super inefficient, but will have to do for now. + for atype in ACTIVITY_TYPES: + old_act = activity.find(atype) + if old_act in activityparents.values(): + new_act = item.find("{http://www.astron.nl/MoM2-Lofar}"+atype) + print "GOTCHA!", old_act, new_act + if new_act is not None: + for k,v in activityparents.items(): + if v == old_act: + activityparents[k] = new_act + else: + raise Exception('Could not update mommified activity reference ->' + str(atype)) + + # todo: entity # todo: - parse entities, create dataproducts # todo: - parse activity-entity relations, add dataproducts to their activity @@ -507,13 +537,13 @@ class SpecificationTranslationHandler(MessageHandlerInterface): def create_service(servicename=SPECIFICATIONTRANSLATION_SERVICENAME, busname=SPECIFICATIONTRANSLATION_BUSNAME): return Service(servicename, - SpecificationTranslationHandler, - busname=busname, - use_service_methods=True, - ) + SpecificationTranslationHandler, + busname=busname, + use_service_methods=True, + ) def main(): with create_service(): - waitForInterrupt() + waitForInterrupt() diff --git a/SAS/Trigger_Services/lib/trigger_service.py b/SAS/Trigger_Services/lib/trigger_service.py index aab0caf3cae..031017f6f48 100644 --- a/SAS/Trigger_Services/lib/trigger_service.py +++ b/SAS/Trigger_Services/lib/trigger_service.py @@ -52,8 +52,9 @@ notification_bus = ToBus(address=TRIGGER_ADDITION_NOTIFICATION_BUSNAME, broker=N def _auth_allows_triggers(project): - #return momqueryrpc.allowsTriggers(project) - return True + response = momqueryrpc.allows_triggers(project) + return response.lower() == 'true' + #return True def _validate_trigger(trigger_xml): @@ -63,7 +64,7 @@ def _validate_trigger(trigger_xml): else: msg = "Got invalid trigger XML" logger.error(msg) - raise Exception(msg+" -> " + response) + raise Exception(msg+" -> " + str(response)) @@ -74,19 +75,22 @@ def _validate_lofarspec(lofar_xml): else: msg = "Translation to Lofar specification failed!" logger.error(msg) - raise Exception(msg+" -> " + response) + raise Exception(msg+" -> " + str(response)) def _add_trigger(username, hostname, projectname, metadata): logger.info("Adding trigger") - # trigger_id = momqueryrpc.addTrigger(username,hostname ,projectname,metadata) - trigger_id = 42 + response = momqueryrpc.add_trigger(username,hostname ,projectname,metadata) + #trigger_id = response.get('trigger_id') + trigger_id = response # no dict? + #trigger_id = '42' return trigger_id def _get_project_priority(project): logger.info("Getting project priority for project"+ str(project)) - # prio = momqueryrpc.getProjectPriority(project) - prio = 1 + response = momqueryrpc.get_project_priority(project) + prio = response + #prio = 1 return prio def _add_specification(user, lofar_xml): diff --git a/SAS/Trigger_Services/test/setup_queues_and_services.sh b/SAS/Trigger_Services/test/setup_queues_and_services.sh new file mode 100755 index 00000000000..e7508555cdc --- /dev/null +++ b/SAS/Trigger_Services/test/setup_queues_and_services.sh @@ -0,0 +1,32 @@ +#/bin/sh + + +source ~/dev/Triggers/Triggers-Task9895/build/gnu_debug/lofarinit.sh + +killall qpidd +killall specificationservice +killall specificationvalidationservice +killall specificationtranslationservice +killall triggerservice + +echo "Setting up queues" +cd dev/qpid-tools-0.32/src/py/ +qpidd & +./qpid-config add exchange topic specificationvalidationbus +./qpid-config add exchange topic specificationtranslationbus +./qpid-config add exchange topic specificationbus +./qpid-config add exchange topic triggerbus +./qpid-config add exchange topic lofar.ts.notification +./qpid-config add exchange topic momquerybus +./qpid-config add exchange topic mom.importxml + + + +echo "Starting services" +specificationservice & +specificationvalidationservice & +specificationtranslationservice & +triggerservice & + +wait + -- GitLab