Задание вставки большого запроса потока данных мгновенно завершается с большим набором данных

#python #google-bigquery #google-cloud-dataflow #apache-beam

#python #google-bigquery #google-облако-поток данных #apache-beam

Вопрос:

Я разработал конвейер beam / dataflow, используя библиотеку beam python. Конвейер примерно выполняет следующее:

  1. ParDo: сбор данных JSON из API
  2. ParDo: преобразование данных JSON
  3. Ввод-вывод: запись преобразованных данных в таблицу BigQuery

Как правило, код выполняет то, что он должен делать. Однако при сборе большого набора данных из API (около 500 000 файлов JSON) задание вставки bigquery останавливается сразу (= в течение одной секунды) после его запуска без конкретного сообщения об ошибке при использовании DataflowRunner (он работает с DirectRunner, выполняемым на моем компьютере). При использовании меньшего набора данных все работает просто отлично.

Журнал потока данных выглядит следующим образом:

 2019-04-22 (00:41:29) Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the...
Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the bq tool: "bq show -j --project_id=X dataflow_job_14675275193414385105". 
2019-04-22 (00:41:29) Workflow failed. Causes: S01:Create Dummy Element/Read Call API Transform JSON Write to Bigquery /Wr...
Workflow failed. Causes: S01:Create Dummy Element/Read Call API Transform JSON Write to Bigquery /WriteToBigQuery/NativeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 
beamapp-X-04212005-04211305-sf4k-harness-lqjg,
beamapp-X-04212005-04211305-sf4k-harness-lgg2,
beamapp-X-04212005-04211305-sf4k-harness-qn55,
beamapp-X-04212005-04211305-sf4k-harness-hcsn
  

Использование инструмента bq cli, как было предложено, для получения дополнительной информации о задании загрузки BQ, не работает. Задание не может быть найдено (и я сомневаюсь, что оно вообще было создано из-за мгновенного сбоя).

Я полагаю, что я столкнулся с каким-то ограничением квоты / bq или даже с проблемой нехватки памяти (см.: https://beam.apache.org/documentation/io/built-in/google-bigquery /)

Ограничения В настоящее время BigQueryIO имеет следующие ограничения.

Вы не можете чередовать завершение записи BigQuery с другими шагами вашего конвейера.

Если вы используете Beam SDK для Python, у вас могут возникнуть проблемы с квотой размера импорта>, если вы пишете очень большой набор данных. В качестве обходного пути вы можете разделить набор данных (например, с помощью преобразования разделов Beam) и записать в несколько таблиц BigQuery. Beam SDK для Java не имеет этого ограничения, поскольку он разделяет ваш набор данных для вас.

Я был бы признателен за любой намек на то, как сузить основную причину этой проблемы.

Я также хотел бы опробовать раздел Fn, но не нашел ни одного примера исходного кода python, как записать секционированную коллекцию pc в таблицы BigQuery.

Ответ №1:

Одна вещь, которая может помочь в отладке, — это просмотр журналов Stackdriver.

Если вы откроете задание потока данных в консоли Google и нажмете LOGS в правом верхнем углу панели графиков, это должно открыть панель журналов внизу. В правом верхнем углу панели ЖУРНАЛОВ есть ссылка на Stackdriver. Это даст вам много информации о регистрации ваших работников / перемещений / и т.д. Для этого конкретного задания.

В нем много всего, и может быть трудно отфильтровать то, что имеет отношение к делу, но, надеюсь, вы сможете найти что-то более полезное, чем A work item was attempted 4 times without success . Например, каждый рабочий иногда регистрирует, сколько памяти он использует, что можно сравнить с объемом памяти, который есть у каждого рабочего (в зависимости от типа машины), чтобы увидеть, действительно ли у них заканчивается память, или ваша ошибка происходит в другом месте.

Удачи!

Комментарии:

1. пока спасибо. Я взглянул на stackdriver. Возможно, я плохо справился с фильтрами, но даже без каких-либо фильтров, кроме одного, фильтрующего шаги потока данных, последние сообщения журнала имеют временную метку не позднее 00:38. Ошибка произошла в 00:41. Однако, согласно более ранним сообщениям журнала, использование памяти, похоже, не является проблемой.

2. Хорошо, теперь выясняется, что это проблема с памятью. Я подключился к рабочей виртуальной машине и мог буквально видеть, как у нее закончилась память непосредственно перед запуском механизма перезапуска потока данных. Я смог исправить утечку памяти в моем коде python, которая была основной причиной. Однако мне интересно, почему в stackdriver не было сообщений журнала, в которых говорилось бы, что на самом деле произошла ситуация с повторной попыткой из-за ошибки нехватки памяти. Есть ли правильный способ обнаружения ситуаций с повторными попытками в журналах stackdriver?

3. Насколько я знаю, нет. Я также хотел бы видеть сообщения об ошибках ООМ на верхнем уровне из потока данных, поскольку журналы Stackdriver сложно анализировать.

Ответ №2:

Насколько я знаю, нет доступной опции для диагностики ООМ в облачном потоке данных и Python SDK Apache Beam (это возможно с Java SDK). Я рекомендую вам открыть запрос функции в Cloud Dataflow issue tracker, чтобы получить более подробную информацию о проблемах такого рода.

В дополнение к проверке файлов журнала заданий потока данных я рекомендую вам отслеживать ваш конвейер с помощью инструмента мониторинга Stackdriver, который обеспечивает использование ресурсов для каждого задания (как общее время использования памяти).

Что касается использования функции разделения в Python SDK, следующий код (основанный на примере, представленном в документации Apache Beam) разделяет данные на 3 задания загрузки BigQuery:

 def partition_fn(input_data, num_partitions):
      return int(get_percentile(lines) * num_partitions / 100)

    partition = input_data | beam.Partition(partition_fn, 3)

    for x in range(3):
      partition[x] | 'WritePartition %s' % x >> beam.io.WriteToBigQuery(
        table_spec,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)