From 17ef11437fce9c06fce4b1dfa83c44c356d5e1de Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 16 Apr 2019 11:29:21 +0000 Subject: [PATCH] SW-658: replaced SOAPpy by python3 compliant package pysimplesoap. Adapted ingestmomadapter to use pysimplesoap. Added test t_ingestmomadapter to tests soap functionality. --- .gitattributes | 3 + .../LTAIngestAdminServer/lib/CMakeLists.txt | 2 + .../lib/ingestmomadapter.py | 142 ++++++++++++------ .../LTAIngestAdminServer/test/CMakeLists.txt | 1 + .../test/t_ingestmomadapter.py | 69 +++++++++ .../test/t_ingestmomadapter.run | 6 + .../test/t_ingestmomadapter.sh | 3 + .../LTAIngestServerCommon/config.py | 10 +- .../lib/CMakeLists.txt | 2 + .../LTAIngestTransferServer/lib/momclient.py | 36 ++--- 10 files changed, 204 insertions(+), 70 deletions(-) create mode 100755 LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py create mode 100755 LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.run create mode 100755 LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.sh diff --git a/.gitattributes b/.gitattributes index c17f1c8ee39..156df1f8c5a 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2107,6 +2107,9 @@ LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt -text LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.py -text LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.run -text LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestjobmanagementserver.sh -text +LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py -text +LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.run -text +LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.sh -text LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py -text LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/bin/CMakeLists.txt -text LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/bin/ingestpipeline -text diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/CMakeLists.txt b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/CMakeLists.txt index 8bb1244b5f1..643f06c1837 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/CMakeLists.txt +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/CMakeLists.txt @@ -1,4 +1,6 @@ +find_python_module(pysimplesoap REQUIRED) # sudo pip3 install PySimpleSOAP + python_install(ingestmomadapter.py ingestjobmanagementserver.py DESTINATION lofar/lta/ingest/server) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py index f733ee01cf1..6d8993b93e1 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py @@ -35,18 +35,15 @@ from lofar.messaging import CommandMessage, EventMessage, ToBus from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.common.datetimeutils import totalSeconds +from lofar.common.util import waitForInterrupt import sys import time from datetime import datetime, timedelta from threading import Thread -try: - import SOAPpy -except ImportError as ie: - print(str(ie)) - print('Please install SOAPpy: sudo pip install SOAPpy') - exit(-1) +from http.server import HTTPServer +import pysimplesoap as soap import logging logger = logging.getLogger() @@ -111,7 +108,7 @@ class IngestBusListenerForMomAdapter(IngestBusListener): class IngestMomAdapter: - def __init__(self, momClient, + def __init__(self, momClient = None, notification_queue_name=DEFAULT_MOMINGESTADAPTER_NOTIFICATION_QUEUENAME, notification_subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, notification_publish_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME, @@ -124,7 +121,9 @@ class IngestMomAdapter: broker=None, mom_broker=None, **kwargs): - self.__momClient = momClient + self._opened = False + + self.__momClient = MoMClient() if momClient is None else momClient if not mom_broker: mom_broker = broker @@ -142,47 +141,88 @@ class IngestMomAdapter: self.__job_queue = ToBus(job_queue_name, broker=broker) try: - logger.info('Setting up MoM SOAP server on %s:%s', mom_xmlrpc_host, mom_xmlrpc_port) - self._server = SOAPpy.SOAPServer((mom_xmlrpc_host, mom_xmlrpc_port)) - self._server.registerFunction(self.onXmlRPCJobReceived, 'urn:pipeline.export', 'new_job') + url = 'http://%s:%s' % (mom_xmlrpc_host, mom_xmlrpc_port) + logger.info('Setting up MoM SOAP server on %s', url) + # self._server = SOAPpy.SOAPServer((mom_xmlrpc_host, mom_xmlrpc_port)) + # self._server.registerFunction(self.onXmlRPCJobReceived, 'urn:pipeline.export', 'new_job') + dispatcher = soap.server.SoapDispatcher(name="mom_xmlrpc", location=url, action=url, namespace="urn:pipeline.export", debug=True) + dispatcher.register_function('new_job', + self.onXmlRPCJobReceived, + args={'fileName':str , 'fileContent': str}, + returns={'new_job_result': bool}) + + self._server = HTTPServer((mom_xmlrpc_host, mom_xmlrpc_port), soap.server.SOAPHandler) + self._server.dispatcher = dispatcher + except IOError as e: - logger.error('Could not listen on %s:%s. %s', mom_xmlrpc_host, mom_xmlrpc_port, e) + logger.error('Could not listen on %s. %s', url, e) sys.exit(-1) + def server_address(self): + """get the xml-rpc server address as host,port tuple""" + return self._server.server_address + + def server_url(self): + """get the xml-rpc server address as url""" + return "http://%s:%s" % self._server.server_address + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def open(self): + if self._opened: + return + + self.__job_queue.open() + self.__event_bus.open() + + self.__momClient.login() + + #run the soap server in a seperate thread + logger.info('Starting MoM SOAP server on %s:%s', self._server.server_address[0], self._server.server_address[1]) + self._server_thread = Thread(target=self._server.serve_forever) + self._server_thread.daemon = True + self._server_thread.start() + + self._opened = True + + def close(self): + # shutdown soap server and wait for its thread to complete + logger.info('Shutting down MoM SOAP server on %s:%s', self._server.server_address[0], self._server.server_address[1]) + self._server.shutdown() + self._server_thread.join() + logger.info('MoM SOAP server stopped') + + self.__momClient.logout() + + self.__job_queue.close() + self.__event_bus.close() + self._opened = False + def run(self): - with self.__job_queue: - with self.__event_bus: - #run the soap server in a seperate thread - logger.info('Starting MoM SOAP server on %s:%s', self._server.server_address[0], self._server.server_address[1]) - t = Thread(target=self._server.serve_forever) - t.daemon = True - t.start() - - logger.info('*****************************************') - logger.info('Started IngestMomAdapter') - logger.info('*****************************************') - - #run event loop forever until KeyboardInterrupt - while True: - #open __ingest_notification_listener, closes on leaving scope - with self.__ingest_notification_listener: - #run for 15 min, doing nothing except listing for event and KeyboardInterrupt - for i in range(15*60): - try: - time.sleep(1) - except KeyboardInterrupt: - break - #15min loop ended, or KeyboardInterrupt - #leave scope of __ingest_notification_listener - #so it reconnects in next iteration of while loop - #and it can (re)process any messages which were not ack'ed - #because mom might have been unreachable - - #shutdown soap server - logger.info('Shutting down MoM SOAP server on %s', self._server.server_address) - self._server.shutdown() - t.join() - logger.info('MoM SOAP server stopped') + with self: + #run event loop forever until KeyboardInterrupt + running = True + while running: + #open __ingest_notification_listener, closes on leaving scope + with self.__ingest_notification_listener: + #run for 15 min, doing nothing except listing for event and KeyboardInterrupt + for i in range(15*60): + try: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Ctrl-C pressed..") + running = False + break + #15min loop ended, or KeyboardInterrupt + #leave scope of __ingest_notification_listener + #so it reconnects in next iteration of while loop + #and it can (re)process any messages which were not ack'ed + #because mom might have been unreachable def onXmlRPCJobReceived(self, fileName, fileContent): try: @@ -194,13 +234,20 @@ class IngestMomAdapter: logger.debug('submitting job %s to queue %s at %s', job['JobId'], self.__job_queue.address, self.__job_queue.broker) msg = CommandMessage(content=fileContent) - msg.priority=job.get('priority', 4) + try: + msg.priority = int(job.get('priority', 4)) + except Exception as e: + logger.error("Cannot set priority in job message: %s", e) + self.__job_queue.send(msg) logger.info('submitted job %s to queue %s at %s', job['JobId'], self.__job_queue.address, self.__job_queue.broker) else: logger.info("Could not parse message as job: %s", fileContent) except Exception as e: logger.error(e) + return False + + return True def _update_mom_status_if_applicable(self, job_dict, status): if job_dict.get('type','').lower() == 'mom': @@ -371,6 +418,9 @@ def main(): mom_servicename=options.mom_query_servicename, broker=options.broker, mom_broker=options.mom_query_service_broker) + logger.info('*****************************************') + logger.info('Started IngestMomAdapter') + logger.info('*****************************************') adapter.run() logger.info('Stopped IngestMomAdapter') diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt index ce220ad8428..045aa645ab7 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt @@ -1,5 +1,6 @@ include(LofarCTest) lofar_add_test(t_ingestjobmanagementserver) +lofar_add_test(t_ingestmomadapter) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py new file mode 100755 index 00000000000..d33ce2d5c44 --- /dev/null +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 + +import uuid +import unittest +from unittest import mock +from random import randint +from pysimplesoap.client import SoapClient + +import logging +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) +logger = logging.getLogger(__name__) + +from lofar.messaging.messagebus import TemporaryQueue +from lofar.lta.ingest.server.ingestmomadapter import IngestMomAdapter +from lofar.lta.ingest.common.job import createJobXml + +class TestIngestMoMAdapter(unittest.TestCase): + def setUp(self): + self.tmp_queue1 = TemporaryQueue("t_ingestmomadapter_lofar.lta.ingest.jobs") + self.tmp_queue1.open() + self.addCleanup(self.tmp_queue1.close) + + self.tmp_queue2 = TemporaryQueue("t_ingestmomadapter_lofar.lta.ingest.notification") + self.tmp_queue2.open() + self.addCleanup(self.tmp_queue2.close) + + self.tmp_queue3 = TemporaryQueue("t_ingestmomadapter_lofar.lta.ingest.notification.momingestadapter") + self.tmp_queue3.open() + self.addCleanup(self.tmp_queue3.close) + + momclient_patcher = mock.patch('lofar.lta.ingest.server.momclient.MoMClient') + self.addCleanup(momclient_patcher.stop) + self.momclient_mock = momclient_patcher.start() + + self.adapter = IngestMomAdapter(job_queue_name=self.tmp_queue1.address, + notification_publish_busname=self.tmp_queue2.address, + notification_queue_name=self.tmp_queue3.address, + mom_xmlrpc_host='localhost', + mom_xmlrpc_port=randint(2345, 4567), # pick random port to reduce chance of clashes + momClient = self.momclient_mock) + + # Note: at this moment we only test the xml-rpc soap receiver. + # TODO: add more tests + + def test_onXmlRPCJobReceived_no_soap(self): + """test the handler routine onXmlRPCJobReceived to check if a job_xml is converted to a message and send on the correct bus""" + with self.adapter: + job_xml = createJobXml('project', 0, 1, 'dp_id', 2, '/tmp/path/to/dataproduct') + self.adapter.onXmlRPCJobReceived('my_job_file.xml', job_xml) + + with self.tmp_queue1.create_frombus() as job_receiver: + job_msg = job_receiver.receive() + self.assertEqual(job_xml, job_msg.body) + + def test_mom_soap_to_job_queue(self): + """assuming test_onXmlRPCJobReceived_no_soap passes, test the correct behaviour when called via soap xml-rpc""" + with self.adapter: + job_xml = createJobXml('project', 0, 1, 'dp_id', 2, '/tmp/path/to/dataproduct') + + soap_client = SoapClient(location=self.adapter.server_url(), namespace="urn:pipeline.export") + soap_client.new_job(fileName='my_job_file.xml', fileContent=job_xml) + + with self.tmp_queue1.create_frombus() as job_receiver: + job_msg = job_receiver.receive() + self.assertEqual(job_xml, job_msg.body) + + +if __name__ == '__main__': + unittest.main() diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.run b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.run new file mode 100755 index 00000000000..43e82cabd73 --- /dev/null +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.run @@ -0,0 +1,6 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "*LTAIngest*" t_ingestmomadapter.py + diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.sh b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.sh new file mode 100755 index 00000000000..103004cf421 --- /dev/null +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingestmomadapter.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_ingestmomadapter diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py index 36670b04aeb..4615c5967cf 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestServerCommon/config.py @@ -1,6 +1,7 @@ from lofar.messaging import adaptNameToEnvironment from lofar.common import isProductionEnvironment from lofar.common import isTestEnvironment +from socket import gethostname #server config is same as common config plus extra's from lofar.lta.ingest.common.config import * @@ -13,10 +14,12 @@ DEFAULT_MOMINGESTADAPTER_NOTIFICATION_QUEUENAME = '%s.momingestadapter' % DEFAUL #e.g.: qpid-config bind lofar.lta.ingest.notification lofar.lta.ingest.notification.jobmanager DEFAULT_JOBMANAGER_NOTIFICATION_QUEUENAME = '%s.jobmanager' % DEFAULT_INGEST_NOTIFICATION_BUSNAME -DEFAULT_MOM_XMLRPC_HOST=hostnameToIp('lexar003') if isProductionEnvironment() else '10.144.4.78' +DEFAULT_MOM_XMLRPC_HOST=hostnameToIp('lexar003.lexar.control.lofar' if isProductionEnvironment() and 'lexar' in gethostname() else + 'lexar004.lexar.control.lofar' if isTestEnvironment() and 'lexar' in gethostname() else + 'localhost') DEFAULT_MOM_XMLRPC_PORT=2010 if isProductionEnvironment() else 2009 -MOM_BASE_URL = 'https://lcs029.control.lofar:8443/' if isProductionEnvironment() else 'https://lcs028.control.lofar:8443/' +MOM_BASE_URL = 'https://lcs029.control.lofar:8443/' if isProductionEnvironment() else 'http://lofartest.control.lofar:8080/' LTA_BASE_URL = 'https://%s:%s@lta-ingest.lofar.eu:9443/' if isProductionEnvironment() else 'https://%s:%s@lta-ingest-test.lofar.eu:19443/' @@ -28,7 +31,8 @@ MAX_NR_OF_JOBS=40 MAX_USED_BANDWITH_TO_START_NEW_JOBS=9.9e9 #Gbps NET_IF_TO_MONITOR=['p2p1.2030', # outgoing traffic to Juelich 'p2p1.2033', # outgoing traffic to Poznan - 'p2p1.992'] # outgoing traffic to SARA + 'p2p1.992' # outgoing traffic to SARA + ] if isProductionEnvironment() else [] GLOBUS_TIMEOUT = 1800 diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/CMakeLists.txt b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/CMakeLists.txt index 707afebc1af..758be518df8 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/CMakeLists.txt +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/CMakeLists.txt @@ -1,4 +1,6 @@ +find_python_module(mechanize REQUIRED) #sudo pip3 install mechanize + python_install(ingesttransferserver.py ltacp.py unspecifiedSIP.py diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py index d949e77d3e7..8ab951b0054 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py @@ -12,13 +12,7 @@ from lofar.common.util import humanreadablesize logger = logging.getLogger() -try: - import mechanize -except ImportError as e: - print(e) - print("please install python3 'mechanize' package: sudo pip3 install mechanize") - print() - exit(1) +import mechanize class MoMClient: @@ -58,7 +52,7 @@ class MoMClient: self.MAX_MOM_RETRIES = 3 - def __login(self): + def login(self): try: if self.__logged_in: return @@ -76,7 +70,7 @@ class MoMClient: except Exception as e: raise Exception("Logging into MoM on %s failed: %s" % (self.__momURLlogin, str(e))) - def __logout(self): + def logout(self): try: self.__browser.open(self.__momURLlogout) self.__logged_in = False @@ -84,11 +78,11 @@ class MoMClient: logger.warning("Logging out of MoM failed: " + str(e)) def __enter__(self): - self.__login() + self.login() return self def __exit__(self, exc_type, exc_val, exc_tb): - self.__logout() + self.logout() def setStatus(self, export_id, status_id, message = None): try: @@ -97,7 +91,7 @@ class MoMClient: # so, upon error, retry a couple of times with a pause, else just return for mom_retry in range(self.MAX_MOM_RETRIES): if not self.__logged_in: - self.__login() + self.login() params = {"exportId" : export_id, "status" : status_id} statusUrl = self.__momURLsetStatus + '?' + urllib.parse.urlencode(params) @@ -136,7 +130,7 @@ class MoMClient: jobState2String(int(status_id)), statusUrl, reply) - self.__logout() + self.logout() if 'DOCTYPE HTML PUBLIC' in reply: logger.error('MoM returned login screen instead of SIP for archive_id=%s mom_id=%s using url %s and data %s', archive_id, mom_id, self.__momURLgetSIP, data) @@ -150,7 +144,7 @@ class MoMClient: logger.error('MoMClient.setStatus could not update status of %s to %s: %s', export_id, jobState2String(int(status_id)), e) - self.__logout() + self.logout() return False def uploadDataAndGetSIP(self, archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate = True): @@ -162,8 +156,8 @@ class MoMClient: start = time.time() logger.info("MoMClient.uploadDataAndGetSIP with archiveId %s - StorageTicket %s - FileName %s - Uri %s", archive_id, storage_ticket, filename, uri) - if not self.__logged_in: - self.__login() + if not self.logged_in: + self.login() xmlcontent = """<?xml version="1.0" encoding="UTF-8"?> <lofar:DataProduct archiveId="%s" xmlns:lofar="http://www.astron.nl/MoM2-Lofar"> @@ -204,7 +198,7 @@ class MoMClient: # logout, even though we think we should be logged in properly # it's mom who thinks we should login again, even though we have a proper session. # next retry, we'll login automatically again - self.__logout() + self.logout() if mom_retry == (self.MAX_MOM_RETRIES - 1): # for some reason mom cannot handle the uploadDataAndGetSIP @@ -247,7 +241,7 @@ class MoMClient: humanreadablesize(len(result)), result[:512].replace('\n', '')) return result except Exception as e: - self.__logout() + self.logout() raise Exception("getting SIP from MoM failed: " + str(e)) return '' @@ -260,8 +254,8 @@ class MoMClient: # so, upon error, retry a couple of times with a pause, else just return for mom_retry in range(self.MAX_MOM_RETRIES): try: - if not self.__logged_in: - self.__login() + if not self.logged_in: + self.login() mom_id = archive_id - 1000000 # stupid mom one million archive_id offset @@ -289,7 +283,7 @@ class MoMClient: return result except Exception as e: - self.__logout() + self.logout() raise Exception("getting SIP from MoM failed: " + str(e)) return '' -- GitLab