#python-3.x #google-cloud-dataflow #apache-beam
# #python-3.x #google-облако-поток данных #apache-beam
Вопрос:
У меня есть конвейер Beam, который считывает данные из раздела pubsub, выполняет некоторые небольшие преобразования, а затем записывает события в некоторые таблицы BigQuery.
Преобразования легки в обработке, возможно, удаляют поле или что-то еще, но, как вы можете видеть на изображении ниже, время на стенке очень велико для некоторых шагов. Что на самом деле может вызвать это?
Каждый элемент на самом деле является кортежем формы ((str, str, str), {**dict with data})
. С помощью этого ключа мы фактически пытаемся выполнить наивную дедупликацию, используя последнее событие по этому ключу. В основном все, что я добавляю после этого Get latest element per key
, происходит медленно, и пометка также происходит медленно, даже если она просто добавляет тег к элементу.
Ответ №1:
Я полагаю, вы имеете в виду, сколько элементов он обрабатывает в секунду?
Здесь происходят две вещи. Во-первых, я предполагаю, что Get latest element per key
он содержит своего рода groupByKey. Это включает в себя глобальную перетасовку, когда все элементы отправляются по сети другим элементам, чтобы гарантировать, что все элементы с заданным ключом сгруппированы на одном и том же рабочем элементе. Этот ввод-вывод может быть дорогостоящим, по крайней мере, с точки зрения времени работы.
Во-вторых, шаги, которые не требуют взаимодействия между рабочими, «объединяются», что связывает их пропускную способность. Например, если за одним DoFnA
из них следует DoFnB
followed by DoFnC
, обработка продолжается путем прохождения первого элемента DoFnA
, затем передачи этих выходных данных DoFnB
и впоследствии DoFnC
, прежде чем передать второй элемент DoFnA
. Это означает, что если один из Fns (или для чтения или записи) имеет ограниченную пропускную способность, все они будут.
Комментарии:
1. Спасибо за объяснение. Да, эта группировка выполняется
combiners.Latest.PerKey()
и, по моим наблюдениям, работает вполне нормально с точки зрения количества элементов в секунду. Основное узкое место — это, по сути, все, что происходит после этого шага. Теперь это та самая перестановка, до того, как это было помечено. У меня только один вопрос: — Как я узнаю, что некоторые шаги объединены вместе? Не знаю, поможет ли это, ошибка здесь — это идентификатор задания2020-12-09_08_32_30-16230074399558339654
🙂2. На самом деле я не уверен, что есть место, куда можно пойти, чтобы посмотреть, какие этапы слились, но обычно это все между двумя GBK. Обычно это не то, о чем вам нужно беспокоиться.
3. Обновление: если вы нажмете на шаг в пользовательском интерфейсе потока данных, в нижней части правой панели StepInfo появится раздел под названием «Оптимизированные этапы». Шаги, которые объединены вместе, будут находиться на одной и той же оптимизированной стадии (например, F__). Обратите внимание, что некоторые шаги могут охватывать оптимизированные этапы.
4. Понял. А как насчет времени на Стене? Должен ли я беспокоиться об этом? Я имею в виду, что задание выполняется в течение 48 часов, и у него есть какое-то сумасшедшее время, например, этап перетасовки длится 50 дней, а значение тега — 13 дней… Какие-то сумасшедшие цифры, и я не уверен, стоит ли мне беспокоиться об этом 🙂
5. Настенное время может быть полезным сигналом, но особенно, если речь идет о вводе-выводе, я бы не стал беспокоиться об этом. (Потоковые конвейеры очень многопоточны, и это сумма времени, затрачиваемого на каждый поток.)