Сбой задания DataflowTemplateOperator в Cloud Composer после обновления до Airflow 2

#google-cloud-composer

#google-cloud-composer

Вопрос:

Мы запускаем несколько заданий DataflowTemplateOperator (от JDBC до шаблона BigQuery) в нашей текущей среде composer 1.16.0 с помощью airflow 1.10.15. Однако при попытке запустить тот же DAG в composer 1.17.6 airflow 2.1.4 мы получаем следующую ошибку:

 [2021-12-07 03:08:56,478] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataflow.py", line 682, in execute
    job = self.hook.start_template_dataflow(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 99, in inner_wrapper
    raise AirflowException(
airflow.exceptions.AirflowException: The mutually exclusive parameter `location` and `region` key in `variables` parameter are both present. Please remove one.
 

Мы устанавливаем параметр region в dataflow_default_options, и они отображаются в airflow 1 следующим образом:

 {'gcpTempLocation': 'gs://us-east1-xxxx/tmp/',
 'machineType': 'n2-standard-2',
 'project': 'xxxx',
 'region': 'us-east1',
 'runner': 'DataflowRunner'}
 

Но похоже, что параметр region больше не может быть установлен с помощью dataflow_default_options в разделе airflow 2. Попытка установить «местоположение» вместо «регион» не имеет никакого эффекта, и задание по умолчанию используется us-central1.

Обе среды используют один и тот же шаблон, и это было проверено в заданиях потока данных.

Причина, по которой мы устанавливаем регион, заключается в том, что мы запускаем несколько задач потока данных, и если мы их не устанавливаем, то квоты процессора попадают. У нас была увеличена квота процессора us-east1.

Приветствуются любые указания.

Спасибо.

Комментарии:

1. Вы проверили эту документацию . Если да, то при попытке запустить оператор DataflowTemplatedJobStartOperator , заполняющий значение местоположения, вы получили ошибку?

2. Мне удалось успешно выполнить, используя параметр location, но я заметил, что документация, на которую вы указываете, относится к airflow.providers.google.cloud.operators.dataflow. DataflowTemplatedJobStartOperator , пока мы используем airflow.contrib.operators.dataflow_operator. DataflowTemplateOperator Как мы выбираем одно против другого?

3. Это указывает на основной раздел документации об операторах облачных потоков данных Google. Я предполагаю, что вы можете использовать DataflowTemplatedJobStartOperator версию 2, которая доступна. Нет никаких DataflowTemplateOperator документов или где вы можете запустить его в версии 2? (возможно, но я думаю, вам следует выбирать последние операторы, чтобы избежать конфликтов в будущем)

Ответ №1:

Приятно знать, что вы смогли решить свою проблему. Я оставляю этот ответ для ознакомления сообщества с текущими версиями DataflowTemplateOperator . Не стесняйтесь обновлять ответ, если сочтете нужным.

Кроме того, здесь вы можете найти официальный пример использования DataflowTemplateOperator для обеих версий.