#python #json #google-cloud-dataflow #dataflow
#python #json #google-cloud-поток данных #поток данных
Вопрос:
Я пытаюсь создать поток данных (пакетный), который считывает файл в час из облачного хранилища Google, анализирует его и записывает запись в таблицу BigQuery. Файл представляет собой .json, в котором каждая строка содержит сложный json.
Я создал простой конвейер:
(p
| "Read file" >> beam.io.ReadFromText(cusom_options.file_name)
| "Parse line json" >> beam.Map(parse)
| "Write in BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=cusom_options.table))
Функция синтаксического анализа выглядит следующим образом:
def parse(input_elem):
log = json.loads(input_elem)
result = {}
... # Get some fields from input and add to "result"
return result
Конвейер отлично работает с размером файла 100 Мб и 70 Тыс. строк (примерно 5 минут на задание). Но когда файл увеличивается, поток данных занимает больше времени (15 минут, 200-300 Мб) или не завершается и завершается ошибкой (более 1,5 Гб и 350 Тыс. строк).
Я провел некоторый тест, когда я создал образец json в функции parse, но не использовал input_elem, поток данных работает нормально и создает строку для каждой записи за 7-8 минут.
Я не знаю, в чем проблема с конвейером, у кого-нибудь есть похожая проблема?
Подробная информация
- Я использую beam.io.gcp.bigquery.WriteToBigQuery для автоматического определения схемы из таблицы BQ, но если я использую beam.io.WriteToBigQuery, возникает та же проблема.
- Имя таблицы предварительно вычисляется вне потока данных и является входным
- Параметры, используемые в потоке данных —experiment=use_beam_bq_sink, —subnetwork=MY_SUBNET, —region=MY_REGION
Комментарии:
1. Откуда вы знаете, что он не завершается (или недостаточно долго ждал)?
2. Поток данных занимает несколько часов и возвращает сбой из-за таймаута в конце: сервер не отвечает (ошибка ping: превышен крайний срок, {«создано»: «@1597627419.455096108», «описание»:»Крайний срок Exceeded»,»file»:»third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc»,»file_line»:69,»grpc_status»:4}). Обычно с этой проблемой можно справиться самостоятельно, пожалуйста, прочитайте: cloud.google.com/dataflow/docs/guides /… ‘
3. В документации говорится о добавлении дополнительных ресурсов, но поток данных застревает на подэтапе «DestinationFileUnion» WriteToBigQuery и не использует никаких ресурсов
4. Вероятно, для этого вам нужно больше памяти. Поскольку вы не используете groupby или shuffle, весь ваш код будет выполняться внутри одного экземпляра с несколькими контейнерами. Попробуйте запустить задание с параметром nouse_multiple_containers.
5. Привет! спасибо, я искал опцию nouse_multiple_containers, но я не нашел в документации. cloud.google.com/dataflow/docs/guides /…
Ответ №1:
Мы, наконец, решили проблему. Параллельно с работой с потоком данных в приложении были созданы некоторые сети VPC, и правила брандмауэра были неправильно настроены.
Этот случай похож на тот, который описан в документации (сеть VPC, используемая для вашей работы, может отсутствовать). Правила существовали, но не были правильно настроены
Спасибо!