#python #google-bigquery #google-cloud-dataflow #apache-beam
#python #google-bigquery #google-облако-поток данных #apache-beam
Вопрос:
Я разработал конвейер beam / dataflow, используя библиотеку beam python. Конвейер примерно выполняет следующее:
- ParDo: сбор данных JSON из API
- ParDo: преобразование данных JSON
- Ввод-вывод: запись преобразованных данных в таблицу BigQuery
Как правило, код выполняет то, что он должен делать. Однако при сборе большого набора данных из API (около 500 000 файлов JSON) задание вставки bigquery останавливается сразу (= в течение одной секунды) после его запуска без конкретного сообщения об ошибке при использовании DataflowRunner (он работает с DirectRunner, выполняемым на моем компьютере). При использовании меньшего набора данных все работает просто отлично.
Журнал потока данных выглядит следующим образом:
2019-04-22 (00:41:29) Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the...
Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the bq tool: "bq show -j --project_id=X dataflow_job_14675275193414385105".
2019-04-22 (00:41:29) Workflow failed. Causes: S01:Create Dummy Element/Read Call API Transform JSON Write to Bigquery /Wr...
Workflow failed. Causes: S01:Create Dummy Element/Read Call API Transform JSON Write to Bigquery /WriteToBigQuery/NativeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:
beamapp-X-04212005-04211305-sf4k-harness-lqjg,
beamapp-X-04212005-04211305-sf4k-harness-lgg2,
beamapp-X-04212005-04211305-sf4k-harness-qn55,
beamapp-X-04212005-04211305-sf4k-harness-hcsn
Использование инструмента bq cli, как было предложено, для получения дополнительной информации о задании загрузки BQ, не работает. Задание не может быть найдено (и я сомневаюсь, что оно вообще было создано из-за мгновенного сбоя).
Я полагаю, что я столкнулся с каким-то ограничением квоты / bq или даже с проблемой нехватки памяти (см.: https://beam.apache.org/documentation/io/built-in/google-bigquery /)
Ограничения В настоящее время BigQueryIO имеет следующие ограничения.
Вы не можете чередовать завершение записи BigQuery с другими шагами вашего конвейера.
Если вы используете Beam SDK для Python, у вас могут возникнуть проблемы с квотой размера импорта>, если вы пишете очень большой набор данных. В качестве обходного пути вы можете разделить набор данных (например, с помощью преобразования разделов Beam) и записать в несколько таблиц BigQuery. Beam SDK для Java не имеет этого ограничения, поскольку он разделяет ваш набор данных для вас.
Я был бы признателен за любой намек на то, как сузить основную причину этой проблемы.
Я также хотел бы опробовать раздел Fn, но не нашел ни одного примера исходного кода python, как записать секционированную коллекцию pc в таблицы BigQuery.
Ответ №1:
Одна вещь, которая может помочь в отладке, — это просмотр журналов Stackdriver.
Если вы откроете задание потока данных в консоли Google и нажмете LOGS
в правом верхнем углу панели графиков, это должно открыть панель журналов внизу. В правом верхнем углу панели ЖУРНАЛОВ есть ссылка на Stackdriver. Это даст вам много информации о регистрации ваших работников / перемещений / и т.д. Для этого конкретного задания.
В нем много всего, и может быть трудно отфильтровать то, что имеет отношение к делу, но, надеюсь, вы сможете найти что-то более полезное, чем A work item was attempted 4 times without success
. Например, каждый рабочий иногда регистрирует, сколько памяти он использует, что можно сравнить с объемом памяти, который есть у каждого рабочего (в зависимости от типа машины), чтобы увидеть, действительно ли у них заканчивается память, или ваша ошибка происходит в другом месте.
Удачи!
Комментарии:
1. пока спасибо. Я взглянул на stackdriver. Возможно, я плохо справился с фильтрами, но даже без каких-либо фильтров, кроме одного, фильтрующего шаги потока данных, последние сообщения журнала имеют временную метку не позднее 00:38. Ошибка произошла в 00:41. Однако, согласно более ранним сообщениям журнала, использование памяти, похоже, не является проблемой.
2. Хорошо, теперь выясняется, что это проблема с памятью. Я подключился к рабочей виртуальной машине и мог буквально видеть, как у нее закончилась память непосредственно перед запуском механизма перезапуска потока данных. Я смог исправить утечку памяти в моем коде python, которая была основной причиной. Однако мне интересно, почему в stackdriver не было сообщений журнала, в которых говорилось бы, что на самом деле произошла ситуация с повторной попыткой из-за ошибки нехватки памяти. Есть ли правильный способ обнаружения ситуаций с повторными попытками в журналах stackdriver?
3. Насколько я знаю, нет. Я также хотел бы видеть сообщения об ошибках ООМ на верхнем уровне из потока данных, поскольку журналы Stackdriver сложно анализировать.
Ответ №2:
Насколько я знаю, нет доступной опции для диагностики ООМ в облачном потоке данных и Python SDK Apache Beam (это возможно с Java SDK). Я рекомендую вам открыть запрос функции в Cloud Dataflow issue tracker, чтобы получить более подробную информацию о проблемах такого рода.
В дополнение к проверке файлов журнала заданий потока данных я рекомендую вам отслеживать ваш конвейер с помощью инструмента мониторинга Stackdriver, который обеспечивает использование ресурсов для каждого задания (как общее время использования памяти).
Что касается использования функции разделения в Python SDK, следующий код (основанный на примере, представленном в документации Apache Beam) разделяет данные на 3 задания загрузки BigQuery:
def partition_fn(input_data, num_partitions):
return int(get_percentile(lines) * num_partitions / 100)
partition = input_data | beam.Partition(partition_fn, 3)
for x in range(3):
partition[x] | 'WritePartition %s' % x >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)