Воздушный поток / Python — как возобновить поток DAG на основе внешнего процесса

#python #asynchronous #airflow

#python #асинхронный #воздушный поток

Вопрос:

 Using Airflow 1.8.0 and python 2.7
  

Наличие следующего DAG (упрощенного):

 (Phase 1)-->(Phase 2)
  

На этапе 1 я запускаю асинхронный процесс, который занимает много времени и может выполняться до 2 дней, когда процесс завершается, он записывает некоторую полезную нагрузку на S3. В этот период я хочу, чтобы DAG ждал и переходил к фазе 2 только тогда, когда существует полезная нагрузка S3.

Я подумал о 2 решениях:

  1. При запуске фазы 1 приостановите DAG с помощью экспериментального REST API и возобновите после завершения процесса.
  2. Подождите, используя оператор, который проверяет полезную нагрузку S3 каждые X минут.

Я не могу использовать вариант 1, поскольку мой администратор не разрешает экспериментальное использование API, а вариант 2 кажется плохой практикой (проверка каждого X менуэтов).

Есть ли какие-либо другие варианты решения моей задачи?

Комментарии:

1. Какова разница во времени выполнения асинхронного процесса? Если это всегда занимает около двух дней, вы можете использовать TimeDeltaSensor и просто начать проверять S3 через определенное время.

2. Время выполнения неизвестно и колеблется от нескольких часов до пары дней

Ответ №1:

Я думаю, что вариант (2) является «правильным способом», вы можете немного оптимизировать его:

BaseSensorOperator поддерживает poke_interval , поэтому его следует использовать для S3KeySensor увеличения времени между попытками.

Poke_interval — время в секундах, которое задание должно ждать между каждой попыткой

Кроме того, вы можете попробовать использовать mode и переключить его на reschedule :

режим: Как работает датчик. Параметры: { poke | reschedule } , по умолчанию poke . При установке на poke датчик занимает рабочий слот на все время выполнения и спит между нажатиями. Используйте этот режим, если ожидаемое время выполнения датчика короткое или требуется короткий интервал нажатия. Обратите внимание, что датчик будет удерживать рабочий слот и слот пула на время выполнения датчика в этом режиме. При установке на reschedule задачу датчика освобождает рабочий слот, когда критерии еще не выполнены, и он переносится на более позднее время. Используйте этот режим, если ожидается, что время до выполнения критериев будет довольно долгим. Интервал нажатия должен составлять более одной минуты, чтобы предотвратить слишком большую нагрузку на планировщик.

Не уверен в Airflow 1.8.0 — не удалось найти старую документацию (я предполагаю poke_interval , что поддерживается, но нет mode ).

Комментарии:

1. В дополнение к этому есть S3KeySensor , который можно использовать для определения полезной нагрузки. Также, если вы используете workers в общей среде, где запущено много других процессов (и вам нужно ждать максимум 2 дня), тогда poke режим не рекомендуется. Потому что рабочий будет занят задачей для тыкания, которая, в свою очередь, потребляет ресурсы.