airflow — создание dag и задачи динамически создают конвейер для одного объекта

#python #airflow

#python #воздушный поток

Вопрос:

В airflow я хочу экспортировать некоторые таблицы из pg в BQ.

 task1: get the max id from BQ
task2: export the data from PG (id>maxid)
task3: GCS to BQ stage
task4: BQ stage to BQ main
  

Но есть небольшая проблема, интервал расписания отличается. Итак, я создал файл JSON, чтобы указать интервал синхронизации. Поэтому, если это 2 минуты, тогда он будет использовать DAG upsert_2mins else 10mins interval ( upsert_10mins ) . Я использовал этот синтаксис для его динамической генерации.

Конфигурационный файл JSON:

 {
    "tbl1": ["update_timestamp", "2mins", "stg"],
    "tbl2": ["update_timestamp", "2mins", "stg"]
}
  

Код:

 import json
from airflow import DAG
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from customoperator.custom_PostgresToGCSOperator import  custom_PostgresToGCSOperator
from airflow.contrib.operators.gcs_to_bq import custom_PostgresToGoogleCloudStorageOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator


table_list = ['tbl1','tbl2']

#DAG details
docs = """test"""
# Add args and Dag
default_args = {
    'owner': 'DBteam',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
    }

 

with open('/home/airflow/gcs/dags/upsert_dag/config.json','r') as conf:
    config = json.loads(conf.read())

def get_max_ts(dag,tablename,**kwargs):
    code for find the max record
    return records[0][0]

def pgexport(dag,tablename, **kwargs):
    code for exporting the data PGtoGCS
    export_tables.execute(None)


def stg_bqimport(dag,tablename, **kwargs):
    code to import GCS to BQ
    bqload.execute(None)

def prd_merge(dag,tablename, **kwargs):
    code to merge bq to main bq table
    bqmerge.execute(context=kwargs)

for table_name in table_list:
    
    sync_interval = config[table_name][1]
    cron_time = ''
    if sync_interval == '2mins':
        cron_time = '*/20 * * * *'
    else:
        cron_time = '*/10 * * * *'
    
    dag = DAG(
    'upsert_every_{}'.format(sync_interval),
    default_args=default_args,
    description='Incremental load - Every 10mins',
    schedule_interval=cron_time,
    catchup=False,
    max_active_runs=1,
    doc_md = docs
    )
    
    max_ts = PythonOperator(
        task_id="get_maxts_{}".format(table_name),
        python_callable=get_max_ts,
        op_kwargs={'tablename':table_name, 'dag': dag},
        provide_context=True,
        dag=dag
    )
   
    export_gcs = PythonOperator(
    task_id='export_gcs_{}'.format(table_name),
    python_callable=pgexport,
    op_kwargs={'tablename':table_name, 'dag': dag},
    provide_context=True,
    dag=dag
    )

    stg_load = PythonOperator(
    task_id='stg_load_{}'.format(table_name),
    python_callable=stg_bqimport,
    op_kwargs={'tablename':table_name, 'dag': dag},
    provide_context=True,
    dag=dag
    )    
    merge = PythonOperator(
    task_id='merge_{}'.format(table_name),
    python_callable=prd_merge,
    op_kwargs={'tablename':table_name, 'dag': dag},
    provide_context=True,
    dag=dag
    )
    
    globals()[sync_interval] = dag
    max_ts >> export_gcs >> stg_load >> merge
  

На самом деле он создал базу данных, но проблема в том, что из веб-интерфейса я могу видеть задачу для последней таблицы.Но он должен показывать задачи для 2 таблиц.
введите описание изображения здесь

Ответ №1:

Ваш код создает 2 базы данных, по одной для каждой таблицы, но перезаписывает первую со второй.

Я предлагаю изменить формат файла JSON на:

 {
    "2mins": [
                "tbl1": ["update_timestamp", "stg"],
                "tbl2": ["update_timestamp", "stg"]
             ],
    "10mins": [
                "tbl3": ["update_timestamp", "stg"],
                "tbl4": ["update_timestamp", "stg"]
             ]
}
  

И пусть ваш код повторит расписания и создаст необходимые задачи для каждой таблицы (вам понадобится два цикла):

 # looping on the schedules to create two dags
for schedule, tables in config.items():

cron_time = '*/10 * * * *'

if schedule== '2mins':
    cron_time = '*/20 * * * *'

dag_id = 'upsert_every_{}'.format(schedule)

dag = DAG(
    dag_id ,
    default_args=default_args,
    description='Incremental load - Every 10mins',
    schedule_interval=cron_time,
    catchup=False,
    max_active_runs=1,
    doc_md = docs
)

# Looping over the tables to create the tasks for 
# each table in the current schedule
for table_name, table_config in tables.items():
    max_ts = PythonOperator(
        task_id="get_maxts_{}".format(table_name),
        python_callable=get_max_ts,
        op_kwargs={'tablename':table_name, 'dag': dag},
        provide_context=True,
        dag=dag
    )

    export_gcs = PythonOperator(
        task_id='export_gcs_{}'.format(table_name),
        python_callable=pgexport,
        op_kwargs={'tablename':table_name, 'dag': dag},
        provide_context=True,
        dag=dag
    )

    stg_load = PythonOperator(
        task_id='stg_load_{}'.format(table_name),
        python_callable=stg_bqimport,
        op_kwargs={'tablename':table_name, 'dag': dag},
        provide_context=True,
        dag=dag
    )    

    merge = PythonOperator(
        task_id='merge_{}'.format(table_name),
        python_callable=prd_merge,
        op_kwargs={'tablename':table_name, 'dag': dag},
        provide_context=True,
        dag=dag
    )
    
    # Tasks for the same table will be chained
    max_ts >> export_gcs >> stg_load >> merge

# DAG is created among the global objects
globals()[dag_id] = dag
  

Комментарии:

1. Спасибо, позвольте мне попробовать этот метод

2. Он не выбирает базу данных за 2 минуты. Я внес эти изменения. Синтаксическая ошибка Json, поэтому — pastebin.com/ynHtMmmX и cron_time = '*/10 * * * *' отступ (одна вкладка), и name 'dag_id' is not defined поэтому я использовал dag_id=(instead of ==)

3. Да, извините, я написал код в редакторе StackOverflow, могут быть и другие проблемы, подобные этой. Это было просто для того, чтобы дать вам представление.