From c685607f5db622dbf58475b30a1c35a6fa40831d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20K=C3=BCnsem=C3=B6ller?= <jkuensem@physik.uni-bielefeld.de> Date: Thu, 13 Jun 2019 13:31:32 +0000 Subject: [PATCH] SW-699: Alter SpecificationServices and tests to work with new RabbbitMQ messaging --- SAS/OTDB_Services/CMakeLists.txt | 2 +- .../lib/lofarxml_to_momxml_translator.py | 6 +++-- .../lib/specification_service.py | 26 +++++++------------ .../lib/specification_service_rpc.py | 16 +++++------- .../lib/translation_service.py | 24 ++++++----------- .../lib/translation_service_rpc.py | 14 +++++----- .../lib/validation_service.py | 24 ++++++----------- .../lib/validation_service_rpc.py | 16 ++++++------ .../test/t_validation_service.py | 13 +++++----- SAS/TriggerServices/test/t_trigger_service.py | 4 +-- 10 files changed, 62 insertions(+), 83 deletions(-) diff --git a/SAS/OTDB_Services/CMakeLists.txt b/SAS/OTDB_Services/CMakeLists.txt index 7fbb478aa2c..70ea0a7d272 100644 --- a/SAS/OTDB_Services/CMakeLists.txt +++ b/SAS/OTDB_Services/CMakeLists.txt @@ -5,7 +5,7 @@ lofar_package(OTDB_Services 1.0 DEPENDS PyMessaging) lofar_find_package(Python 3.4 REQUIRED) include(PythonInstall) -find_python_module(pg REQUIRED) # sudo aptitude install python3-pg +#find_python_module(pg REQUIRED) # sudo aptitude install python3-pg lofar_add_bin_scripts( getOTDBParset diff --git a/SAS/SpecificationServices/lib/lofarxml_to_momxml_translator.py b/SAS/SpecificationServices/lib/lofarxml_to_momxml_translator.py index f795a8e9da3..ee908d71fdc 100644 --- a/SAS/SpecificationServices/lib/lofarxml_to_momxml_translator.py +++ b/SAS/SpecificationServices/lib/lofarxml_to_momxml_translator.py @@ -48,10 +48,12 @@ from xmljson import Parker import re import datetime -from .config import VALIDATION_SERVICENAME, VALIDATION_BUSNAME +from .config import VALIDATION_SERVICENAME from .validation_service_rpc import ValidationRPC from .specification_service import _parse_relation_tree, make_key, _parse_project_code +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME + from io import BytesIO import logging @@ -61,7 +63,7 @@ __changedBy__ = "Joern Kuensemoeller" logger = logging.getLogger(__name__) -validationrpc = ValidationRPC(VALIDATION_BUSNAME, VALIDATION_SERVICENAME) +validationrpc = ValidationRPC(DEFAULT_BUSNAME, DEFAULT_BROKER) # ------------------------ # -> For moving elements: diff --git a/SAS/SpecificationServices/lib/specification_service.py b/SAS/SpecificationServices/lib/specification_service.py index 6864e814573..4ce07da269a 100644 --- a/SAS/SpecificationServices/lib/specification_service.py +++ b/SAS/SpecificationServices/lib/specification_service.py @@ -27,8 +27,7 @@ from lofar.specificationservices.validation_service_rpc import ValidationRPC from lofar.specificationservices.translation_service_rpc import TranslationRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC -from lofar.messaging import Service, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.Service import MessageHandlerInterface +from lofar.messaging import RPCService, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME, ServiceMessageHandler from lofar.common.util import waitForInterrupt # TODO: mom.importxml uses old messaging interface @@ -40,11 +39,11 @@ from .config import \ MOMIMPORTXML_BUSNAME, \ MOMIMPORTXML_SUBJECT -momqueryrpc = MoMQueryRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) +momqueryrpc = MoMQueryRPC.create(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) validationrpc = ValidationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) specificationtranslationrpc = TranslationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) -momimportxml_bus = ToBus(address=MOMIMPORTXML_BUSNAME, broker=DEFAULT_BROKER) +momimportxml_bus = ToBus(exchange=MOMIMPORTXML_BUSNAME, broker=DEFAULT_BROKER) permitted_activities=["observation", "pipeline", "measurement"] permitted_statuses=["opened", "approved"] @@ -216,7 +215,7 @@ def _add_spec_to_mom(mom_xml): msg.payload = "\n%s\n" # MoM needs enters around the payload to avoid "Content not allowed in prolog" error content = msg.content() % (mom_xml,) - emsg = EventMessage(context=MOMIMPORTXML_SUBJECT, content=content) + emsg = EventMessage(subject=MOMIMPORTXML_SUBJECT, content=content) momimportxml_bus.send(emsg) logger.debug("Send specs to MOM: " + mom_xml) @@ -226,17 +225,13 @@ def _lofarxml_to_momxml(lofarxml): response = specificationtranslationrpc.specification_to_momspecification(lofarxml) return response["mom-specification"] -class SpecificationHandler(MessageHandlerInterface): +class SpecificationHandler(ServiceMessageHandler): def __init__(self, **kwargs): - super(SpecificationHandler, self).__init__(**kwargs) + super(SpecificationHandler, self).__init__() momimportxml_bus.open() - self.service2MethodMap = { - 'add_specification': self.add_specification, - 'get_specification': self.get_specification, - } def add_specification(self, user, lofar_xml): logger.info("got specification from user " + str(user)) @@ -261,12 +256,11 @@ class SpecificationHandler(MessageHandlerInterface): return response -def create_service(busname=DEFAULT_BUSNAME): - return Service(SPECIFICATION_SERVICENAME, +def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + return RPCService(SPECIFICATION_SERVICENAME, SpecificationHandler, - busname=busname, - use_service_methods=True, - ) + exchange=busname, + broker=broker) def main(): diff --git a/SAS/SpecificationServices/lib/specification_service_rpc.py b/SAS/SpecificationServices/lib/specification_service_rpc.py index 1d89bce98ab..11618bc9e99 100644 --- a/SAS/SpecificationServices/lib/specification_service_rpc.py +++ b/SAS/SpecificationServices/lib/specification_service_rpc.py @@ -1,28 +1,26 @@ -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, RPCClientContextManagerMixin, RPCClient, DEFAULT_RPC_TIMEOUT from .config import SPECIFICATION_SERVICENAME import logging logger = logging.getLogger(__file__) -class SpecificationRPC(RPCWrapper): +class SpecificationRPC(RPCClientContextManagerMixin): def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - timeout=120): - super(SpecificationRPC, self).__init__(busname, SPECIFICATION_SERVICENAME, broker, timeout=timeout) - + timeout=DEFAULT_RPC_TIMEOUT): + super().__init__() + self._rpc_client = RPCClient(service_name=SPECIFICATION_SERVICENAME, exchange=busname, broker=broker, timeout=timeout) def add_specification(self, user, lofar_xml): logger.info("Requesting addition of specification XML for user -> "+user) - result = self.rpc('add_specification', user, lofar_xml=lofar_xml) + result = self._rpc_client.execute('add_specification', user, lofar_xml=lofar_xml) logger.info("Received addition result -> " + str(result)) return result - def get_specification(self, user, id): logger.info("Requesting specification XML for user, id -> "+user+","+id) - result = self.rpc('get_specification', user, id=id) + result = self._rpc_client.execute('get_specification', user, id=id) logger.info("Received specification XML -> " + str(result)) return result diff --git a/SAS/SpecificationServices/lib/translation_service.py b/SAS/SpecificationServices/lib/translation_service.py index 586f887b3a8..ae0ee741a54 100644 --- a/SAS/SpecificationServices/lib/translation_service.py +++ b/SAS/SpecificationServices/lib/translation_service.py @@ -46,8 +46,7 @@ logger = logging.getLogger(__name__) from lxml import etree from io import BytesIO -from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.Service import MessageHandlerInterface +from lofar.messaging import RPCService, ServiceMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.common.util import waitForInterrupt from .config import SPECIFICATIONTRANSLATION_SERVICENAME @@ -62,15 +61,9 @@ validationrpc = ValidationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) FULL_TRANSLATION = "Full translation" MODEL_TRANSLATION = "Model translation" - -class SpecificationTranslationHandler(MessageHandlerInterface): +class SpecificationTranslationHandler(ServiceMessageHandler): def __init__(self, **kwargs): - super(SpecificationTranslationHandler, self).__init__(**kwargs) - - self.service2MethodMap = { - 'trigger_to_specification': self.trigger_to_specification, - 'specification_to_momspecification': self.specification_to_momspecification, - } + super(SpecificationTranslationHandler, self).__init__() def trigger_to_specification(self, trigger_spec, trigger_id, job_priority): @@ -152,12 +145,11 @@ class SpecificationTranslationHandler(MessageHandlerInterface): raise Exception("MoM specification validation after translation failed! -> " + str(response)) -def create_service(busname=DEFAULT_BUSNAME): - return Service(SPECIFICATIONTRANSLATION_SERVICENAME, - SpecificationTranslationHandler, - busname=busname, - use_service_methods=True, - ) +def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + return RPCService(SPECIFICATIONTRANSLATION_SERVICENAME, + SpecificationTranslationHandler, + exchange=busname, + broker=broker) def main(): diff --git a/SAS/SpecificationServices/lib/translation_service_rpc.py b/SAS/SpecificationServices/lib/translation_service_rpc.py index a23ea36a9da..8f128ac2d81 100644 --- a/SAS/SpecificationServices/lib/translation_service_rpc.py +++ b/SAS/SpecificationServices/lib/translation_service_rpc.py @@ -1,22 +1,22 @@ -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT from .config import SPECIFICATIONTRANSLATION_SERVICENAME import logging logger = logging.getLogger(__file__) -class TranslationRPC(RPCWrapper): +class TranslationRPC(RPCClientContextManagerMixin): def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - timeout=120): - super(TranslationRPC, self).__init__(busname, SPECIFICATIONTRANSLATION_SERVICENAME, broker, timeout=timeout) + timeout=DEFAULT_RPC_TIMEOUT): + super(TranslationRPC, self).__init__() + self._rpc_client = RPCClient(SPECIFICATIONTRANSLATION_SERVICENAME, busname, broker, timeout=timeout) def trigger_to_specification(self, trigger_spec, trigger_id, job_priority): logger.info("Requesting validation of trigger XML") - result = self.rpc('trigger_to_specification', + result = self._rpc_client.execute('trigger_to_specification', trigger_spec=trigger_spec, trigger_id=trigger_id, job_priority=job_priority) @@ -26,6 +26,6 @@ class TranslationRPC(RPCWrapper): def specification_to_momspecification(self, spec): logger.info("Requesting validation of trigger XML") - result = self.rpc('specification_to_momspecification', spec_xml=spec) + result = self._rpc_client.exchange('specification_to_momspecification', spec_xml=spec) logger.info("Received validation result -> " +str(result)) return result diff --git a/SAS/SpecificationServices/lib/validation_service.py b/SAS/SpecificationServices/lib/validation_service.py index 17741505b42..73ea20dd748 100644 --- a/SAS/SpecificationServices/lib/validation_service.py +++ b/SAS/SpecificationServices/lib/validation_service.py @@ -28,8 +28,7 @@ import logging from io import BytesIO from lxml import etree import os -from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.Service import MessageHandlerInterface +from lofar.messaging import ServiceMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME, RPCService from lofar.common.util import waitForInterrupt from .config import TRIGGER_XSD, LOFARSPEC_XSD, MOMSPEC_XSD, VALIDATION_SERVICENAME @@ -70,15 +69,9 @@ def _validateXSD(xml, xsdpath): return {"valid": True} -class ValidationHandler(MessageHandlerInterface): +class ValidationHandler(ServiceMessageHandler): def __init__(self, **kwargs): - super(ValidationHandler, self).__init__(**kwargs) - - self.service2MethodMap = { - 'validate_trigger_specification': self.validate_trigger_specification, - 'validate_specification': self.validate_specification, - 'validate_mom_specification': self.validate_mom_specification, - } + super(ValidationHandler, self).__init__() def validate_trigger_specification(self, xml): return _validateXSD(xml, TRIGGER_XSD) @@ -93,12 +86,11 @@ class ValidationHandler(MessageHandlerInterface): -def create_service(busname=DEFAULT_BUSNAME): - return Service(VALIDATION_SERVICENAME, - ValidationHandler, - busname=busname, - use_service_methods=True, - ) +def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + return RPCService(VALIDATION_SERVICENAME, + ValidationHandler, + exchange=busname, + broker=broker) def main(): diff --git a/SAS/SpecificationServices/lib/validation_service_rpc.py b/SAS/SpecificationServices/lib/validation_service_rpc.py index e41b7f65823..da4432e3098 100644 --- a/SAS/SpecificationServices/lib/validation_service_rpc.py +++ b/SAS/SpecificationServices/lib/validation_service_rpc.py @@ -1,34 +1,34 @@ -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT from .config import VALIDATION_SERVICENAME import logging logger = logging.getLogger(__file__) from ast import literal_eval -class ValidationRPC(RPCWrapper): +class ValidationRPC(RPCClientContextManagerMixin): def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - timeout=120): - super(ValidationRPC, self).__init__(busname, VALIDATION_SERVICENAME, broker, timeout=timeout) + timeout=DEFAULT_RPC_TIMEOUT): + super(ValidationRPC, self).__init__() + self._rpc_client = RPCClient(VALIDATION_SERVICENAME, busname, broker, timeout=timeout) def validate_trigger_specification(self, xml): logger.info("Requesting validation of trigger XML") - result = self.rpc('validate_trigger_specification', xml=xml) + result = self._rpc_client.execute('validate_trigger_specification', xml=xml) logger.info("Received validation result -> " +str(result)) return result def validate_specification(self, xml): logger.info("Requesting validation of specification XML") - result = self.rpc('validate_specification', xml=xml) + result = self._rpc_client.execute('validate_specification', xml=xml) logger.info("Received validation result -> " +str(result)) return result def validate_mom_specification(self, xml): logger.info("Requesting validation of MoM specification XML") - result = self.rpc('validate_mom_specification', xml=xml) + result = self._rpc_client.execute('validate_mom_specification', xml=xml) logger.info("Received validation result -> " +str(result)) return result diff --git a/SAS/SpecificationServices/test/t_validation_service.py b/SAS/SpecificationServices/test/t_validation_service.py index c291cc69831..569471359d8 100755 --- a/SAS/SpecificationServices/test/t_validation_service.py +++ b/SAS/SpecificationServices/test/t_validation_service.py @@ -46,12 +46,13 @@ class TestValidationService(unittest.TestCase): f.close() return xmlcontent - def test_validation_service_method_map_should_be_correct(self): - uut = ValidationHandler() - - self.assertEqual(uut.service2MethodMap["validate_trigger_specification"], uut.validate_trigger_specification) - self.assertEqual(uut.service2MethodMap["validate_specification"], uut.validate_specification) - self.assertEqual(uut.service2MethodMap["validate_mom_specification"], uut.validate_mom_specification) + # todo: the service map is not around anymore. Do we have to test anything equivalent with new RabbitMQ setup? + # def test_validation_service_method_map_should_be_correct(self): + # uut = ValidationHandler() + # + # self.assertEqual(uut.service2MethodMap["validate_trigger_specification"], uut.validate_trigger_specification) + # self.assertEqual(uut.service2MethodMap["validate_specification"], uut.validate_specification) + # self.assertEqual(uut.service2MethodMap["validate_mom_specification"], uut.validate_mom_specification) def test_validate_specification_should_raise_exception_on_invalid_xsd(self): xml = self.sample_lofar_xml diff --git a/SAS/TriggerServices/test/t_trigger_service.py b/SAS/TriggerServices/test/t_trigger_service.py index adc47644ffd..6fc83bb1996 100644 --- a/SAS/TriggerServices/test/t_trigger_service.py +++ b/SAS/TriggerServices/test/t_trigger_service.py @@ -24,7 +24,7 @@ import unittest import os import logging -from lofar.triggerservices.trigger_service import TriggerHandler, ALERTHandler, DEFAULT_TBB_PROJECT +from lofar.triggerservices.trigger_service import TriggerServiceMessageHandler, ALERTHandler, DEFAULT_TBB_PROJECT import lofar.triggerservices.trigger_service as serv from lofar.specificationservices.translation_service import SpecificationTranslationHandler from lxml import etree @@ -47,7 +47,7 @@ class TestTriggerHandler(unittest.TestCase): cls.trigger_xml = f.read() with mock.patch('lofar.triggerservices.trigger_service.notification_bus'): - cls.handler = TriggerHandler() + cls.handler = TriggerServiceMessageHandler() def setUp(self): logging.info('-----------------') -- GitLab