поток воздуха max_active_run не увеличивается

#airflow-scheduler #airflow

Вопрос:

мой код dag

 dag = DAG(
    "ETL_s21_{}_impression_product_test".format("every_minute"),
    default_args=default_args,
    schedule_interval="* * * * *",
    max_active_runs=10,
    concurrency=5
)


def export_impression_log(table, item):
    export_impression_log_task = PythonOperator(
            task_id=f"{table}_{item}_export_impression_log",
            python_callable=S3.export_impression_log_from_ad_s3,
            op_kwargs={
                "execution_date": '{{execution_date.in_timezone("Asia/Seoul").strftime("%Y-%m-%d-%H-%M")}}',
                'table': table,
                'item': item
            },
            dag=dag,
            queue='s21',
            task_concurrency=5,
            provide_context=False
        )

    return export_impression_log_task


def add_partition(table, item):
    add_partition_task = PythonOperator(
        task_id=f"{table}_{item}_add_partition",
        python_callable=Athena.add_partition,
        op_kwargs={
            "glue_table_name": 'amplitude_impression_test',
            "execution_date": '{{execution_date.in_timezone("UTC").strftime("%Y-%m-%d-%H-%M")}}'
        },
        dag=dag,
        queue='s21',
        task_concurrency=5,
        provide_context=False
    )
 

воздушный поток.cfg

 dag_concurrency = 16
worker_concurrency = 16
parallelism = 32
max_active_runs_per_dag = 16
 

dag работает каждую минуту и начинает с прошлого. затем я ожидал, что он начнется с start_date и будет создан максимум 5 dag сразу, а 5 dag будут продолжаться.

но максимальное количество активных экземпляров dag для запуска равно 2, и оно не увеличивается.

введите описание изображения здесь

введите описание изображения здесь

я думаю, что уже протестирован вариант параллелизма / параллелизма. но не смог решить эту проблему.

что я должен проверить?

Комментарии:

1. какого исполнителя вы используете?

2. @SaverioGuzzo я использую CeleryExecutor

3. у вас такая же проблема с использованием LocalExecutor?

4. я никогда не пробовал. должен ли я ?