#google-cloud-dataflow
#google-облако-поток данных
Вопрос:
У нас есть шаблон потока данных GCP из PubSub в текстовый файл на GCS.
Перед записью в GCS я хотел бы обработать каждое сообщение из pubsub перед записью в GCS.
Я наткнулся на этот код на GitHub , но я не уверен, где мне обновить код и есть ли способ распечатать каждое сообщение из PubSub?
Комментарии:
1. Какой язык вы используете для построения своего конвейера?
Ответ №1:
Код из Github представляет собой конвейер потока данных. Вы можете запустить конвейер как задание без шаблона, обычно для проверки того, что конвейер работает.
Если вы хотите запустить его как шаблон, вам нужно будет создать и установить шаблон
Для создания шаблона вам нужно будет его скомпилировать, рекомендуется использовать maven для его компиляции.
mvn compile exec:java
-Dexec.mainClass=com.example.myclass
-Dexec.args="--runner=DataflowRunner
--project=YOUR_PROJECT_ID
--stagingLocation=gs://YOUR_BUCKET_NAME/staging
--templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
Ответ №2:
Вы можете изменить код, применив DoFn сразу после того, как PubSub прочитает этот код, например
pipeline
.apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply(ParDo.of([YourMessageProcessingDoFnHere]))
.apply(
options.getWindowDuration() " Window",