#google-cloud-platform #google-cloud-dataflow #apache-beam #dataflow
#google-облачная платформа #google-облако-поток данных #apache-beam #поток данных #google-cloud-platform #google-cloud-поток данных
Вопрос:
У меня есть конвейер, который состоит из трех этапов. На первом шаге, который является ParDo, который принимает 5 URL-адресов в PCollection. И каждый из 5 элементов генерирует тысячи URL-адресов каждый и выводит его. Таким образом, ввод второго шага — это еще одна коллекция PC, которая может иметь размер 100-400k. На последнем шаге очищенный вывод каждого URL-адреса сохраняется в службе хранения.
Я заметил, что на первом шаге, который генерирует список URL-адресов из 5 входных URL-адресов, выделено 5 рабочих и генерируется новый набор URL-адресов. Но как только первый шаг завершен, ни один работник не сокращается и не достигает 1. И пока выполняется второй шаг, он выполняется только на 1 работнике (с 1 работником мой поток данных выполняется в течение последних 2 дней, поэтому, просматривая журналы, я делаю логическое предположение, что первый шаг завершен).
Итак, мой вопрос в том, что, несмотря на большой размер PCollection, почему он не распределяется между работниками или почему не выделяется больше работников? Шаг 2 — это простой веб-скребок, который очищает заданный URL-адрес и выводит строку. Который затем сохраняется в службе хранения
Ответ №1:
Поток данных пытается соединить шаги вместе, чтобы создать объединенные шаги. Таким образом, даже если у вас ParDo
в конвейере несколько s, они будут объединены и будут выполнены как один шаг.
Кроме того, после объединения масштабирование потока данных ограничено шагом в начале объединенного шага.
Я подозреваю, что у вас есть Create
преобразование, состоящее из нескольких элементов в верхней части вашего конвейера. В этом случае поток данных может масштабироваться только до количества элементов в этом Create
преобразовании.
Одним из способов предотвращения такого поведения является объединение прерываний после одного (или нескольких) ваших ParDo
преобразований с высокой разветвленностью. Это можно сделать, добавив после него преобразование Reshuffle.viaRandomKey() (которое содержит a GroupByKey
). Учитывая, что Reshuffle
это преобразование идентификатора, ваш конвейер не должен требовать дополнительных изменений.
Смотрите Здесь для получения дополнительной информации о слиянии и способах его предотвращения.