Skip to content
Snippets Groups Projects
Commit ddca902f authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-828: use a common OTDBTestInstance uing a common OTDBCommonTestMixin for testing otdb services

parent 2c3e7c38
No related branches found
No related tags found
2 merge requests!74Lofar release 4 0,!72Resolve SW-828
......@@ -98,7 +98,7 @@ def main():
create_service(options.busname, dbcreds)
def create_service(busname, dbcreds):
def create_service(busname, dbcreds, state_file_path='~/.lofar/otdb_treestatusevent_state'):
alive = True
connected = False
otdb_connection = None
......@@ -124,7 +124,7 @@ def create_service(busname, dbcreds):
if connected:
# Get start_time (= creation time of last retrieved record if any)
try:
treestatuseventfilename = os.path.expanduser('~/.lofar/otdb_treestatusevent_state')
treestatuseventfilename = os.path.expanduser(state_file_path)
with open(treestatuseventfilename, 'r') as f:
line = f.readline()
if line.rfind('.') > 0:
......
# $Id: CMakeLists.txt 1576 2015-09-29 15:22:28Z loose $
include(LofarCTest)
if(BUILD_TESTING)
include(LofarCTest)
include(FindPythonModule)
find_python_module(dateutil)
include(PythonInstall)
python_install(otdb_common_testing.py DESTINATION lofar/sas/otdb/testing)
lofar_add_test(t_TreeService)
lofar_add_test(t_TreeStatusEvents)
endif()
lofar_find_package(Python 3.4 REQUIRED)
find_python_module(testing.postgresql)
lofar_add_test(t_TreeService)
lofar_add_test(t_TreeStatusEvents)
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id$
import subprocess
import os
import logging
logger = logging.getLogger(__name__)
from lofar.common.testing.postgres import PostgresTestMixin
class OTDBCommonTestMixin(PostgresTestMixin):
'''
A common test mixin class from which you can derive to get a freshly setup postgres testing instance with the latest OTDB sql setup scripts applied.
'''
gzipped_schema_dump_filename = ''
@classmethod
def apply_database_schema(cls):
logger.info('applying OTDB sql schema to %s', cls.dbcreds)
cmd1 = ['gzip', '-dc', cls.gzipped_schema_dump_filename]
cmd2 = ['psql', '-U', cls.dbcreds.user, '-h', cls.dbcreds.host,
'-p', str(cls.dbcreds.port), cls.dbcreds.database]
logger.info('executing: %s', ' '.join(cmd1))
logger.info('executing: %s', ' '.join(cmd2))
proc1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
proc2 = subprocess.Popen(cmd2, stdin=proc1.stdout)
proc1.wait(timeout=60)
proc2.wait(timeout=60)
if proc1.returncode != 0:
raise RuntimeError("Could not execute cmd: '%s' error=%s" % (' '.join(cmd1), proc1.stderr))
if proc2.returncode != 0:
raise RuntimeError("Could not execute cmd: '%s' error=%s" % (' '.join(cmd2), proc2.stderr))
class OTDBTestInstance():
'''Helper class which uses the OTDBCommonTestMixin without a unittest.TestCase to setup/teardown a test OTDB instance'''
def __init__(self, gzipped_schema_dump_filename):
OTDBCommonTestMixin.gzipped_schema_dump_filename = gzipped_schema_dump_filename
self._db_creator = OTDBCommonTestMixin()
@property
def db(self):
return self._db_creator.db
@property
def dbcreds(self):
return self._db_creator.dbcreds
def __enter__(self):
try:
self._db_creator.setUpClass()
return self
except Exception as e:
logger.error(e)
self._db_creator.tearDownClass()
raise
def __exit__(self, exc_type, exc_val, exc_tb):
self._db_creator.tearDownClass()
......@@ -28,49 +28,15 @@ KeyUpdateCommand : function to update the value of multiple (existing) ke
StatusUpdateCommand : finction to update the status of a tree.
"""
import logging
import testing.postgresql
import psycopg2
import subprocess
from lofar.sas.otdb.TreeService import create_service
from lofar.common.dbcredentials import Credentials
from lofar.messaging import TemporaryExchange, RPCClient
from lofar.messaging import TemporaryExchange, RPCClient, BusListenerJanitor
from lofar.sas.otdb.testing.otdb_common_testing import OTDBTestInstance
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
try:
postgresql = testing.postgresql.PostgresqlFactory()()
database_credentials = Credentials()
database_credentials.host = postgresql.dsn()['host']
database_credentials.database = postgresql.dsn()['database']
database_credentials.port = postgresql.dsn()['port']
# connect to test-db as root
conn = psycopg2.connect(**postgresql.dsn())
cursor = conn.cursor()
# set credentials to be used during tests
database_credentials.user = 'otdb_test_user'
database_credentials.password = 'otdb_test_password' # cannot be empty...
# create user role
query = "CREATE USER %s WITH SUPERUSER PASSWORD '%s'" % (database_credentials.user, database_credentials.password)
cursor.execute(query)
conn.commit()
conn.close()
cmd1 = ['gzip', '-dc', 't_TreeService.in.unittest_db.dump.gz']
cmd2 = ['psql', '-U', database_credentials.user, '-h', database_credentials.host,
'-p', str(database_credentials.port), database_credentials.database]
proc1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
proc2 = subprocess.Popen(cmd2, stdin=proc1.stdout)
proc1.wait(timeout=60)
proc2.wait(timeout=60)
with OTDBTestInstance('t_TreeService.in.unittest_db.dump.gz') as test_db:
def do_rpc_catch_exception(exc_text, rpc_instance, method_name, arg_dict):
try:
print("** Executing {0}({1})...".format(method_name, arg_dict))
......@@ -90,7 +56,7 @@ try:
with TemporaryExchange(__name__) as tmp_exchange:
exchange = tmp_exchange.address
with create_service(exchange=exchange, dbcreds=database_credentials) as service:
with BusListenerJanitor(create_service(exchange=exchange, dbcreds=test_db.dbcreds)) as service:
with RPCClient(service_name=service.service_name, exchange=exchange, timeout=10) as otdbRPC: # Existing: otdb_id:1099268, mom_id:353713
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': 353713 })
......@@ -190,5 +156,3 @@ try:
do_rpc_catch_exception('on invalid key', otdbRPC, "TaskSetSpecification", {'OtdbID':1099266,
'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.NoSuchKey':'NameOfTestHost'}})
finally:
postgresql.stop()
......@@ -28,69 +28,33 @@ KeyUpdateCommand : function to update the value of multiple (existing) ke
StatusUpdateCommand : finction to update the status of a tree.
"""
import sys, pg
import logging
import testing.postgresql
import psycopg2
import subprocess
from lofar.messaging.messagebus import *
from lofar.sas.otdb.TreeStatusEvents import create_service
from lofar.common.dbcredentials import Credentials
import threading
import sys
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
try:
postgresql = testing.postgresql.PostgresqlFactory()()
database_credentials = Credentials()
database_credentials.host = postgresql.dsn()['host']
database_credentials.database = postgresql.dsn()['database']
database_credentials.port = postgresql.dsn()['port']
# connect to test-db as root
conn = psycopg2.connect(**postgresql.dsn())
cursor = conn.cursor()
# set credentials to be used during tests
database_credentials.user = 'otdb_test_user'
database_credentials.password = 'otdb_test_password' # cannot be empty...
# create user role
query = "CREATE USER %s WITH SUPERUSER PASSWORD '%s'" % (database_credentials.user, database_credentials.password)
cursor.execute(query)
conn.commit()
conn.close()
cmd1 = ['gzip', '-dc', 't_TreeStatusEvents.in.unittest_db.dump.gz']
cmd2 = ['psql', '-U', database_credentials.user, '-h', database_credentials.host,
'-p', str(database_credentials.port), database_credentials.database]
proc1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
proc2 = subprocess.Popen(cmd2, stdin=proc1.stdout)
proc1.wait(timeout=60)
proc2.wait(timeout=60)
otdb_connection = pg.connect(**database_credentials.pg_connect_options())
from lofar.sas.otdb.testing.otdb_common_testing import OTDBTestInstance
with OTDBTestInstance('t_TreeStatusEvents.in.unittest_db.dump.gz') as test_db:
with TemporaryExchange(__name__) as tmp_exchange:
with TemporaryQueue(__name__, exchange=tmp_exchange.address) as tmp_queue:
with tmp_exchange.create_temporary_queue() as tmp_queue:
with tmp_queue.create_frombus() as frombus:
t = threading.Thread(target=create_service, args=(tmp_exchange.address, database_credentials))
t = threading.Thread(target=create_service, args=(tmp_exchange.address, test_db.dbcreds, '/dev/nullz'))
t.daemon = True
t.start()
otdb_connection.query("select setTreeState(1, %d, %d::INT2,'%s')" % (1099266, 500, False))
msg = frombus.receive(timeout=5, acknowledge=True) # TreeStateEVent are send every 2 seconds
test_db.db.executeQuery("select setTreeState(1, %d, %d::INT2,'%s'::boolean);" % (1099266, 500, False))
test_db.db.commit()
msg = frombus.receive(timeout=500, acknowledge=True) # TreeStateEvent are send every 2 seconds
logger.info(msg)
try:
ok = (msg.content['treeID'] == 1099266 and msg.content['state'] == 'queued')
except IndexError:
ok = False
sys.exit(not ok) # 0 = success
finally:
postgresql.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment