Модуль насмешливого потока в python

#python #multithreading #unit-testing #pytest #python-unittest.mock

Вопрос:

Я пытаюсь имитировать функцию, которая использует многопоточность для запуска другой функции с другими параметрами и сохраняет результаты возврата в очередь. Я попытался использовать pytest и unitest, чтобы поиздеваться над ним, но, похоже, он все еще выполняет поток при вызове из тестовой функции:

 from threading import Thread
import threading
import time
import queue
from unittest import mock

def threaded_function(name):
    time.sleep(100)
    return name

def run_threads():
    thread_list = []
    result_list = []
    res_queue = queue.Queue()
    args_list = [("A"), ("B"), ("C")]
    for val in args_list:
        thread = Thread(target=lambda q, arg1: q.put(threaded_function(arg1)), args=(res_queue, val))
        thread.start()
        thread_list.append(thread)
    for thread in thread_list:
        thread.join()
    while not res_queue.empty():
        result_list.append(res_queue.get())
    return result_list
 

Ниже приведены макетные функции, которые я пытаюсь:

 @mock.patch("threading.Thread")
@mock.patch("queue.Queue")
def test_run_threads(mock_queue, mock_thread):
    new_queue = queue.Queue()
    new_queue.put("D")
    mock_queue.return_value = new_queue
    mock_thread.return_value = None
    result = run_threads()
    assert result == ["D"]


class MockThread:
    def __init__(self):
        pass

    def start():
        pass

    def join():
        pass


def test_run_threads2(monkeypatch):
    mock_thread = MockThread()
    monkeypatch.setattr(threading, "Thread", MockThread)
    result = run_threads()
    assert result == []
 

Ответ №1:

Согласно Unittest: Где исправлять, вам нужно исправить поток с того места, где он используется (или где его ищут). В вашей функции run_threads вы используете __main__.Threads вместо threading.Threads того , чтобы из-за того, как from threading import Thread импортируется. Удалите mock_thread.return_value = None , и теперь все ваши потоки run_threads будут магическими, которые не выполняют никаких функций.

Ваша следующая проблема — издеваться res_queue run_threads . При его исправлении у test_run_threads вас нет возможности res_queue заменить его другой очередью , потому что вы просто заменили все новые экземпляры queue.Queue на MagicMock .

Лучше переписать эту функцию, чтобы ее было легче тестировать. Я бы предложил разбить run_threads() его на две функции.

create_thread_list(args_list, res_queue): будет использоваться для создания нашего списка потоков. Разделив это, мы можем изменить args_list список аргументов на любой, который мы хотим проверить.

 def create_thread_list(args_list, res_queue):
    thread_list = []

    for val in args_list:
        thread = Thread(target=lambda q, arg1: q.put(threaded_function(arg1)), args=(res_queue, val))
        thread_list.append(thread)

    return thread_list
 

run_threads_2(thread_list, res_queue): будет использоваться для запуска потоков.

 def run_threads_2(thread_list, res_queue):
    result_list = []

    for th in thread_list:
        th.start()
    for th in thread_list:
        th.join()
    while not res_queue.empty():
        result_list.append((res_queue.get()))
    return result_list
 

Разделив их, вы можете передавать любые аргументы, которые хотите проверить для своих потоков.

Ниже приведены некоторые примеры того, как я бы проверил это сейчас:

 import queue
import time
from unittest.mock import patch
class MockThread2:
    def __init__(self, name, result_q):
        self.name = name
        self.result_q = result_q

    def start(self):
        self.result_q.put(self.name)

    def join(self):
        pass

class TestMultiThreadedFunctions(unittest.TestCase):

    def test_run_threads_2(self):
        arg_list = ['A', 'B', 'C']
        result_q = queue.Queue()

        # Testing if created threads actually call the target function
        # without actually calling the function.
        with patch('__main__.threaded_function') as mock_function:
            thread_list = create_thread_list(args_list=arg_list, res_queue=result_q)
            run_threads_2(thread_list=thread_list, res_queue=result_q)

            # Check if all threads ran
            self.assertEqual(len(arg_list), mock_function.call_count)

        arg_list = ['C', 'A', 'D', 'B', 'E']
        result_q = queue.Queue()
        # Using the threaded function, but just patching sleep
        with patch('time.sleep') as mock_sleep:
            thread_list = create_thread_list(args_list=arg_list, res_queue=result_q)
            result_list = run_threads_2(thread_list=thread_list, res_queue=result_q)

            self.assertListEqual(arg_list, result_list)

    def test_run_with_alternate_threads(self):
        # testing with MockThread and expecting nothing in the result_q
        result_q = queue.Queue()
        thread_list = [MockThread() for _ in range(5)]
        expected_list = []
        result_list = run_threads_2(thread_list=thread_list, res_queue=result_q)
        self.assertListEqual(expected_list, result_list)

        # testing with MockThread2
        result_q = queue.Queue()
        thread_list = [MockThread2(str(name), result_q) for name in range(5)]
        expected_list = ['0', '1', '2', '3', '4']
        result_list = run_threads_2(thread_list=thread_list, res_queue=result_q)
        self.assertListEqual(expected_list, result_list)