BatchElements() работает на DirectRunner, но не в DataflowRunner, когда перед этим добавляется перестановка()

# #google-cloud-dataflow #apache-beam

Вопрос:

В Apache beam BatchElements() элементы группируются в пакеты заданного размера 100 при работе с DirectRunner. Но тот же код на DataflowRunner группирует элементы в пакеты по 1. Я уже Reshuffle() делал это раньше BatchElements() .

Без Reshuffle() этого он работает, как и ожидалось, на обоих бегунах.

                 p
                | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=subscription)
                | "Parse as List" >> Map(lambda element: json.loads(element))
                | "Flatten" >> FlatMap(lambda elements: elements)
                | Reshuffle()
                | "Typed Element" >> ParDo(TypedElement())
                | "Batch Typed Element"
                >> BatchElements(
                    min_batch_size=50, max_batch_size=50, target_batch_duration_secs=None
                ) 

Кто-нибудь сталкивался с подобной проблемой? Любые предложения будут очень признательны! Я бьюсь об это головой уже несколько дней.

Комментарии:

1. Перестановка использует ключ перестановки и помечена как экспериментальная. BatchElements-это «Преобразование, которое пакует элементы для амортизированной обработки». Какую логику вы пытаетесь здесь применить? Если вы хотите явно контролировать некоторую логику для каждых 100 элементов на ключ в окне, вам, вероятно, следует использовать обработку с отслеживанием состояния: beam.apache.org/blog/stateful-processing

2. В то время как перестановки отмечены как экспериментальные. Поток данных явно поддерживает это и является рекомендуемым способом предотвращения слияния. Без перестановок поток данных объединяет все этапы, и работники не масштабируются. BatchElements() используется, потому что элементы не имеют ключей. Нам нужно пакетировать его для асинхронной выборки HTML, чтобы увеличить пропускную способность. Ожидаемое поведение заключается в том, что пакет элементов (~50) распределяется параллельно DoFn между 100 работниками.

3. Я не уверен, что BatchElements является правильным подходом здесь, так как у вас нет контроля над пакетом. Вот пример элементов дозирования: cloud.google.com/blog/products/data-analytics/. … Ищите Batching through state and timers , и кажется, что это, естественно, требует перетасовки.

4. Спасибо за ресурс! Мы решили использовать перестановку после объединения элементов. У нас там хорошие партии и хороший параллелизм.