#python-3.x #multithreading #concurrent.futures
#python-3.x #многопоточность #concurrent.futures
Вопрос:
Я пробовал различные подходы с многопоточностью python, чтобы увидеть, какой из них соответствует моим требованиям. Чтобы дать обзор, у меня есть куча элементов, которые мне нужно отправить в API. Затем, основываясь на ответе, некоторые элементы будут отправлены в базу данных, и все элементы будут зарегистрированы; например, для элемента, если API возвращает успех, этот элемент будет только зарегистрирован, но когда он возвращает сбой, этот элемент будет отправлен в базу данных для будущих попыток вместе с протоколированием.
Теперь, основываясь на ответе API, я могу отделить элементы успеха от сбоя и выполнить пакетный запрос со всеми элементами сбоя, что улучшит производительность моей базы данных. Для этого я собираю все запросы в одном месте и пытаюсь выполнить многопоточные вызовы API (поскольку это задача, связанная с вводом-выводом, я даже не думаю о многопроцессорной обработке), но в то же время мне нужно отслеживать, какой ответ принадлежит какому запросу.
Переходя к актуальному вопросу, я попробовал два разных подхода, которые, как я думал, дадут почти одинаковую производительность, но разница оказалась огромной.
Чтобы имитировать вызов API, я создал API на своем локальном хосте с режимом ожидания 500 мс (для среднего времени обработки). Пожалуйста, обратите внимание, что я хочу начать ведение журнала и вставку в базу данных после завершения всех вызовов API.
Подход — 1 (с потоковой обработкой.Поток и очередь.Очередь ())
import requests
import datetime
import threading
import queue
def target(data_q):
while not data_q.empty():
data_q.get()
response = requests.get("https://postman-echo.com/get?foo1=bar1amp;foo2=bar2")
print(response.status_code)
data_q.task_done()
if __name__ == "__main__":
data_q = queue.Queue()
for i in range(0, 20):
data_q.put(i)
start = datetime.datetime.now()
num_thread = 5
for _ in range(num_thread):
worker = threading.Thread(target=target(data_q))
worker.start()
data_q.join()
print('Time taken multi-threading: ' str(datetime.datetime.now() - start))
Я пробовал 5, 10, 20 и 30 раз, и результаты соответственно приведены ниже,
Время, затраченное на многопоточность: 0:00:06.625710
Время, затраченное на многопоточность: 0:00:13.326969
Время, затраченное на многопоточность: 0:00:26.435534
Время, затраченное на многопоточность: 0:00:40.737406
Что меня потрясло, так это то, что я попробовал то же самое без многопоточности и получил почти такую же производительность.
Затем, после некоторого поиска в Google, я познакомился с модулем futures.
Подход — 2 (с использованием concurrent.futures)
def fetch_url(im_url):
try:
response = requests.get(im_url)
return response.status_code
except Exception as e:
traceback.print_exc()
if __name__ == "__main__":
data = []
for i in range(0, 20):
data.append(i)
start = datetime.datetime.now()
urls = ["https://postman-echo.com/get?foo1=bar1amp;foo2=bar2" str(item) for item in data]
with futures.ThreadPoolExecutor(max_workers=5) as executor:
responses = executor.map(fetch_url, urls)
for ret in responses:
print(ret)
print('Time taken future concurrent: ' str(datetime.datetime.now() - start))
Снова с 5, 10, 20 и 30 раз, и результаты соответственно ниже,
Время, затраченное на параллельное будущее: 0:00:01.276891
Время, затраченное на параллельное будущее: 0:00:02.635949
Время, затраченное на параллельное будущее: 0:00:05.073299
Время, затраченное на параллельное будущее: 0:00:07.296873
Теперь я слышал об asyncio, но я еще не использовал его. Я также читал, что это дает даже лучшую производительность, чем futures.ThreadPoolExecutor() .
Последний вопрос, если оба подхода используют потоки (или я так думаю), то почему существует огромный разрыв в производительности? Я делаю что-то ужасно неправильное? Я огляделся. Не удалось найти удовлетворительного ответа. Любые мысли по этому поводу будут высоко оценены. Спасибо, что ответили на вопрос.
[Правка 1] Все это работает на python 3.8. [Правка 2] Обновлены примеры кода и время выполнения. Теперь они должны запускаться в любой системе.
Ответ №1:
В документации ThreadPoolExecutor
подробно объясняется, сколько потоков запускается, когда max_workers
параметр не указан, как в вашем примере. Поведение отличается в зависимости от конкретной версии Python, но количество запущенных задач, скорее всего, больше 3, количество потоков в первой версии с использованием очереди. Вы должны использовать futures.ThreadPoolExecutor(max_workers= 3)
для сравнения два подхода.
Для обновленного подхода — 1 я предлагаю немного изменить цикл for:
for _ in range(num_thread):
target_to_run= target(data_q)
print('target to run: {}'.format(target_to_run))
worker = threading.Thread(target= target_to_run)
worker.start()
Результат будет таким:
200
...
200
200
target to run: None
target to run: None
target to run: None
target to run: None
target to run: None
Time taken multi-threading: 0:00:10.846368
Проблема в том, что Thread
конструктор ожидает вызываемый объект или None
как его target
. Вы не предоставляете ему возможность вызова, скорее обработка очереди происходит при первом вызове target(data_q)
основным потоком, и запускается 5 потоков, которые ничего не делают, потому что их target
есть None
.
Комментарии:
1. привет, @xxa, большое спасибо за ваше внимание. Я попробовал то, что вы предложили. После выполнения
futures.ThreadPoolExecutor(max_workers= 3)
время, затраченное на выполнение, увеличилось, но ненамного. Теперь для завершения 100 элементов требуется 0:00:03.644387. Все еще намного меньше, чем какой подход -1 требуется для 100 элементов (0:00:10.535744). Опять же, просто из любопытства, я увеличилnum_thread = 30
для подхода-1, и он по-прежнему занимает 0:00:10.642669. Что-то здесь действительно не так.2. @SamM. problems — ваши результаты не воспроизводимы. В подходе 1 есть то
requests.request
, что выполняется только в вашей среде, и строкаworker = threading.Thread(target=target(data_q))
не работает в реальности, она должна читатьworker = threading.Thread(target=target, args=(data_q, ))
. Я предлагаю вам изменить примеры, которые используют случайные задержки в определенных точках, чтобы их мог запускать любой.3. ОК. Итак, я думаю, у нас здесь проблема. После изменения
worker = threading.Thread(target=target(data_q))
наworker = threading.Thread(target=target, args=(data_q, ))
производительность значительно улучшилась. Теперь разница между обоими подходами составляет 1 или 2 секунды. Как вы думаете, что произошло?4. Просто взглянув на код подхода 1, я не вижу, как он может работать. Если
response.status_code
это anint
, то он должен завершаться сTypeError: 'int' object is not callable
помощью .5. Я обновил код. Вы должны быть в состоянии запустить его сейчас. Дайте мне знать, если у вас возникнут какие-либо проблемы. Я не обновил метод вызова потока, чтобы вы могли увидеть разницу.