# #google-cloud-dataflow #apache-beam #google-cloud-pubsub #dataflow
Вопрос:
Я нахожусь в середине разработки компонента для загрузки CSV-файлов, поступающих из корзины GCS в таблицы BQ. Поскольку наши требования включают вставку дополнительных столбцов,я разработал следующий дизайн с использованием задания потоковой передачи потока данных.
1] CSV-файл поступает в корзину GCS
2] Событие создания объекта GCS отправляется в паб/подраздел. Эти события используются потоковым потоком данных «Загрузчик GCS-BQ».
2a] Просматривая метаданные, вложенные в объект GCS,определите имя таблицы BQ, настройки и т. Д. И Рассчитайте значения для новых столбцов (lot_num и batch)
3] Используя BigqueryAPI, для CSV-файла создается временная внешняя таблица.
4] С помощью BigqueryAPI выполняется запрос для вставки данных из внешней таблицы BQ в конечную таблицу BQ.Этот шаг выполняется,потому что в итоговую таблицу необходимо добавить дополнительные столбцы(lot_num, batch). Наконец, временное внешнее устройство удаляется.
В зависимости от метаданных, вложенных в объект GCS, мы ожидаем, что у нас будет около 1000 таблиц BQ. Эти CSV-файлы варьируются от пары килобайт до ~1 ГБ.
У меня есть следующие вопросы, касающиеся этого дизайна:
- Этот процесс связан с вводом-выводом (а не с процессором), так как вызовы API BQ блокируются. В этом случае эти блокирующие вызовы заблокируют потоки потока данных.Повлияет ли это на производительность?
- Как в этом случае будет работать автоматическое масштабирование потока данных? Из документов:
Потоки данных масштабируются на основе параллелизма конвейера. Параллелизм конвейера-это оценка количества потоков, необходимых для наиболее эффективной обработки данных в любой момент времени.
Какой поток данных метрики используется для автоматического масштабирования для таких процессов, связанных с вводом-выводом?
Это количество неподтвержденных сообщений pub/sub в буфере?
- Подходит ли поток данных для такого рода обработки, связанной с вводом-выводом? Или простое Java-приложение, работающее на GKE(k8s), больше подходит для этого случая ?
Комментарии:
1. интересный вариант использования. как вы определяете значения столбцов lot_num и batch? не могли бы вы сделать это из конвейера потока данных? Копирование данных из одной таблицы в другую в BQ имеет очень низкий предел. Вместо этого вы можете добавить lot_num и пакетные данные в свой конвейер потоков данных и использовать
BigQueryIO.write
преобразование Beam для записи данных в BQ с очень высокой пропускной способностью.2. @Pablo да, эти значения lot_num и «пакетные» значения генерируются внутри задания потока данных при просмотре метаданных объекта GCS. Но при использовании TextIO.read() и BigQueryIO.write содержимое файла CSV проходит через задание потока данных, верно ? Я хотел избежать этого, вот почему я хотел создать внешнюю таблицу и сгенерировать итоговую таблицу с помощью запроса BQ, где содержимое файла перемещается внутри между BQ и GCS.
3. Я понимаю. Да, вы правы, что с TextIO.read все данные каждого файла проходят через конвейер. Я считаю, что в этом случае вам, возможно, будет лучше использовать облачную функцию для выполнения всех вызовов API BQ, поскольку вам, вероятно, не нужны большие возможности масштабирования потока данных, но учтите, что вы можете в конечном итоге сделать слишком много вызовов API BQ, а затем, возможно, придется переключиться на поток данных, чтобы использовать TextIO
4. @Пабло спасибо, я сначала подумал об использовании облачной функции, однако у нее максимальный лимит времени ожидания 7 минут. У нас могут быть некоторые случаи, когда запрос BQ занимает более 7 минут. Однако знаете ли вы, как будет вести себя поток данных, когда потоки блокируются для вызовов ввода-вывода, таких как вызов API блокировки BQ ? как это повлияет на автоматическое масштабирование ?
5. на автоматическое масштабирование влияет отставание в вашей теме pubsub. если отставание невелико, то задание не увидит необходимости в расширении.