diff --git a/tests/long_running_tasks/test_task_queue_manager.py b/tests/long_running_tasks/test_task_queue_manager.py index a9fad70e82b1c48d9957cff5eae040b0198c7cfc..ac34c9bce5e72964af0664f6b145b7b5c027087f 100644 --- a/tests/long_running_tasks/test_task_queue_manager.py +++ b/tests/long_running_tasks/test_task_queue_manager.py @@ -53,7 +53,7 @@ def slow_task(): def get_task(): class SlowTask(BaseCommand): def do(self): - time.sleep(2) + time.sleep(1) return SlowTask(target=None) @@ -99,7 +99,7 @@ def stop_task(): def do(self): assert not self.stopping_event.is_set() while not self.stopping_event.is_set(): - pass + time.sleep(0.1) return StopTask(target=None) @@ -265,11 +265,11 @@ class TestQueueManagerTasks: # Wait for a item on the queue while not qm.task_ids_in_queue: - pass + time.sleep(0.1) # Wait for the queue to empty while not qm.task_status: - pass + time.sleep(0.1) # Wait for all the callbacks to fire while len(call_back_func.call_args_list) < 24: @@ -316,7 +316,7 @@ class TestQueueManagerTasks: qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger) unique_id_one, _ = qm.enqueue_task(simple_task(), 3) while not qm.task_result: - pass + time.sleep(0.1) assert qm.get_task_state(unique_id=unique_id_one) == TaskState.COMPLETED def test_task_get_state_in_queued(self, slow_task): @@ -333,7 +333,7 @@ class TestQueueManagerTasks: qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger) unique_id_one, _ = qm.enqueue_task(progress_task()) while not qm.task_progress: - pass + time.sleep(0.1) assert qm.get_task_state(unique_id=unique_id_one) == TaskState.IN_PROGRESS @@ -343,51 +343,68 @@ class TestQueueManagerTasks: assert qm.get_task_state(unique_id="non_existing_id") == TaskState.NOT_FOUND -@pytest.mark.forked class TestQueueManagerExit: """Test the stopping and aborting.""" - @pytest.mark.timeout(15) + @pytest.mark.forked + @pytest.mark.timeout(5) def test_exit_abort(self, abort_task, slow_task): """Test aborting exit.""" + + results = [] + + def catch_updates(name, result): + if name == "longRunningCommandResult": + tr = TaskResult.from_task_result(result) + results.append( + ( + tr.unique_id, + tr.result_code, + ) + ) + cm = QueueWorkerComponentManager( op_state_model=None, logger=logger, max_queue_size=10, num_workers=2, - push_change_event=None, + push_change_event=catch_updates, ) cm.enqueue(abort_task(), 0.1) # Wait for the command to start while not cm.task_status: - pass + time.sleep(0.1) # Start aborting cm.queue_manager.abort_tasks() + # Wait for the exit while not cm.task_result: - pass + time.sleep(0.1) # aborting state should be cleaned up since the queue is empty and # nothing is in progress while cm.queue_manager.is_aborting: - pass + time.sleep(0.1) # When aborting this should be rejected # Fill up the workers cm.enqueue(slow_task()) cm.enqueue(slow_task()) + + assert not cm.queue_manager.is_aborting # Abort tasks cm.queue_manager.abort_tasks() + assert cm.queue_manager.is_aborting + # Load up some tasks that should be aborted cm.enqueue(slow_task()) cm.enqueue(slow_task()) unique_id, _ = cm.enqueue(slow_task()) while True: - tr = TaskResult.from_task_result(cm.task_result) - if tr.unique_id == unique_id and tr.result_code == ResultCode.ABORTED: + if (unique_id, ResultCode.ABORTED) in results: break time.sleep(0.1) @@ -397,12 +414,14 @@ class TestQueueManagerExit: # Wait for my slow command to finish unique_id, _ = cm.enqueue(slow_task()) + while True: - tr = TaskResult.from_task_result(cm.task_result) - if tr.unique_id == unique_id: + if (unique_id, ResultCode.OK) in results: break + time.sleep(0.1) - @pytest.mark.timeout(20) + @pytest.mark.forked + @pytest.mark.timeout(5) def test_exit_stop(self, stop_task): """Test stopping exit.""" cm = QueueWorkerComponentManager( @@ -416,16 +435,19 @@ class TestQueueManagerExit: # Wait for the command to start while not cm.task_status: - pass + time.sleep(0.1) + # Stop all threads cm.queue_manager.stop_tasks() # Wait for the exit while not cm.task_result: - pass + time.sleep(0.5) + # Wait for all the workers to stop - while not any([worker.is_alive() for worker in cm.queue_manager._threads]): - pass + while any([worker.is_alive() for worker in cm.queue_manager._threads]): + time.sleep(0.1) + @pytest.mark.forked @pytest.mark.timeout(5) def test_delete_queue(self, slow_task, stop_task, abort_task): """Test deleting the queue.""" @@ -470,7 +492,7 @@ class TestComponentManager: class TestStress: """Stress test the queue manager.""" - @pytest.mark.timeout(20) + @pytest.mark.timeout(30) def test_stress(self, slow_task): """Stress test the queue manager.""" qm = QueueManager(max_queue_size=100, num_workers=50, logger=logger) @@ -484,5 +506,5 @@ class TestStress: # Wait for the queue to drain while qm._work_queue.qsize(): - pass + time.sleep(0.1) del qm