# #python #google-cloud-platform #google-cloud-dataflow #apache-beam
Вопрос:
Я строю конвейер на потоке данных, который отправляет полученные сообщения от pubsub, преобразует их, вызывая api, а затем записывает их в большой запрос.
Запись из pubsub в bigquery без вызова api работает отлично. Однако, когда я пытаюсь использовать библиотеку запросов, она всегда выдает мне следующую ошибку:
NameError: name 'requests' is not defined [while running 'CustomParse-ptransform-11727']
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service.cc:631
generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs
File "PubSub2BQ.py", line 43, in process
вот соответствующий код:
import argparse
import json
import requests
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
class MtToJson(beam.DoFn):
def __init__(self):
pass
def process(self, message):
x = requests.post('serverAdress', data=message)
yield x.text
def run():
# Parsing arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."',
default=INPUT_SUBSCRIPTION,
)
parser.add_argument(
"--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE
)
parser.add_argument(
"--output_schema",
help="Output BigQuery Schema in text format",
default=BIGQUERY_SCHEMA,
)
known_args, pipeline_args = parser.parse_known_args()
# Creating pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
# Defining our pipeline and its steps
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None )
| "CustomParse" >> beam.ParDo(MtToJson())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.output_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)
Адрес сервера опущен.
Я проверил, что запросы встроены в gcp и предназначены для python 2.7 и 3.7.
я запускаю бегун потока данных с requirements.txt это включает в себя модуль запросов, но, похоже, он не устанавливается.
Редактировать:
Я изменил версию apache sdk на более низкую версию:
requirements.txt
requests>=2.20.1
apache-beam[gcp]==2.25.0
и я получаю новое огромное сообщение об ошибке:
File "/usr/lib/python3.8/subprocess.py", line 512, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
Pip install failed for package: -r
Output from execution of subprocess: b'Collecting requests>=2.20.1n File was already downloaded /tmp/dataflow-requirements-cache/requests-2.25.1.tar.gznCollecting httplib2>=0.17.2n File was already downloaded /tmp/dataflow-requirements-cache/httplib2-0.19.1.tar.gzn Installing build dependencies: startedn Installing build dependencies: finished with status 'done'n Getting requirements to build wheel: startedn Getting requirements to build wheel: finished with status 'done'n Preparing wheel metadata: startedn Preparing wheel metadata: finished with status 'done'nCollecting apache-beam[gcp]==2.25.0n File was already downloaded /tmp/dataflow-requirements-cache/apache-beam-2.25.0.zipnCollecting certifi>=2017.4.17n File was already downloaded /tmp/dataflow-requirements-cache/certifi-2021.5.30.tar.gznCollecting chardet<5,>=3.0.2n File was already downloaded /tmp/dataflow-requirements-cache/chardet-4.0.0.tar.gznCollecting idna<3,>=2.5n File was already downloaded /tmp/dataflow-requirements-cache/idna-2.10.tar.gznCollecting urllib3<1.27,>=1.21.1n File was already downloaded /tmp/dataflow-requirements-cache/urllib3-1.26.5.tar.gznCollecting pyparsing<3,>=2.4.2n File was already downloaded /tmp/dataflow-requirements-cache/pyparsing-2.4.7.tar.gznCollecting crcmod<2.0,>=1.7n File was already downloaded /tmp/dataflow-requirements-cache/crcmod-1.7.tar.gznCollecting dill<0.3.2,>=0.3.1.1n File was already downloaded /tmp/dataflow-requirements-cache/dill-0.3.1.1.tar.gznCollecting fastavro<2,>=0.21.4n File was already downloaded /tmp/dataflow-requirements-cache/fastavro-1.4.1.tar.gznCollecting future<1.0.0,>=0.18.2n File was already downloaded /tmp/dataflow-requirements-cache/future-0.18.2.tar.gznCollecting grpcio<2,>=1.29.0n File was already downloaded /tmp/dataflow-requirements-cache/grpcio-1.38.0.tar.gznCollecting hdfs<3.0.0,>=2.1.0n File was already downloaded /tmp/dataflow-requirements-cache/hdfs-2.6.0.tar.gznCollecting mock<3.0.0,>=1.0.1n File was already downloaded /tmp/dataflow-requirements-cache/mock-2.0.0.tar.gznCollecting numpy<2,>=1.14.3n File was already downloaded /tmp/dataflow-requirements-cache/numpy-1.20.3.zipn Installing build dependencies: startedn Installing build dependencies: still running...n Installing build dependencies: finished with status 'done'n Getting requirements to build wheel: startedn Getting requirements to build wheel: finished with status 'done'n Preparing wheel metadata: startedn Preparing wheel metadata: finished with status 'done'nCollecting oauth2client<5,>=2.0.1n File was already downloaded /tmp/dataflow-requirements-cache/oauth2client-4.1.3.tar.gznCollecting protobuf<4,>=3.12.2n File was already downloaded /tmp/dataflow-requirements-cache/protobuf-3.17.3.tar.gznCollecting pydot<2,>=1.2.0n File was already downloaded /tmp/dataflow-requirements-cache/pydot-1.4.2.tar.gznCollecting pymongo<4.0.0,>=3.8.0n File was already downloaded /tmp/dataflow-requirements-cache/pymongo-3.11.4.tar.gznCollecting python-dateutil<3,>=2.8.0n File was already downloaded /tmp/dataflow-requirements-cache/python-dateutil-2.8.1.tar.gzn Installing build dependencies: startedn Installing build dependencies: finished with status 'done'n Getting requirements to build wheel: startedn Getting requirements to build wheel: finished with status 'done'n Preparing wheel metadata: startedn Preparing wheel metadata: finished with status 'done'nCollecting pytz>=2018.3n File was already downloaded /tmp/dataflow-requirements-cache/pytz-2021.1.tar.gznCollecting typing-extensions<3.8.0,>=3.7.0n File was already downloaded /tmp/dataflow-requirements-cache/typing_extensions-3.7.4.3.tar.gznCollecting avro-python3!=1.9.2,<1.10.0,>=1.8.1n File was already downloaded /tmp/dataflow-requirements-cache/avro-python3-1.9.2.1.tar.gznCollecting pyarrow<0.18.0,>=0.15.1n File was already downloaded /tmp/dataflow-requirements-cache/pyarrow-0.17.1.tar.gzn Installing build dependencies: startedn Installing build dependencies: still running...n Installing build dependencies: still running...n Installing build dependencies: finished with status 'done'n Getting requirements to build wheel: startedn Getting requirements to build wheel: finished with status 'done'n Preparing wheel metadata: startedn Preparing wheel metadata: finished with status 'done'nCollecting cachetools<5,>=3.1.0n File was already downloaded /tmp/dataflow-requirements-cache/cachetools-4.2.2.tar.gzn Installing build dependencies: startedn Installing build dependencies: finished with status 'done'n Getting requirements to build wheel: startedn Getting requirements to build wheel: finished with status 'done'n Preparing wheel metadata: startedn Preparing wheel metadata: finished with status 'done'nCollecting google-apitools<0.5.32,>=0.5.31n File was already downloaded /tmp/dataflow-requirements-cache/google-apitools-0.5.31.tar.gznCollecting google-auth<2,>=1.18.0n File was already downloaded /tmp/dataflow-requirements-cache/google-auth-1.30.2.tar.gznCollecting google-cloud-bigquery<2,>=1.6.0n File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-bigquery-1.28.0.tar.gznCollecting google-cloud-bigtable<2,>=0.31.1n File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-bigtable-
Похоже, он хочет загрузить кучу пакетов, которые я давно удалил из файла требований. Есть ли способ очистить кэш?
Комментарии:
1. В соответствии с зависимостями рабочего потока данных модуль запросов уже должен быть установлен в рабочем потоке. Используете ли вы ту же версию, что указана на этой странице (2.24.0)?
2. @DanielOliveira в сети gcp я вижу, что он работает под управлением 2.29.0
3. Это, вероятно, версия Apache Beam, которую вы описываете, так как запросы, похоже, сейчас только до 2.25.1. Мне было интересно, какую версию вы указали в requirements.txt. На самом деле, было бы более полезно, если бы вы могли включить содержимое своего requirements.txt в вопросе, если сможете.
4. Я обновил вопрос, так как теперь я получаю что-то другое