#airflow #postgres-operator
Вопрос:
~$ версия воздушного потока 2.1.2 python 3.8
Я пытаюсь выполнить некоторые основные запросы в своем кластере redshift с помощью dag, но задача не выполняется с исключением(не показано в журналах).
import datetime
import logging
from airflow import DAG
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import sql_statements
def load_data_to_redshift(*args, **kwargs):
aws_hook = AwsHook("aws_credentials")
credentials = aws_hook.get_credentials()
redshift_hook = PostgresHook("redshift")
sql_stmt = sql_statements.COPY_ALL_data_SQL.format(
credentials.access_key,
credentials.secret_key,
)
redshift_hook.run(sql_stmt)
dag = DAG(
'exercise1',
start_date=datetime.datetime.now()
)
create_t1_table = PostgresOperator(
task_id="create_t1_table",
dag=dag,
postgres_conn_id="redshift_default",
sql=sql_statements.CREATE_t1_TABLE_SQL
)
create_t2_table = PostgresOperator(
task_id="create_t2_table",
dag=dag,
postgres_conn_id="redshift_default",
sql=sql_statements.CREATE_t2_TABLE_SQL,
)
create_t1_table >> create_t2_table
следующее является исключением
[2021-09-17 05:23:33,902] {base.py:69} INFO - Using connection to: id: redshift_default. Host: rdscluster.123455.us-west-2.redshift.amazonaws.com, Port: 5439, Schema: udac, Login: ***, Password: ***, extra: {}
[2021-09-17 05:23:33,903] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/providers/postgres/operators/postgres.py", line 70, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 177, in run
with closing(self.get_conn()) as conn:
File "/home/8085/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 115, in get_conn
self.conn = psycopg2.connect(**conn_args)
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
conn = psycopg2.connect("dbname=airflow user=abc password=ubantu host=127.0.0.1 port=5432")
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2abc/__init__.py", line 124, in connect
conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")
[Previous line repeated 974 more times]
RecursionError: maximum recursion depth exceeded
[2021-09-17 05:23:33,907] {taskinstance.py:1544} INFO - Marking task as FAILED. dag_id=exercise1, task_id=create_t1_table, execution_date=20210917T092331, start_date=20210917T092333, end_date=20210917T092333
[2021-09-17 05:23:33,953] {local_task_job.py:149} INFO - Task exited with return code 1
Я не могу сказать из журналов, что здесь происходит не так, похоже, что даже после предоставления идентификатора подключения redshift оператор Postgres использует соединение Postgres по умолчанию, настроенное при установке веб-сервера Airflow, но я могу ошибаться.
Есть идеи, как мне решить эту проблему или получить больше выхода из воздушного потока? (обратите внимание, что я уже пробовал использовать разные уровни журнала воздушного потока из конфигурации воздушного потока, но это тоже не помогло)
redshift — соединение определено правильно, и я могу подключиться к redshift с помощью другой автономной утилиты python, а также plsql, поэтому с кластером Redshift проблем нет.
-Спасибо,
Комментарии:
1. можете ли вы предоставить полную информацию, пожалуйста.
2. @floating_hammer: Добавлен полный dag в вопрос. Спасибо.
Ответ №1:
Решено: Каким-то образом следующий файл ссылался на базу данных airflow postgres, созданную во время установки Airflow, а не на подключение к локальному postgres.
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
**conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")**
Пришлось воссоздать базу данных воздушного потока с нуля, чтобы решить проблему.