Apache Beam не запускается повторно для сеансовых окон в потоке данных Google

#python #python-3.x #google-cloud-dataflow #apache-beam

#python #python-3.x #google-cloud-поток данных #apache-beam

Вопрос:

Следующий конвейер запускается рано после каждого элемента при локальном запуске с использованием DirectRunner, но при запуске в потоке данных Google Cloud ранних триггеров нет. В потоке данных он запускается только после закрытия окна сеанса.

 ( p
        | 'read'   >> beam.io.ReadFromPubSub(subscription = 'projects/xxx/subscriptions/xxx-sub')
        | 'json'   >> beam.Map(lambda x: json.loads(x.decode('utf-8')))
        | 'kv'     >> beam.Map(lambda x: (x['id'], x['amount']))
        | 'window' >> beam.WindowInto(window.Sessions(15*60), trigger=trigger.Repeatedly(trigger.AfterCount(1)), accumulation_mode=AccumulationMode.ACCUMULATING)
        | 'group'  >> beam.GroupByKey()
        | 'log'    >> beam.Map(lambda x: logging.info(x))
)
 

Я попробовал это с помощью Apache Beam 2.23 и 2.28. Согласно матрице совместимости, окна сеанса и триггеры полностью поддерживаются в потоке данных. Он также запускается рано при использовании фиксированного окна.

Есть какие-нибудь идеи, чего мне не хватает?

Комментарии:

1. Это звучит как ошибка, была ли у вас возможность открыть выпуск JIRA для beam dataflow-runner?

2. Отправлено issues.apache.org/jira/browse/BEAM-11906 для этого, @YichiZhang

3. Я попытался воспроизвести вашу проблему, но, похоже, что ранний триггер работает, и он срабатывает каждый раз, когда поступает новый ввод. Хотя вывод, похоже, не соответствует желаемому режиму накопления. Не могли бы вы подробнее рассказать о том, каков ваш ожидаемый результат и что отображается в вашем задании потока данных?

4. Спасибо за создание заявки. Он запускается локально после каждого ввода и накапливает значения. ИНФОРМАЦИЯ: root:(‘123’, [10020.3]) ИНФОРМАЦИЯ: корень:(‘123’, [10020.3, 10020.3]) ИНФОРМАЦИЯ: корень:(‘123’, [10020.3, 10020.3, 10020.3]) Но в потоке данных он срабатывает только после перерыва в сеансе.

5. После второй попытки с потоком данных вы правы, он срабатывает, но накопление не работает! «(‘123’, [10020.3])» «(‘123’, [10020.3])» «(‘123’, [10020.3])»