#python #dask
#python #dask
Вопрос:
У меня есть большой файл BCP (12 ГБ), который я импортировал в dask и произвел некоторую обработку данных, которые я хочу импортировать на SQL server. Файл был уменьшен с 40 столбцов до 8 столбцов, и я хочу найти наилучший метод для импорта на SQL server. Я попытался использовать следующее:
import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from urllib.parse import quote_plus
pbar = ProgressBar()
pbar.register()
#windows authentication
#to_sql_uri = quote_plus(engine)
ddf.to_sql('test',
uri='mssql pyodbc://TEST_SERVER/TEST_DB?driver=SQL Server?Trusted_Connection=yes', if_exists='replace', index=False)
Этот метод занимает слишком много времени (3 дня и подсчет). Я подозревал, что это может быть так, поэтому я также попытался выполнить запись в файл BCP с намерением использовать SQL BCP, но опять же это занимает несколько дней:
df_train_grouped.compute().to_csv("F:TEST_FILE.bcp", sep='t')
Я относительно новичок в dask и, похоже, не могу найти простой пример наиболее эффективного метода для этого.
Ответ №1:
Вам не нужно использовать compute
, это материализует фрейм данных в память и, вероятно, является узким местом для вас. Вместо этого вы можете сделать
df_train_grouped.to_csv("F:TEST_FILE*.bcp", sep='t')
который создаст несколько выходных файлов параллельно — что, вероятно, именно то, что вы хотите.
Обратите внимание, что профилирование определит, связан ли ваш процесс с вводом-выводом (например, самим диском), и в этом случае вы ничего не можете сделать, или один из планировщиков на основе процессов (в идеале распределенный планировщик) может помочь с задачами хранения GIL.
Переход на многопроцессорный планировщик следующим образом улучшил производительность в этом конкретном случае:
dask.config.set(scheduler='processes') # overwrite default with multiprocessing scheduler
df_train_grouped.to_csv("F:TEST_FILE*.bcp", sep='t', chunksize=1000000)