вложенные аргументы dag по умолчанию в потоке воздуха

#arguments #airflow #directed-acyclic-graphs

Вопрос:

В настоящее время я использую следующий оператор BigQuery в airflow:

   s10_test = bigquery.BigQueryInsertJobOperator(
            task_id="10_test",
            configuration={
                    "query": {
                                "query": "SELECT CURRENT_DATE",
                                "useLegacySql": False,
                    }
            },
               )
 

Я хотел бы избежать указания useLegacySql в каждой задаче и просто передать этот параметр в качестве аргумента по умолчанию. Требуется ли какой-либо специальный синтаксис для вложенных аргументов?

Ответ №1:

Вы пробовали передать запрос без этого параметра?

Если он выдает ошибку без этого параметра, то у вас может не быть большого выбора. Если это не так, то вы можете попробовать и посмотреть, какой SQL (стандартный/устаревший) он работает по умолчанию?

Если вы не хотите передавать этот параметр, вы всегда можете обернуть эту функцию внутри небольшой служебной функции, которую вы можете использовать где угодно. Нравится:

 from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

def BigQueryInsertJobOperator(params={}):
   task_id = params.get("task_id")
   query = params.get("query")
   response = None
   if (query and task_id):
      response = bigquery.BigQueryInsertJobOperator(
         task_id=task_id,
         configuration={"query": {"query": query, "useLegacySql": False}},
      )
   return response
 

и называйте это везде в своих сценариях, например:

 from utilities import BigQueryInsertJobOperator

params = {"query": "SELECT CURRENT_DATE", "task_id": "test_1"}
response = BigQueryInsertJobOperator(params=params)