#python #pandas #dask
#python #панды #dask
Вопрос:
У меня есть следующий код для чтения сжатого CSV-файла из байтов. Он работает с pandas.read_csv, однако он не работает с dask (dd.read_csv).
File in d['urls'][0]
— это ссылка на файл на Amazon S3, предоставленный сторонним сервисом.
import io
import requests
import pandas
import dask.dataframe as dd
output = io.BytesIO()
output.name = "chunk_1.csv.gz"
with requests.get(d['urls'][0], stream=True) as resp:
resp.raise_for_status()
for chunk in resp.iter_content(chunk_size=None):
if chunk:
output.write(chunk)
output.seek(0)
dd.read_csv(output, compression='gzip', blocksize=None) #Doesn't work
pd.read_csv(output, compression='gzip') # WORKS
Трассировка:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-13-39441d60668b> in <module>
13 output.seek(0)
14
---> 15 dd.read_csv(output, compression='gzip', blocksize=None) #Doesn't work
16
17 pd.read
~/opt/anaconda3/lib/python3.8/site-packages/dask/dataframe/io/csv.py in read(urlpath, blocksize, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
698 **kwargs,
699 ):
--> 700 return read_pandas(
701 reader,
702 urlpath,
~/opt/anaconda3/lib/python3.8/site-packages/dask/dataframe/io/csv.py in read_pandas(reader, urlpath, blocksize, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
533 sample = blocksize
534 b_lineterminator = lineterminator.encode()
--> 535 b_out = read_bytes(
536 urlpath,
537 delimiter=b_lineterminator,
~/opt/anaconda3/lib/python3.8/site-packages/dask/bytes/core.py in read_bytes(urlpath, delimiter, not_zero, blocksize, sample, compression, include_path, **kwargs)
93 """
94 if not isinstance(urlpath, (str, list, tuple, os.PathLike)):
---> 95 raise TypeError("Path should be a string, os.PathLike, list or tuple")
96
97 fs, fs_token, paths = get_fs_token_paths(urlpath, mode="rb", storage_options=kwargs)
TypeError: Path should be a string, os.PathLike, list or tuple
URL-адрес, из которого я пытаюсь получить файл, выглядит like https://user-ad-revenue.s3.amazonaws.com/data/XXXX/uar/tables/mediation/XXXX=v3/publisher_id%XXXXX/application_id%XXXXX/day=2020-12-27/report.csv.gz?AWSAccessKeyId=XXXXXamp;Expires=1609150335amp;Signature=XXXXX
Чтение из http с помощью dask dd.read_csv(d['urls'][0], compression='gzip', blocksize=None)
возвращает BadGzipFile: Not a gzipped file (b'<?')
, однако оно работает с pd.read_csv
Ответ №1:
Согласно документации dask read_csv
, первый параметр должен быть строкой или списком:
urlpathstring или список абсолютных или относительных путей к файлам.
Префикс с протоколом, подобным s3:// для чтения из альтернативных файловых систем. Для чтения из нескольких файлов вы можете передать глобальную строку или список путей, с оговоркой, что все они должны иметь один и тот же протокол.
Это отражается в трассировке:
Ошибка типа: путь должен быть строкой, os.PathLike, списком или кортежем
Обратите внимание, что это отличается от pandas read_csv:
filepath_or_bufferstr, объект path или файлоподобный объект Допустим любой допустимый строковый путь. Строка может быть URL-адресом. Допустимые схемы URL включают http, ftp, s3, gs и file. Для URL-адресов файлов ожидается хост. Локальным файлом может быть: file://localhost/path/to/table.csv.
Если вы хотите передать объект path, pandas принимает любую ОС.Похожий на путь.
Под файлоподобным объектом мы подразумеваем объекты с помощью метода read(), такие как дескриптор файла (например, через встроенную функцию open) или StringIO.
Однако вы можете напрямую прочитать файл в dask, поддерживаются следующие удаленные хранилища данных:
- Локальная или сетевая файловая система: file:// — локальная файловая система, используемая по умолчанию при отсутствии какого-либо протокола.
- Файловая система Hadoop: hdfs:// — распределенная файловая система Hadoop для устойчивых, реплицируемых файлов в кластере. Это использует PyArrow в качестве серверной части.
- Amazon S3: s3:// — удаленное двоичное хранилище Amazon S3, часто используемое с Amazon EC2, с использованием библиотеки s3fs.
- Облачное хранилище Google: gcs:// или gs:// — облачное хранилище Google, обычно используемое с вычислительным ресурсом Google с использованием gcsfs.
- Хранилище Microsoft Azure: adl: //, abfs: // или az:// — хранилище Microsoft Azure с использованием adlfs.
- HTTP (ы): http:// или https:// для чтения данных непосредственно с веб-серверов HTTP.
Подробнее об удаленных данных в документах dask.
Комментарии:
1. В этом случае я не могу читать напрямую из файловой системы или S3. Два доступных варианта — buffer или http. Если я прочитаю URL-адрес с помощью dd.read_csv(d [‘urls’] [0], сжатие = ‘gzip’, размер блока = Нет), я получаю ошибку BadGzipFile: не архивированный файл (b'<?’) Однако чтение непосредственно из URL отлично работает с pandas
2. @PoradaKev Я вижу, было бы нормально читать с помощью pandas и преобразовывать его в фрейм данных dask, используя что-то вроде from_pandas ?
3. Не полностью, для чтения файла таким образом требуется больше времени. CSV содержит более 20 миллионов строк, ранее я использовал только pandas для этого случая, но теперь я пытаюсь переписать код с помощью dask