#python #multithreading #unit-testing #pytest #python-unittest.mock
Вопрос:
Я пытаюсь имитировать функцию, которая использует многопоточность для запуска другой функции с другими параметрами и сохраняет результаты возврата в очередь. Я попытался использовать pytest и unitest, чтобы поиздеваться над ним, но, похоже, он все еще выполняет поток при вызове из тестовой функции:
from threading import Thread
import threading
import time
import queue
from unittest import mock
def threaded_function(name):
time.sleep(100)
return name
def run_threads():
thread_list = []
result_list = []
res_queue = queue.Queue()
args_list = [("A"), ("B"), ("C")]
for val in args_list:
thread = Thread(target=lambda q, arg1: q.put(threaded_function(arg1)), args=(res_queue, val))
thread.start()
thread_list.append(thread)
for thread in thread_list:
thread.join()
while not res_queue.empty():
result_list.append(res_queue.get())
return result_list
Ниже приведены макетные функции, которые я пытаюсь:
@mock.patch("threading.Thread")
@mock.patch("queue.Queue")
def test_run_threads(mock_queue, mock_thread):
new_queue = queue.Queue()
new_queue.put("D")
mock_queue.return_value = new_queue
mock_thread.return_value = None
result = run_threads()
assert result == ["D"]
class MockThread:
def __init__(self):
pass
def start():
pass
def join():
pass
def test_run_threads2(monkeypatch):
mock_thread = MockThread()
monkeypatch.setattr(threading, "Thread", MockThread)
result = run_threads()
assert result == []
Ответ №1:
Согласно Unittest: Где исправлять, вам нужно исправить поток с того места, где он используется (или где его ищут). В вашей функции run_threads
вы используете __main__.Threads
вместо threading.Threads
того , чтобы из-за того, как from threading import Thread
импортируется. Удалите mock_thread.return_value = None
, и теперь все ваши потоки run_threads
будут магическими, которые не выполняют никаких функций.
Ваша следующая проблема — издеваться res_queue
run_threads
. При его исправлении у test_run_threads
вас нет возможности res_queue
заменить его другой очередью , потому что вы просто заменили все новые экземпляры queue.Queue
на MagicMock
.
Лучше переписать эту функцию, чтобы ее было легче тестировать. Я бы предложил разбить run_threads()
его на две функции.
create_thread_list(args_list, res_queue):
будет использоваться для создания нашего списка потоков. Разделив это, мы можем изменить args_list
список аргументов на любой, который мы хотим проверить.
def create_thread_list(args_list, res_queue):
thread_list = []
for val in args_list:
thread = Thread(target=lambda q, arg1: q.put(threaded_function(arg1)), args=(res_queue, val))
thread_list.append(thread)
return thread_list
run_threads_2(thread_list, res_queue):
будет использоваться для запуска потоков.
def run_threads_2(thread_list, res_queue):
result_list = []
for th in thread_list:
th.start()
for th in thread_list:
th.join()
while not res_queue.empty():
result_list.append((res_queue.get()))
return result_list
Разделив их, вы можете передавать любые аргументы, которые хотите проверить для своих потоков.
Ниже приведены некоторые примеры того, как я бы проверил это сейчас:
import queue
import time
from unittest.mock import patch
class MockThread2:
def __init__(self, name, result_q):
self.name = name
self.result_q = result_q
def start(self):
self.result_q.put(self.name)
def join(self):
pass
class TestMultiThreadedFunctions(unittest.TestCase):
def test_run_threads_2(self):
arg_list = ['A', 'B', 'C']
result_q = queue.Queue()
# Testing if created threads actually call the target function
# without actually calling the function.
with patch('__main__.threaded_function') as mock_function:
thread_list = create_thread_list(args_list=arg_list, res_queue=result_q)
run_threads_2(thread_list=thread_list, res_queue=result_q)
# Check if all threads ran
self.assertEqual(len(arg_list), mock_function.call_count)
arg_list = ['C', 'A', 'D', 'B', 'E']
result_q = queue.Queue()
# Using the threaded function, but just patching sleep
with patch('time.sleep') as mock_sleep:
thread_list = create_thread_list(args_list=arg_list, res_queue=result_q)
result_list = run_threads_2(thread_list=thread_list, res_queue=result_q)
self.assertListEqual(arg_list, result_list)
def test_run_with_alternate_threads(self):
# testing with MockThread and expecting nothing in the result_q
result_q = queue.Queue()
thread_list = [MockThread() for _ in range(5)]
expected_list = []
result_list = run_threads_2(thread_list=thread_list, res_queue=result_q)
self.assertListEqual(expected_list, result_list)
# testing with MockThread2
result_q = queue.Queue()
thread_list = [MockThread2(str(name), result_q) for name in range(5)]
expected_list = ['0', '1', '2', '3', '4']
result_list = run_threads_2(thread_list=thread_list, res_queue=result_q)
self.assertListEqual(expected_list, result_list)