Apache airflow — не удается шаблонизировать имя очереди, унаследованное от baseOperator

#jinja2 #airflow

#jinja2 #воздушный поток

Вопрос:

У меня есть пользовательский оператор, который наследует baseoperator . Я пытаюсь шаблонизировать имя ‘queue’, чтобы задача могла быть подобрана другим работником Celery.

Но он использует необработанную строку шаблона (не отображенную строку jinja) в качестве имени очереди вместо отображаемой строки.

Тот же поток работает, если я указываю предполагаемое имя очереди непосредственно в виде простой строки.

 from airflow import DAG
from operators.check_operator import CheckQueueOperator
from datetime import datetime, timedelta
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'schedule_interval': None,  # exclusively “externally triggered” DAG
    'owner': 'admin',
    'description': 'This helps to quickly check queue templatization',
    'start_date': days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    'provide_context': True
}


# this goes to wrong queue --> {{ dag_run.conf["queue"]}}
with DAG('test_queue', default_args=default_args) as dag:
    t1 = CheckQueueOperator(task_id='check_q',
        queue='{{ dag_run.conf["queue"]}}'
    )

  

В приведенном выше сценарии :

  • В RabbitMQ я вижу, что задача ставится в очередь под именем очереди ‘{{ dag_run.conf[«queue»]}}’ (строка необработанного шаблона )
  • В Airflow, под Rendered template я могу видеть правильно отображаемое значение для queue поля
  • На скриншоте мы видим docker-desktop как имя очереди. Это моя тестовая очередь, а также моя очередь потока по умолчанию. Это отлично работает, если я даю это имя очереди в виде прямой строки.

введите описание изображения здесь

 #this goes to right queue --> my_target_queue
with DAG('test_queue', default_args=default_args) as dag:
    t1 = CheckQueueOperator(task_id='check_q',
        queue='my_target_queue'
    )

  

Код CheckQueueOperator :

 from airflow.models.baseoperator import BaseOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

'''
Validate if queue can be templatized in base operator
'''
class CheckQueueOperator(BaseOperator):
    template_fields = ['queue']

    @apply_defaults
    def __init__(
        self,
        *args,
        **kwargs
    ):
        super(CheckQueueOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        self.log.info('*******************************')
        self.log.info('Queue name %s', self.queue)
        return
  

Сведения о стеке:

  • Версия Apache Airflow — 1.10.12
  • Использование CeleryExecutor
  • Использование RabbitMQ

Ответ №1:

queue Атрибут зарезервирован (возможно, не официально, но на практике) BaseOperator и, хотя вы можете обмануть веб-сервер при отображении атрибута, части Airflow, которые обрабатывают планирование и выполнение задач, не выполняют отображение до чтения queue атрибута.