Dask read_csv не может считываться из BytesIO

#python #pandas #dask

#python #панды #dask

Вопрос:

У меня есть следующий код для чтения сжатого CSV-файла из байтов. Он работает с pandas.read_csv, однако он не работает с dask (dd.read_csv).

File in d['urls'][0] — это ссылка на файл на Amazon S3, предоставленный сторонним сервисом.

 import io
import requests
import pandas
import dask.dataframe as dd 

output = io.BytesIO()
output.name = "chunk_1.csv.gz"
with requests.get(d['urls'][0], stream=True) as resp:
    resp.raise_for_status()
    for chunk in resp.iter_content(chunk_size=None):
        if chunk:
            output.write(chunk)
output.seek(0)

dd.read_csv(output, compression='gzip', blocksize=None) #Doesn't work

pd.read_csv(output, compression='gzip') # WORKS
 

Трассировка:

 ---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-13-39441d60668b> in <module>
     13 output.seek(0)
     14 
---> 15 dd.read_csv(output, compression='gzip', blocksize=None) #Doesn't work
     16 
     17 pd.read

~/opt/anaconda3/lib/python3.8/site-packages/dask/dataframe/io/csv.py in read(urlpath, blocksize, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    698         **kwargs,
    699     ):
--> 700         return read_pandas(
    701             reader,
    702             urlpath,

~/opt/anaconda3/lib/python3.8/site-packages/dask/dataframe/io/csv.py in read_pandas(reader, urlpath, blocksize, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    533         sample = blocksize
    534     b_lineterminator = lineterminator.encode()
--> 535     b_out = read_bytes(
    536         urlpath,
    537         delimiter=b_lineterminator,

~/opt/anaconda3/lib/python3.8/site-packages/dask/bytes/core.py in read_bytes(urlpath, delimiter, not_zero, blocksize, sample, compression, include_path, **kwargs)
     93     """
     94     if not isinstance(urlpath, (str, list, tuple, os.PathLike)):
---> 95         raise TypeError("Path should be a string, os.PathLike, list or tuple")
     96 
     97     fs, fs_token, paths = get_fs_token_paths(urlpath, mode="rb", storage_options=kwargs)

TypeError: Path should be a string, os.PathLike, list or tuple
 

URL-адрес, из которого я пытаюсь получить файл, выглядит like https://user-ad-revenue.s3.amazonaws.com/data/XXXX/uar/tables/mediation/XXXX=v3/publisher_id%XXXXX/application_id%XXXXX/day=2020-12-27/report.csv.gz?AWSAccessKeyId=XXXXXamp;Expires=1609150335amp;Signature=XXXXX

Чтение из http с помощью dask dd.read_csv(d['urls'][0], compression='gzip', blocksize=None) возвращает BadGzipFile: Not a gzipped file (b'<?') , однако оно работает с pd.read_csv

Ответ №1:

Согласно документации dask read_csv , первый параметр должен быть строкой или списком:

urlpathstring или список абсолютных или относительных путей к файлам.

Префикс с протоколом, подобным s3:// для чтения из альтернативных файловых систем. Для чтения из нескольких файлов вы можете передать глобальную строку или список путей, с оговоркой, что все они должны иметь один и тот же протокол.

Это отражается в трассировке:

Ошибка типа: путь должен быть строкой, os.PathLike, списком или кортежем

Обратите внимание, что это отличается от pandas read_csv:

filepath_or_bufferstr, объект path или файлоподобный объект Допустим любой допустимый строковый путь. Строка может быть URL-адресом. Допустимые схемы URL включают http, ftp, s3, gs и file. Для URL-адресов файлов ожидается хост. Локальным файлом может быть: file://localhost/path/to/table.csv.

Если вы хотите передать объект path, pandas принимает любую ОС.Похожий на путь.

Под файлоподобным объектом мы подразумеваем объекты с помощью метода read(), такие как дескриптор файла (например, через встроенную функцию open) или StringIO.

Однако вы можете напрямую прочитать файл в dask, поддерживаются следующие удаленные хранилища данных:

  • Локальная или сетевая файловая система: file:// — локальная файловая система, используемая по умолчанию при отсутствии какого-либо протокола.
  • Файловая система Hadoop: hdfs:// — распределенная файловая система Hadoop для устойчивых, реплицируемых файлов в кластере. Это использует PyArrow в качестве серверной части.
  • Amazon S3: s3:// — удаленное двоичное хранилище Amazon S3, часто используемое с Amazon EC2, с использованием библиотеки s3fs.
  • Облачное хранилище Google: gcs:// или gs:// — облачное хранилище Google, обычно используемое с вычислительным ресурсом Google с использованием gcsfs.
  • Хранилище Microsoft Azure: adl: //, abfs: // или az:// — хранилище Microsoft Azure с использованием adlfs.
  • HTTP (ы): http:// или https:// для чтения данных непосредственно с веб-серверов HTTP.

Подробнее об удаленных данных в документах dask.

Комментарии:

1. В этом случае я не могу читать напрямую из файловой системы или S3. Два доступных варианта — buffer или http. Если я прочитаю URL-адрес с помощью dd.read_csv(d [‘urls’] [0], сжатие = ‘gzip’, размер блока = Нет), я получаю ошибку BadGzipFile: не архивированный файл (b'<?’) Однако чтение непосредственно из URL отлично работает с pandas

2. @PoradaKev Я вижу, было бы нормально читать с помощью pandas и преобразовывать его в фрейм данных dask, используя что-то вроде from_pandas ?

3. Не полностью, для чтения файла таким образом требуется больше времени. CSV содержит более 20 миллионов строк, ранее я использовал только pandas для этого случая, но теперь я пытаюсь переписать код с помощью dask