#python #google-cloud-platform #google-cloud-dataflow #google-cloud-pubsub
#python #google-cloud-platform #google-cloud-поток данных #google-cloud-pubsub
Вопрос:
Я пытаюсь создать пользовательский шаблон для потока данных Google. Я просто хочу распечатать некоторые сообщения из Pubsub на консоль. Когда я пытаюсь создать свой шаблон, я получаю сообщение об ошибке, что Cloud Pub / Sub доступен только для потоковых конвейеров, в то время как мой конвейер предназначен для потокового конвейера: x. Что я делаю, чтобы мой конвейер пакетный, а не потоковый?
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class PrintExample(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--welcome', type=str)
TOPIC = ...
PROJECT = ...
BUCKET = ...
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project=PROJECT,
job_name='printtemplate01',
temp_location='gs://{}/temp'.format(BUCKET),
region='us-central1'
)
with beam.Pipeline(options=pipeline_options) as p:
options = pipeline_options.view_as(PrintExample)
(
p
| "Extract PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC)
| "Print" >> beam.Map(print)
)
p.run()
Затем я запускаю
python -m PrintTemplate.py
--runner DataflowRunner --project [PROJECT]
--staging_location gs://[BUCKET]/staging
--temp_location gs://[BUCKET]/temp
--template_location gs://[BUCKET]/templates/PrintTemplate
В результате:
ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines.
Ответ №1:
Вы были почти на месте. Просто добавьте --streaming
в свою команду.
python -m PrintTemplate.py
--runner DataflowRunner --project [PROJECT]
--staging_location gs://[BUCKET]/staging
--temp_location gs://[BUCKET]/temp
--template_location gs://[BUCKET]/templates/PrintTemplate
--streaming
Я вижу, что вы используете PipelineOptions. Вы также можете пройти streaming=True
.
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project=PROJECT,
job_name='printtemplate01',
temp_location='gs://{}/temp'.format(BUCKET),
region='us-central1',
streaming=True
)