Есть ли какой-либо способ отредактировать оператор воздушного потока после создания?

#python #airflow #google-cloud-composer

#python #воздушный поток #google-cloud-composer

Вопрос:

У меня есть скрипт на python, который динамически создает task (оператор воздушного потока) и DAG на основе файла JSON, который отображает все желаемые параметры. Скрипт также выделил функцию для создания любого необходимого оператора. Иногда я хочу активировать некоторые условные параметры на основе сопоставления… например, в bigqueryOperator иногда мне нужны time_partitioning и destination_table, но я не хочу устанавливать для каждой отображенной задачи.

Я пытался прочитать документацию о BaseOperator, но я не вижу никакого java-подобного метода set.

Функция, возвращающая оператор, например, BigQuery

 def bqOperator(mappedTask):
    try:
        return BigQueryOperator(
        task_id=mappedTask.get('task_id'),
        sql=mappedTask.get('sql'),  
##destination_dataset_table=project '.' dataset '.' mappedTask.get('target'),
        write_disposition=mappedTask.get('write_disposition'),
        allow_large_results=mappedTask.get('allow_large_results'),
        ##time_partitioning=mappedTask.get('time_partitioning'),
        use_legacy_sql=mappedTask.get('use_legacy_sql'),
        dag=dag,
        )
    except Exception as e:
        error = 'Error creating BigQueryOperator for task : '   mappedTask.get('task_id')
        logger.error(error)
        raise Exception(error) 
  

mappedTask внутри файла json без разделения

         {
            "task_id": "TEST_TASK_ID",
            "sql": "some fancy query",
            "type": "bqOperator",
            "dependencies": [],
            "write_disposition": "WRITE_APPEND",
            "allow_large_results": true,
            "createDisposition": "CREATE_IF_NEEDED",
            "use_legacy_sql": false
        },
  

mappedTask внутри файла json с разделением

         {
            "task_id": "TEST_TASK_ID_PARTITION",
            "sql": "some fancy query",
            "type": "bqOperator",
            "dependencies": [],
            "write_disposition": "WRITE_APPEND",
            "allow_large_results": true,
            "createDisposition": "CREATE_IF_NEEDED",
            "use_legacy_sql": false,
                        "targetTable": "TARGET_TABLE",
            "time_partitioning": {
                "field": "DATE_TO_PART",
                "type": "DAY"
            }
        },
  

Ответ №1:

Измените bqOperator , как показано ниже, чтобы обработать этот случай, в основном он не передаст None, когда не найдет это поле в вашем json:

 def bqOperator(mappedTask):
    try:
        return BigQueryOperator(
        task_id=mappedTask.get('task_id'),
        sql=mappedTask.get('sql'),  
        destination_dataset_table="{}.{}.{}".format(project, dataset, mappedTask.get('target')) if mappedTask.get('target', None)  else None,
        write_disposition=mappedTask.get('write_disposition'),
        allow_large_results=mappedTask.get('allow_large_results'),
        time_partitioning=mappedTask.get('time_partitioning', None),
        use_legacy_sql=mappedTask.get('use_legacy_sql'),
        dag=dag,
        )
    except Exception as e:
        error = 'Error creating BigQueryOperator for task : '   mappedTask.get('task_id')
        logger.error(error)
        raise Exception(error) 
  

Комментарии:

1. Спасибо за ответ, он дал мне некоторый синтаксис pythonic, которого я до сих пор не получал, и, вероятно, решил проблему, я попробую это!

Ответ №2:

В python нет закрытых методов или полей, поэтому вы можете напрямую устанавливать и получать поля типа

 op.use_legacy_sql = True
  

Учитывая, что я настоятельно не рекомендую этого делать, поскольку это пахнет настоящим кодом. Вместо этого вы могли бы изменить свой заводской класс, чтобы применить некоторые значения по умолчанию к вашим данным json.
Или, что еще лучше, примените значения по умолчанию к самому json. Чем сохранять и использовать обновленный json. Это сделает вещи более предсказуемыми.

Комментарии:

1. Спасибо за ответ, он дал мне синтаксис для рассмотрения, но я думаю, что последую ответу kaxil, потому что я не хочу, чтобы json сопоставления был слишком сложным для редактирования. Конечная цель — предоставить людям, не являющимся разработчиками, простой инструмент для создания новой задачи в этом потоке composer. Спасибо.