Разница в производительности между многопоточностью с использованием очереди и фьючерсов.ThreadPoolExecutor использует список в python3?

#python-3.x #multithreading #concurrent.futures

#python-3.x #многопоточность #concurrent.futures

Вопрос:

Я пробовал различные подходы с многопоточностью python, чтобы увидеть, какой из них соответствует моим требованиям. Чтобы дать обзор, у меня есть куча элементов, которые мне нужно отправить в API. Затем, основываясь на ответе, некоторые элементы будут отправлены в базу данных, и все элементы будут зарегистрированы; например, для элемента, если API возвращает успех, этот элемент будет только зарегистрирован, но когда он возвращает сбой, этот элемент будет отправлен в базу данных для будущих попыток вместе с протоколированием.

Теперь, основываясь на ответе API, я могу отделить элементы успеха от сбоя и выполнить пакетный запрос со всеми элементами сбоя, что улучшит производительность моей базы данных. Для этого я собираю все запросы в одном месте и пытаюсь выполнить многопоточные вызовы API (поскольку это задача, связанная с вводом-выводом, я даже не думаю о многопроцессорной обработке), но в то же время мне нужно отслеживать, какой ответ принадлежит какому запросу.

Переходя к актуальному вопросу, я попробовал два разных подхода, которые, как я думал, дадут почти одинаковую производительность, но разница оказалась огромной.

Чтобы имитировать вызов API, я создал API на своем локальном хосте с режимом ожидания 500 мс (для среднего времени обработки). Пожалуйста, обратите внимание, что я хочу начать ведение журнала и вставку в базу данных после завершения всех вызовов API.

Подход — 1 (с потоковой обработкой.Поток и очередь.Очередь ())

 import requests
import datetime
import threading
import queue

def target(data_q):
    while not data_q.empty():
        data_q.get()
        response = requests.get("https://postman-echo.com/get?foo1=bar1amp;foo2=bar2")
        print(response.status_code)
        data_q.task_done()

if __name__ == "__main__":
    data_q = queue.Queue()
    for i in range(0, 20):
        data_q.put(i)

    start = datetime.datetime.now()
    num_thread = 5
    for _ in range(num_thread):
        worker = threading.Thread(target=target(data_q))
        worker.start()

    data_q.join()

    print('Time taken multi-threading: ' str(datetime.datetime.now() - start))
  

Я пробовал 5, 10, 20 и 30 раз, и результаты соответственно приведены ниже,
Время, затраченное на многопоточность: 0:00:06.625710
Время, затраченное на многопоточность: 0:00:13.326969
Время, затраченное на многопоточность: 0:00:26.435534
Время, затраченное на многопоточность: 0:00:40.737406

Что меня потрясло, так это то, что я попробовал то же самое без многопоточности и получил почти такую же производительность.

Затем, после некоторого поиска в Google, я познакомился с модулем futures.

Подход — 2 (с использованием concurrent.futures)

 def fetch_url(im_url):
    try:
        response = requests.get(im_url)
        return response.status_code
    except Exception as e:
        traceback.print_exc()

if __name__ == "__main__":
    data = []
    for i in range(0, 20):
        data.append(i)

    start = datetime.datetime.now()
    urls = ["https://postman-echo.com/get?foo1=bar1amp;foo2=bar2"   str(item) for item in data]
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        responses = executor.map(fetch_url, urls)
        for ret in responses:
            print(ret)
    print('Time taken future concurrent: '   str(datetime.datetime.now() - start))
  

Снова с 5, 10, 20 и 30 раз, и результаты соответственно ниже,
Время, затраченное на параллельное будущее: 0:00:01.276891
Время, затраченное на параллельное будущее: 0:00:02.635949
Время, затраченное на параллельное будущее: 0:00:05.073299
Время, затраченное на параллельное будущее: 0:00:07.296873

Теперь я слышал об asyncio, но я еще не использовал его. Я также читал, что это дает даже лучшую производительность, чем futures.ThreadPoolExecutor() .

Последний вопрос, если оба подхода используют потоки (или я так думаю), то почему существует огромный разрыв в производительности? Я делаю что-то ужасно неправильное? Я огляделся. Не удалось найти удовлетворительного ответа. Любые мысли по этому поводу будут высоко оценены. Спасибо, что ответили на вопрос.

[Правка 1] Все это работает на python 3.8. [Правка 2] Обновлены примеры кода и время выполнения. Теперь они должны запускаться в любой системе.

Ответ №1:

В документации ThreadPoolExecutor подробно объясняется, сколько потоков запускается, когда max_workers параметр не указан, как в вашем примере. Поведение отличается в зависимости от конкретной версии Python, но количество запущенных задач, скорее всего, больше 3, количество потоков в первой версии с использованием очереди. Вы должны использовать futures.ThreadPoolExecutor(max_workers= 3) для сравнения два подхода.

Для обновленного подхода — 1 я предлагаю немного изменить цикл for:

 for _ in range(num_thread):
    target_to_run= target(data_q)
    print('target to run: {}'.format(target_to_run))
    worker = threading.Thread(target= target_to_run)
    worker.start()
  

Результат будет таким:

 200
...
200
200
target to run: None
target to run: None
target to run: None
target to run: None
target to run: None
Time taken multi-threading: 0:00:10.846368
  

Проблема в том, что Thread конструктор ожидает вызываемый объект или None как его target . Вы не предоставляете ему возможность вызова, скорее обработка очереди происходит при первом вызове target(data_q) основным потоком, и запускается 5 потоков, которые ничего не делают, потому что их target есть None .

Комментарии:

1. привет, @xxa, большое спасибо за ваше внимание. Я попробовал то, что вы предложили. После выполнения futures.ThreadPoolExecutor(max_workers= 3) время, затраченное на выполнение, увеличилось, но ненамного. Теперь для завершения 100 элементов требуется 0:00:03.644387. Все еще намного меньше, чем какой подход -1 требуется для 100 элементов (0:00:10.535744). Опять же, просто из любопытства, я увеличил num_thread = 30 для подхода-1, и он по-прежнему занимает 0:00:10.642669. Что-то здесь действительно не так.

2. @SamM. problems — ваши результаты не воспроизводимы. В подходе 1 есть то requests.request , что выполняется только в вашей среде, и строка worker = threading.Thread(target=target(data_q)) не работает в реальности, она должна читать worker = threading.Thread(target=target, args=(data_q, )) . Я предлагаю вам изменить примеры, которые используют случайные задержки в определенных точках, чтобы их мог запускать любой.

3. ОК. Итак, я думаю, у нас здесь проблема. После изменения worker = threading.Thread(target=target(data_q)) на worker = threading.Thread(target=target, args=(data_q, )) производительность значительно улучшилась. Теперь разница между обоими подходами составляет 1 или 2 секунды. Как вы думаете, что произошло?

4. Просто взглянув на код подхода 1, я не вижу, как он может работать. Если response.status_code это an int , то он должен завершаться с TypeError: 'int' object is not callable помощью .

5. Я обновил код. Вы должны быть в состоянии запустить его сейчас. Дайте мне знать, если у вас возникнут какие-либо проблемы. Я не обновил метод вызова потока, чтобы вы могли увидеть разницу.