psycopg2.OperationalError: сервер неожиданно закрыл соединение (поток в AWS, разрыв соединения с обеих сторон)

#python #postgresql #amazon-web-services #airflow #psycopg2

#python #postgresql #amazon-веб-сервисы #поток воздуха #psycopg2

Вопрос:

У нас есть экземпляр Airflow, работающий в AWS Fargate. Он подключается к локальному серверу Postgres (в Windows) и пытается загрузить данные из (сложного) представления. PostgresHook Для этого используется a . Однако задача в DAG завершается сбоем в потоке данных с этой ошибкой:

   File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py", line 120, in get_records
    cur.execute(sql)
psycopg2.OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 

Некоторое время назад ошибка возникла примерно через 10-15 минут. Теперь это происходит быстрее, через 5 минут или даже быстрее.

Я просмотрел журналы Postgres, которые показывают (смутно), что именно клиент закрыл соединение:

 LOG:  could not send data to client: An existing connection was forcibly closed by the remote host.
FATAL:  connection to client lost
 

Я уже перепробовал кучу потенциальных решений.

Без подключения Airflow к серверу за пределами Airflow, используя psycopg2 напрямую: работает (с использованием сложного представления).

Другая таблица Попытка загрузить данные из другой таблицы из Airflow в облаке: работает, тоже быстро завершается. Таким образом, этот «тайм-аут» возникает только потому, что запрос занимает некоторое время.

Сначала я мог воспроизвести эту проблему локально, но я (думаю, что я) решил ее, добавив некоторые дополнительные параметры в строку подключения postgres : keepalives=1amp;keepalives_idle=60amp;keepalives_interval=60 . Однако я не могу воспроизвести это исправление в воздушном потоке в облаке, потому что, когда я добавляю туда эти параметры, ошибка остается.

Увеличение времени ожидания См. Выше, я добавил keepalives, но я также попытался рассуждать о других потенциальных таймаутах. Я добавил тайм-аут execution_timeout к аргументам DAG, но безрезультатно. Мы также проверили сетевые тайм-ауты, но, учитывая нерегулярный характер сбоев соединения, на самом деле это не похоже на такой жесткий тайм-аут…

Я здесь в недоумении. Есть предложения?

Ответ №1:

Обновление: мы решили эту проблему с помощью обходного пути. Вместо того, чтобы держать соединение открытым во время запроса сложного представления, мы превратили соединение в асинхронное соединение (т. Е. aconn = psycopg2.connect(database='test', async=1) Из документов psycopg). Кроме того, мы превратили представление в материализованное представление, так что мы вызываем только REFRESH MATERIALIZED VIEW через асинхронное соединение, а затем мы можем просто SELECT * использовать материализованное представление некоторое время спустя, что происходит очень быстро.