diff --git a/SAS/OTDB_Services/CMakeLists.txt b/SAS/OTDB_Services/CMakeLists.txt index 7fbb478aa2cf094af6ac01728ca83f94ed9cb51a..70ea0a7d272ff1ca1916da5820e1299d91ed6560 100644 --- a/SAS/OTDB_Services/CMakeLists.txt +++ b/SAS/OTDB_Services/CMakeLists.txt @@ -5,7 +5,7 @@ lofar_package(OTDB_Services 1.0 DEPENDS PyMessaging) lofar_find_package(Python 3.4 REQUIRED) include(PythonInstall) -find_python_module(pg REQUIRED) # sudo aptitude install python3-pg +#find_python_module(pg REQUIRED) # sudo aptitude install python3-pg lofar_add_bin_scripts( getOTDBParset diff --git a/SAS/SpecificationServices/lib/lofarxml_to_momxml_translator.py b/SAS/SpecificationServices/lib/lofarxml_to_momxml_translator.py index f795a8e9da3c57cc31c3bba7a5a3328fb7df0062..ee908d71fdc02fd9f3974e9c706ada11352f2d4e 100644 --- a/SAS/SpecificationServices/lib/lofarxml_to_momxml_translator.py +++ b/SAS/SpecificationServices/lib/lofarxml_to_momxml_translator.py @@ -48,10 +48,12 @@ from xmljson import Parker import re import datetime -from .config import VALIDATION_SERVICENAME, VALIDATION_BUSNAME +from .config import VALIDATION_SERVICENAME from .validation_service_rpc import ValidationRPC from .specification_service import _parse_relation_tree, make_key, _parse_project_code +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME + from io import BytesIO import logging @@ -61,7 +63,7 @@ __changedBy__ = "Joern Kuensemoeller" logger = logging.getLogger(__name__) -validationrpc = ValidationRPC(VALIDATION_BUSNAME, VALIDATION_SERVICENAME) +validationrpc = ValidationRPC(DEFAULT_BUSNAME, DEFAULT_BROKER) # ------------------------ # -> For moving elements: diff --git a/SAS/SpecificationServices/lib/specification_service.py b/SAS/SpecificationServices/lib/specification_service.py index 6864e814573b3bef37a3e4e65614d2653e267004..4ce07da269adccd34f7a0962243c1fc39546f186 100644 --- a/SAS/SpecificationServices/lib/specification_service.py +++ b/SAS/SpecificationServices/lib/specification_service.py @@ -27,8 +27,7 @@ from lofar.specificationservices.validation_service_rpc import ValidationRPC from lofar.specificationservices.translation_service_rpc import TranslationRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC -from lofar.messaging import Service, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.Service import MessageHandlerInterface +from lofar.messaging import RPCService, ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME, ServiceMessageHandler from lofar.common.util import waitForInterrupt # TODO: mom.importxml uses old messaging interface @@ -40,11 +39,11 @@ from .config import \ MOMIMPORTXML_BUSNAME, \ MOMIMPORTXML_SUBJECT -momqueryrpc = MoMQueryRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) +momqueryrpc = MoMQueryRPC.create(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) validationrpc = ValidationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) specificationtranslationrpc = TranslationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) -momimportxml_bus = ToBus(address=MOMIMPORTXML_BUSNAME, broker=DEFAULT_BROKER) +momimportxml_bus = ToBus(exchange=MOMIMPORTXML_BUSNAME, broker=DEFAULT_BROKER) permitted_activities=["observation", "pipeline", "measurement"] permitted_statuses=["opened", "approved"] @@ -216,7 +215,7 @@ def _add_spec_to_mom(mom_xml): msg.payload = "\n%s\n" # MoM needs enters around the payload to avoid "Content not allowed in prolog" error content = msg.content() % (mom_xml,) - emsg = EventMessage(context=MOMIMPORTXML_SUBJECT, content=content) + emsg = EventMessage(subject=MOMIMPORTXML_SUBJECT, content=content) momimportxml_bus.send(emsg) logger.debug("Send specs to MOM: " + mom_xml) @@ -226,17 +225,13 @@ def _lofarxml_to_momxml(lofarxml): response = specificationtranslationrpc.specification_to_momspecification(lofarxml) return response["mom-specification"] -class SpecificationHandler(MessageHandlerInterface): +class SpecificationHandler(ServiceMessageHandler): def __init__(self, **kwargs): - super(SpecificationHandler, self).__init__(**kwargs) + super(SpecificationHandler, self).__init__() momimportxml_bus.open() - self.service2MethodMap = { - 'add_specification': self.add_specification, - 'get_specification': self.get_specification, - } def add_specification(self, user, lofar_xml): logger.info("got specification from user " + str(user)) @@ -261,12 +256,11 @@ class SpecificationHandler(MessageHandlerInterface): return response -def create_service(busname=DEFAULT_BUSNAME): - return Service(SPECIFICATION_SERVICENAME, +def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + return RPCService(SPECIFICATION_SERVICENAME, SpecificationHandler, - busname=busname, - use_service_methods=True, - ) + exchange=busname, + broker=broker) def main(): diff --git a/SAS/SpecificationServices/lib/specification_service_rpc.py b/SAS/SpecificationServices/lib/specification_service_rpc.py index 1d89bce98ab90c9616f2fe2c1faa3279fc677ff0..11618bc9e9957960752be1a3d337402a5d4b8c7e 100644 --- a/SAS/SpecificationServices/lib/specification_service_rpc.py +++ b/SAS/SpecificationServices/lib/specification_service_rpc.py @@ -1,28 +1,26 @@ -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, RPCClientContextManagerMixin, RPCClient, DEFAULT_RPC_TIMEOUT from .config import SPECIFICATION_SERVICENAME import logging logger = logging.getLogger(__file__) -class SpecificationRPC(RPCWrapper): +class SpecificationRPC(RPCClientContextManagerMixin): def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - timeout=120): - super(SpecificationRPC, self).__init__(busname, SPECIFICATION_SERVICENAME, broker, timeout=timeout) - + timeout=DEFAULT_RPC_TIMEOUT): + super().__init__() + self._rpc_client = RPCClient(service_name=SPECIFICATION_SERVICENAME, exchange=busname, broker=broker, timeout=timeout) def add_specification(self, user, lofar_xml): logger.info("Requesting addition of specification XML for user -> "+user) - result = self.rpc('add_specification', user, lofar_xml=lofar_xml) + result = self._rpc_client.execute('add_specification', user, lofar_xml=lofar_xml) logger.info("Received addition result -> " + str(result)) return result - def get_specification(self, user, id): logger.info("Requesting specification XML for user, id -> "+user+","+id) - result = self.rpc('get_specification', user, id=id) + result = self._rpc_client.execute('get_specification', user, id=id) logger.info("Received specification XML -> " + str(result)) return result diff --git a/SAS/SpecificationServices/lib/translation_service.py b/SAS/SpecificationServices/lib/translation_service.py index 586f887b3a8743e5f4d998ad781fbe3b6045c7ef..ae0ee741a541af47041eca63354cd493934358c3 100644 --- a/SAS/SpecificationServices/lib/translation_service.py +++ b/SAS/SpecificationServices/lib/translation_service.py @@ -46,8 +46,7 @@ logger = logging.getLogger(__name__) from lxml import etree from io import BytesIO -from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.Service import MessageHandlerInterface +from lofar.messaging import RPCService, ServiceMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.common.util import waitForInterrupt from .config import SPECIFICATIONTRANSLATION_SERVICENAME @@ -62,15 +61,9 @@ validationrpc = ValidationRPC(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) FULL_TRANSLATION = "Full translation" MODEL_TRANSLATION = "Model translation" - -class SpecificationTranslationHandler(MessageHandlerInterface): +class SpecificationTranslationHandler(ServiceMessageHandler): def __init__(self, **kwargs): - super(SpecificationTranslationHandler, self).__init__(**kwargs) - - self.service2MethodMap = { - 'trigger_to_specification': self.trigger_to_specification, - 'specification_to_momspecification': self.specification_to_momspecification, - } + super(SpecificationTranslationHandler, self).__init__() def trigger_to_specification(self, trigger_spec, trigger_id, job_priority): @@ -152,12 +145,11 @@ class SpecificationTranslationHandler(MessageHandlerInterface): raise Exception("MoM specification validation after translation failed! -> " + str(response)) -def create_service(busname=DEFAULT_BUSNAME): - return Service(SPECIFICATIONTRANSLATION_SERVICENAME, - SpecificationTranslationHandler, - busname=busname, - use_service_methods=True, - ) +def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + return RPCService(SPECIFICATIONTRANSLATION_SERVICENAME, + SpecificationTranslationHandler, + exchange=busname, + broker=broker) def main(): diff --git a/SAS/SpecificationServices/lib/translation_service_rpc.py b/SAS/SpecificationServices/lib/translation_service_rpc.py index a23ea36a9da2729d683d3fe532bfd6fb338c2ac4..8f128ac2d81eb99c7fa670ad84f948a256fb5d95 100644 --- a/SAS/SpecificationServices/lib/translation_service_rpc.py +++ b/SAS/SpecificationServices/lib/translation_service_rpc.py @@ -1,22 +1,22 @@ -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT from .config import SPECIFICATIONTRANSLATION_SERVICENAME import logging logger = logging.getLogger(__file__) -class TranslationRPC(RPCWrapper): +class TranslationRPC(RPCClientContextManagerMixin): def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - timeout=120): - super(TranslationRPC, self).__init__(busname, SPECIFICATIONTRANSLATION_SERVICENAME, broker, timeout=timeout) + timeout=DEFAULT_RPC_TIMEOUT): + super(TranslationRPC, self).__init__() + self._rpc_client = RPCClient(SPECIFICATIONTRANSLATION_SERVICENAME, busname, broker, timeout=timeout) def trigger_to_specification(self, trigger_spec, trigger_id, job_priority): logger.info("Requesting validation of trigger XML") - result = self.rpc('trigger_to_specification', + result = self._rpc_client.execute('trigger_to_specification', trigger_spec=trigger_spec, trigger_id=trigger_id, job_priority=job_priority) @@ -26,6 +26,6 @@ class TranslationRPC(RPCWrapper): def specification_to_momspecification(self, spec): logger.info("Requesting validation of trigger XML") - result = self.rpc('specification_to_momspecification', spec_xml=spec) + result = self._rpc_client.exchange('specification_to_momspecification', spec_xml=spec) logger.info("Received validation result -> " +str(result)) return result diff --git a/SAS/SpecificationServices/lib/validation_service.py b/SAS/SpecificationServices/lib/validation_service.py index 17741505b4296b26c0ca5c73fded3e644d81bcfe..73ea20dd748c4c40d83006b339661ea37e242a0d 100644 --- a/SAS/SpecificationServices/lib/validation_service.py +++ b/SAS/SpecificationServices/lib/validation_service.py @@ -28,8 +28,7 @@ import logging from io import BytesIO from lxml import etree import os -from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.Service import MessageHandlerInterface +from lofar.messaging import ServiceMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME, RPCService from lofar.common.util import waitForInterrupt from .config import TRIGGER_XSD, LOFARSPEC_XSD, MOMSPEC_XSD, VALIDATION_SERVICENAME @@ -70,15 +69,9 @@ def _validateXSD(xml, xsdpath): return {"valid": True} -class ValidationHandler(MessageHandlerInterface): +class ValidationHandler(ServiceMessageHandler): def __init__(self, **kwargs): - super(ValidationHandler, self).__init__(**kwargs) - - self.service2MethodMap = { - 'validate_trigger_specification': self.validate_trigger_specification, - 'validate_specification': self.validate_specification, - 'validate_mom_specification': self.validate_mom_specification, - } + super(ValidationHandler, self).__init__() def validate_trigger_specification(self, xml): return _validateXSD(xml, TRIGGER_XSD) @@ -93,12 +86,11 @@ class ValidationHandler(MessageHandlerInterface): -def create_service(busname=DEFAULT_BUSNAME): - return Service(VALIDATION_SERVICENAME, - ValidationHandler, - busname=busname, - use_service_methods=True, - ) +def create_service(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + return RPCService(VALIDATION_SERVICENAME, + ValidationHandler, + exchange=busname, + broker=broker) def main(): diff --git a/SAS/SpecificationServices/lib/validation_service_rpc.py b/SAS/SpecificationServices/lib/validation_service_rpc.py index e41b7f65823285f5b35ec2389e577c54f7d32f19..da4432e3098be113d493c8da8b2dd1c32efb0a36 100644 --- a/SAS/SpecificationServices/lib/validation_service_rpc.py +++ b/SAS/SpecificationServices/lib/validation_service_rpc.py @@ -1,34 +1,34 @@ -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT from .config import VALIDATION_SERVICENAME import logging logger = logging.getLogger(__file__) from ast import literal_eval -class ValidationRPC(RPCWrapper): +class ValidationRPC(RPCClientContextManagerMixin): def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - timeout=120): - super(ValidationRPC, self).__init__(busname, VALIDATION_SERVICENAME, broker, timeout=timeout) + timeout=DEFAULT_RPC_TIMEOUT): + super(ValidationRPC, self).__init__() + self._rpc_client = RPCClient(VALIDATION_SERVICENAME, busname, broker, timeout=timeout) def validate_trigger_specification(self, xml): logger.info("Requesting validation of trigger XML") - result = self.rpc('validate_trigger_specification', xml=xml) + result = self._rpc_client.execute('validate_trigger_specification', xml=xml) logger.info("Received validation result -> " +str(result)) return result def validate_specification(self, xml): logger.info("Requesting validation of specification XML") - result = self.rpc('validate_specification', xml=xml) + result = self._rpc_client.execute('validate_specification', xml=xml) logger.info("Received validation result -> " +str(result)) return result def validate_mom_specification(self, xml): logger.info("Requesting validation of MoM specification XML") - result = self.rpc('validate_mom_specification', xml=xml) + result = self._rpc_client.execute('validate_mom_specification', xml=xml) logger.info("Received validation result -> " +str(result)) return result diff --git a/SAS/SpecificationServices/test/t_validation_service.py b/SAS/SpecificationServices/test/t_validation_service.py index c291cc69831379203e2e7fd4411a7e61f2ea1515..569471359d8ca3e2cbe5f246279154dd712cd0e8 100755 --- a/SAS/SpecificationServices/test/t_validation_service.py +++ b/SAS/SpecificationServices/test/t_validation_service.py @@ -46,12 +46,13 @@ class TestValidationService(unittest.TestCase): f.close() return xmlcontent - def test_validation_service_method_map_should_be_correct(self): - uut = ValidationHandler() - - self.assertEqual(uut.service2MethodMap["validate_trigger_specification"], uut.validate_trigger_specification) - self.assertEqual(uut.service2MethodMap["validate_specification"], uut.validate_specification) - self.assertEqual(uut.service2MethodMap["validate_mom_specification"], uut.validate_mom_specification) + # todo: the service map is not around anymore. Do we have to test anything equivalent with new RabbitMQ setup? + # def test_validation_service_method_map_should_be_correct(self): + # uut = ValidationHandler() + # + # self.assertEqual(uut.service2MethodMap["validate_trigger_specification"], uut.validate_trigger_specification) + # self.assertEqual(uut.service2MethodMap["validate_specification"], uut.validate_specification) + # self.assertEqual(uut.service2MethodMap["validate_mom_specification"], uut.validate_mom_specification) def test_validate_specification_should_raise_exception_on_invalid_xsd(self): xml = self.sample_lofar_xml diff --git a/SAS/TriggerServices/test/t_trigger_service.py b/SAS/TriggerServices/test/t_trigger_service.py index adc47644ffd86677d1037e53c7efa46e0ec524d0..6fc83bb19965f640c751edaad699d89d4ce8ee2d 100644 --- a/SAS/TriggerServices/test/t_trigger_service.py +++ b/SAS/TriggerServices/test/t_trigger_service.py @@ -24,7 +24,7 @@ import unittest import os import logging -from lofar.triggerservices.trigger_service import TriggerHandler, ALERTHandler, DEFAULT_TBB_PROJECT +from lofar.triggerservices.trigger_service import TriggerServiceMessageHandler, ALERTHandler, DEFAULT_TBB_PROJECT import lofar.triggerservices.trigger_service as serv from lofar.specificationservices.translation_service import SpecificationTranslationHandler from lxml import etree @@ -47,7 +47,7 @@ class TestTriggerHandler(unittest.TestCase): cls.trigger_xml = f.read() with mock.patch('lofar.triggerservices.trigger_service.notification_bus'): - cls.handler = TriggerHandler() + cls.handler = TriggerServiceMessageHandler() def setUp(self): logging.info('-----------------')