потоковая передача и обработка нескольких файлов apache beam одновременно и оконные соединения?

# #google-cloud-dataflow #apache-beam #apache-beam-io

Вопрос:

Я только что прочитал эту статью

https://medium.com/bb-tutorials-and-thoughts/how-to-create-a-streaming-job-on-gcp-dataflow-a71b9a28e432

Чего мне здесь действительно не хватает, так это того, что если я удалю 50 файлов, и это потоковое задание, как говорится в статье(всегда в прямом эфире), то разве вывод не будет оконным соединением всех файлов?

Если нет, то как бы это выглядело и как бы изменилось, чтобы стать оконным соединением? Я пытаюсь представить себе в своей голове оба мира

  • Оконное соединение в потоковом задании(вывод 1 файла для всех входных файлов)
  • Неоконное соединение в потоковом задании(вывод 1 файла НА входной файл)

Может ли кто-нибудь пролить свет на эту статью и что изменится?

Я также читал кое-что о «Ограниченных коллекциях ПК». В этом случае, возможно, окно не требуется, так как внутри потока оно похоже на пакет, пока мы не обработаем всю коллекцию ПК, мы не перейдем к следующему этапу? Возможно, если в статье используется ограниченное объединение, то все входные файлы сопоставляются 1 к 1 с выходными файлами?

Как можно определить изнутри функции, получаю ли я данные из ограниченной или неограниченной коллекции? Есть ли какой-то другой способ, которым я могу это сказать? Возможны ли вообще ограниченные коллекции в потоковой работе apache beam?

Ответ №1:

Я постараюсь ответить на некоторые из ваших вопросов.

Чего мне здесь действительно не хватает, так это того, что если я удалю 50 файлов, и это потоковое задание, как говорится в статье(всегда в прямом эфире), то разве вывод не будет оконным соединением всех файлов?

Вход (источник) и выход (приемник) напрямую не связаны. Так что это зависит от того, что вы делаете в своем конвейере. TextIO.watchForNewFiles является преобразованием источника потоковой передачи, которое продолжает наблюдать за заданным местоположением файла и продолжает читать файлы новостей и выводить строки, прочитанные из таких файлов. Следовательно, результатом этого шага будет PCollection<String> поток строк текста, считываемых из таких файлов.

Затем устанавливается окно, это решает, как ваши данные будут объединены в Windows. Для этого конвейера они предпочитают использовать FixedWindows 1 минуту. Отметкой времени будет время, в течение которого файл был замечен.

Преобразование приемника применяется в конце вашего конвейера (иногда приемники также выдают выходные данные, так что на самом деле это может быть не конец). В этом случае они выбирают TextIO.write() , какие строки записываются из входных PCollection<String> текстовых файлов в выходные.

Таким образом, будут ли выходные данные включать данные из всех входных файлов или нет, зависит от того, как обрабатываются ваши входные файлы и как они объединяются в Windows в рамках конвейера.

Я также читал кое-что о «Ограниченных коллекциях ПК». В этом случае, возможно, окно не требуется, так как внутри потока оно похоже на пакет, пока мы не обработаем всю коллекцию ПК, мы не перейдем к следующему этапу? Возможно, если в статье используется ограниченное объединение, то все входные файлы сопоставляются 1 к 1 с выходными файлами?

Вы можете использовать ограниченные входные данные в потоковом конвейере. В потоковом конвейере прогресс отслеживается с помощью функции водяных знаков. Если вы используете ограниченный ввод (например, ограниченный источник), водяной знак будет просто переходить от 0 к бесконечности вместо постепенного продвижения. Следовательно, ваш конвейер может просто закончиться, вместо того чтобы ждать дополнительных данных.

Как можно определить изнутри функции, получаю ли я данные из ограниченной или неограниченной коллекции? Есть ли какой-то другой способ, которым я могу это сказать? Возможны ли вообще ограниченные коллекции в потоковой работе apache beam?

Это определенно возможно, как я уже упоминал выше. Если у вас есть доступ к коллекции входных данных, вы можете использовать функцию isBounded, чтобы определить, ограничена ли она. См.Здесь пример. У вас есть доступ к входным коллекциям ПК при расширении PTransform s (следовательно, во время отправки задания). Я не верю, что у вас есть доступ к этому во время выполнения.