Что может ограничить конвейер потока данных использованием только одного работника?

#python #google-cloud-platform #google-cloud-dataflow #apache-beam

# #python #google-облачная платформа #google-cloud-поток данных #apache-луч

Вопрос:

Я пытался отладить конвейер, который принимает входной параметр, который управляет последующими операциями ParDo. По причинам, которые я не могу понять, конвейер не будет масштабироваться дальше одного рабочего, даже если я отключил автоматическое масштабирование и установил количество рабочих. К сожалению, ужасный интерфейс потока данных в GCP мало что делает, чтобы осветить невозможность масштабирования. Кто-нибудь может посоветовать, в чем может быть проблема или как эффективно отлаживать?

 with beam.Pipeline(options=opts) as p:
  result = (
      p | "Initialize Pipeline" >> beam.Create(
          [(f'gs://data/']) |
      "Scan for extraction tasks" >> beam.ParDo(scanner.ScanForTasks()) |
      "Extract data" >> beam.ParDo(worker.TaskToData()))
 

Ответ №1:

Оказалось, что проблема связана с оптимизацией в потоке данных, называемой «слияние«, в которой смежные операции объединяются вместе, предположительно, для того, чтобы они могли беспрепятственно выполняться на одном и том же рабочем. Проблема в том, что если конвейер заполняется одним элементом, который генерирует большое количество последующих задач, все эти задачи будут обрабатываться тем же рабочим, который обрабатывал начальную задачу заполнения.

Решение состоит в том, чтобы напрямую заполнять конвейер задачами, чтобы эта «оптимизация» не снижала производительность

 def scan_for_tasks():
  tasks = []
  # Build your task list here
  return tasks

with beam.Pipeline(options=opts) as p:
  result = (
    p | "Initialize Pipeline" >> beam.Create(scan_for_tasks()) |
    "Extract data" >> beam.ParDo(worker.TaskToData()))
 

Комментарии:

1. спасибо, что поделились! Вы также можете добавить beam.Reshuffle() преобразование, которое должно позволить вам при необходимости перераспределять задачи между работниками.

2. Привет, Пабло. Было бы действительно полезно, если бы пользовательский интерфейс потока данных мог визуально донести до пользователя, какие шаги были объединены, и что коэффициент ветвления для каждого шага равен. Это позволило бы пользователям легко видеть, когда происходит что-то подобное. Я действительно изо всех сил пытался определить, что происходит с текущим пользовательским интерфейсом…

3. Спасибо. Это разумная обратная связь. Некоторые шаги отмечены «этапами», которые являются объединенными единицами выполнения, но я согласен, что это недостаточно четко