#airflow
Вопрос:
У меня есть оператор воздушного потока, который возвращает строковое значение, и задача называется «task1». Поэтому после выполнения я захожу в xcom и проверяю значение return_value, а это просто строка (скриншот ниже).
Затем у меня есть следующий оператор с именем task2, который принимает входные данные из значения xcom из task1, как показано ниже:
"{{ ti.xcom_pull(task_ids=['task1'],key='return_value')}}"
Проблема в том, что получаемое значение представляет собой список, преобразованный в строку.
Значение в xcom: это просто строка
Значение, возвращаемое xcom pull (версия шаблона jinga): [‘это просто строка’]
Итак, есть ли способ обновить xcom pull (версия jinga), показанную выше, чтобы просто извлечь значение? У меня нет доступа внутри оператора, в который он передается, или я мог бы применить некоторую логику для преобразования строки в список, а затем получить только значение (но это было бы не идеально и в любом случае не вариант).
Кроме того, я думаю, что его работа упоминает, что я пытался сделать что-то подобное, но использовал оператор Python, а затем выполнил вытягивание xcom, используя содержимое внутри кода python, и значение было возвращено просто отлично. Поэтому я не уверен, почему xcom использует шаблон Jinja, и как я могу это обойти? Я надеюсь, что есть что-то, что я могу сделать, о чем я не знаю, чтобы легко получить желаемый результат. Код оператора python, который работает, приведен ниже (просто к вашему сведению…)
def python_code_task3(**context):
value = context['ti'].xcom_pull(task_ids='task1', key='return_value')
logging.info("Value: " value)
И этот код просто выводит значение, как я хочу, это просто строка
Я действительно просто хочу использовать версию шаблона jinga, чтобы она извлекалась и передавалась в строке. Не строковое представление списка со строковым значением в качестве одного элемента в списке.
Ответ №1:
Существует небольшая разница между двумя способами, которыми вы извлекаете XCom
фрагменты кода: у одного есть task_ids=["task_1"]
(аргумент списка), а у другого есть task_ids="task_1"
(аргумент str).
Тип аргумента task_ids
имеет значение при использовании xcom_pull()
. Воздушный поток сделает вывод, что если вы передадите список идентификаторов задач, то для извлечения должно быть несколько задач XComs
, и вернет список, содержащий все извлеченные XComs
. В противном случае, если тип является просто строкой, то есть идентификатором одной задачи, возвращается одно XCom
значение. Вот ссылка на код, в котором это делается.
Также стоит отметить, что значения, заданные шаблоном Jinja, по умолчанию отображаются в виде строк. Однако с помощью Airflow 2.1 вы можете задать параметр, вызываемый render_template_as_native_obj
True
на уровне DAG. Это теперь будет отображать значения шаблона Jinja как собственные объекты Python (список, дикт и т. Д.), Когда это применимо. Более подробно об этой концепции здесь.
Комментарии:
1. Это было именно так. Я упустил это из виду! Обновлено до «{{ ti.xcom_pull(task_ids=’задача 1′,ключ=’возвращаемое значение’)}}» и теперь все работает так, как ожидалось!