Xcom возвращает строку в формате списка, а не просто строковое значение?

#airflow

Вопрос:

У меня есть оператор воздушного потока, который возвращает строковое значение, и задача называется «task1». Поэтому после выполнения я захожу в xcom и проверяю значение return_value, а это просто строка (скриншот ниже).

скриншот ключа/значения xcom

Затем у меня есть следующий оператор с именем task2, который принимает входные данные из значения xcom из task1, как показано ниже:

 "{{ ti.xcom_pull(task_ids=['task1'],key='return_value')}}"
 

Проблема в том, что получаемое значение представляет собой список, преобразованный в строку.

Значение в xcom: это просто строка

Значение, возвращаемое xcom pull (версия шаблона jinga): [‘это просто строка’]

Итак, есть ли способ обновить xcom pull (версия jinga), показанную выше, чтобы просто извлечь значение? У меня нет доступа внутри оператора, в который он передается, или я мог бы применить некоторую логику для преобразования строки в список, а затем получить только значение (но это было бы не идеально и в любом случае не вариант).

Кроме того, я думаю, что его работа упоминает, что я пытался сделать что-то подобное, но использовал оператор Python, а затем выполнил вытягивание xcom, используя содержимое внутри кода python, и значение было возвращено просто отлично. Поэтому я не уверен, почему xcom использует шаблон Jinja, и как я могу это обойти? Я надеюсь, что есть что-то, что я могу сделать, о чем я не знаю, чтобы легко получить желаемый результат. Код оператора python, который работает, приведен ниже (просто к вашему сведению…)

 def python_code_task3(**context): 
value = context['ti'].xcom_pull(task_ids='task1', key='return_value') 
logging.info("Value: "   value)
 

И этот код просто выводит значение, как я хочу, это просто строка

Я действительно просто хочу использовать версию шаблона jinga, чтобы она извлекалась и передавалась в строке. Не строковое представление списка со строковым значением в качестве одного элемента в списке.

Ответ №1:

Существует небольшая разница между двумя способами, которыми вы извлекаете XCom фрагменты кода: у одного есть task_ids=["task_1"] (аргумент списка), а у другого есть task_ids="task_1" (аргумент str).

Тип аргумента task_ids имеет значение при использовании xcom_pull() . Воздушный поток сделает вывод, что если вы передадите список идентификаторов задач, то для извлечения должно быть несколько задач XComs , и вернет список, содержащий все извлеченные XComs . В противном случае, если тип является просто строкой, то есть идентификатором одной задачи, возвращается одно XCom значение. Вот ссылка на код, в котором это делается.

Также стоит отметить, что значения, заданные шаблоном Jinja, по умолчанию отображаются в виде строк. Однако с помощью Airflow 2.1 вы можете задать параметр, вызываемый render_template_as_native_obj True на уровне DAG. Это теперь будет отображать значения шаблона Jinja как собственные объекты Python (список, дикт и т. Д.), Когда это применимо. Более подробно об этой концепции здесь.

Комментарии:

1. Это было именно так. Я упустил это из виду! Обновлено до «{{ ti.xcom_pull(task_ids=’задача 1′,ключ=’возвращаемое значение’)}}» и теперь все работает так, как ожидалось!