# #pandas #google-cloud-dataflow #apache-beam
Вопрос:
Я пытаюсь импортировать данные из API в свой GBQ и хочу использовать поток данных.
По неизвестным и невообразимым для меня причинам API просто возвращает URL-адрес «.csv.gz», который мне затем нужно загрузить и обработать, прежде чем отправлять данные в GBQ.
Кроме того, API имеет аутентификацию с помощью токена на предъявителя, поэтому я искал способ обработки загрузки и анализа данных, а также аутентификации и нашел:
pd.read_csv('https://app.SOMEPROVIDER.com/api/reporting/download/SOMEID.csv.gz', storage_options={'Authorization': 'Bearer ' bearer_token}, compression='gzip', header=0, sep=',', quotechar='"')
что фантастически работает при использовании его в моем локальном трубопроводе луча.
Однако, как только я загружаю конвейер в поток данных и запускаю его там, я получаю сообщение об ошибке
Ошибка значения: параметры хранилища передаются с файловым объектом или путем к файлу, отличным от fsspec.
Полный след:
"apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process File ".filename.py", line 144, in process File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 610, in read_csv return _read(filepath_or_buffer, kwds) File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 462, in _read parser = TextFileReader(filepath_or_buffer, **kwds) File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 819, in __init__ self._engine = self._make_engine(self.engine) File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1050, in _make_engine return mapping[engine](self.f, **self.options) # type: ignore[call-arg] File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1867, in __init__ self._open_handles(src, kwds) File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1362, in _open_handles self.handles = get_handle( File "/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line 558, in get_handle ioargs = _get_filepath_or_buffer( File "/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line 286, in _get_filepath_or_buffer raise ValueError( ValueError: storage_options passed with file object or non-fsspec file path During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process File ".filename.py", line 144, in process File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 610, in read_csv return _read(filepath_or_buffer, kwds) File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 462, in _read parser = TextFileReader(filepath_or_buffer, **kwds) File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 819, in __init__ self._engine = self._make_engine(self.engine) File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1050, in _make_engine return mapping[engine](self.f, **self.options) # type: ignore[call-arg] File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1867, in __init__ self._open_handles(src, kwds) File "/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1362, in _open_handles self.handles = get_handle( File "/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line 558, in get_handle ioargs = _get_filepath_or_buffer( File "/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line 286, in _get_filepath_or_buffer raise ValueError( ValueError: storage_options passed with file object or non-fsspec file path [while running 'Fetch actual report data'] ```
Кто-нибудь знает, почему это работает локально, но не в облаке? Я предполагаю, что это может быть связано с файловой системой и временными файлами, но тогда сообщение об ошибке не имеет большого смысла…
Согласно документу pandas, параметр storage_options передается в urllib для https-ссылок и только в fsspec для путей s3 и gcs. смотрите здесь
Комментарии:
1. Является ли «bearer_token» строкой или путем к файлу ? Если это путь к файлу, он может быть недоступен для виртуальных машин. Похоже, ошибка связана с используемым вами API, поэтому я не уверен, что это значит.
Ответ №1:
Оказалось, что это была просто проблема с версией. Интерпретация аргумента параметров хранения в качестве информации об авторизации не существовала в версии pandas, которая включена в изображения потока данных, и когда я передал локальное «колесо» новейшей доступной версии pandas с --extra_package
параметром, проблема разрешилась сама собой.