Конвейер потока данных Apache Beam — простой DoFn с большим временем ожидания

#python #google-cloud-dataflow #apache-beam

#python #google-облако-поток данных #apache-beam

Вопрос:

Я просматривал график заданий моего недавно развернутого конвейера и заметил, что, казалось бы, очень простой DoFn, похоже, требует огромного времени обработки по сравнению с другими задачами.

Конвейер принимает сообщения о данных сеанса потоковой передачи, которые имеют идентификаторы как project_id , так и a user_id , и генерирует ежечасные сводки, которые агрегируются на уровне проекта и сохраняются в BigQuery. Конвейер обрабатывает ~ 1 тыс. сообщений JSON в секунду и работает на 10 n2-standard машинах. Краткое описание шагов:

  1. Читать из PubSub
  2. Окно — 5-дневные фиксированные окна с повторяющимся 3-часовым запуском и AccumulationMode.ACCUMULATING .
  3. Агрегирование по пользователю куча других полей (хотя мы в конечном итоге выполняем свертки на уровне проекта, этот шаг должен выполняться первым, чтобы избежать перерасчета определенных полей)
  4. Простой DoFn, который отображает из старого (keys), value в новый (keys), value (удаляет user_id из списка ключей, так что теперь мы агрегируем на уровне проекта)
 class CollapseKeys(beam.DoFn):
    """
    Re-map data to the hour/org/project/release/env level (remove distinct_id key)
    """
    def process(self, elem, window=beam.DoFn.WindowParam):
        (timestamp, org_id, project_id, release, env, sdk, distinct_id), vals = elem
        yield (timestamp, org_id, project_id, release, env, sdk), vals
  
  1. Вычисление почасовых накоплений на уровне проекта
  2. Сохранить в BQ

Как вы можете видеть из рисунка, время обработки (время, затраченное на step) кажется чрезвычайно высоким для DoFn, который просто отображает один набор ключей на другой. Я совершаю здесь какой-то мерзкий грех луча / потока данных? Это просто артефакт пользовательского интерфейса потока данных? Я новичок как в Beam, так и в Dataflow и был бы признателен за любой совет.

График заданий (простой DoFn - это этап перехода к проекту

Комментарии:

1. Привет, я думаю, это может быть нормально. Вероятно, в операции «Свернуть в проект» есть операция «группировать по», которая запускает перетасовку между машинами. Это означает, что сетевой трафик является дорогостоящим по времени, и это может увеличить время обработки шага.

2. привет @jszule спасибо за ответ! Я должен был быть более ясен в вопросе, но в операции свертывания в проект не выполняется операция «группировать по». «Свернуть в проект» — это просто запуск DoFn, который я изложил в своем вопросе: | "Collapse to project" >> beam.ParDo(CollapseKeys())

3. @user3637516 Я вижу, что вы установили количество рабочих на 10. Однако, похоже, что вашему процессу CollapseKeys() может потребоваться более 10 рабочих. Рассматривали ли вы возможность использования автозапуска с установкой максимального количества рабочих вместо использования 10 рабочих на протяжении всего процесса?

4. Если это буквально ваш DoFn, это удивительно. (Одно место, где это может возникнуть, — это если ваш DoFn выполняет итерацию по (возможно, большому) набору значений после groupByKey, что вызывает фактические чтения GBK, но вы, похоже, даже не делаете этого здесь …)

Ответ №1:

Вы можете использовать параметры на https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L890 чтобы получить профиль и выяснить, на что тратится ваше время в конкретных DOFN.