#python #multithreading #dataframe
#python #многопоточность #фрейм данных
Вопрос:
У меня есть куча фреймов данных, хранящихся в словаре (‘df_dict’). Каждый фрейм данных содержит 100 строк. Для каждой строки мне нужно добавить столбцы (‘new_score’, ‘new_num_comments’, ‘upvote_ratio’) с текущими данными из Reddit. Я использую PRAW для доступа к API Reddit.
Поскольку для последовательного обновления каждой строки требуется так много времени, я пытаюсь использовать многопоточность для получения данных. Итак, я беру фрейм данных со 100 строками, запускаю поток для каждого и создаю экземпляр PRAW для каждого потока.
Каким-то образом мой код, кажется, работает и обновляет строки, но это занимает слишком много времени — ужасно много. Нет разницы в последовательном обновлении. Требуется почти 11,4 секунды, чтобы обновить одну строку с моей попыткой «многопоточности». Пока это занимает 0,2 секунды, если я делаю это последовательно. Что я делаю не так???
Вот мой код. Я попытался вырезать столько, сколько мог, и, очевидно, отредактировал свои учетные данные:
from threading import Thread, Lock
import praw
# Dataframes are stored in df_dict
mutex = Lock()
threads = []
class ReqThread (Thread):
def __init__(self, threadID, index, row):
Thread.__init__(self)
self.threadID = threadID
self.index = index
self.row = row
def run(self):
print("Starting %s" % self.threadID)
for row in self.row:
worker(index=self.index, row=self.row)
print("Exiting %s" % self.threadID)
def make_reddit():
return praw.Reddit(client_id=client_id, client_secret=client_secret, username=username, password=password, user_agent=user_agent)
def worker(index, row):
global df
print('Request-ID: %s' % row['id'])
reddit = make_reddit()
submission = reddit.submission(row['id'])
mutex.acquire()
df.at[index, 'new_score'] = submission.score
df.at[index, 'upvote_ratio'] = submission.upvote_ratio
df.at[index, 'new_num_comments'] = submission.num_comments
mutex.release()
for i in df_dict:
df = df_dict[i]
for index, row in df.iterrows():
t = ReqThread(threadID=index, index=index, row=row)
t.start()
threads.append(t)
for thread in threads:
thread.join()
df.to_csv('u_{i}.csv'.format(i=i))
РЕДАКТИРОВАТЬ: снова подсчитано, сколько занимает моя «многопоточность».
Ответ №1:
То, с чем вы, похоже, сталкиваетесь, — это уловка Python Threading.
Почему эти «потоки» медленнее?
Несмотря на то, что может показаться, что потоки Python не являются фактическими потоками, в модуле _thread и threading используются потоки ОС, которые отлично подходят для параллелизма, связанного с вводом-выводом, но в меньшей степени для задач, связанных с процессором, это сводится к Python и GIL (глобальная блокировка интерпретатора), чтобы сохранить все потокобезопасным. Поскольку вы не получаете преимущества от нескольких ядер, вы только получите накладные расходы на балансировку нескольких потоков ОС.
Итак, как мне выполнить многоядерную обработку в Python?
Чтобы обойти проблему GIL, python использует подпроцессы, чтобы существенно сбалансировать нагрузку и использовать несколько ядер, такие модули, как multiprocessing, используют модуль subprocess внутри, вы можете попробовать это сами, если создадите пул процессов из 4 рабочих элементов и запустите их, обратите внимание, как он также порождает 4 процесса python?
Важные моменты, на которые следует обратить внимание
- Модули потоков в Python на самом деле обычно являются системами потоков операционной системы.
- GIL ограничивает большую часть параллельной обработки в python.
- IPC может быть намного сложнее, потому что вы не получаете возможность свободно передавать переменные между процессами. Смотрите документацию по многопроцессорной обработке для получения дополнительной информации.