Skip to content
Snippets Groups Projects
Unverified Commit 025eec26 authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-276 Improved test

parent 148a603e
No related branches found
No related tags found
No related merge requests found
...@@ -53,7 +53,7 @@ def slow_task(): ...@@ -53,7 +53,7 @@ def slow_task():
def get_task(): def get_task():
class SlowTask(BaseCommand): class SlowTask(BaseCommand):
def do(self): def do(self):
time.sleep(2) time.sleep(1)
return SlowTask(target=None) return SlowTask(target=None)
...@@ -99,7 +99,7 @@ def stop_task(): ...@@ -99,7 +99,7 @@ def stop_task():
def do(self): def do(self):
assert not self.stopping_event.is_set() assert not self.stopping_event.is_set()
while not self.stopping_event.is_set(): while not self.stopping_event.is_set():
pass time.sleep(0.1)
return StopTask(target=None) return StopTask(target=None)
...@@ -265,11 +265,11 @@ class TestQueueManagerTasks: ...@@ -265,11 +265,11 @@ class TestQueueManagerTasks:
# Wait for a item on the queue # Wait for a item on the queue
while not qm.task_ids_in_queue: while not qm.task_ids_in_queue:
pass time.sleep(0.1)
# Wait for the queue to empty # Wait for the queue to empty
while not qm.task_status: while not qm.task_status:
pass time.sleep(0.1)
# Wait for all the callbacks to fire # Wait for all the callbacks to fire
while len(call_back_func.call_args_list) < 24: while len(call_back_func.call_args_list) < 24:
...@@ -316,7 +316,7 @@ class TestQueueManagerTasks: ...@@ -316,7 +316,7 @@ class TestQueueManagerTasks:
qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger) qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger)
unique_id_one, _ = qm.enqueue_task(simple_task(), 3) unique_id_one, _ = qm.enqueue_task(simple_task(), 3)
while not qm.task_result: while not qm.task_result:
pass time.sleep(0.1)
assert qm.get_task_state(unique_id=unique_id_one) == TaskState.COMPLETED assert qm.get_task_state(unique_id=unique_id_one) == TaskState.COMPLETED
def test_task_get_state_in_queued(self, slow_task): def test_task_get_state_in_queued(self, slow_task):
...@@ -333,7 +333,7 @@ class TestQueueManagerTasks: ...@@ -333,7 +333,7 @@ class TestQueueManagerTasks:
qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger) qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger)
unique_id_one, _ = qm.enqueue_task(progress_task()) unique_id_one, _ = qm.enqueue_task(progress_task())
while not qm.task_progress: while not qm.task_progress:
pass time.sleep(0.1)
assert qm.get_task_state(unique_id=unique_id_one) == TaskState.IN_PROGRESS assert qm.get_task_state(unique_id=unique_id_one) == TaskState.IN_PROGRESS
...@@ -343,51 +343,68 @@ class TestQueueManagerTasks: ...@@ -343,51 +343,68 @@ class TestQueueManagerTasks:
assert qm.get_task_state(unique_id="non_existing_id") == TaskState.NOT_FOUND assert qm.get_task_state(unique_id="non_existing_id") == TaskState.NOT_FOUND
@pytest.mark.forked
class TestQueueManagerExit: class TestQueueManagerExit:
"""Test the stopping and aborting.""" """Test the stopping and aborting."""
@pytest.mark.timeout(15) @pytest.mark.forked
@pytest.mark.timeout(5)
def test_exit_abort(self, abort_task, slow_task): def test_exit_abort(self, abort_task, slow_task):
"""Test aborting exit.""" """Test aborting exit."""
results = []
def catch_updates(name, result):
if name == "longRunningCommandResult":
tr = TaskResult.from_task_result(result)
results.append(
(
tr.unique_id,
tr.result_code,
)
)
cm = QueueWorkerComponentManager( cm = QueueWorkerComponentManager(
op_state_model=None, op_state_model=None,
logger=logger, logger=logger,
max_queue_size=10, max_queue_size=10,
num_workers=2, num_workers=2,
push_change_event=None, push_change_event=catch_updates,
) )
cm.enqueue(abort_task(), 0.1) cm.enqueue(abort_task(), 0.1)
# Wait for the command to start # Wait for the command to start
while not cm.task_status: while not cm.task_status:
pass time.sleep(0.1)
# Start aborting # Start aborting
cm.queue_manager.abort_tasks() cm.queue_manager.abort_tasks()
# Wait for the exit # Wait for the exit
while not cm.task_result: while not cm.task_result:
pass time.sleep(0.1)
# aborting state should be cleaned up since the queue is empty and # aborting state should be cleaned up since the queue is empty and
# nothing is in progress # nothing is in progress
while cm.queue_manager.is_aborting: while cm.queue_manager.is_aborting:
pass time.sleep(0.1)
# When aborting this should be rejected # When aborting this should be rejected
# Fill up the workers # Fill up the workers
cm.enqueue(slow_task()) cm.enqueue(slow_task())
cm.enqueue(slow_task()) cm.enqueue(slow_task())
assert not cm.queue_manager.is_aborting
# Abort tasks # Abort tasks
cm.queue_manager.abort_tasks() cm.queue_manager.abort_tasks()
assert cm.queue_manager.is_aborting
# Load up some tasks that should be aborted # Load up some tasks that should be aborted
cm.enqueue(slow_task()) cm.enqueue(slow_task())
cm.enqueue(slow_task()) cm.enqueue(slow_task())
unique_id, _ = cm.enqueue(slow_task()) unique_id, _ = cm.enqueue(slow_task())
while True: while True:
tr = TaskResult.from_task_result(cm.task_result) if (unique_id, ResultCode.ABORTED) in results:
if tr.unique_id == unique_id and tr.result_code == ResultCode.ABORTED:
break break
time.sleep(0.1) time.sleep(0.1)
...@@ -397,12 +414,14 @@ class TestQueueManagerExit: ...@@ -397,12 +414,14 @@ class TestQueueManagerExit:
# Wait for my slow command to finish # Wait for my slow command to finish
unique_id, _ = cm.enqueue(slow_task()) unique_id, _ = cm.enqueue(slow_task())
while True: while True:
tr = TaskResult.from_task_result(cm.task_result) if (unique_id, ResultCode.OK) in results:
if tr.unique_id == unique_id:
break break
time.sleep(0.1)
@pytest.mark.timeout(20) @pytest.mark.forked
@pytest.mark.timeout(5)
def test_exit_stop(self, stop_task): def test_exit_stop(self, stop_task):
"""Test stopping exit.""" """Test stopping exit."""
cm = QueueWorkerComponentManager( cm = QueueWorkerComponentManager(
...@@ -416,16 +435,19 @@ class TestQueueManagerExit: ...@@ -416,16 +435,19 @@ class TestQueueManagerExit:
# Wait for the command to start # Wait for the command to start
while not cm.task_status: while not cm.task_status:
pass time.sleep(0.1)
# Stop all threads # Stop all threads
cm.queue_manager.stop_tasks() cm.queue_manager.stop_tasks()
# Wait for the exit # Wait for the exit
while not cm.task_result: while not cm.task_result:
pass time.sleep(0.5)
# Wait for all the workers to stop # Wait for all the workers to stop
while not any([worker.is_alive() for worker in cm.queue_manager._threads]): while any([worker.is_alive() for worker in cm.queue_manager._threads]):
pass time.sleep(0.1)
@pytest.mark.forked
@pytest.mark.timeout(5) @pytest.mark.timeout(5)
def test_delete_queue(self, slow_task, stop_task, abort_task): def test_delete_queue(self, slow_task, stop_task, abort_task):
"""Test deleting the queue.""" """Test deleting the queue."""
...@@ -470,7 +492,7 @@ class TestComponentManager: ...@@ -470,7 +492,7 @@ class TestComponentManager:
class TestStress: class TestStress:
"""Stress test the queue manager.""" """Stress test the queue manager."""
@pytest.mark.timeout(20) @pytest.mark.timeout(30)
def test_stress(self, slow_task): def test_stress(self, slow_task):
"""Stress test the queue manager.""" """Stress test the queue manager."""
qm = QueueManager(max_queue_size=100, num_workers=50, logger=logger) qm = QueueManager(max_queue_size=100, num_workers=50, logger=logger)
...@@ -484,5 +506,5 @@ class TestStress: ...@@ -484,5 +506,5 @@ class TestStress:
# Wait for the queue to drain # Wait for the queue to drain
while qm._work_queue.qsize(): while qm._work_queue.qsize():
pass time.sleep(0.1)
del qm del qm
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment