pandas read_csv с функциями storage_options, работающими локально, но не в потоке данных

# #pandas #google-cloud-dataflow #apache-beam

Вопрос:

Я пытаюсь импортировать данные из API в свой GBQ и хочу использовать поток данных.

По неизвестным и невообразимым для меня причинам API просто возвращает URL-адрес «.csv.gz», который мне затем нужно загрузить и обработать, прежде чем отправлять данные в GBQ.

Кроме того, API имеет аутентификацию с помощью токена на предъявителя, поэтому я искал способ обработки загрузки и анализа данных, а также аутентификации и нашел:

 pd.read_csv('https://app.SOMEPROVIDER.com/api/reporting/download/SOMEID.csv.gz', storage_options={'Authorization': 'Bearer '  bearer_token}, compression='gzip', header=0, sep=',', quotechar='"')
 

что фантастически работает при использовании его в моем локальном трубопроводе луча.

Однако, как только я загружаю конвейер в поток данных и запускаю его там, я получаю сообщение об ошибке

Ошибка значения: параметры хранилища передаются с файловым объектом или путем к файлу, отличным от fsspec.

Полный след:

 "apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 572, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
".filename.py", line 144, in process File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
610, in read_csv return _read(filepath_or_buffer, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
462, in _read parser = TextFileReader(filepath_or_buffer, **kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
819, in __init__ self._engine = self._make_engine(self.engine) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1050, in _make_engine return mapping[engine](self.f, **self.options) #
type: ignore[call-arg] File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1867, in __init__ self._open_handles(src, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1362, in _open_handles self.handles = get_handle( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
558, in get_handle ioargs = _get_filepath_or_buffer( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
286, in _get_filepath_or_buffer raise ValueError( ValueError:
storage_options passed with file object or non-fsspec file path During
handling of the above exception, another exception occurred: Traceback
(most recent call last): File
"/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py",
line 651, in do_work work_executor.execute() File
"/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py",
line 179, in execute op.start() File
"dataflow_worker/shuffle_operations.py", line 63, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 353, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 353, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 713, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1225, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1290, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 752, in
apache_beam.runners.common.PerWindowInvoker.invoke_process File
"apache_beam/runners/common.py", line 875, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1386, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 713, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1225, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 572, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
".filename.py", line 144, in process File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
610, in read_csv return _read(filepath_or_buffer, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
462, in _read parser = TextFileReader(filepath_or_buffer, **kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
819, in __init__ self._engine = self._make_engine(self.engine) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1050, in _make_engine return mapping[engine](self.f, **self.options) #
type: ignore[call-arg] File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1867, in __init__ self._open_handles(src, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1362, in _open_handles self.handles = get_handle( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
558, in get_handle ioargs = _get_filepath_or_buffer( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
286, in _get_filepath_or_buffer raise ValueError( ValueError:
storage_options passed with file object or non-fsspec file path [while
running 'Fetch actual report data'] ```
 

Кто-нибудь знает, почему это работает локально, но не в облаке? Я предполагаю, что это может быть связано с файловой системой и временными файлами, но тогда сообщение об ошибке не имеет большого смысла…

Согласно документу pandas, параметр storage_options передается в urllib для https-ссылок и только в fsspec для путей s3 и gcs. смотрите здесь

Комментарии:

1. Является ли «bearer_token» строкой или путем к файлу ? Если это путь к файлу, он может быть недоступен для виртуальных машин. Похоже, ошибка связана с используемым вами API, поэтому я не уверен, что это значит.

Ответ №1:

Оказалось, что это была просто проблема с версией. Интерпретация аргумента параметров хранения в качестве информации об авторизации не существовала в версии pandas, которая включена в изображения потока данных, и когда я передал локальное «колесо» новейшей доступной версии pandas с --extra_package параметром, проблема разрешилась сама собой.