Запись большого фрейма данных dask в файл

#python #dask

#python #dask

Вопрос:

У меня есть большой файл BCP (12 ГБ), который я импортировал в dask и произвел некоторую обработку данных, которые я хочу импортировать на SQL server. Файл был уменьшен с 40 столбцов до 8 столбцов, и я хочу найти наилучший метод для импорта на SQL server. Я попытался использовать следующее:

 import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from urllib.parse import quote_plus

pbar = ProgressBar()
pbar.register()
#windows authentication 
#to_sql_uri = quote_plus(engine)
ddf.to_sql('test', 
           uri='mssql pyodbc://TEST_SERVER/TEST_DB?driver=SQL Server?Trusted_Connection=yes', if_exists='replace', index=False)
  

Этот метод занимает слишком много времени (3 дня и подсчет). Я подозревал, что это может быть так, поэтому я также попытался выполнить запись в файл BCP с намерением использовать SQL BCP, но опять же это занимает несколько дней:

 df_train_grouped.compute().to_csv("F:TEST_FILE.bcp", sep='t')
  

Я относительно новичок в dask и, похоже, не могу найти простой пример наиболее эффективного метода для этого.

Ответ №1:

Вам не нужно использовать compute , это материализует фрейм данных в память и, вероятно, является узким местом для вас. Вместо этого вы можете сделать

 df_train_grouped.to_csv("F:TEST_FILE*.bcp", sep='t')
  

который создаст несколько выходных файлов параллельно — что, вероятно, именно то, что вы хотите.

Обратите внимание, что профилирование определит, связан ли ваш процесс с вводом-выводом (например, самим диском), и в этом случае вы ничего не можете сделать, или один из планировщиков на основе процессов (в идеале распределенный планировщик) может помочь с задачами хранения GIL.

Переход на многопроцессорный планировщик следующим образом улучшил производительность в этом конкретном случае:

 dask.config.set(scheduler='processes')  # overwrite default with multiprocessing scheduler
df_train_grouped.to_csv("F:TEST_FILE*.bcp", sep='t', chunksize=1000000)