#python #airflow
#python #воздушный поток
Вопрос:
Я пытаюсь сгенерировать набор динамических задач из переменной XCOM. В XCOM я сохраняю список, и я хочу использовать каждый элемент списка для динамического создания последующей задачи.
Мой вариант использования заключается в том, что у меня есть вышестоящий оператор, который проверяет sftp-сервер на наличие файлов и возвращает список имен файлов, соответствующих определенным критериям. Я хочу создавать динамические задачи для каждого из возвращаемых имен файлов.
Я упростил его до приведенного ниже, и, хотя он работает, я чувствую, что это не идиоматическое решение для воздушного потока. В моем случае я бы написал функцию python, которая вызывается из оператора python, который извлекает значение из xcom и возвращает его, вместо использования функции pusher .
Я понимаю, что, хотя я могу создать пользовательский оператор, который сочетает в себе оба, я не думаю, что создание одноразового оператора является хорошей практикой, и я надеюсь, что есть другое решение.
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
"owner": "test",
"depends_on_past": False,
"start_date": datetime(2018, 10, 27),
"email": ["test@mctest.com"],
"email_on_failure": False,
"email_on_retry": False,
"email_on_success": False,
"retries": 0,
"provide_context": True
}
dag = DAG("test", default_args=default_args, schedule_interval="@daily", catchup=False)
def pusher(**context):
return ['a', 'b', 'c', 'd', 'e']
pusher_task = PythonOperator(
task_id='pusher_task',
dag=dag,
python_callable=pusher
)
def bash_wrapper(task, **context):
return BashOperator(
task_id='dynamic' task,
dag=dag,
bash_command='date'
)
end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')
pusher_task >> [bash_wrapper(task) for task in pusher()] >> end
Ответ №1:
Я бы не стал делать то, чего вы пытаетесь достичь, главным образом потому, что:
- Значение XCOM — это состояние, сгенерированное во время выполнения
- Структура базы данных определяется во время синтаксического анализа
Даже если вы используете что-то вроде следующего, чтобы получить доступ к значениям XCOM, сгенерированным какой-либо вышестоящей задачей:
from airflow.models import TaskInstance
from airflow.utils.db import provide_session
dag = DAG(...)
@provide_session
def get_files_list(session):
execution_date = dag.previous_schedule(datetime.now())
// Find previous task instance:
ti = session.query(TaskInstance).filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id == upstream_task_id).first()
if ti:
files_list = ti.xcom_pull()
if files_list:
return files_list
// Return default state:
return {...}
files_list = get_files_list()
// Generate tasks based on upstream task state:
task = PythonOperator(
...
xcom_push=True,
dag=dag)
Но это будет вести себя очень странно, потому что синтаксический анализ DAG и выполнение задачи не синхронизированы так, как вы хотите.
Если основной причиной, по которой вы хотите это сделать, является распараллеливание обработки файлов, у меня было бы некоторое статическое количество задач обработки (определяемое требуемым параллелизмом), которые считывают список файлов из значения XCOM вышестоящей задачи и работают с соответствующей частью этого списка.
Другой вариант — распараллеливание обработки файлов с использованием какой-либо платформы для распределенных вычислений, такой как Apache Spark.
Комментарии:
1. Основными причинами для этого являются 1. Параллелизм, 2. Я не могу контролировать, сколько файлов / или данных в списке я получаю от вышестоящей задачи, но я хочу убедиться, что все файлы проходят через нижестоящие задачи
2. При первом подходе, о котором вы упомянули, предположим, что имеется 5 файлов и две задачи обработки. После того, как задачи обработки обрабатывают file1 и file2 из списка, как я могу снова запустить ту же задачу для file3 и file4? И, в конце концов, как я могу запустить только 1 задачу обработки для file5? Любой пример в этом направлении полезен
3. @nightgaunt каждая задача обработки занимает часть
files
массива, используя:files[int(task_idx * len(files) / parallelism):int((task_idx 1) * len(files) / parallelism)]
, таким образом, первая задача получает файлы 1-2, а вторая — файлы с 3 по 5.4. Но одна задача должна обрабатывать несколько файлов путем итерации в рамках одного оператора. Правильно? Для этого требуется пользовательский оператор, который выполняет одну и ту же операцию дважды / трижды. Не воспринимайте это плохо. Ваше решение имеет смысл и ближе всего к ответу. Но если есть способ заставить задачу работать только с одним файлом, я готов пойти с этим вариантом.
5. Решение для использования вложенных тегов и динамической генерации задач назревает в моей голове уже некоторое время. Но ваше объяснение о том, что xcom — это время выполнения, а структура — время анализа, заставляет меня колебаться.
Ответ №2:
Самый простой способ, который я могу придумать, это использовать оператор перехода. https://github.com/apache/airflow/blob/master/airflow/example_dags/example_branch_operator.py
Комментарии:
1. Можете ли вы уточнить? Как BranchOperator помогает получить доступ к переменным xcom и установить для них динамические задачи по потоку?
2. В вашем примере показано, как программно создавать ветви — вопрос заключался в том, как динамически создавать задачи в зависимости от результата, полученного предыдущей задачей во время выполнения