#python #airflow
Вопрос:
У меня возникли проблемы с оператором ветки Airflow 1.10 Python. У меня есть dag, который сканирует облачное хранилище и обрабатывает файлы, если они найдены. Если файл отсутствует, он попадает в фиктивный оператор no_file_found и завершается, в противном случае он переходит к некоторым этапам синтаксического анализа.
С одним файлом этот рабочий процесс отлично работает. Моя проблема возникает, когда я добавляю ту же логику для второго файла. В настоящее время check_for_Post_Performance возвращает задачу cleans_headers_for_gcm, и я в полной растерянности, как это происходит. Из приведенного ниже плана следует, что у него должно быть только два пути вперед: clean_headers_Post_Perfromance или no_file_found.
Я создаю эти задачи динамически из списка имен файлов. Я перебираю каждое имя файла и создаю следующие операторы:
def build_check(filename): return BranchPythonOperator( task_id=f'check_for_{file_name}'.replace(' ', '_'), python_callable=check_file_exists, op_kwargs={'filename': filename}, provide_context=True, dag=dag ) def check_file_exists(filename, **context): xcom_value = context['ti'].xcom_pull(task_ids=f'list_files') if any(filename in s for s in xcom_value): return f'clean_headers_for_{file_name}'.replace(' ', '_') else: return 'no_file_found'
Я проверил шаблон отрисованной задачи, чтобы подтвердить, что «Производительность после публикации» передается для переменной имени файла
но, просматривая журналы, я вижу следующее:
[2021-12-02 20:15:56,742] {logging_mixin.py:120} INFO - Running lt;TaskInstance: example_dag.check_for_Post_Performance 2021-12-02T20:14:50.724084 00:00 [running]gt; on host 21d0393eb686 [2021-12-02 20:15:56,766] {python_operator.py:114} INFO - Done. Returned value was: clean_headers_for_GCM [2021-12-02 20:15:56,767] {skipmixin.py:122} INFO - Following branch clean_headers_for_GCM [2021-12-02 20:15:56,773] {skipmixin.py:158} INFO - Skipping tasks ['no_file_found', 'clean_headers_for_Post_Performance']
Мое лучшее предположение заключается в том, что функция не создается каждый цикл, как я думаю, или какое-то правило триггера сбивает меня с толку. Как я могу сделать так, чтобы каждый файл в моем списке источников достиг задачи no_file_found или clean_headers независимо друг от друга?
РЕДАКТИРОВАТЬ Вот код, который я использую для создания задач из статического списка:
for file_name, table_name in FILES().items(): import_to_bq = import_file(file_name, table_name) clean_headers_task = clean_headers(file_name) start_import gt;gt; list_files gt;gt; build_check(file_name) gt;gt; [clean_headers_task, no_file] clean_headers_task gt;gt; import_to_bq gt;gt; archive_file(file_name)
Комментарии:
1. Вы не можете создавать задачи во время выполнения. Задачи создаются при анализе кода/DAG, и в это время нет контекста, в том числе относительно того, что
list_files
будет отображаться. Есть ли только 2 файла, которые можно было бы перечислить ? Если их больше, рассмотрите возможность создания одной задачи, которая обрабатывает несколько файлов вfor
цикле в aPythonOperator
. Кроме того, ваш фрагмент кода неполон — как бы/build_check
называется. Публикация полного кода получила бы большую популярность.2. Я использую словарь, который заполняю любыми файлами, которые хотел бы захватить с помощью этого dag. listFiles использует GoogleCloudStorageListOperator, который перечислит все файлы в корзине и отправит их в xcom. Затем я анализирую этот xcom на предмет совпадений. Приведенный ниже ответ навел меня на правильный путь, но спасибо за уточняющие вопросы.
Ответ №1:
Может быть, в этом и заключается разница между file_name
и filename
? Похоже, идентификаторы задач используются file_name
, пока аргумент есть filename
. Следует ли использовать обе эти функции filename
?
def build_check(filename): return BranchPythonOperator( task_id=f'check_for_{filename}'.replace(' ', '_'), python_callable=check_file_exists, op_kwargs={'filename': filename}, provide_context=True, dag=dag ) def check_file_exists(filename, **context): xcom_value = context['ti'].xcom_pull(task_ids=f'list_files') if any(filename in s for s in xcom_value): return f'clean_headers_for_{filename}'.replace(' ', '_') else: return 'no_file_found'
Комментарии:
1. Не думаю, что в этом проблема, но код OP действительно потерпит неудачу как есть, как
file_name
не определено.2. Хороший улов. Я просмотрел и исправил все варианты имени файла и имени файла… и это сработало. Я обновил свою оригинальную публикацию своим циклом, но это, по крайней мере, вывело меня на правильный путь, даже если я все еще не на 100% в корне проблемы. Спасибо!