#jinja2 #airflow
#jinja2 #воздушный поток
Вопрос:
У меня есть пользовательский оператор, который наследует baseoperator
. Я пытаюсь шаблонизировать имя ‘queue’, чтобы задача могла быть подобрана другим работником Celery.
Но он использует необработанную строку шаблона (не отображенную строку jinja) в качестве имени очереди вместо отображаемой строки.
Тот же поток работает, если я указываю предполагаемое имя очереди непосредственно в виде простой строки.
from airflow import DAG
from operators.check_operator import CheckQueueOperator
from datetime import datetime, timedelta
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.dates import days_ago
default_args = {
'schedule_interval': None, # exclusively “externally triggered” DAG
'owner': 'admin',
'description': 'This helps to quickly check queue templatization',
'start_date': days_ago(1),
'retries': 0,
'retry_delay': timedelta(minutes=5),
'provide_context': True
}
# this goes to wrong queue --> {{ dag_run.conf["queue"]}}
with DAG('test_queue', default_args=default_args) as dag:
t1 = CheckQueueOperator(task_id='check_q',
queue='{{ dag_run.conf["queue"]}}'
)
В приведенном выше сценарии :
- В RabbitMQ я вижу, что задача ставится в очередь под именем очереди ‘{{ dag_run.conf[«queue»]}}’ (строка необработанного шаблона )
- В Airflow, под
Rendered template
я могу видеть правильно отображаемое значение дляqueue
поля - На скриншоте мы видим
docker-desktop
как имя очереди. Это моя тестовая очередь, а также моя очередь потока по умолчанию. Это отлично работает, если я даю это имя очереди в виде прямой строки.
#this goes to right queue --> my_target_queue
with DAG('test_queue', default_args=default_args) as dag:
t1 = CheckQueueOperator(task_id='check_q',
queue='my_target_queue'
)
Код CheckQueueOperator :
from airflow.models.baseoperator import BaseOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
'''
Validate if queue can be templatized in base operator
'''
class CheckQueueOperator(BaseOperator):
template_fields = ['queue']
@apply_defaults
def __init__(
self,
*args,
**kwargs
):
super(CheckQueueOperator, self).__init__(*args, **kwargs)
def execute(self, context):
self.log.info('*******************************')
self.log.info('Queue name %s', self.queue)
return
Сведения о стеке:
- Версия Apache Airflow — 1.10.12
- Использование CeleryExecutor
- Использование RabbitMQ
Ответ №1:
queue
Атрибут зарезервирован (возможно, не официально, но на практике) BaseOperator
и, хотя вы можете обмануть веб-сервер при отображении атрибута, части Airflow, которые обрабатывают планирование и выполнение задач, не выполняют отображение до чтения queue
атрибута.