#python #airflow #google-cloud-composer
#python #воздушный поток #google-cloud-composer
Вопрос:
У меня есть скрипт на python, который динамически создает task (оператор воздушного потока) и DAG на основе файла JSON, который отображает все желаемые параметры. Скрипт также выделил функцию для создания любого необходимого оператора. Иногда я хочу активировать некоторые условные параметры на основе сопоставления… например, в bigqueryOperator иногда мне нужны time_partitioning и destination_table, но я не хочу устанавливать для каждой отображенной задачи.
Я пытался прочитать документацию о BaseOperator, но я не вижу никакого java-подобного метода set.
Функция, возвращающая оператор, например, BigQuery
def bqOperator(mappedTask):
try:
return BigQueryOperator(
task_id=mappedTask.get('task_id'),
sql=mappedTask.get('sql'),
##destination_dataset_table=project '.' dataset '.' mappedTask.get('target'),
write_disposition=mappedTask.get('write_disposition'),
allow_large_results=mappedTask.get('allow_large_results'),
##time_partitioning=mappedTask.get('time_partitioning'),
use_legacy_sql=mappedTask.get('use_legacy_sql'),
dag=dag,
)
except Exception as e:
error = 'Error creating BigQueryOperator for task : ' mappedTask.get('task_id')
logger.error(error)
raise Exception(error)
mappedTask внутри файла json без разделения
{
"task_id": "TEST_TASK_ID",
"sql": "some fancy query",
"type": "bqOperator",
"dependencies": [],
"write_disposition": "WRITE_APPEND",
"allow_large_results": true,
"createDisposition": "CREATE_IF_NEEDED",
"use_legacy_sql": false
},
mappedTask внутри файла json с разделением
{
"task_id": "TEST_TASK_ID_PARTITION",
"sql": "some fancy query",
"type": "bqOperator",
"dependencies": [],
"write_disposition": "WRITE_APPEND",
"allow_large_results": true,
"createDisposition": "CREATE_IF_NEEDED",
"use_legacy_sql": false,
"targetTable": "TARGET_TABLE",
"time_partitioning": {
"field": "DATE_TO_PART",
"type": "DAY"
}
},
Ответ №1:
Измените bqOperator
, как показано ниже, чтобы обработать этот случай, в основном он не передаст None, когда не найдет это поле в вашем json:
def bqOperator(mappedTask):
try:
return BigQueryOperator(
task_id=mappedTask.get('task_id'),
sql=mappedTask.get('sql'),
destination_dataset_table="{}.{}.{}".format(project, dataset, mappedTask.get('target')) if mappedTask.get('target', None) else None,
write_disposition=mappedTask.get('write_disposition'),
allow_large_results=mappedTask.get('allow_large_results'),
time_partitioning=mappedTask.get('time_partitioning', None),
use_legacy_sql=mappedTask.get('use_legacy_sql'),
dag=dag,
)
except Exception as e:
error = 'Error creating BigQueryOperator for task : ' mappedTask.get('task_id')
logger.error(error)
raise Exception(error)
Комментарии:
1. Спасибо за ответ, он дал мне некоторый синтаксис pythonic, которого я до сих пор не получал, и, вероятно, решил проблему, я попробую это!
Ответ №2:
В python нет закрытых методов или полей, поэтому вы можете напрямую устанавливать и получать поля типа
op.use_legacy_sql = True
Учитывая, что я настоятельно не рекомендую этого делать, поскольку это пахнет настоящим кодом. Вместо этого вы могли бы изменить свой заводской класс, чтобы применить некоторые значения по умолчанию к вашим данным json.
Или, что еще лучше, примените значения по умолчанию к самому json. Чем сохранять и использовать обновленный json. Это сделает вещи более предсказуемыми.
Комментарии:
1. Спасибо за ответ, он дал мне синтаксис для рассмотрения, но я думаю, что последую ответу kaxil, потому что я не хочу, чтобы json сопоставления был слишком сложным для редактирования. Конечная цель — предоставить людям, не являющимся разработчиками, простой инструмент для создания новой задачи в этом потоке composer. Спасибо.