#python #google-cloud-dataflow #apache-beam
#python #google-облако-поток данных #apache-beam
Вопрос:
Я просматривал график заданий моего недавно развернутого конвейера и заметил, что, казалось бы, очень простой DoFn, похоже, требует огромного времени обработки по сравнению с другими задачами.
Конвейер принимает сообщения о данных сеанса потоковой передачи, которые имеют идентификаторы как project_id
, так и a user_id
, и генерирует ежечасные сводки, которые агрегируются на уровне проекта и сохраняются в BigQuery. Конвейер обрабатывает ~ 1 тыс. сообщений JSON в секунду и работает на 10 n2-standard
машинах. Краткое описание шагов:
- Читать из PubSub
- Окно — 5-дневные фиксированные окна с повторяющимся 3-часовым запуском и
AccumulationMode.ACCUMULATING
. - Агрегирование по пользователю куча других полей (хотя мы в конечном итоге выполняем свертки на уровне проекта, этот шаг должен выполняться первым, чтобы избежать перерасчета определенных полей)
- Простой DoFn, который отображает из старого
(keys), value
в новый(keys), value
(удаляет user_id из списка ключей, так что теперь мы агрегируем на уровне проекта)
class CollapseKeys(beam.DoFn):
"""
Re-map data to the hour/org/project/release/env level (remove distinct_id key)
"""
def process(self, elem, window=beam.DoFn.WindowParam):
(timestamp, org_id, project_id, release, env, sdk, distinct_id), vals = elem
yield (timestamp, org_id, project_id, release, env, sdk), vals
- Вычисление почасовых накоплений на уровне проекта
- Сохранить в BQ
Как вы можете видеть из рисунка, время обработки (время, затраченное на step) кажется чрезвычайно высоким для DoFn, который просто отображает один набор ключей на другой. Я совершаю здесь какой-то мерзкий грех луча / потока данных? Это просто артефакт пользовательского интерфейса потока данных? Я новичок как в Beam, так и в Dataflow и был бы признателен за любой совет.
Комментарии:
1. Привет, я думаю, это может быть нормально. Вероятно, в операции «Свернуть в проект» есть операция «группировать по», которая запускает перетасовку между машинами. Это означает, что сетевой трафик является дорогостоящим по времени, и это может увеличить время обработки шага.
2. привет @jszule спасибо за ответ! Я должен был быть более ясен в вопросе, но в операции свертывания в проект не выполняется операция «группировать по». «Свернуть в проект» — это просто запуск DoFn, который я изложил в своем вопросе:
| "Collapse to project" >> beam.ParDo(CollapseKeys())
3. @user3637516 Я вижу, что вы установили количество рабочих на 10. Однако, похоже, что вашему процессу CollapseKeys() может потребоваться более 10 рабочих. Рассматривали ли вы возможность использования автозапуска с установкой максимального количества рабочих вместо использования 10 рабочих на протяжении всего процесса?
4. Если это буквально ваш DoFn, это удивительно. (Одно место, где это может возникнуть, — это если ваш DoFn выполняет итерацию по (возможно, большому) набору значений после groupByKey, что вызывает фактические чтения GBK, но вы, похоже, даже не делаете этого здесь …)
Ответ №1:
Вы можете использовать параметры на https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L890 чтобы получить профиль и выяснить, на что тратится ваше время в конкретных DOFN.