psycopg2.pool вылетает, когда заканчивается пул потоков

#python #python-3.x #psycopg2 #psql

#python #python-3.x #psycopg2 #psql

Вопрос:

У меня есть N потоки и M psycopg2.pool.ThreadedConnectionPools где N>M . Я хочу, чтобы мои потоки запрашивали соединение psycopg2 и ждали его ответа, вместо этого они выходят из строя, когда нет доступных подключений. Должно быть, я делаю что-то не так, иначе это противоречит цели пула… Пожалуйста, посоветуйте, как это исправить.

MWE:

 import threading
import psycopg2
from psycopg2 import pool

conn_pool = psycopg2.pool.ThreadedConnectionPool (
    1,
    10,
    host='127.0.0.1',
    user='john',
    password='1234',
    dbname='test',
    port=1234)

class Foo (threading.Thread):
    def __init__ (self):
        threading.Thread.__init__(self)

    def run (self):
        global conn_pool

        conn = conn_pool.getconn()
        cur = conn.cursor()

        sql_query="SELECT id from test_table;"

        print(cur.execute (sql_query))

        cur.close ()
        conn_pool.putconn(conn)

num_threads = 20
threads = []

for i in range (num_threads):
    threads.append (Foo())

for i in range (num_threads):
    threads[i].start()

for i in range (num_threads):
    threads[i].join()

conn_pool.closeall ()
  

Ошибка:

 Exception in thread Thread-13:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "test.py", line 22, in run
    conn = conn_pool.getconn()
  File "/usr/local/lib/python3.6/dist-packages/psycopg2/pool.py", line 169, in getconn
    return self._getconn(key)
  File "/usr/local/lib/python3.6/dist-packages/psycopg2/pool.py", line 92, in _getconn
    raise PoolError("connection pool exhausted")
psycopg2.pool.PoolError: connection pool exhausted
  

Ответ №1:

Вам необходимо самостоятельно обработать логику исключения / повторного подключения:

 class Foo (threading.Thread):
    def __init__ (self):
        threading.Thread.__init__(self)

    def run (self):
        conn = self._getConnection()
        cur = conn.cursor()

        sql_query="SELECT id from accounts where id > 10000 and id < 10002;"

        cur.execute (sql_query)
        print(cur.rowcount)

        cur.close ()
        conn_pool.putconn(conn)

    def _getConnection(self):
        global conn_pool
        while 1:
            conn = None
            try:
                conn = conn_pool.getconn()
            except psycopg2.pool.PoolError:
                time.sleep(2)
                print("waiting ...")
            if conn:
                return conn


num_threads = 20
threads = []

for i in range (num_threads):
    threads.append (Foo())

for i in range (num_threads):
    threads[i].start()

for i in range (num_threads):
    threads[i].join()

conn_pool.closeall ()
  

Out:

 1
1
1
1
1
1
1
1
1
1
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
1
1
1
1
1
1
1
1
1
1
  

Комментарии:

1. Похоже, что это было бы очень распространенной функцией, которую psycopg2 должен был бы предоставить