Воздушный поток PostgresOperator :Задача не выполнена с исключением при использовании postgres_conn_id=»красное смещение»

#airflow #postgres-operator

Вопрос:

~$ версия воздушного потока 2.1.2 python 3.8

Я пытаюсь выполнить некоторые основные запросы в своем кластере redshift с помощью dag, но задача не выполняется с исключением(не показано в журналах).

 import datetime
import logging

from airflow import DAG
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator

import sql_statements

  def load_data_to_redshift(*args, **kwargs):
    aws_hook = AwsHook("aws_credentials")
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook("redshift")
    sql_stmt = sql_statements.COPY_ALL_data_SQL.format(
        credentials.access_key,
        credentials.secret_key,
    )
    redshift_hook.run(sql_stmt)

    dag = DAG(
    'exercise1',
    start_date=datetime.datetime.now()
    )

create_t1_table = PostgresOperator(
    task_id="create_t1_table",
    dag=dag,
    postgres_conn_id="redshift_default",
    sql=sql_statements.CREATE_t1_TABLE_SQL
    )
    
create_t2_table = PostgresOperator(
    task_id="create_t2_table",
    dag=dag,
    postgres_conn_id="redshift_default",
    sql=sql_statements.CREATE_t2_TABLE_SQL,
)

create_t1_table >> create_t2_table
 

следующее является исключением

 [2021-09-17 05:23:33,902] {base.py:69} INFO - Using connection to: id: redshift_default. Host: rdscluster.123455.us-west-2.redshift.amazonaws.com, Port: 5439, Schema: udac, Login: ***, Password: ***, extra: {}
[2021-09-17 05:23:33,903] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
   self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/providers/postgres/operators/postgres.py", line 70, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 177, in run
    with closing(self.get_conn()) as conn:
File "/home/8085/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 115, in get_conn
    self.conn = psycopg2.connect(**conn_args)
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
    conn = psycopg2.connect("dbname=airflow user=abc password=ubantu host=127.0.0.1 port=5432")
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2abc/__init__.py", line 124, in connect
    conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
    conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")
  [Previous line repeated 974 more times]
RecursionError: maximum recursion depth exceeded
[2021-09-17 05:23:33,907] {taskinstance.py:1544} INFO - Marking task as FAILED. dag_id=exercise1, task_id=create_t1_table, execution_date=20210917T092331, start_date=20210917T092333, end_date=20210917T092333
[2021-09-17 05:23:33,953] {local_task_job.py:149} INFO - Task exited with return code 1
 

Я не могу сказать из журналов, что здесь происходит не так, похоже, что даже после предоставления идентификатора подключения redshift оператор Postgres использует соединение Postgres по умолчанию, настроенное при установке веб-сервера Airflow, но я могу ошибаться.
Есть идеи, как мне решить эту проблему или получить больше выхода из воздушного потока? (обратите внимание, что я уже пробовал использовать разные уровни журнала воздушного потока из конфигурации воздушного потока, но это тоже не помогло)

redshift — соединение определено правильно, и я могу подключиться к redshift с помощью другой автономной утилиты python, а также plsql, поэтому с кластером Redshift проблем нет.

-Спасибо,

Комментарии:

1. можете ли вы предоставить полную информацию, пожалуйста.

2. @floating_hammer: Добавлен полный dag в вопрос. Спасибо.

Ответ №1:

Решено: Каким-то образом следующий файл ссылался на базу данных airflow postgres, созданную во время установки Airflow, а не на подключение к локальному postgres.

 File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
    **conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")**
 

Пришлось воссоздать базу данных воздушного потока с нуля, чтобы решить проблему.