#python-3.x #pandas #multithreading #threadpool
#python-3.x #pandas #многопоточность #пул потоков
Вопрос:
У меня есть приложение, которое прочитало бы, скажем, 50 файлов CSV большого размера, около 400 МБ каждый. Теперь я читаю их, чтобы создать фрейм данных и в конечном итоге объединить все это в один фрейм данных. Я хочу сделать это параллельно, чтобы ускорить общий процесс. Итак, мой приведенный ниже код выглядит примерно так:
import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool
from time import time
Class dataProvider:
def __init__(self):
self.df=pd.DataFrame()
self.pool = ThreadPool(processes=40)
self.df_abc=pd.DataFrame()
self.df_xyz=pd.DataFrame()
self.start=time()
def get_csv_data(self,filename):
return pd.read_csv(filename)
def get_all_csv_data(self,filename):
self.start=time()
df_1 = self.pool.apply_sync(self.get_csv_data,('1.csv',), callback=concatDf)
df_2 = self.pool.apply_sync(self.get_csv_data,('2.csv',), callback=concatDf)
total_time=time()-self.start
def concatDf(self):
self.df_abc=pd.concat([df_1,df_2])
self.df_xyz=self.df_abc.iloc[:,1:]
return self.df_xyz
Я вижу ниже проблему с кодом:
- Если тот же самый обратный вызов вызывается моим вызовом apply_sync, то как я узнаю, что текущий обратный вызов был вызван именно тем вызовом, который указан в строке df_1 или df_2 выше? 2) Я хочу объединить выходные данные другого apply_sync, как я могу это сделать в функции обратного вызова concatDf?
- Как я узнаю, что завершены обратные вызовы всех вызовов apply_sync, чтобы я мог вернуть обратно объединенный фрейм данных всех 50 csv?
- Есть ли лучший и эффективный способ сделать это?
Спасибо
Комментарии:
1. Вы ограничены оперативной памятью?
2. @mrzo У меня достаточно оперативной памяти выше 300 ГБ, идея состоит в том, чтобы читать и объединять эти файлы параллельно, поскольку каждый из них может занять, скажем, 30 секунд. поэтому в идеале я не хочу 30 * 50, а затем для запуска процесса объединения. Спасибо
3. Одновременное объединение фреймов данных намного эффективнее, чем их итеративное объединение, как вы пытаетесь это сделать. Итак, почему вы хотите это сделать? Вы пробовали мой ответ?
4. @mrzo Да, я рассмотрю ваш ответ и обновлю вас. Спасибо за вашу помощь
Ответ №1:
Редактировать: Используйте это решение, только если у вас достаточно оперативной памяти.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import pandas as pd
from glob import glob
files = glob("*.csv")
def read_file(file):
return pd.read_csv(file)
# I would recommend to try out whether ThreadPoolExecutor or
# ProcessPoolExecutor is faster on your system:
with ThreadPoolExecutor(4) as pool:
df = pd.concat(pool.map(read_file, files))
print(df)