#airflow-scheduler #airflow
Вопрос:
мой код dag
dag = DAG(
"ETL_s21_{}_impression_product_test".format("every_minute"),
default_args=default_args,
schedule_interval="* * * * *",
max_active_runs=10,
concurrency=5
)
def export_impression_log(table, item):
export_impression_log_task = PythonOperator(
task_id=f"{table}_{item}_export_impression_log",
python_callable=S3.export_impression_log_from_ad_s3,
op_kwargs={
"execution_date": '{{execution_date.in_timezone("Asia/Seoul").strftime("%Y-%m-%d-%H-%M")}}',
'table': table,
'item': item
},
dag=dag,
queue='s21',
task_concurrency=5,
provide_context=False
)
return export_impression_log_task
def add_partition(table, item):
add_partition_task = PythonOperator(
task_id=f"{table}_{item}_add_partition",
python_callable=Athena.add_partition,
op_kwargs={
"glue_table_name": 'amplitude_impression_test',
"execution_date": '{{execution_date.in_timezone("UTC").strftime("%Y-%m-%d-%H-%M")}}'
},
dag=dag,
queue='s21',
task_concurrency=5,
provide_context=False
)
воздушный поток.cfg
dag_concurrency = 16
worker_concurrency = 16
parallelism = 32
max_active_runs_per_dag = 16
dag работает каждую минуту и начинает с прошлого. затем я ожидал, что он начнется с start_date и будет создан максимум 5 dag сразу, а 5 dag будут продолжаться.
но максимальное количество активных экземпляров dag для запуска равно 2, и оно не увеличивается.
я думаю, что уже протестирован вариант параллелизма / параллелизма. но не смог решить эту проблему.
что я должен проверить?
Комментарии:
1. какого исполнителя вы используете?
2. @SaverioGuzzo я использую CeleryExecutor
3. у вас такая же проблема с использованием LocalExecutor?
4. я никогда не пробовал. должен ли я ?