#python #google-cloud-dataflow #apache-beam
#python #google-облако-поток данных #apache-beam
Вопрос:
Я испытываю неожиданные проблемы с производительностью при записи в BigQuery с потоковыми вставками и Python SDK 2.23.
Без шага записи конвейер выполняется на одном работнике с ~ 20-30% ЦП. Добавление шага BigQuery конвейер масштабируется до 6 рабочих, все на 70-90% процессора.
Я довольно новичок в потоке данных и Beam, и, вероятно, такое поведение является нормальным, или я делаю что-то неправильно, но мне кажется, что использование 6 машин для записи 250 строк в секунду в BigQuery немного тяжеловато. Мне интересно, как вообще возможно достичь квоты вставки в 100 тыс. строк в секунду.
Мой конвейер выглядит так:
p
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=options.pubsub_subscription) # ~40/s
| "Split messages" >> beam.FlatMap(split_messages) # ~ 400/s
| "Prepare message for BigQuery" >> beam.Map(prepare_row)
| "Filter known message types" >> beam.Filter(filter_message_types) # ~ 250/s
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
table=options.table_spec_position,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
additional_bq_parameters=additional_bq_parameters,
)
Конвейер работает с этим параметром, хотя я испытывал подобное поведение без использования механизма потоковой передачи.
--enable_streaming_engine
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=15
--machine_type=n1-standard-2
--disk_size_gb=30
Мой вопрос в том, является ли такое поведение нормальным или я могу что-нибудь сделать, чтобы уменьшить количество необходимых работников для этого конвейера. Спасибо!
Обновление: Вот изображение последнего шага графика потока данных с временными интервалами. (взято после того, как задание выполнялось в течение 1 часа). Все остальные предыдущие шаги имеют очень низкое время выполнения — всего несколько секунд.
Комментарии:
1. Вы пытались использовать специальное решение для вашего сценария, которое является шаблоном потока данных: cloud.google.com/dataflow/docs/guides/templates/… Дайте мне знать, если вы пробовали это.
2. Спасибо за ваш ответ! Я видел шаблоны, но мы не можем их использовать, потому что наше входящее сообщение PubSub содержит несколько сообщений, которые необходимо извлечь (фактически разделить на ‘ n’). Также мы хотим создать и понять пользовательский конвейер, чтобы лучше понять Beam и поток данных.
3. Это на удивление медленно. Можете ли вы посмотреть на свои счетчики мсек и посмотреть, на каких этапах конвейер тратит свое время?
4. Спасибо за ваш ответ! Я не уверен, что вы имеете в виду под счетчиками мсек. Я обновил вопрос и добавил изображение последнего шага с соответствующими временными интервалами, надеясь, что это то, о чем вы просили. Похоже, что окончательная запись вызывает задержку. Но я не уверен, как продолжить расследование отсюда.
Ответ №1:
После небольшой отладки я обнаружил, что были некоторые недопустимые сообщения, которые не могли быть записаны в BigQuery (и не регистрировали ошибку). Итак, для всех, кто сталкивается с подобной проблемой:
После изменения insert_retry_strategy
of beam.io.WriteToBigQuery
на RETRY_NEVER
и распечатки сборника deadletter PCollection я исправил сообщения с неправильным форматированием, и производительность улучшилась. Я предполагаю, что некоторые недопустимые сообщения застряли из-за стратегии по умолчанию RETRY_ALWAYS
.
Комментарии:
1. Вы также пробовали RETRY_ON_TRANSIENT_ERROR? Он должен игнорировать сбои с недопустимыми сообщениями, но все равно повторять попытку при возникновении другой ошибки.
2. Нет, пока нет, но похоже, что это стратегия повторных попыток, которую я действительно хочу использовать. Спасибо, что указали на это!
3. Рад помочь. Возможно, вы захотите записать в журнал сбойные строки, вот введение о том, как это сделать. beam.apache.org/documentation/patterns/bigqueryio /…