Каков наиболее эффективный способ чтения большого CSV-файла (10 млн записей), расположенного на S3 (AWS), с помощью Python?

#python #pandas #csv #amazon-s3 #dask

#python #pandas #csv #amazon-s3 #dask

Вопрос:

Я пытался найти самый быстрый способ чтения большого csv-файла (более 10 миллионов записей) из S3 и выполнить пару простых операций с одним из столбцов (общее количество строк и среднее значение). Я провел пару тестов, и самым быстрым на данный момент было создание фрейма данных dask, но мне интересно, есть ли какая-либо другая альтернатива, которая может сделать все еще быстрее.

Есть предложения? Спасибо!

Тест 1. Pandas прочитал csv: 92.36531567573547 секунд

 start_time = time.time()
s3 = boto3.client('s3')
path =my_csvS3
use_column=['tip_amount']
df= pd.read_csv(path,usecols=use_column)
print(df.count)
print (df["tip_amount"].mean())
print("%s seconds" % ((time.time())-(start_time)))
 

Тест 2 Pandas читает csv по частям: 78.15214204788208 секунд

 import time
start_time = time.time()
tp = pd.read_csv(path, usecols=use_column, iterator=True, chunksize=5000000)  # gives TextFileReader
df = pd.concat(tp, ignore_index=True)
print(df.count)
print (df["tip_amount"].mean())
print("%s seconds" % ((time.time())-(start_time)))
 

Тест 3 dask dataframe: 54.183971881866455 секунд

 
import dask.dataframe as dd
import time
start_time = time.time()
s3 = boto3.client('s3')
df = dd.read_csv(path)
df = df['tip_amount'] 
cols=['tip_amount']
dfp = df.compute()
print(len(dfp))
print (dfp.mean())
print("%s seconds" % ((time.time())-(start_time)))
 

Комментарии:

1. Вы должны измерить, сколько времени требуется для одного только downlaod, который должен быть ограничен пропускной способностью, т. Е. Нет возможности обойти эту стоимость.

2. Примечание: вместо обработки файла в программе Python вы можете использовать Amazon Athena для обработки файла. Вы можете предоставить Athena SQL-запрос, и он сможет выполнить запрос к данным в S3 без необходимости загружать данные. Вы также можете вызвать Athena из своей собственной программы на Python. (Athena использует технологию Presto для выполнения быстрых параллельных запросов.)

Ответ №1:

Эта строка

 dfp = df.compute()
 

является антипаттером для dask. Вы разделяете нагрузку, btu, затем вы формируете один большой фрейм данных в памяти путем конкатенации. Вам было бы лучше вычислить то, что вы хотите, в исходных фрагментах (обратите внимание, что len это особенное в python, поэтому это менее аккуратно, чем могло бы быть

 dask.compute(df.shape[0], df.mean())
 

Кроме того, вы можете повысить производительность распределенного планировщика даже на одной машине для некоторых рабочих нагрузок. В этом случае я считаю, что большая часть работы выполняется без использования GIL, что является критическим соображением. Тем не менее, стоит оценить разницу.

Дополнительно: если вам действительно нужен только один столбец, укажите это в своем read_csv — это относится ко всем бэкэндам!

Комментарии:

1. Спасибо за ваши предложения @mdurant! Я протестировал ваши идеи, и это заняло немного больше времени, чем мой 3-й тест (read_csv только тот столбец, который мне нужен, и использовал dask.compute (df.shape[0],df.mean()) сверху… Я немного сбит с толку, потому что все, что вы сказали, имеет смысл, и это должно сэкономить как минимум несколько секунд…