dask: увеличение скорости загрузки нескольких файлов в один фрейм данных

#python #pandas #performance #dask

#python #pandas #Производительность #dask

Вопрос:

Я довольно регулярно объединяю несколько тысяч фреймов данных разумного размера (~ 1 миллион строк). Хотя я могу заставить pandas работать read_csv , это ужасное решение из-за чрезвычайно больших накладных расходов.

Мне нужно более быстрое решение этой проблемы, и dask, по-видимому, имеет эту множественную функциональность csv, встроенную в их read_csv / read_table функции.

Однако я не заметил значительного улучшения скорости с помощью этих решений.

Есть ли способ увеличить скорость процесса следующего типа? :

 import io
import re
import numpy as np
import dask.bag as dbag
import dask.dataframe as ddf

def filter_data(fp, ix_col = 'index_here', val_col = 'some_value'):
    dask_frame = ddf.read_table(fp)
    # filter to only one column and index (like a series)
    series = dask_frame[[ix_col, val_col]].set_index(ix_col)

    # Rename it to be the filename / file_id
    file_id = re.match("file_(. ).txt", fp)[1]
    series.columns = [file_id]
    return series

def get_dataframe(file_paths):
    # Make a collection
    dasks_bag = dbag.from_sequence(file_paths)

    # Open the files as dask frame and filter each to series-like frames
    filtered_dfs = dasks_bag.map(filter_data)

    # Compute pandas dataframe on each within the list
    filtered_dfs = filtered_dfs.compute()

    # concatenate them together
    df = ddf.concat(filtered_dfs, axis = 1)

    # Compute on concatenated again, so it becomes pandas dataframe
    return df.dropna(how = "all").compute()



# Just write some random files here
paths = ['file_120202021.txt', 'file_123.txt', 'file_12330.txt']
for fp in paths:
    with open(fp, 'w') as f:
        f.write('index_heretsome_valuetother_colsn')
        for row in range(0,1000):
            for val, other_col in np.random.rand(1, 2):
                f.write(str(row) 't' str(val) 't' str(other_col) 'n')

# Make a dataframe with dask
get_dataframe(paths)
  

Редактировать:
Здесь у меня есть небольшой скрипт, который показывает сбой dask:
Время, необходимое для dask на моем компьютере, составляет 1,87 секунды
, в то время как время, необходимое для pandas, составляет 0,29 секунды

Очевидно, я делаю это неправильно, поскольку dask был специально создан для более быстрых вычислений на фреймах данных.

 import io
import re
import numpy as np
import pandas as pd
import dask.bag as dbag
import dask.dataframe as ddf
import time

def get_dask_dataframe(file_paths,  ix_col = 'index_here', val_col = 'some_value'):
    # Make a collection
    dasks_bag = dbag.from_sequence(file_paths)

    # read and filter to data of interest
    dask_frames = ddf.read_table(file_paths, include_path_column = True)[[ix_col, val_col, 'path']]

    # Make pandas dataframe
    df = dask_frames.compute()

    # Pivot since read_table puts path in one column
    df = df.pivot_table(values = val_col, index = ix_col, columns = 'path')
    return df.dropna(how = "all")

def get_pandas_dataframe(file_paths, ix_col = 'index_here', val_col = 'some_value'):
    # Make a collection
    l = []
    for f in file_paths:
        series = pd.read_csv(f, sep = 't')[[ix_col, val_col]].set_index(ix_col)
        # Rename it to be the filename / file_id
        file_id = re.match("file_(. ).txt", f)[1]
        series.columns = [file_id]
        l  = [series]

    # concatenate them together
    df = pd.concat(l, axis = 1)
    return df.dropna(how = "all")


# Just write a whole bunch of random files
paths = ['file_' str(i) '.txt' for i in range(0, 100)]
for fp in paths:
    with open(fp, 'w') as f:
        f.write('index_heretsome_valuetother_colsn')
        for row in range(0,1000):
            for val, other_col in np.random.rand(1, 2):
                f.write(str(row) 't' str(val) 't' str(other_col) 'n')

t0 = time.time()
# Make a dataframe with dask
df1 = get_dask_dataframe(paths)
t1 = time.time()
print(t1-t0)

t0 = time.time()
# Make a dataframe with dask
df2 = get_pandas_dataframe(paths)
t1 = time.time()
print(t1-t0)