#!/usr/bin/env python3 # Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # $Id: $ import os import time from multiprocessing import Process, Event import logging logger = logging.getLogger(__name__) from lofar.common.dbcredentials import Credentials, DBCredentials from lofar.common.util import find_free_port, waitForInterrupt from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.sas.resourceassignment.database.radb import RADatabase from lofar.sas.resourceassignment.database.testing.radb_common_testing import RADBTestDatabaseInstance from lofar.sas.resourceassignment.resourceassigner.raservice import RAService from lofar.sas.resourceassignment.resourceassignmentservice.service import createService as createRADBService from lofar.sas.resourceassignment.resourceassignmentestimator.service import createService as createEstimatorService class RATestEnvironment: '''Create and run a several ResourrceAssigner services in an isolated test environment''' def __init__(self, exchange: str=os.environ.get("RA_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("RA_BROKER", DEFAULT_BROKER)): self.radb_test_instance = RADBTestDatabaseInstance() self.radb = self.radb_test_instance.create_database_connection() self.radb_service = createRADBService(dbcreds=self.radb_test_instance.dbcreds, exchange=exchange, broker=broker) self.re_service = createEstimatorService(exchange=exchange, broker=broker) self.ra_service = RAService(radbcreds=self.radb_test_instance.dbcreds, exchange=exchange, broker=broker) def start(self): self.radb_test_instance.create() self.radb.connect() self.radb_service.start_listening() self.re_service.start_listening() self.ra_service.start_listening() def stop(self): self.radb.disconnect() self.ra_service.stop_listening() self.re_service.stop_listening() self.radb_service.stop_listening() self.radb_test_instance.destroy() def __enter__(self): try: self.start() except Exception as e: logger.error(e) self.stop() raise return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() def main(): """instantiate, run and destroy a ResourceAssignment test environment""" from optparse import OptionParser, OptionGroup os.environ['TZ'] = 'UTC' parser = OptionParser('%prog [options]', description='setup/run/teardown a full RA test environment including a fresh and isolated RA database, and resourceassigment services.') group = OptionGroup(parser, 'Messaging options') group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default') group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Bus or queue where the TMSS messages are published. [default: %default]") parser.add_option_group(group) (options, args) = parser.parse_args() logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO) from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC from datetime import datetime, timedelta with RATestEnvironment(exchange=options.exchange, broker=options.broker) as instance: # print some nice info for the user to use the test servers... # use print instead of log for clean lines. for h in logging.root.handlers: h.flush() print() print() print("*****************************************************") print("RADB, and RA-services up and running...") print("*****************************************************") print("RADB Credentials ID: %s" % (instance.radb.dbcreds_id, )) print() print("Press Ctrl-C to exit (and remove the test database and django server automatically)") waitForInterrupt() if __name__ == '__main__': main()