#python #pandas #csv #amazon-s3 #dask
#python #pandas #csv #amazon-s3 #dask
Вопрос:
Я пытался найти самый быстрый способ чтения большого csv-файла (более 10 миллионов записей) из S3 и выполнить пару простых операций с одним из столбцов (общее количество строк и среднее значение). Я провел пару тестов, и самым быстрым на данный момент было создание фрейма данных dask, но мне интересно, есть ли какая-либо другая альтернатива, которая может сделать все еще быстрее.
Есть предложения? Спасибо!
Тест 1. Pandas прочитал csv: 92.36531567573547 секунд
start_time = time.time()
s3 = boto3.client('s3')
path =my_csvS3
use_column=['tip_amount']
df= pd.read_csv(path,usecols=use_column)
print(df.count)
print (df["tip_amount"].mean())
print("%s seconds" % ((time.time())-(start_time)))
Тест 2 Pandas читает csv по частям: 78.15214204788208 секунд
import time
start_time = time.time()
tp = pd.read_csv(path, usecols=use_column, iterator=True, chunksize=5000000) # gives TextFileReader
df = pd.concat(tp, ignore_index=True)
print(df.count)
print (df["tip_amount"].mean())
print("%s seconds" % ((time.time())-(start_time)))
Тест 3 dask dataframe: 54.183971881866455 секунд
import dask.dataframe as dd
import time
start_time = time.time()
s3 = boto3.client('s3')
df = dd.read_csv(path)
df = df['tip_amount']
cols=['tip_amount']
dfp = df.compute()
print(len(dfp))
print (dfp.mean())
print("%s seconds" % ((time.time())-(start_time)))
Комментарии:
1. Вы должны измерить, сколько времени требуется для одного только downlaod, который должен быть ограничен пропускной способностью, т. Е. Нет возможности обойти эту стоимость.
2. Примечание: вместо обработки файла в программе Python вы можете использовать Amazon Athena для обработки файла. Вы можете предоставить Athena SQL-запрос, и он сможет выполнить запрос к данным в S3 без необходимости загружать данные. Вы также можете вызвать Athena из своей собственной программы на Python. (Athena использует технологию Presto для выполнения быстрых параллельных запросов.)
Ответ №1:
Эта строка
dfp = df.compute()
является антипаттером для dask. Вы разделяете нагрузку, btu, затем вы формируете один большой фрейм данных в памяти путем конкатенации. Вам было бы лучше вычислить то, что вы хотите, в исходных фрагментах (обратите внимание, что len
это особенное в python, поэтому это менее аккуратно, чем могло бы быть
dask.compute(df.shape[0], df.mean())
Кроме того, вы можете повысить производительность распределенного планировщика даже на одной машине для некоторых рабочих нагрузок. В этом случае я считаю, что большая часть работы выполняется без использования GIL, что является критическим соображением. Тем не менее, стоит оценить разницу.
Дополнительно: если вам действительно нужен только один столбец, укажите это в своем read_csv
— это относится ко всем бэкэндам!
Комментарии:
1. Спасибо за ваши предложения @mdurant! Я протестировал ваши идеи, и это заняло немного больше времени, чем мой 3-й тест (read_csv только тот столбец, который мне нужен, и использовал dask.compute (df.shape[0],df.mean()) сверху… Я немного сбит с толку, потому что все, что вы сказали, имеет смысл, и это должно сэкономить как минимум несколько секунд…