Отсутствие элементов в PCollection не влияет на количество выделенных рабочих мест

#google-cloud-platform #google-cloud-dataflow #apache-beam #dataflow

#google-облачная платформа #google-облако-поток данных #apache-beam #поток данных #google-cloud-platform #google-cloud-поток данных

Вопрос:

У меня есть конвейер, который состоит из трех этапов. На первом шаге, который является ParDo, который принимает 5 URL-адресов в PCollection. И каждый из 5 элементов генерирует тысячи URL-адресов каждый и выводит его. Таким образом, ввод второго шага — это еще одна коллекция PC, которая может иметь размер 100-400k. На последнем шаге очищенный вывод каждого URL-адреса сохраняется в службе хранения.

Я заметил, что на первом шаге, который генерирует список URL-адресов из 5 входных URL-адресов, выделено 5 рабочих и генерируется новый набор URL-адресов. Но как только первый шаг завершен, ни один работник не сокращается и не достигает 1. И пока выполняется второй шаг, он выполняется только на 1 работнике (с 1 работником мой поток данных выполняется в течение последних 2 дней, поэтому, просматривая журналы, я делаю логическое предположение, что первый шаг завершен).

Итак, мой вопрос в том, что, несмотря на большой размер PCollection, почему он не распределяется между работниками или почему не выделяется больше работников? Шаг 2 — это простой веб-скребок, который очищает заданный URL-адрес и выводит строку. Который затем сохраняется в службе хранения

Ответ №1:

Поток данных пытается соединить шаги вместе, чтобы создать объединенные шаги. Таким образом, даже если у вас ParDo в конвейере несколько s, они будут объединены и будут выполнены как один шаг.

Кроме того, после объединения масштабирование потока данных ограничено шагом в начале объединенного шага.

Я подозреваю, что у вас есть Create преобразование, состоящее из нескольких элементов в верхней части вашего конвейера. В этом случае поток данных может масштабироваться только до количества элементов в этом Create преобразовании.

Одним из способов предотвращения такого поведения является объединение прерываний после одного (или нескольких) ваших ParDo преобразований с высокой разветвленностью. Это можно сделать, добавив после него преобразование Reshuffle.viaRandomKey() (которое содержит a GroupByKey ). Учитывая, что Reshuffle это преобразование идентификатора, ваш конвейер не должен требовать дополнительных изменений.

Смотрите Здесь для получения дополнительной информации о слиянии и способах его предотвращения.