Skip to content
Snippets Groups Projects
Commit e52c4a19 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

SW-720: python2 python3 conversion and rabbitmq implementation left over

parent fdfece49
Branches
Tags
1 merge request!4Lofar release 4 0 minor fixes
...@@ -20,21 +20,20 @@ ...@@ -20,21 +20,20 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lxml import etree
from io import BytesIO
from collections import OrderedDict from collections import OrderedDict
from lofar.specificationservices.validation_service_rpc import ValidationRPC from io import BytesIO
from lofar.specificationservices.translation_service_rpc import TranslationRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.messaging import RPCService, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME, ServiceMessageHandler
from lofar.common.util import waitForInterrupt from lofar.common.util import waitForInterrupt
# TODO: mom.importxml uses old messaging interface # TODO: mom.importxml uses old messaging interface
from lofar.messagebus.message import MessageContent from lofar.messagebus.message import MessageContent
from lofar.messaging import RPCService, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME, \
ServiceMessageHandler
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.specificationservices.translation_service_rpc import TranslationRPC
from lofar.specificationservices.validation_service_rpc import ValidationRPC
from lxml import etree
from .config import \ from .config import \
VALIDATION_SERVICENAME, \
SPECIFICATION_SERVICENAME, \ SPECIFICATION_SERVICENAME, \
MOMIMPORTXML_BUSNAME, \ MOMIMPORTXML_BUSNAME, \
MOMIMPORTXML_SUBJECT MOMIMPORTXML_SUBJECT
...@@ -49,8 +48,10 @@ permitted_activities=["observation", "pipeline", "measurement"] ...@@ -49,8 +48,10 @@ permitted_activities=["observation", "pipeline", "measurement"]
permitted_statuses = ["opened", "approved"] permitted_statuses = ["opened", "approved"]
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def make_key(element): def make_key(element):
source = element.find("source").text source = element.find("source").text
identifier = element.find("identifier").text identifier = element.find("identifier").text
...@@ -65,15 +66,23 @@ def _parse_relation_tree(spec): ...@@ -65,15 +66,23 @@ def _parse_relation_tree(spec):
# todo: Expose as service method? This requires conversion to xml again... # todo: Expose as service method? This requires conversion to xml again...
containers = spec.findall('container') containers = spec.findall('container')
folder_activity = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if x.find("type").text == "folder-activity"] folder_activity = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if
folder_folder = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if x.find("type").text == "folder-folder"] x.find("type").text == "folder-activity"]
folder_folder = [(x.find("parent"), x.find("child")) for x in spec.findall("relation") if
x.find("type").text == "folder-folder"]
# Instead of normal dict we use an OrderedDict in order to be able to influence the output element order (through # Instead of normal dict we use an OrderedDict in order to be able to influence the output element order (through
# input order of folder-activity relations). The generator in the constructor creates a sorted list, so this is # input order of folder-activity relations). The generator in the constructor creates a sorted list, so this is
# rather similar to a standard dict comprehension. # rather similar to a standard dict comprehension.
foldernames = OrderedDict((make_key(container.find("temporaryIdentifier")), container.find('folder').find('name').text) for container in containers) foldernames = OrderedDict((make_key(container.find("temporaryIdentifier")),
parentfolders = OrderedDict((make_key(folder_id), make_key(parentfolder_id)) for (parentfolder_id, folder_id) in folder_folder) container.find('folder').find('name').text) for container in
activityfolders = OrderedDict((make_key(activity_id), make_key(folder_id)) for (folder_id, activity_id) in folder_activity ) containers)
parentfolders = OrderedDict(
(make_key(folder_id), make_key(parentfolder_id)) for (parentfolder_id, folder_id) in
folder_folder)
activityfolders = OrderedDict(
(make_key(activity_id), make_key(folder_id)) for (folder_id, activity_id) in
folder_activity)
# check completeness # check completeness
for folder in list(activityfolders.values()): for folder in list(activityfolders.values()):
...@@ -87,6 +96,7 @@ def _parse_relation_tree(spec): ...@@ -87,6 +96,7 @@ def _parse_relation_tree(spec):
return activityfolders, parentfolders, foldernames return activityfolders, parentfolders, foldernames
def _parse_project_code(spec): def _parse_project_code(spec):
projectref = spec.find('projectReference') projectref = spec.find('projectReference')
if projectref is not None: if projectref is not None:
...@@ -126,6 +136,7 @@ def _parse_activity_paths(spec): ...@@ -126,6 +136,7 @@ def _parse_activity_paths(spec):
logger.debug("Activity path -> " + str(key) + " --> " + paths[key]) logger.debug("Activity path -> " + str(key) + " --> " + paths[key])
return paths return paths
def _check_specification(user, lofar_xml): def _check_specification(user, lofar_xml):
""" """
Performs some checks to make sure the specification meets some criteria before we accespt it. Performs some checks to make sure the specification meets some criteria before we accespt it.
...@@ -164,13 +175,15 @@ def _check_specification(user, lofar_xml): ...@@ -164,13 +175,15 @@ def _check_specification(user, lofar_xml):
break break
logger.warning("!!! %s not found..." % (action,)) logger.warning("!!! %s not found..." % (action,))
if jobtype is None: if jobtype is None:
raise Exception("Specified activity is not permitted: " + str(key) + " -> "+ str(permitted_activities) raise Exception("Specified activity is not permitted: " + str(key) + " -> " + str(
permitted_activities)
+ " not found in " + str(activity.getchildren())) + " not found in " + str(activity.getchildren()))
status = activity.find("status") status = activity.find("status")
if status is None: if status is None:
raise Exception("Activity has no status: " + str(key)) raise Exception("Activity has no status: " + str(key))
if status.text not in permitted_statuses: if status.text not in permitted_statuses:
raise Exception("Specified activity is not going to permitted status: " + str(key) + " -> '" raise Exception(
"Specified activity is not going to permitted status: " + str(key) + " -> '"
+ str(status.text) + "' not in " + str(permitted_statuses)) + str(status.text) + "' not in " + str(permitted_statuses))
# measurements require observation permissions # measurements require observation permissions
...@@ -179,31 +192,38 @@ def _check_specification(user, lofar_xml): ...@@ -179,31 +192,38 @@ def _check_specification(user, lofar_xml):
_authenticateAction(str(user), str(project), str(jobtype), str(status.text)) _authenticateAction(str(user), str(project), str(jobtype), str(status.text))
def _isActive(project): def _isActive(project):
logger.debug("Checking if project is active: " + project) logger.debug("Checking if project is active: " + project)
response = momqueryrpc.isProjectActive(project) # todo mock this for testing response = momqueryrpc.isProjectActive(project) # todo mock this for testing
return response['active'] return response['active']
def _folderExists(path): def _folderExists(path):
logger.debug("Checking if path exists -> " + path) logger.debug("Checking if path exists -> " + path)
response = momqueryrpc.folderExists(path) # todo mock this for testing response = momqueryrpc.folderExists(path) # todo mock this for testing
return response["exists"] return response["exists"]
def _authenticateAction(user, project, jobtype, state): def _authenticateAction(user, project, jobtype, state):
logger.debug("Authenticate action -> " + user + ', ' + project + ', ' + jobtype + ', ' + state) logger.debug("Authenticate action -> " + user + ', ' + project + ', ' + jobtype + ', ' + state)
response = momqueryrpc.authorized_add_with_status(user,project,jobtype,state) # todo mock this for testing response = momqueryrpc.authorized_add_with_status(user, project, jobtype,
state) # todo mock this for testing
return response['authorized'] return response['authorized']
def _validate_lofarspec(lofar_xml): def _validate_lofarspec(lofar_xml):
response = validationrpc.validate_specification(lofar_xml) response = validationrpc.validate_specification(lofar_xml)
if not response["valid"]: if not response["valid"]:
raise Exception("Invalid specification: " + response["error"]) raise Exception("Invalid specification: " + response["error"])
def _validate_momspec(mom_xml): def _validate_momspec(mom_xml):
response = validationrpc.validate_mom_specification(mom_xml) response = validationrpc.validate_mom_specification(mom_xml)
if not response["valid"]: if not response["valid"]:
raise Exception("Invalid MoM specification: " + response["error"]) raise Exception("Invalid MoM specification: " + response["error"])
def _add_spec_to_mom(mom_xml): def _add_spec_to_mom(mom_xml):
# Construct message payload using old-style (MessageBus) message format # Construct message payload using old-style (MessageBus) message format
msg = MessageContent() msg = MessageContent()
...@@ -220,11 +240,13 @@ def _add_spec_to_mom(mom_xml): ...@@ -220,11 +240,13 @@ def _add_spec_to_mom(mom_xml):
logger.debug("Send specs to MOM: " + mom_xml) logger.debug("Send specs to MOM: " + mom_xml)
def _lofarxml_to_momxml(lofarxml): def _lofarxml_to_momxml(lofarxml):
logger.debug("Translating LOFAR spec to MoM spec") logger.debug("Translating LOFAR spec to MoM spec")
response = specificationtranslationrpc.specification_to_momspecification(lofarxml) response = specificationtranslationrpc.specification_to_momspecification(lofarxml)
return response["mom-specification"] return response["mom-specification"]
class SpecificationHandler(ServiceMessageHandler): class SpecificationHandler(ServiceMessageHandler):
def __init__(self, **kwargs): def __init__(self, **kwargs):
...@@ -232,8 +254,8 @@ class SpecificationHandler(ServiceMessageHandler): ...@@ -232,8 +254,8 @@ class SpecificationHandler(ServiceMessageHandler):
momimportxml_bus.open() momimportxml_bus.open()
def add_specification(self, user, lofar_xml): def add_specification(self, user, lofar_xml):
with momimportxml_bus:
logger.info("got specification from user " + str(user)) logger.info("got specification from user " + str(user))
logger.debug("LOFAR-XML: " + lofar_xml) logger.debug("LOFAR-XML: " + lofar_xml)
...@@ -266,4 +288,3 @@ def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): ...@@ -266,4 +288,3 @@ def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
def main(): def main():
with create_service(): with create_service():
waitForInterrupt() waitForInterrupt()
...@@ -97,7 +97,7 @@ class SpecificationTranslationHandler(ServiceMessageHandler): ...@@ -97,7 +97,7 @@ class SpecificationTranslationHandler(ServiceMessageHandler):
lofarspec = etree.Element("{http://www.astron.nl/LofarSpecification}specification", nsmap=spec.nsmap) lofarspec = etree.Element("{http://www.astron.nl/LofarSpecification}specification", nsmap=spec.nsmap)
for child in spec.getchildren(): for child in spec.getchildren():
lofarspec.append(child) lofarspec.append(child)
specification_xml = etree.tostring(lofarspec, pretty_print=True) specification_xml = etree.tostring(lofarspec, pretty_print=True).decode('utf8')
logger.debug(specification_xml) logger.debug(specification_xml)
except Exception as err: except Exception as err:
logger.error("Exception while translating trigger -> " + str(err)) logger.error("Exception while translating trigger -> " + str(err))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment