Airflow PythonBranchOperator возвращает недопустимое значение

#python #airflow

Вопрос:

У меня возникли проблемы с оператором ветки Airflow 1.10 Python. У меня есть dag, который сканирует облачное хранилище и обрабатывает файлы, если они найдены. Если файл отсутствует, он попадает в фиктивный оператор no_file_found и завершается, в противном случае он переходит к некоторым этапам синтаксического анализа.

С одним файлом этот рабочий процесс отлично работает. Моя проблема возникает, когда я добавляю ту же логику для второго файла. В настоящее время check_for_Post_Performance возвращает задачу cleans_headers_for_gcm, и я в полной растерянности, как это происходит. Из приведенного ниже плана следует, что у него должно быть только два пути вперед: clean_headers_Post_Perfromance или no_file_found.

Графическое представление рабочего процесса

Я создаю эти задачи динамически из списка имен файлов. Я перебираю каждое имя файла и создаю следующие операторы:

 def build_check(filename):  return BranchPythonOperator(  task_id=f'check_for_{file_name}'.replace(' ', '_'),  python_callable=check_file_exists,  op_kwargs={'filename': filename},  provide_context=True,  dag=dag  )  def check_file_exists(filename, **context):  xcom_value = context['ti'].xcom_pull(task_ids=f'list_files')  if any(filename in s for s in xcom_value):  return f'clean_headers_for_{file_name}'.replace(' ', '_')   else:  return 'no_file_found'  

Я проверил шаблон отрисованной задачи, чтобы подтвердить, что «Производительность после публикации» передается для переменной имени файла Шаблон Визуализации Производительности Воздушного потока После Публикации

но, просматривая журналы, я вижу следующее:

 [2021-12-02 20:15:56,742] {logging_mixin.py:120} INFO - Running lt;TaskInstance: example_dag.check_for_Post_Performance 2021-12-02T20:14:50.724084 00:00 [running]gt; on host 21d0393eb686 [2021-12-02 20:15:56,766] {python_operator.py:114} INFO - Done. Returned value was: clean_headers_for_GCM [2021-12-02 20:15:56,767] {skipmixin.py:122} INFO - Following branch clean_headers_for_GCM [2021-12-02 20:15:56,773] {skipmixin.py:158} INFO - Skipping tasks ['no_file_found', 'clean_headers_for_Post_Performance']  

Мое лучшее предположение заключается в том, что функция не создается каждый цикл, как я думаю, или какое-то правило триггера сбивает меня с толку. Как я могу сделать так, чтобы каждый файл в моем списке источников достиг задачи no_file_found или clean_headers независимо друг от друга?

РЕДАКТИРОВАТЬ Вот код, который я использую для создания задач из статического списка:

 for file_name, table_name in FILES().items():  import_to_bq = import_file(file_name, table_name)  clean_headers_task = clean_headers(file_name)   start_import gt;gt; list_files gt;gt; build_check(file_name) gt;gt; [clean_headers_task, no_file]   clean_headers_task gt;gt; import_to_bq gt;gt; archive_file(file_name)  

Комментарии:

1. Вы не можете создавать задачи во время выполнения. Задачи создаются при анализе кода/DAG, и в это время нет контекста, в том числе относительно того, что list_files будет отображаться. Есть ли только 2 файла, которые можно было бы перечислить ? Если их больше, рассмотрите возможность создания одной задачи, которая обрабатывает несколько файлов в for цикле в a PythonOperator . Кроме того, ваш фрагмент кода неполон — как бы/ build_check называется. Публикация полного кода получила бы большую популярность.

2. Я использую словарь, который заполняю любыми файлами, которые хотел бы захватить с помощью этого dag. listFiles использует GoogleCloudStorageListOperator, который перечислит все файлы в корзине и отправит их в xcom. Затем я анализирую этот xcom на предмет совпадений. Приведенный ниже ответ навел меня на правильный путь, но спасибо за уточняющие вопросы.

Ответ №1:

Может быть, в этом и заключается разница между file_name и filename ? Похоже, идентификаторы задач используются file_name , пока аргумент есть filename . Следует ли использовать обе эти функции filename ?

 def build_check(filename):  return BranchPythonOperator(  task_id=f'check_for_{filename}'.replace(' ', '_'),  python_callable=check_file_exists,  op_kwargs={'filename': filename},  provide_context=True,  dag=dag  )  def check_file_exists(filename, **context):  xcom_value = context['ti'].xcom_pull(task_ids=f'list_files')  if any(filename in s for s in xcom_value):  return f'clean_headers_for_{filename}'.replace(' ', '_')   else:  return 'no_file_found'  

Комментарии:

1. Не думаю, что в этом проблема, но код OP действительно потерпит неудачу как есть, как file_name не определено.

2. Хороший улов. Я просмотрел и исправил все варианты имени файла и имени файла… и это сработало. Я обновил свою оригинальную публикацию своим циклом, но это, по крайней мере, вывело меня на правильный путь, даже если я все еще не на 100% в корне проблемы. Спасибо!