Не удается использовать переменную python в шаблоне jinja с помощью Airflow

#python #amazon-web-services #airflow #amazon-emr #mwaa

#python #amazon-web-services #воздушный поток #amazon-emr #mwaa

Вопрос:

Я пытаюсь использовать Airflow для запуска шага 11 на AWS EMR и следую этому коду в качестве ссылки. Поскольку использование EmrAddStepsOperator и EmrStepSensor для 11 шагов было бы слишком большим повторением. Поэтому я пытаюсь перебрать его. Я использовал приведенный ниже код в своей базе данных.

 step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]

# @evalcontextfilter
# def dangerous_render(context, value):
#     return Markup(Template(value).render(context)).render()

for i in range(0,len(steps)):
        #Add step
    step_adder.append(EmrAddStepsOperator(
        task_id=steps[i],
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=eval('step_' str(i 1)),
    ))
    print(step_adder)
        #Step Sensor for checking
    step_checker.append(EmrStepSensor(
        task_id=steps[i] '_check',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        #step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
        step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
        aws_conn_id='aws_default',
    ))
 

Я столкнулся с ошибкой здесь, EmrStepSensor ожидает, что step_id из EMR будет введен здесь, и он генерируется из xcom (я думаю, я не уверен на 100%, как работает этот код). Но мой шаг хранится в списке шагов, поэтому я не могу указать статическое значение здесь в task_id в step_id, как указано в справочном коде, и я не могу понять, как использовать шаблон jinja со значением переменной python для размещения значений здесь из списка шагов.

Я использовал оба приведенных ниже способа, чтобы step_id мог получить правильный шаг из EMR в соответствии с именем шага в шагах [i]

 step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",

step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")
 

Однако оба они завершились ошибкой синтаксиса в Airflow. Так что, если кто-нибудь может указать мне правильное направление для этого, я был бы очень признателен. Я использую Airflow 1.10.12 (это версия Airflow по умолчанию в управляемом Apache Airflow на AWS).

Ответ №1:

Я не уверен, что это уже решено, поэтому:

Использование f-строк:

f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}', key='return_value')[0] }}}}"

Использование .format : "{{{{ task_instance.xcom_pull(task_ids='{}', key='return_value')[0] }}}}".format(steps[i])

Обратите внимание, что вы должны убедиться, что значение ключа task_ids заключено в одинарные кавычки. Кроме того, возврат из xcom_pull представляет собой список, поэтому индекс [0] в конце o

Комментарии:

1. Это было то, что я искал, [0] помогло, иначе код не получал правильный идентификатор шага.