#arguments #airflow #directed-acyclic-graphs
Вопрос:
В настоящее время я использую следующий оператор BigQuery в airflow:
s10_test = bigquery.BigQueryInsertJobOperator(
task_id="10_test",
configuration={
"query": {
"query": "SELECT CURRENT_DATE",
"useLegacySql": False,
}
},
)
Я хотел бы избежать указания useLegacySql в каждой задаче и просто передать этот параметр в качестве аргумента по умолчанию. Требуется ли какой-либо специальный синтаксис для вложенных аргументов?
Ответ №1:
Вы пробовали передать запрос без этого параметра?
Если он выдает ошибку без этого параметра, то у вас может не быть большого выбора. Если это не так, то вы можете попробовать и посмотреть, какой SQL (стандартный/устаревший) он работает по умолчанию?
Если вы не хотите передавать этот параметр, вы всегда можете обернуть эту функцию внутри небольшой служебной функции, которую вы можете использовать где угодно. Нравится:
from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs
def BigQueryInsertJobOperator(params={}):
task_id = params.get("task_id")
query = params.get("query")
response = None
if (query and task_id):
response = bigquery.BigQueryInsertJobOperator(
task_id=task_id,
configuration={"query": {"query": query, "useLegacySql": False}},
)
return response
и называйте это везде в своих сценариях, например:
from utilities import BigQueryInsertJobOperator
params = {"query": "SELECT CURRENT_DATE", "task_id": "test_1"}
response = BigQueryInsertJobOperator(params=params)