динамические идентификаторы задач в потоке данных

#airflow #google-cloud-composer

# #поток данных #google-cloud-composer

Вопрос:

У меня есть DAG с одним DataflowTemplateOperator , который может работать с разными файлами json. Когда я запускаю dag, я передаю некоторые параметры через {{dag_run.conf['param1']}} и работает нормально.

Проблема, с которой я столкнулся, заключается в попытке переименовать файл на task_id основе param1.

т.е. task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",

он жалуется только на буквенно-цифровые символы или

task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']), он не распознает dag_run плюс альфа-проблему.

Вся идея этого заключается в том, что, когда я вижу в консоли заданий потока данных и задание не выполнено, я знаю, кто является нарушителем, основываясь на param1. Имена заданий потока данных основаны на task_id следующим образом:

df-operator-read-object-json-file-8b9eecec

и что мне нужно, так это:

df-operator-read-object-param1-json-file-8b9eecec

Есть идеи, возможно ли это?

Комментарии:

1. вы имеете в виду, что при каждом запуске ваш DAG будет иметь другой идентификатор task_id на основе вашего файла json, не так ли?

2. param1 может принимать только 5 различных значений идентификатор задачи должен быть строкой param1

3. Является ли это запланированным dag или только ручным с ручным вводом для каждого запуска?

4. как только файл попадает в корзину базы данных, он запускается облачной функцией, которая передает все параметры, необходимые для запуска задания DF

5. Это означает, что если вы вызовете функцию 1000 раз, она создаст 1000 задач, каждая из которых выполнялась один раз и больше никогда не будет выполняться? Это не очень хорошая практика для Airflow. DataflowTemplatedJobStartOperator (обновленный оператор) имеет параметр job_name, который вы можете использовать для настройки по своему усмотрению. Вам не нужно привязывать task_id к job_name

Ответ №1:

Нет необходимости создавать новый оператор для каждого файла. DataflowTemplatedJobStartOperator имеет job_name параметр, который также является шаблоном, поэтому его можно использовать с Jinja.

Я не тестировал это, но это должно сработать:

 from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
        task_id="df_operator_read_object_json_file",
        job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
        template='gs://dataflow-templates/your_template',
        location='europe-west3',
    )