Как получить результаты sql-запроса в airflow с помощью оператора JDBC

#jdbc #airflow

#jdbc #воздушный поток

Вопрос:

Я настроил соединение JDBC в соединениях Airflow. Моя часть задачи DAG выглядит так, как показано ниже, которая содержит оператор select. При запуске DAG выполняется успешно, но мои результаты запроса не печатаются в журнале. Как получить результаты запроса с помощью оператора JDBC.

dag = DAG(dag_id=’test_azure_sqldw_v1′, default_args=default_args,schedule_interval=None,dagrun_timeout=timedelta(секунды = 120),)

sql=»выберите count(*) из tablename»

azure_sqldw=JdbcOpetask_id=’azure_sqldw’, sql= sql,jdbc_conn_id=»cdf_sqldw», autocommit = True, dag = dag)

Комментарии:

1. почему вы ожидаете, что результат будет напечатан в журналах?

Ответ №1:

Оператор не печатает в журнале. Он просто запускает запрос. Если вы хотите получить результаты, чтобы что-то с ними сделать, вам нужно использовать хук.

 from airflow.providers.jdbc.hooks.jdbc import JdbcHook

def func(jdbc_conn_id, sql, **kwargs):
    """Print df from JDBC """
    pprint(kwargs)
    hook = JdbcHook(jdbc_conn_id=jdbc_conn_id)
    df = hook.get_pandas_df(sql=sql,autocommit=True)
    print(df.to_string())


run_this = PythonOperator(
    task_id='task',
    python_callable=func,
    op_kwargs={'jdbc_conn_id': 'cdf_sqldw', 'sql': 'select count(*) from tablename' }, 
    dag=dag,
)
 

Вы также можете создать пользовательский оператор, который выполняет требуемое действие, которое вы ищете.