diff --git a/SAS/SpecificationServices/lib/specification_service.py b/SAS/SpecificationServices/lib/specification_service.py index 14084b01f366024877f2546d82602c247cf134b0..bde5c5e143573791f9a68aab7819f625da1e39e9 100644 --- a/SAS/SpecificationServices/lib/specification_service.py +++ b/SAS/SpecificationServices/lib/specification_service.py @@ -20,21 +20,20 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. -from lxml import etree -from io import BytesIO from collections import OrderedDict -from lofar.specificationservices.validation_service_rpc import ValidationRPC -from lofar.specificationservices.translation_service_rpc import TranslationRPC -from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from io import BytesIO -from lofar.messaging import RPCService, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME, ServiceMessageHandler from lofar.common.util import waitForInterrupt - # TODO: mom.importxml uses old messaging interface from lofar.messagebus.message import MessageContent +from lofar.messaging import RPCService, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME, \ + ServiceMessageHandler +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.specificationservices.translation_service_rpc import TranslationRPC +from lofar.specificationservices.validation_service_rpc import ValidationRPC +from lxml import etree from .config import \ - VALIDATION_SERVICENAME, \ SPECIFICATION_SERVICENAME, \ MOMIMPORTXML_BUSNAME, \ MOMIMPORTXML_SUBJECT @@ -45,14 +44,16 @@ specificationtranslationrpc = TranslationRPC.create(exchange=DEFAULT_BUSNAME, br momimportxml_bus = ToBus(exchange=MOMIMPORTXML_BUSNAME, broker=DEFAULT_BROKER) -permitted_activities=["observation", "pipeline", "measurement"] -permitted_statuses=["opened", "approved"] +permitted_activities = ["observation", "pipeline", "measurement"] +permitted_statuses = ["opened", "approved"] import logging + logger = logging.getLogger(__name__) + def make_key(element): - source = element.find("source").text + source = element.find("source").text identifier = element.find("identifier").text return (source, identifier) @@ -65,15 +66,23 @@ def _parse_relation_tree(spec): # todo: Expose as service method? This requires conversion to xml again... containers = spec.findall('container') - folder_activity = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if x.find("type").text == "folder-activity"] - folder_folder = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if x.find("type").text == "folder-folder"] + folder_activity = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if + x.find("type").text == "folder-activity"] + folder_folder = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if + x.find("type").text == "folder-folder"] # Instead of normal dict we use an OrderedDict in order to be able to influence the output element order (through # input order of folder-activity relations). The generator in the constructor creates a sorted list, so this is # rather similar to a standard dict comprehension. - foldernames = OrderedDict((make_key(container.find("temporaryIdentifier")), container.find('folder').find('name').text) for container in containers) - parentfolders = OrderedDict((make_key(folder_id), make_key(parentfolder_id)) for (parentfolder_id, folder_id) in folder_folder) - activityfolders = OrderedDict((make_key(activity_id), make_key(folder_id)) for (folder_id, activity_id) in folder_activity ) + foldernames = OrderedDict((make_key(container.find("temporaryIdentifier")), + container.find('folder').find('name').text) for container in + containers) + parentfolders = OrderedDict( + (make_key(folder_id), make_key(parentfolder_id)) for (parentfolder_id, folder_id) in + folder_folder) + activityfolders = OrderedDict( + (make_key(activity_id), make_key(folder_id)) for (folder_id, activity_id) in + folder_activity) # check completeness for folder in list(activityfolders.values()): @@ -87,6 +96,7 @@ def _parse_relation_tree(spec): return activityfolders, parentfolders, foldernames + def _parse_project_code(spec): projectref = spec.find('projectReference') if projectref is not None: @@ -114,7 +124,7 @@ def _parse_activity_paths(spec): if folder in list(foldernames.keys()): path = foldernames[folder] + "/" + path else: - raise Exception("No folder name for key: " +str(folder)) + raise Exception("No folder name for key: " + str(folder)) if folder in list(parentfolders.keys()): folder = parentfolders[folder] else: @@ -123,9 +133,10 @@ def _parse_activity_paths(spec): paths[activikey] = path for key in list(paths.keys()): - logger.debug("Activity path -> "+str(key)+" --> "+ paths[key]) + logger.debug("Activity path -> " + str(key) + " --> " + paths[key]) return paths + def _check_specification(user, lofar_xml): """ Performs some checks to make sure the specification meets some criteria before we accespt it. @@ -141,22 +152,22 @@ def _check_specification(user, lofar_xml): activity_paths = _parse_activity_paths(spec) for path in list(activity_paths.values()): if _folderExists(path): - raise Exception("Innermost folder already exists: "+path) + raise Exception("Innermost folder already exists: " + path) project = _parse_project_code(spec) if not _isActive(project): - raise Exception("Project is not active: "+str(project)) + raise Exception("Project is not active: " + str(project)) activities = spec.findall('activity') for activity in activities: - key = (activity.find("temporaryIdentifier").find("source").text , + key = (activity.find("temporaryIdentifier").find("source").text, activity.find("temporaryIdentifier").find("identifier").text) if not key in list(activity_paths.keys()): # allow measurements, which are activities, but not contained in folders by definition! # todo: check, is this what we want? Or do we have to do attional checks, # todo: e.g. that the obs-measurement relation and the parent obs exists? if not activity.find("measurement") is not None: - raise Exception("Specified action has to be in folder: "+str(key)) + raise Exception("Specified action has to be in folder: " + str(key)) jobtype = None for action in permitted_activities: if activity.find(action) is not None: @@ -164,14 +175,16 @@ def _check_specification(user, lofar_xml): break logger.warning("!!! %s not found..." % (action,)) if jobtype is None: - raise Exception("Specified activity is not permitted: " + str(key) + " -> "+ str(permitted_activities) - + " not found in "+ str(activity.getchildren())) + raise Exception("Specified activity is not permitted: " + str(key) + " -> " + str( + permitted_activities) + + " not found in " + str(activity.getchildren())) status = activity.find("status") if status is None: - raise Exception("Activity has no status: "+str(key) ) + raise Exception("Activity has no status: " + str(key)) if status.text not in permitted_statuses: - raise Exception("Specified activity is not going to permitted status: " + str(key) + " -> '" - + str(status.text) + "' not in "+str(permitted_statuses)) + raise Exception( + "Specified activity is not going to permitted status: " + str(key) + " -> '" + + str(status.text) + "' not in " + str(permitted_statuses)) # measurements require observation permissions if jobtype == "measurement": @@ -179,31 +192,38 @@ def _check_specification(user, lofar_xml): _authenticateAction(str(user), str(project), str(jobtype), str(status.text)) + def _isActive(project): - logger.debug("Checking if project is active: "+ project) - response = momqueryrpc.isProjectActive(project) #todo mock this for testing + logger.debug("Checking if project is active: " + project) + response = momqueryrpc.isProjectActive(project) # todo mock this for testing return response['active'] + def _folderExists(path): - logger.debug("Checking if path exists -> "+ path) - response = momqueryrpc.folderExists(path) # todo mock this for testing + logger.debug("Checking if path exists -> " + path) + response = momqueryrpc.folderExists(path) # todo mock this for testing return response["exists"] + def _authenticateAction(user, project, jobtype, state): - logger.debug("Authenticate action -> "+ user +', ' +project +', '+ jobtype +', '+ state) - response = momqueryrpc.authorized_add_with_status(user,project,jobtype,state) # todo mock this for testing + logger.debug("Authenticate action -> " + user + ', ' + project + ', ' + jobtype + ', ' + state) + response = momqueryrpc.authorized_add_with_status(user, project, jobtype, + state) # todo mock this for testing return response['authorized'] + def _validate_lofarspec(lofar_xml): response = validationrpc.validate_specification(lofar_xml) if not response["valid"]: raise Exception("Invalid specification: " + response["error"]) + def _validate_momspec(mom_xml): response = validationrpc.validate_mom_specification(mom_xml) if not response["valid"]: raise Exception("Invalid MoM specification: " + response["error"]) + def _add_spec_to_mom(mom_xml): # Construct message payload using old-style (MessageBus) message format msg = MessageContent() @@ -212,7 +232,7 @@ def _add_spec_to_mom(mom_xml): msg.summary = "Translated LOFAR specifications" msg.momid = -1 msg.sasid = -1 - msg.payload = "\n%s\n" # MoM needs enters around the payload to avoid "Content not allowed in prolog" error + msg.payload = "\n%s\n" # MoM needs enters around the payload to avoid "Content not allowed in prolog" error content = msg.content() % (mom_xml,) emsg = EventMessage(subject=MOMIMPORTXML_SUBJECT, content=content) @@ -220,11 +240,13 @@ def _add_spec_to_mom(mom_xml): logger.debug("Send specs to MOM: " + mom_xml) + def _lofarxml_to_momxml(lofarxml): logger.debug("Translating LOFAR spec to MoM spec") - response = specificationtranslationrpc.specification_to_momspecification(lofarxml) + response = specificationtranslationrpc.specification_to_momspecification(lofarxml) return response["mom-specification"] + class SpecificationHandler(ServiceMessageHandler): def __init__(self, **kwargs): @@ -232,38 +254,37 @@ class SpecificationHandler(ServiceMessageHandler): momimportxml_bus.open() - def add_specification(self, user, lofar_xml): - logger.info("got specification from user " + str(user)) - logger.debug("LOFAR-XML: " + lofar_xml) + with momimportxml_bus: + logger.info("got specification from user " + str(user)) + logger.debug("LOFAR-XML: " + lofar_xml) - _validate_lofarspec(lofar_xml) - logger.info("lofar xml validates!") + _validate_lofarspec(lofar_xml) + logger.info("lofar xml validates!") - _check_specification(user, lofar_xml) - logger.info("lofar xml check successful!") + _check_specification(user, lofar_xml) + logger.info("lofar xml check successful!") - mom_xml = _lofarxml_to_momxml(lofar_xml) - _validate_momspec(mom_xml) - logger.info("mom xml validates!") + mom_xml = _lofarxml_to_momxml(lofar_xml) + _validate_momspec(mom_xml) + logger.info("mom xml validates!") - _add_spec_to_mom(mom_xml) - logger.info("Fired and forgot to mom.importXML") + _add_spec_to_mom(mom_xml) + logger.info("Fired and forgot to mom.importXML") def get_specification(self, user, id): - logger.info("getting spec of id "+str(id)) + logger.info("getting spec of id " + str(id)) response = momqueryrpc.get_trigger_spec(user, id) return response def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): return RPCService(SPECIFICATION_SERVICENAME, - SpecificationHandler, - exchange=busname, - broker=broker) + SpecificationHandler, + exchange=busname, + broker=broker) def main(): with create_service(): - waitForInterrupt() - + waitForInterrupt()