Испытывает медленную потоковую запись в BigQuery из конвейера потока данных?

#python #google-cloud-dataflow #apache-beam

#python #google-облако-поток данных #apache-beam

Вопрос:

Я испытываю неожиданные проблемы с производительностью при записи в BigQuery с потоковыми вставками и Python SDK 2.23.

Без шага записи конвейер выполняется на одном работнике с ~ 20-30% ЦП. Добавление шага BigQuery конвейер масштабируется до 6 рабочих, все на 70-90% процессора.

Я довольно новичок в потоке данных и Beam, и, вероятно, такое поведение является нормальным, или я делаю что-то неправильно, но мне кажется, что использование 6 машин для записи 250 строк в секунду в BigQuery немного тяжеловато. Мне интересно, как вообще возможно достичь квоты вставки в 100 тыс. строк в секунду.

Мой конвейер выглядит так:

 p
    | "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=options.pubsub_subscription) # ~40/s
    | "Split messages" >> beam.FlatMap(split_messages) # ~ 400/s
    | "Prepare message for BigQuery" >> beam.Map(prepare_row)
    | "Filter known message types" >> beam.Filter(filter_message_types) # ~ 250/s
    | "Write to BigQuery" >> beam.io.WriteToBigQuery(
          table=options.table_spec_position,
          schema=table_schema,
          write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
          create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
          additional_bq_parameters=additional_bq_parameters,
      )
  

Конвейер работает с этим параметром, хотя я испытывал подобное поведение без использования механизма потоковой передачи.

 --enable_streaming_engine 
--autoscaling_algorithm=THROUGHPUT_BASED 
--max_num_workers=15 
--machine_type=n1-standard-2 
--disk_size_gb=30 
  

Скриншот показателей:
введите описание изображения здесь

Мой вопрос в том, является ли такое поведение нормальным или я могу что-нибудь сделать, чтобы уменьшить количество необходимых работников для этого конвейера. Спасибо!

Обновление: Вот изображение последнего шага графика потока данных с временными интервалами. (взято после того, как задание выполнялось в течение 1 часа). Все остальные предыдущие шаги имеют очень низкое время выполнения — всего несколько секунд.

введите описание изображения здесь

Комментарии:

1. Вы пытались использовать специальное решение для вашего сценария, которое является шаблоном потока данных: cloud.google.com/dataflow/docs/guides/templates/… Дайте мне знать, если вы пробовали это.

2. Спасибо за ваш ответ! Я видел шаблоны, но мы не можем их использовать, потому что наше входящее сообщение PubSub содержит несколько сообщений, которые необходимо извлечь (фактически разделить на ‘ n’). Также мы хотим создать и понять пользовательский конвейер, чтобы лучше понять Beam и поток данных.

3. Это на удивление медленно. Можете ли вы посмотреть на свои счетчики мсек и посмотреть, на каких этапах конвейер тратит свое время?

4. Спасибо за ваш ответ! Я не уверен, что вы имеете в виду под счетчиками мсек. Я обновил вопрос и добавил изображение последнего шага с соответствующими временными интервалами, надеясь, что это то, о чем вы просили. Похоже, что окончательная запись вызывает задержку. Но я не уверен, как продолжить расследование отсюда.

Ответ №1:

После небольшой отладки я обнаружил, что были некоторые недопустимые сообщения, которые не могли быть записаны в BigQuery (и не регистрировали ошибку). Итак, для всех, кто сталкивается с подобной проблемой:

После изменения insert_retry_strategy of beam.io.WriteToBigQuery на RETRY_NEVER и распечатки сборника deadletter PCollection я исправил сообщения с неправильным форматированием, и производительность улучшилась. Я предполагаю, что некоторые недопустимые сообщения застряли из-за стратегии по умолчанию RETRY_ALWAYS .

Комментарии:

1. Вы также пробовали RETRY_ON_TRANSIENT_ERROR? Он должен игнорировать сбои с недопустимыми сообщениями, но все равно повторять попытку при возникновении другой ошибки.

2. Нет, пока нет, но похоже, что это стратегия повторных попыток, которую я действительно хочу использовать. Спасибо, что указали на это!

3. Рад помочь. Возможно, вы захотите записать в журнал сбойные строки, вот введение о том, как это сделать. beam.apache.org/documentation/patterns/bigqueryio /…