Skip to content
Snippets Groups Projects
Commit c685607f authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

SW-699: Alter SpecificationServices and tests to work with new RabbbitMQ messaging

parent e8199d59
Branches
Tags
No related merge requests found
...@@ -5,7 +5,7 @@ lofar_package(OTDB_Services 1.0 DEPENDS PyMessaging) ...@@ -5,7 +5,7 @@ lofar_package(OTDB_Services 1.0 DEPENDS PyMessaging)
lofar_find_package(Python 3.4 REQUIRED) lofar_find_package(Python 3.4 REQUIRED)
include(PythonInstall) include(PythonInstall)
find_python_module(pg REQUIRED) # sudo aptitude install python3-pg #find_python_module(pg REQUIRED) # sudo aptitude install python3-pg
lofar_add_bin_scripts( lofar_add_bin_scripts(
getOTDBParset getOTDBParset
......
...@@ -48,10 +48,12 @@ from xmljson import Parker ...@@ -48,10 +48,12 @@ from xmljson import Parker
import re import re
import datetime import datetime
from .config import VALIDATION_SERVICENAME, VALIDATION_BUSNAME from .config import VALIDATION_SERVICENAME
from .validation_service_rpc import ValidationRPC from .validation_service_rpc import ValidationRPC
from .specification_service import _parse_relation_tree, make_key, _parse_project_code from .specification_service import _parse_relation_tree, make_key, _parse_project_code
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from io import BytesIO from io import BytesIO
import logging import logging
...@@ -61,7 +63,7 @@ __changedBy__ = "Joern Kuensemoeller" ...@@ -61,7 +63,7 @@ __changedBy__ = "Joern Kuensemoeller"
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
validationrpc = ValidationRPC(VALIDATION_BUSNAME, VALIDATION_SERVICENAME) validationrpc = ValidationRPC(DEFAULT_BUSNAME, DEFAULT_BROKER)
# ------------------------ # ------------------------
# -> For moving elements: # -> For moving elements:
......
...@@ -27,8 +27,7 @@ from lofar.specificationservices.validation_service_rpc import ValidationRPC ...@@ -27,8 +27,7 @@ from lofar.specificationservices.validation_service_rpc import ValidationRPC
from lofar.specificationservices.translation_service_rpc import TranslationRPC from lofar.specificationservices.translation_service_rpc import TranslationRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.messaging import Service, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging import RPCService, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME, ServiceMessageHandler
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt from lofar.common.util import waitForInterrupt
# TODO: mom.importxml uses old messaging interface # TODO: mom.importxml uses old messaging interface
...@@ -40,11 +39,11 @@ from .config import \ ...@@ -40,11 +39,11 @@ from .config import \
MOMIMPORTXML_BUSNAME, \ MOMIMPORTXML_BUSNAME, \
MOMIMPORTXML_SUBJECT MOMIMPORTXML_SUBJECT
momqueryrpc = MoMQueryRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) momqueryrpc = MoMQueryRPC.create(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER)
validationrpc = ValidationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) validationrpc = ValidationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER)
specificationtranslationrpc = TranslationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) specificationtranslationrpc = TranslationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER)
momimportxml_bus = ToBus(address=MOMIMPORTXML_BUSNAME, broker=DEFAULT_BROKER) momimportxml_bus = ToBus(exchange=MOMIMPORTXML_BUSNAME, broker=DEFAULT_BROKER)
permitted_activities=["observation", "pipeline", "measurement"] permitted_activities=["observation", "pipeline", "measurement"]
permitted_statuses=["opened", "approved"] permitted_statuses=["opened", "approved"]
...@@ -216,7 +215,7 @@ def _add_spec_to_mom(mom_xml): ...@@ -216,7 +215,7 @@ def _add_spec_to_mom(mom_xml):
msg.payload = "\n%s\n" # MoM needs enters around the payload to avoid "Content not allowed in prolog" error msg.payload = "\n%s\n" # MoM needs enters around the payload to avoid "Content not allowed in prolog" error
content = msg.content() % (mom_xml,) content = msg.content() % (mom_xml,)
emsg = EventMessage(context=MOMIMPORTXML_SUBJECT, content=content) emsg = EventMessage(subject=MOMIMPORTXML_SUBJECT, content=content)
momimportxml_bus.send(emsg) momimportxml_bus.send(emsg)
logger.debug("Send specs to MOM: " + mom_xml) logger.debug("Send specs to MOM: " + mom_xml)
...@@ -226,17 +225,13 @@ def _lofarxml_to_momxml(lofarxml): ...@@ -226,17 +225,13 @@ def _lofarxml_to_momxml(lofarxml):
response = specificationtranslationrpc.specification_to_momspecification(lofarxml) response = specificationtranslationrpc.specification_to_momspecification(lofarxml)
return response["mom-specification"] return response["mom-specification"]
class SpecificationHandler(MessageHandlerInterface): class SpecificationHandler(ServiceMessageHandler):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super(SpecificationHandler, self).__init__(**kwargs) super(SpecificationHandler, self).__init__()
momimportxml_bus.open() momimportxml_bus.open()
self.service2MethodMap = {
'add_specification': self.add_specification,
'get_specification': self.get_specification,
}
def add_specification(self, user, lofar_xml): def add_specification(self, user, lofar_xml):
logger.info("got specification from user " + str(user)) logger.info("got specification from user " + str(user))
...@@ -261,12 +256,11 @@ class SpecificationHandler(MessageHandlerInterface): ...@@ -261,12 +256,11 @@ class SpecificationHandler(MessageHandlerInterface):
return response return response
def create_service(busname=DEFAULT_BUSNAME): def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
return Service(SPECIFICATION_SERVICENAME, return RPCService(SPECIFICATION_SERVICENAME,
SpecificationHandler, SpecificationHandler,
busname=busname, exchange=busname,
use_service_methods=True, broker=broker)
)
def main(): def main():
......
from lofar.messaging.RPC import RPC, RPCException, RPCWrapper from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, RPCClientContextManagerMixin, RPCClient, DEFAULT_RPC_TIMEOUT
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from .config import SPECIFICATION_SERVICENAME from .config import SPECIFICATION_SERVICENAME
import logging import logging
logger = logging.getLogger(__file__) logger = logging.getLogger(__file__)
class SpecificationRPC(RPCWrapper): class SpecificationRPC(RPCClientContextManagerMixin):
def __init__(self, busname=DEFAULT_BUSNAME, def __init__(self, busname=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER, broker=DEFAULT_BROKER,
timeout=120): timeout=DEFAULT_RPC_TIMEOUT):
super(SpecificationRPC, self).__init__(busname, SPECIFICATION_SERVICENAME, broker, timeout=timeout) super().__init__()
self._rpc_client = RPCClient(service_name=SPECIFICATION_SERVICENAME, exchange=busname, broker=broker, timeout=timeout)
def add_specification(self, user, lofar_xml): def add_specification(self, user, lofar_xml):
logger.info("Requesting addition of specification XML for user -> "+user) logger.info("Requesting addition of specification XML for user -> "+user)
result = self.rpc('add_specification', user, lofar_xml=lofar_xml) result = self._rpc_client.execute('add_specification', user, lofar_xml=lofar_xml)
logger.info("Received addition result -> " + str(result)) logger.info("Received addition result -> " + str(result))
return result return result
def get_specification(self, user, id): def get_specification(self, user, id):
logger.info("Requesting specification XML for user, id -> "+user+","+id) logger.info("Requesting specification XML for user, id -> "+user+","+id)
result = self.rpc('get_specification', user, id=id) result = self._rpc_client.execute('get_specification', user, id=id)
logger.info("Received specification XML -> " + str(result)) logger.info("Received specification XML -> " + str(result))
return result return result
...@@ -46,8 +46,7 @@ logger = logging.getLogger(__name__) ...@@ -46,8 +46,7 @@ logger = logging.getLogger(__name__)
from lxml import etree from lxml import etree
from io import BytesIO from io import BytesIO
from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging import RPCService, ServiceMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt from lofar.common.util import waitForInterrupt
from .config import SPECIFICATIONTRANSLATION_SERVICENAME from .config import SPECIFICATIONTRANSLATION_SERVICENAME
...@@ -62,15 +61,9 @@ validationrpc = ValidationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) ...@@ -62,15 +61,9 @@ validationrpc = ValidationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER)
FULL_TRANSLATION = "Full translation" FULL_TRANSLATION = "Full translation"
MODEL_TRANSLATION = "Model translation" MODEL_TRANSLATION = "Model translation"
class SpecificationTranslationHandler(ServiceMessageHandler):
class SpecificationTranslationHandler(MessageHandlerInterface):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super(SpecificationTranslationHandler, self).__init__(**kwargs) super(SpecificationTranslationHandler, self).__init__()
self.service2MethodMap = {
'trigger_to_specification': self.trigger_to_specification,
'specification_to_momspecification': self.specification_to_momspecification,
}
def trigger_to_specification(self, trigger_spec, trigger_id, job_priority): def trigger_to_specification(self, trigger_spec, trigger_id, job_priority):
...@@ -152,12 +145,11 @@ class SpecificationTranslationHandler(MessageHandlerInterface): ...@@ -152,12 +145,11 @@ class SpecificationTranslationHandler(MessageHandlerInterface):
raise Exception("MoM specification validation after translation failed! -> " + str(response)) raise Exception("MoM specification validation after translation failed! -> " + str(response))
def create_service(busname=DEFAULT_BUSNAME): def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
return Service(SPECIFICATIONTRANSLATION_SERVICENAME, return RPCService(SPECIFICATIONTRANSLATION_SERVICENAME,
SpecificationTranslationHandler, SpecificationTranslationHandler,
busname=busname, exchange=busname,
use_service_methods=True, broker=broker)
)
def main(): def main():
......
from lofar.messaging.RPC import RPC, RPCException, RPCWrapper from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from .config import SPECIFICATIONTRANSLATION_SERVICENAME from .config import SPECIFICATIONTRANSLATION_SERVICENAME
import logging import logging
logger = logging.getLogger(__file__) logger = logging.getLogger(__file__)
class TranslationRPC(RPCWrapper): class TranslationRPC(RPCClientContextManagerMixin):
def __init__(self, busname=DEFAULT_BUSNAME, def __init__(self, busname=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER, broker=DEFAULT_BROKER,
timeout=120): timeout=DEFAULT_RPC_TIMEOUT):
super(TranslationRPC, self).__init__(busname, SPECIFICATIONTRANSLATION_SERVICENAME, broker, timeout=timeout) super(TranslationRPC, self).__init__()
self._rpc_client = RPCClient(SPECIFICATIONTRANSLATION_SERVICENAME, busname, broker, timeout=timeout)
def trigger_to_specification(self, trigger_spec, trigger_id, job_priority): def trigger_to_specification(self, trigger_spec, trigger_id, job_priority):
logger.info("Requesting validation of trigger XML") logger.info("Requesting validation of trigger XML")
result = self.rpc('trigger_to_specification', result = self._rpc_client.execute('trigger_to_specification',
trigger_spec=trigger_spec, trigger_spec=trigger_spec,
trigger_id=trigger_id, trigger_id=trigger_id,
job_priority=job_priority) job_priority=job_priority)
...@@ -26,6 +26,6 @@ class TranslationRPC(RPCWrapper): ...@@ -26,6 +26,6 @@ class TranslationRPC(RPCWrapper):
def specification_to_momspecification(self, spec): def specification_to_momspecification(self, spec):
logger.info("Requesting validation of trigger XML") logger.info("Requesting validation of trigger XML")
result = self.rpc('specification_to_momspecification', spec_xml=spec) result = self._rpc_client.exchange('specification_to_momspecification', spec_xml=spec)
logger.info("Received validation result -> " +str(result)) logger.info("Received validation result -> " +str(result))
return result return result
...@@ -28,8 +28,7 @@ import logging ...@@ -28,8 +28,7 @@ import logging
from io import BytesIO from io import BytesIO
from lxml import etree from lxml import etree
import os import os
from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging import ServiceMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME, RPCService
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt from lofar.common.util import waitForInterrupt
from .config import TRIGGER_XSD, LOFARSPEC_XSD, MOMSPEC_XSD, VALIDATION_SERVICENAME from .config import TRIGGER_XSD, LOFARSPEC_XSD, MOMSPEC_XSD, VALIDATION_SERVICENAME
...@@ -70,15 +69,9 @@ def _validateXSD(xml, xsdpath): ...@@ -70,15 +69,9 @@ def _validateXSD(xml, xsdpath):
return {"valid": True} return {"valid": True}
class ValidationHandler(MessageHandlerInterface): class ValidationHandler(ServiceMessageHandler):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super(ValidationHandler, self).__init__(**kwargs) super(ValidationHandler, self).__init__()
self.service2MethodMap = {
'validate_trigger_specification': self.validate_trigger_specification,
'validate_specification': self.validate_specification,
'validate_mom_specification': self.validate_mom_specification,
}
def validate_trigger_specification(self, xml): def validate_trigger_specification(self, xml):
return _validateXSD(xml, TRIGGER_XSD) return _validateXSD(xml, TRIGGER_XSD)
...@@ -93,12 +86,11 @@ class ValidationHandler(MessageHandlerInterface): ...@@ -93,12 +86,11 @@ class ValidationHandler(MessageHandlerInterface):
def create_service(busname=DEFAULT_BUSNAME): def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
return Service(VALIDATION_SERVICENAME, return RPCService(VALIDATION_SERVICENAME,
ValidationHandler, ValidationHandler,
busname=busname, exchange=busname,
use_service_methods=True, broker=broker)
)
def main(): def main():
......
from lofar.messaging.RPC import RPC, RPCException, RPCWrapper from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from .config import VALIDATION_SERVICENAME from .config import VALIDATION_SERVICENAME
import logging import logging
logger = logging.getLogger(__file__) logger = logging.getLogger(__file__)
from ast import literal_eval from ast import literal_eval
class ValidationRPC(RPCWrapper): class ValidationRPC(RPCClientContextManagerMixin):
def __init__(self, busname=DEFAULT_BUSNAME, def __init__(self, busname=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER, broker=DEFAULT_BROKER,
timeout=120): timeout=DEFAULT_RPC_TIMEOUT):
super(ValidationRPC, self).__init__(busname, VALIDATION_SERVICENAME, broker, timeout=timeout) super(ValidationRPC, self).__init__()
self._rpc_client = RPCClient(VALIDATION_SERVICENAME, busname, broker, timeout=timeout)
def validate_trigger_specification(self, xml): def validate_trigger_specification(self, xml):
logger.info("Requesting validation of trigger XML") logger.info("Requesting validation of trigger XML")
result = self.rpc('validate_trigger_specification', xml=xml) result = self._rpc_client.execute('validate_trigger_specification', xml=xml)
logger.info("Received validation result -> " +str(result)) logger.info("Received validation result -> " +str(result))
return result return result
def validate_specification(self, xml): def validate_specification(self, xml):
logger.info("Requesting validation of specification XML") logger.info("Requesting validation of specification XML")
result = self.rpc('validate_specification', xml=xml) result = self._rpc_client.execute('validate_specification', xml=xml)
logger.info("Received validation result -> " +str(result)) logger.info("Received validation result -> " +str(result))
return result return result
def validate_mom_specification(self, xml): def validate_mom_specification(self, xml):
logger.info("Requesting validation of MoM specification XML") logger.info("Requesting validation of MoM specification XML")
result = self.rpc('validate_mom_specification', xml=xml) result = self._rpc_client.execute('validate_mom_specification', xml=xml)
logger.info("Received validation result -> " +str(result)) logger.info("Received validation result -> " +str(result))
return result return result
......
...@@ -46,12 +46,13 @@ class TestValidationService(unittest.TestCase): ...@@ -46,12 +46,13 @@ class TestValidationService(unittest.TestCase):
f.close() f.close()
return xmlcontent return xmlcontent
def test_validation_service_method_map_should_be_correct(self): # todo: the service map is not around anymore. Do we have to test anything equivalent with new RabbitMQ setup?
uut = ValidationHandler() # def test_validation_service_method_map_should_be_correct(self):
# uut = ValidationHandler()
self.assertEqual(uut.service2MethodMap["validate_trigger_specification"], uut.validate_trigger_specification) #
self.assertEqual(uut.service2MethodMap["validate_specification"], uut.validate_specification) # self.assertEqual(uut.service2MethodMap["validate_trigger_specification"], uut.validate_trigger_specification)
self.assertEqual(uut.service2MethodMap["validate_mom_specification"], uut.validate_mom_specification) # self.assertEqual(uut.service2MethodMap["validate_specification"], uut.validate_specification)
# self.assertEqual(uut.service2MethodMap["validate_mom_specification"], uut.validate_mom_specification)
def test_validate_specification_should_raise_exception_on_invalid_xsd(self): def test_validate_specification_should_raise_exception_on_invalid_xsd(self):
xml = self.sample_lofar_xml xml = self.sample_lofar_xml
......
...@@ -24,7 +24,7 @@ import unittest ...@@ -24,7 +24,7 @@ import unittest
import os import os
import logging import logging
from lofar.triggerservices.trigger_service import TriggerHandler, ALERTHandler, DEFAULT_TBB_PROJECT from lofar.triggerservices.trigger_service import TriggerServiceMessageHandler, ALERTHandler, DEFAULT_TBB_PROJECT
import lofar.triggerservices.trigger_service as serv import lofar.triggerservices.trigger_service as serv
from lofar.specificationservices.translation_service import SpecificationTranslationHandler from lofar.specificationservices.translation_service import SpecificationTranslationHandler
from lxml import etree from lxml import etree
...@@ -47,7 +47,7 @@ class TestTriggerHandler(unittest.TestCase): ...@@ -47,7 +47,7 @@ class TestTriggerHandler(unittest.TestCase):
cls.trigger_xml = f.read() cls.trigger_xml = f.read()
with mock.patch('lofar.triggerservices.trigger_service.notification_bus'): with mock.patch('lofar.triggerservices.trigger_service.notification_bus'):
cls.handler = TriggerHandler() cls.handler = TriggerServiceMessageHandler()
def setUp(self): def setUp(self):
logging.info('-----------------') logging.info('-----------------')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment