Объединение фреймов данных dask приводит к ошибке диска из-за временных файлов .partd в /tmp

#python #merge #dask #temporary-files

#python #слияние #dask #временные файлы

Вопрос:

Я пытался объединить 32 файла с общим столбцом id в dask. В общей сложности файлы имеют размер 82,4 ГБ. Я итеративно просматриваю файлы и выполняю внешнее слияние для некоторых, а затем левое слияние для других. Я не устанавливал распределенный клиент, поскольку не был уверен, что это необходимо. Я также попытался установить столбец id в качестве индекса и использовать его для объединения, но это не сработало. Я также пробовал pandas, но получил ошибку памяти, хотя я думал, что это поместится в оперативную память (524 ГБ).

Проблема в том, что dask создает так много больших временных файлов .partd, что заполняет доступные 950 ГБ. Не похоже, что он полностью использует оперативную память. Я подозреваю, что рабочий процесс создает много временных файлов, и они не очищаются во время процесса, даже если они не используются. Они удаляются, как только скрипты завершаются ошибкой из-за нехватки места на диске.

Любое предложение улучшить это было бы полезно! Большое вам спасибо!

Код выглядит следующим образом:

 #!/usr/bin/env python

import pandas as pd
import dask.dataframe as dd


if __name__ == '__main__':

    list_df=pd.read_csv("path_2_files.csv")
    for index, row in list_df.iterrows():
        print("Starting " str(index))
        if (index==0):
            ddf = dd.read_csv(row['path'] row['filename'] , sep="t",
                              dtype={'id': 'object', 
                                     'data1': 'float64', 
                                     'data2': 'float64', 
                                     'data3': 'float64' 
                                     })
            #Check for duplicates and drop
            ddf=ddf.drop_duplicates()
            print(len(ddf))
        else:
            ddf2 = dd.read_csv(row['path']   row['filename'], 
                                dtype={'id': 'object', 
                                       'data1': 'float64', 
                                       'data2': 'float64', 
                                       'data3': 'float64' 
                                       })
            #Check for duplicates and drop
            ddf2=ddf2.drop_duplicates()
            
            if (row['type']=='test1'):
                ddf = dd.merge(ddf, ddf2, on='id', how="outer")
                print(len(ddf))
                del ddf2
            elif (row['type']=='test2' or row['type']=='test3'):
                ddf = dd.merge(ddf, ddf2, on='id', how="left")
                print(len(ddf))
                del ddf2

    ddf.to_parquet("s3://some/path/merged_file.parquet", object_encoding='utf8')
  

Комментарии:

1. Если у вас есть какой-либо другой раздел с большим объемом дискового пространства, чем /tmp , вы можете изменить каталог этих временных файлов, как описано в этой проблеме