Запросы модулей не найдены в потоке данных runner

# #python #google-cloud-platform #google-cloud-dataflow #apache-beam

Вопрос:

Я строю конвейер на потоке данных, который отправляет полученные сообщения от pubsub, преобразует их, вызывая api, а затем записывает их в большой запрос.

Запись из pubsub в bigquery без вызова api работает отлично. Однако, когда я пытаюсь использовать библиотеку запросов, она всегда выдает мне следующую ошибку:

 NameError: name 'requests' is not defined [while running 'CustomParse-ptransform-11727']

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service.cc:631
generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "PubSub2BQ.py", line 43, in process
 

вот соответствующий код:

 import argparse
import json
import requests
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions


class MtToJson(beam.DoFn):
    def __init__(self):
        pass

    def process(self, message):
        x = requests.post('serverAdress', data=message)
        yield x.text

def run():
    # Parsing arguments
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_subscription",
        help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."',
        default=INPUT_SUBSCRIPTION,
    )
    parser.add_argument(
        "--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE
    )
    parser.add_argument(
        "--output_schema",
        help="Output BigQuery Schema in text format",
        default=BIGQUERY_SCHEMA,
    )
    known_args, pipeline_args = parser.parse_known_args()

    # Creating pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(StandardOptions).streaming = True

    # Defining our pipeline and its steps
    with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
                subscription=known_args.input_subscription, timestamp_attribute=None )
            | "CustomParse" >> beam.ParDo(MtToJson())
            | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                known_args.output_table,
                schema=known_args.output_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
        )

 

Адрес сервера опущен.

Я проверил, что запросы встроены в gcp и предназначены для python 2.7 и 3.7.

я запускаю бегун потока данных с requirements.txt это включает в себя модуль запросов, но, похоже, он не устанавливается.

Редактировать:

Я изменил версию apache sdk на более низкую версию:

requirements.txt

 requests>=2.20.1
apache-beam[gcp]==2.25.0
 

и я получаю новое огромное сообщение об ошибке:

  File "/usr/lib/python3.8/subprocess.py", line 512, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.

 Pip install failed for package: -r
 Output from execution of subprocess: b'Collecting requests>=2.20.1n  File was already downloaded /tmp/dataflow-requirements-cache/requests-2.25.1.tar.gznCollecting httplib2>=0.17.2n  File was already downloaded /tmp/dataflow-requirements-cache/httplib2-0.19.1.tar.gzn  Installing build dependencies: startedn  Installing build dependencies: finished with status 'done'n  Getting requirements to build wheel: startedn  Getting requirements to build wheel: finished with status 'done'n    Preparing wheel metadata: startedn    Preparing wheel metadata: finished with status 'done'nCollecting apache-beam[gcp]==2.25.0n  File was already downloaded /tmp/dataflow-requirements-cache/apache-beam-2.25.0.zipnCollecting certifi>=2017.4.17n  File was already downloaded /tmp/dataflow-requirements-cache/certifi-2021.5.30.tar.gznCollecting chardet<5,>=3.0.2n  File was already downloaded /tmp/dataflow-requirements-cache/chardet-4.0.0.tar.gznCollecting idna<3,>=2.5n  File was already downloaded /tmp/dataflow-requirements-cache/idna-2.10.tar.gznCollecting urllib3<1.27,>=1.21.1n  File was already downloaded /tmp/dataflow-requirements-cache/urllib3-1.26.5.tar.gznCollecting pyparsing<3,>=2.4.2n  File was already downloaded /tmp/dataflow-requirements-cache/pyparsing-2.4.7.tar.gznCollecting crcmod<2.0,>=1.7n  File was already downloaded /tmp/dataflow-requirements-cache/crcmod-1.7.tar.gznCollecting dill<0.3.2,>=0.3.1.1n  File was already downloaded /tmp/dataflow-requirements-cache/dill-0.3.1.1.tar.gznCollecting fastavro<2,>=0.21.4n  File was already downloaded /tmp/dataflow-requirements-cache/fastavro-1.4.1.tar.gznCollecting future<1.0.0,>=0.18.2n  File was already downloaded /tmp/dataflow-requirements-cache/future-0.18.2.tar.gznCollecting grpcio<2,>=1.29.0n  File was already downloaded /tmp/dataflow-requirements-cache/grpcio-1.38.0.tar.gznCollecting hdfs<3.0.0,>=2.1.0n  File was already downloaded /tmp/dataflow-requirements-cache/hdfs-2.6.0.tar.gznCollecting mock<3.0.0,>=1.0.1n  File was already downloaded /tmp/dataflow-requirements-cache/mock-2.0.0.tar.gznCollecting numpy<2,>=1.14.3n  File was already downloaded /tmp/dataflow-requirements-cache/numpy-1.20.3.zipn  Installing build dependencies: startedn  Installing build dependencies: still running...n  Installing build dependencies: finished with status 'done'n  Getting requirements to build wheel: startedn  Getting requirements to build wheel: finished with status 'done'n    Preparing wheel metadata: startedn    Preparing wheel metadata: finished with status 'done'nCollecting oauth2client<5,>=2.0.1n  File was already downloaded /tmp/dataflow-requirements-cache/oauth2client-4.1.3.tar.gznCollecting protobuf<4,>=3.12.2n  File was already downloaded /tmp/dataflow-requirements-cache/protobuf-3.17.3.tar.gznCollecting pydot<2,>=1.2.0n  File was already downloaded /tmp/dataflow-requirements-cache/pydot-1.4.2.tar.gznCollecting pymongo<4.0.0,>=3.8.0n  File was already downloaded /tmp/dataflow-requirements-cache/pymongo-3.11.4.tar.gznCollecting python-dateutil<3,>=2.8.0n  File was already downloaded /tmp/dataflow-requirements-cache/python-dateutil-2.8.1.tar.gzn  Installing build dependencies: startedn  Installing build dependencies: finished with status 'done'n  Getting requirements to build wheel: startedn  Getting requirements to build wheel: finished with status 'done'n    Preparing wheel metadata: startedn    Preparing wheel metadata: finished with status 'done'nCollecting pytz>=2018.3n  File was already downloaded /tmp/dataflow-requirements-cache/pytz-2021.1.tar.gznCollecting typing-extensions<3.8.0,>=3.7.0n  File was already downloaded /tmp/dataflow-requirements-cache/typing_extensions-3.7.4.3.tar.gznCollecting avro-python3!=1.9.2,<1.10.0,>=1.8.1n  File was already downloaded /tmp/dataflow-requirements-cache/avro-python3-1.9.2.1.tar.gznCollecting pyarrow<0.18.0,>=0.15.1n  File was already downloaded /tmp/dataflow-requirements-cache/pyarrow-0.17.1.tar.gzn  Installing build dependencies: startedn  Installing build dependencies: still running...n  Installing build dependencies: still running...n  Installing build dependencies: finished with status 'done'n  Getting requirements to build wheel: startedn  Getting requirements to build wheel: finished with status 'done'n    Preparing wheel metadata: startedn    Preparing wheel metadata: finished with status 'done'nCollecting cachetools<5,>=3.1.0n  File was already downloaded /tmp/dataflow-requirements-cache/cachetools-4.2.2.tar.gzn  Installing build dependencies: startedn  Installing build dependencies: finished with status 'done'n  Getting requirements to build wheel: startedn  Getting requirements to build wheel: finished with status 'done'n    Preparing wheel metadata: startedn    Preparing wheel metadata: finished with status 'done'nCollecting google-apitools<0.5.32,>=0.5.31n  File was already downloaded /tmp/dataflow-requirements-cache/google-apitools-0.5.31.tar.gznCollecting google-auth<2,>=1.18.0n  File was already downloaded /tmp/dataflow-requirements-cache/google-auth-1.30.2.tar.gznCollecting google-cloud-bigquery<2,>=1.6.0n  File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-bigquery-1.28.0.tar.gznCollecting google-cloud-bigtable<2,>=0.31.1n  File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-bigtable-
 

Похоже, он хочет загрузить кучу пакетов, которые я давно удалил из файла требований. Есть ли способ очистить кэш?

Комментарии:

1. В соответствии с зависимостями рабочего потока данных модуль запросов уже должен быть установлен в рабочем потоке. Используете ли вы ту же версию, что указана на этой странице (2.24.0)?

2. @DanielOliveira в сети gcp я вижу, что он работает под управлением 2.29.0

3. Это, вероятно, версия Apache Beam, которую вы описываете, так как запросы, похоже, сейчас только до 2.25.1. Мне было интересно, какую версию вы указали в requirements.txt. На самом деле, было бы более полезно, если бы вы могли включить содержимое своего requirements.txt в вопросе, если сможете.

4. Я обновил вопрос, так как теперь я получаю что-то другое