Пользовательский шаблон облачного потока данных Google — только в потоковых конвейерах

#python #google-cloud-platform #google-cloud-dataflow #google-cloud-pubsub

#python #google-cloud-platform #google-cloud-поток данных #google-cloud-pubsub

Вопрос:

Я пытаюсь создать пользовательский шаблон для потока данных Google. Я просто хочу распечатать некоторые сообщения из Pubsub на консоль. Когда я пытаюсь создать свой шаблон, я получаю сообщение об ошибке, что Cloud Pub / Sub доступен только для потоковых конвейеров, в то время как мой конвейер предназначен для потокового конвейера: x. Что я делаю, чтобы мой конвейер пакетный, а не потоковый?

 import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class PrintExample(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--welcome', type=str)


TOPIC = ...
PROJECT = ...
BUCKET = ...


pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project=PROJECT,
    job_name='printtemplate01',
    temp_location='gs://{}/temp'.format(BUCKET),
    region='us-central1'
)

with beam.Pipeline(options=pipeline_options) as p:
    options = pipeline_options.view_as(PrintExample)
    (
        p
        | "Extract PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC)
        | "Print" >> beam.Map(print)
    )
    p.run()
  

Затем я запускаю

 python -m PrintTemplate.py 
    --runner DataflowRunner --project [PROJECT] 
    --staging_location gs://[BUCKET]/staging 
    --temp_location gs://[BUCKET]/temp 
    --template_location gs://[BUCKET]/templates/PrintTemplate

  

В результате:

 ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines.
  

Ответ №1:

Вы были почти на месте. Просто добавьте --streaming в свою команду.

 python -m PrintTemplate.py 
    --runner DataflowRunner --project [PROJECT] 
    --staging_location gs://[BUCKET]/staging 
    --temp_location gs://[BUCKET]/temp 
    --template_location gs://[BUCKET]/templates/PrintTemplate
    --streaming
  

Я вижу, что вы используете PipelineOptions. Вы также можете пройти streaming=True .

 pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project=PROJECT,
    job_name='printtemplate01',
    temp_location='gs://{}/temp'.format(BUCKET),
    region='us-central1',
    streaming=True
)