#python #postgresql #amazon-web-services #airflow #psycopg2
#python #postgresql #amazon-веб-сервисы #поток воздуха #psycopg2
Вопрос:
У нас есть экземпляр Airflow, работающий в AWS Fargate. Он подключается к локальному серверу Postgres (в Windows) и пытается загрузить данные из (сложного) представления. PostgresHook
Для этого используется a . Однако задача в DAG завершается сбоем в потоке данных с этой ошибкой:
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py", line 120, in get_records
cur.execute(sql)
psycopg2.OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
Некоторое время назад ошибка возникла примерно через 10-15 минут. Теперь это происходит быстрее, через 5 минут или даже быстрее.
Я просмотрел журналы Postgres, которые показывают (смутно), что именно клиент закрыл соединение:
LOG: could not send data to client: An existing connection was forcibly closed by the remote host.
FATAL: connection to client lost
Я уже перепробовал кучу потенциальных решений.
Без подключения Airflow к серверу за пределами Airflow, используя psycopg2
напрямую: работает (с использованием сложного представления).
Другая таблица Попытка загрузить данные из другой таблицы из Airflow в облаке: работает, тоже быстро завершается. Таким образом, этот «тайм-аут» возникает только потому, что запрос занимает некоторое время.
Сначала я мог воспроизвести эту проблему локально, но я (думаю, что я) решил ее, добавив некоторые дополнительные параметры в строку подключения postgres : keepalives=1amp;keepalives_idle=60amp;keepalives_interval=60
. Однако я не могу воспроизвести это исправление в воздушном потоке в облаке, потому что, когда я добавляю туда эти параметры, ошибка остается.
Увеличение времени ожидания См. Выше, я добавил keepalives, но я также попытался рассуждать о других потенциальных таймаутах. Я добавил тайм-аут execution_timeout
к аргументам DAG, но безрезультатно. Мы также проверили сетевые тайм-ауты, но, учитывая нерегулярный характер сбоев соединения, на самом деле это не похоже на такой жесткий тайм-аут…
Я здесь в недоумении. Есть предложения?
Ответ №1:
Обновление: мы решили эту проблему с помощью обходного пути. Вместо того, чтобы держать соединение открытым во время запроса сложного представления, мы превратили соединение в асинхронное соединение (т. Е. aconn = psycopg2.connect(database='test', async=1)
Из документов psycopg). Кроме того, мы превратили представление в материализованное представление, так что мы вызываем только REFRESH MATERIALIZED VIEW
через асинхронное соединение, а затем мы можем просто SELECT *
использовать материализованное представление некоторое время спустя, что происходит очень быстро.