Проблема с синтаксическим анализом и записью в BigQuery огромного файла json с потоком данных Google Cloud

#python #json #google-cloud-dataflow #dataflow

#python #json #google-cloud-поток данных #поток данных

Вопрос:

Я пытаюсь создать поток данных (пакетный), который считывает файл в час из облачного хранилища Google, анализирует его и записывает запись в таблицу BigQuery. Файл представляет собой .json, в котором каждая строка содержит сложный json.

Я создал простой конвейер:

 (p 
| "Read file" >> beam.io.ReadFromText(cusom_options.file_name)
| "Parse line json" >> beam.Map(parse)
| "Write in BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        table=cusom_options.table))
  

Функция синтаксического анализа выглядит следующим образом:

 def parse(input_elem):
   log = json.loads(input_elem)
   result = {}

   ... # Get some fields from input and add to "result"

   return result
  

Конвейер отлично работает с размером файла 100 Мб и 70 Тыс. строк (примерно 5 минут на задание). Но когда файл увеличивается, поток данных занимает больше времени (15 минут, 200-300 Мб) или не завершается и завершается ошибкой (более 1,5 Гб и 350 Тыс. строк).

Я провел некоторый тест, когда я создал образец json в функции parse, но не использовал input_elem, поток данных работает нормально и создает строку для каждой записи за 7-8 минут.

Я не знаю, в чем проблема с конвейером, у кого-нибудь есть похожая проблема?

Подробная информация

  • Я использую beam.io.gcp.bigquery.WriteToBigQuery для автоматического определения схемы из таблицы BQ, но если я использую beam.io.WriteToBigQuery, возникает та же проблема.
  • Имя таблицы предварительно вычисляется вне потока данных и является входным
  • Параметры, используемые в потоке данных —experiment=use_beam_bq_sink, —subnetwork=MY_SUBNET, —region=MY_REGION

Комментарии:

1. Откуда вы знаете, что он не завершается (или недостаточно долго ждал)?

2. Поток данных занимает несколько часов и возвращает сбой из-за таймаута в конце: сервер не отвечает (ошибка ping: превышен крайний срок, {«создано»: «@1597627419.455096108», «описание»:»Крайний срок Exceeded»,»file»:»third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc»,»file_line»:69,»grpc_status»:4}). Обычно с этой проблемой можно справиться самостоятельно, пожалуйста, прочитайте: cloud.google.com/dataflow/docs/guides /…

3. В документации говорится о добавлении дополнительных ресурсов, но поток данных застревает на подэтапе «DestinationFileUnion» WriteToBigQuery и не использует никаких ресурсов

4. Вероятно, для этого вам нужно больше памяти. Поскольку вы не используете groupby или shuffle, весь ваш код будет выполняться внутри одного экземпляра с несколькими контейнерами. Попробуйте запустить задание с параметром nouse_multiple_containers.

5. Привет! спасибо, я искал опцию nouse_multiple_containers, но я не нашел в документации. cloud.google.com/dataflow/docs/guides /…

Ответ №1:

Мы, наконец, решили проблему. Параллельно с работой с потоком данных в приложении были созданы некоторые сети VPC, и правила брандмауэра были неправильно настроены.

Этот случай похож на тот, который описан в документации (сеть VPC, используемая для вашей работы, может отсутствовать). Правила существовали, но не были правильно настроены

Спасибо!