Попытка использовать dask delayed для чтения и подсчета строк в файлах csv

#python #delay #dask

#python #задержка #dask

Вопрос:

У меня есть 5 CSV-файлов, содержащих около 1 МЛН записей каждый. Я использую dask, чтобы попытаться прочитать их параллельно, и подсчитываю записи в каждой из них, затем суммирую, чтобы получить общее количество записей. Вот мой код:

 import dask.dataframe as dd
import dask

counts = []

def read_file(fn):
    df = dask.delayed(dd.read_csv)(fn)
    return len(df.index)

for i in range(5):
    filename="c://parallel//test" str(i) ".csv"
    print(filename)
    counts.append(read_file(filename))
    

dask.compute(sum(counts))
  

Однако я получаю следующее сообщение об ошибке при попытке запустить приведенный выше код.

 ~anaconda3libsite-packagesdaskdelayed.py in __len__(self)
    549     def __len__(self):
    550         if getattr(self, "_length", None) is None:
--> 551             raise TypeError("Delayed objects of unspecified length have no len()")
    552         return self._length
    553 

TypeError: Delayed objects of unspecified length have no len()
  

Если я заменю строку return len(df.index) жестко запрограммированным значением, таким как return 1000 , все будет работать, как ожидалось

Кто-нибудь может показать мне, как обойти это.

Заранее спасибо

Ответ №1:

Не смешивайте коллекции (dataframe) в отложенных функциях. Решение, которое вы ищете, должно быть проще:

 import dask.dataframe as dd

filenames = ["c://parallel//test" str(i) ".csv" for i in range(5)]
df = dd.read_csv(filenames)
len(df)