Skip to content
Snippets Groups Projects
Commit d80516f9 authored by Auke Klazema's avatar Auke Klazema
Browse files

SW-836: Move some tests to unittest framework as integration tests

parent 9b02635e
No related branches found
No related tags found
1 merge request!86Resolve SW-836
...@@ -22,25 +22,34 @@ from subprocess import call ...@@ -22,25 +22,34 @@ from subprocess import call
import logging import logging
from lofar.common.cep4_utils import * from lofar.common.cep4_utils import *
from lofar.common.test_utils import unit_test from lofar.common.test_utils import integration_test
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@integration_test
class TestCep4Utils(unittest.TestCase): class TestCep4Utils(unittest.TestCase):
@unit_test def setUpClass(self):
try:
cep4_true_cmd = wrap_command_in_cep4_head_node_ssh_call(['true'])
if call(cep4_true_cmd) == 0:
logger.info('We can reach the CEP4 head node. Continuing with tests...')
else:
logger.warning('Cannot reach the CEP4 head node. skipping tests...')
raise unittest.SkipTest('Cannot reach the CEP4 head node. skipping tests...')
except:
raise unittest.SkipTest('Cannot reach the CEP4 head node. skipping tests...')
def test_01_wrap_command_in_cep4_head_node_ssh_call(self): def test_01_wrap_command_in_cep4_head_node_ssh_call(self):
cmd = wrap_command_in_cep4_head_node_ssh_call(['true']) cmd = wrap_command_in_cep4_head_node_ssh_call(['true'])
logger.info('executing command: %s', ' '.join(cmd)) logger.info('executing command: %s', ' '.join(cmd))
self.assertEqual(0, call(cmd)) self.assertEqual(0, call(cmd))
@unit_test
def test_02_get_cep4_available_cpu_nodes(self): def test_02_get_cep4_available_cpu_nodes(self):
node_nrs = get_cep4_available_cpu_nodes() node_nrs = get_cep4_available_cpu_nodes()
self.assertTrue(isinstance(node_nrs, list)) self.assertTrue(isinstance(node_nrs, list))
self.assertTrue(len(node_nrs) > 0) self.assertTrue(len(node_nrs) > 0)
@unit_test
def test_03_wrap_command_in_cep4_random_cpu_node_ssh_call(self): def test_03_wrap_command_in_cep4_random_cpu_node_ssh_call(self):
''' '''
this test calls and tests the functionality of the following methods via this test calls and tests the functionality of the following methods via
...@@ -50,7 +59,6 @@ class TestCep4Utils(unittest.TestCase): ...@@ -50,7 +59,6 @@ class TestCep4Utils(unittest.TestCase):
logger.info('executing command: %s', ' '.join(cmd)) logger.info('executing command: %s', ' '.join(cmd))
self.assertEqual(0, call(cmd)) self.assertEqual(0, call(cmd))
@unit_test
def test_04_wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(self): def test_04_wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(self):
''' '''
this test calls and tests the functionality of the following methods via this test calls and tests the functionality of the following methods via
...@@ -62,7 +70,6 @@ class TestCep4Utils(unittest.TestCase): ...@@ -62,7 +70,6 @@ class TestCep4Utils(unittest.TestCase):
logger.info('executing command: %s', ' '.join(cmd)) logger.info('executing command: %s', ' '.join(cmd))
self.assertEqual(0, call(cmd)) self.assertEqual(0, call(cmd))
@unit_test
def test_05_wrap_command_for_docker_in_cep4_head_node_ssh_call(self): def test_05_wrap_command_for_docker_in_cep4_head_node_ssh_call(self):
''' '''
this test calls and tests the functionality of wrap_command_for_docker and this test calls and tests the functionality of wrap_command_for_docker and
...@@ -75,7 +82,6 @@ class TestCep4Utils(unittest.TestCase): ...@@ -75,7 +82,6 @@ class TestCep4Utils(unittest.TestCase):
logger.info('executing command: %s', ' '.join(cmd)) logger.info('executing command: %s', ' '.join(cmd))
self.assertEqual(0, call(cmd)) self.assertEqual(0, call(cmd))
@unit_test
def test_06_get_slurm_info_from_within_docker_via_cep4_head(self): def test_06_get_slurm_info_from_within_docker_via_cep4_head(self):
''' '''
test to see if we can execute a command via ssh on the head node, test to see if we can execute a command via ssh on the head node,
...@@ -98,18 +104,4 @@ class TestCep4Utils(unittest.TestCase): ...@@ -98,18 +104,4 @@ class TestCep4Utils(unittest.TestCase):
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
# first try if we can reach cep4
# this assumes the code in wrap_command_in_cep4_head_node_ssh_call is correct and working
# (which is also being tested in the unittests)
# if and only if the command to the head node succeeds, then we can do the tests
# otherwise, for whatever reason the ssh call fails, we skip the tests because we cannot reach cep4 head node.
cep4_true_cmd = wrap_command_in_cep4_head_node_ssh_call(['true'])
if call(cep4_true_cmd) == 0:
logger.info('We can reach the CEP4 head node. Continuing with tests...')
unittest.main() unittest.main()
else:
logger.warning('Cannot reach the CEP4 head node. skipping tests...')
#exit with special 'skipped' exit-code
exit(3)
...@@ -5,10 +5,15 @@ import uuid ...@@ -5,10 +5,15 @@ import uuid
import datetime import datetime
import logging import logging
from lofar.messaging.messagebus import TemporaryQueue from lofar.messaging.messagebus import TemporaryQueue
from lofar.common.test_utils import integration_test
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@integration_test
class CleanupServiceRpcTest(unittest.TestCase):
def integration_test(self):
with TemporaryQueue(__name__) as tmp_queue: with TemporaryQueue(__name__) as tmp_queue:
busname = tmp_queue.address busname = tmp_queue.address
...@@ -100,3 +105,8 @@ with TemporaryQueue(__name__) as tmp_queue: ...@@ -100,3 +105,8 @@ with TemporaryQueue(__name__) as tmp_queue:
#with createService(busname=busname): #with createService(busname=busname):
## and run all tests ## and run all tests
#unittest.main() #unittest.main()
if __name__ == "__main__":
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
unittest.main()
...@@ -31,14 +31,18 @@ StatusUpdateCommand : finction to update the status of a tree. ...@@ -31,14 +31,18 @@ StatusUpdateCommand : finction to update the status of a tree.
from lofar.sas.otdb.TreeService import create_service from lofar.sas.otdb.TreeService import create_service
from lofar.messaging import TemporaryExchange, RPCClient, BusListenerJanitor from lofar.messaging import TemporaryExchange, RPCClient, BusListenerJanitor
from lofar.sas.otdb.testing.otdb_common_testing import OTDBTestInstance from lofar.sas.otdb.testing.otdb_common_testing import OTDBTestInstance
import unittest
from lofar.common.test_utils import integration_test
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# todo: use unittest package for this
@integration_test
class TreeServiceTest(unittest.TestCase):
def test(self):
with OTDBTestInstance('t_TreeService.in.unittest_db.dump.gz') as test_db: with OTDBTestInstance('t_TreeService.in.unittest_db.dump.gz') as test_db:
def do_rpc_catch_exception(exc_text, rpc_instance, method_name, arg_dict): def do_rpc_catch_exception(exc_text, rpc_instance, method_name, arg_dict):
try: try:
...@@ -159,3 +163,7 @@ with OTDBTestInstance('t_TreeService.in.unittest_db.dump.gz') as test_db: ...@@ -159,3 +163,7 @@ with OTDBTestInstance('t_TreeService.in.unittest_db.dump.gz') as test_db:
do_rpc_catch_exception('on invalid key', otdbRPC, "TaskSetSpecification", {'OtdbID':1099266, do_rpc_catch_exception('on invalid key', otdbRPC, "TaskSetSpecification", {'OtdbID':1099266,
'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.NoSuchKey':'NameOfTestHost'}}) 'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.NoSuchKey':'NameOfTestHost'}})
if __name__ == "__main__":
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
unittest.main()
...@@ -34,6 +34,8 @@ import threading ...@@ -34,6 +34,8 @@ import threading
import sys import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
import unittest
from lofar.common.test_utils import integration_test
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -41,8 +43,10 @@ logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=loggin ...@@ -41,8 +43,10 @@ logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=loggin
from lofar.sas.otdb.testing.otdb_common_testing import OTDBTestInstance from lofar.sas.otdb.testing.otdb_common_testing import OTDBTestInstance
# todo: use unittest package for this
@integration_test
class TreeStatusEventsTest(unittest.TestCase):
def test(self):
with OTDBTestInstance('t_TreeStatusEvents.in.unittest_db.dump.gz') as test_db: with OTDBTestInstance('t_TreeStatusEvents.in.unittest_db.dump.gz') as test_db:
with TemporaryExchange(__name__) as tmp_exchange: with TemporaryExchange(__name__) as tmp_exchange:
with tmp_exchange.create_temporary_queue() as tmp_queue: with tmp_exchange.create_temporary_queue() as tmp_queue:
...@@ -67,4 +71,8 @@ with OTDBTestInstance('t_TreeStatusEvents.in.unittest_db.dump.gz') as test_db: ...@@ -67,4 +71,8 @@ with OTDBTestInstance('t_TreeStatusEvents.in.unittest_db.dump.gz') as test_db:
except IndexError: except IndexError:
ok = False ok = False
sys.exit(not ok) # 0 = success self.assertTrue(ok)
if __name__ == "__main__":
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment