Как выполнять многопроцессорные операции с фреймом данных

#python #pandas #multiprocessing

#python #pandas #многопроцессорность

Вопрос:

Мне приходится выполнять множество операций с фреймом данных, и это занимает много времени при использовании одного ядра. Я пытаюсь реализовать многопроцессорность.

Прямо сейчас, когда я пытаюсь выяснить, как это работает, я использую более простую версию, где я просто хочу добавлять значения из данных

 import multiprocessing
import pandas as pd

def add_values(a):
    df = pd.DataFrame([{'n':a}])
    return df

df = pd.DataFrame([{'n':0}])
data = [9, 4, 5]
with multiprocessing.Pool(processes=4) as pool:
    df = df.add(pool.map(add_values, data))

df
  

Я хотел бы, чтобы df возвращал фрейм данных с n = 18, но я получаю это сообщение об ошибке ValueError: Невозможно принудительно преобразовать в серию, длина должна быть 1: задано 3

Ответ №1:

Проблема здесь в том, как вы обрабатываете возвращаемое значение из ваших многопроцессорных вызовов. pool.map() возвращает list . В данном конкретном случае это будет список фреймов данных, т.Е. то, до чего расширяется ваш вызов, эквивалентно df = df.add([dfn9, dfn4, dfn5]) , где dfnX s — это разные фреймы данных.

Этот ввод не ожидается и не обрабатывается df.add() , который ожидает чего-то, что может быть превращено в pd.Series объект и добавлено к исходному фрейму. Вместо этого вам нужно взять этот список и «вручную» уменьшить его, например, как:

 import multiprocessing
import pandas as pd

def add_values(a):
    df = pd.DataFrame([{'n':a}])
    return df

df = pd.DataFrame([{'n':0}])
data = [9, 4, 5]
with multiprocessing.Pool(processes=4) as pool:
    #df = df.add(pool.map(add_values, data)) does not work
    dfs = pool.map(add_values, data)

print(type(dfs))
# Reducing return values
for d in dfs:
    df = df.add(d)

print(df)
  

Сокращение должно происходить в одном процессе, поскольку разные процессы не используют одно и то же df (вместо этого все они имеют идентичные копии).

В качестве дополнительного примечания, я думаю, вам также следует рассмотреть возможность использования multithreading rahter than multiprocessing . Это может быть проще, поскольку потоки могут совместно использовать одну и ту же память и уменьшать потребность в копировании памяти. Кроме того, поскольку pandas уменьшается объем GIL, не возникает проблемы одновременного выполнения только одного потока.