Запуск задания потока данных при получении уведомления PubSub

# #google-cloud-platform #google-cloud-dataflow #apache-beam #google-cloud-pubsub

Вопрос:

Я написал конвейер потока данных с Apache Beam, чтобы дать вам базовое представление о коде:

 Job= (
    p
    |"cretae">>beam.Create(["message"])
    |"job 1" >> beam.ParDo(dofn1())
    |"job 2" >> beam.ParDo(dofn2())
    |"job 3" >> beam.ParDo(dofn3())
    )
 

В настоящее время я запускаю конвейер потока данных, создавая случайное сообщение, содержание сообщения не имеет значения, так как оно используется только для запуска конвейера. Просто хочу знать, есть ли способ запустить этот конвейер при получении уведомления PubSub. Возможно, с помощью API Apache Beam Pubsub? Может ли кто-нибудь привести такой пример? Спасибо

Комментарии:

1. Вы ознакомились с шаблоном потока данных? cloud.google.com/dataflow/docs/concepts/dataflow-templates

2. Спасибо за ваше предложение, Гийом, я его проверил. Хотя шаблон потока данных способен обрабатывать потоковое задание. Использование beam.io.readfrompubsub лучше подходит для моего варианта использования

Ответ №1:

Вы правы. Вы можете настроить конвейер потока данных для чтения из паба/подраздела GCP. Вы можете прочитать непосредственно из этой темы, но я рекомендую создать подписку и подключить конвейер потока данных к подписке (почему? это предотвратит потерю сообщений, если вы когда-нибудь захотите перезапустить свой конвейер и не пропустите ни одного сообщения, поступающего в тему между его остановкой и повторным запуском).

Вот как вы это делаете, предполагая, что вы уже настроили раздел/подраздел GPC pub и подписку. Вам нужно будет запомнить путь к подписке.

 import apache_beam as beam
import logging

logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

with beam.Pipeline(options=pipeline_options) as pipeline:
    (pipeline
    | "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=input_topic_subscription_path)
    | "Window into fixed intervals" >> beam.WindowInto(beam.FixedWindows(5))
    | "Log the messages" >> beam.Map(lambda message: logging.info(message))
    )
 

Приведенный выше код будет считывать сообщения из паба/подраздела каждые 5 секунд, а затем регистрировать сообщение.