#python #python-3.x #psycopg2 #psql
#python #python-3.x #psycopg2 #psql
Вопрос:
У меня есть N
потоки и M
psycopg2.pool.ThreadedConnectionPools где N>M
. Я хочу, чтобы мои потоки запрашивали соединение psycopg2 и ждали его ответа, вместо этого они выходят из строя, когда нет доступных подключений. Должно быть, я делаю что-то не так, иначе это противоречит цели пула… Пожалуйста, посоветуйте, как это исправить.
MWE:
import threading
import psycopg2
from psycopg2 import pool
conn_pool = psycopg2.pool.ThreadedConnectionPool (
1,
10,
host='127.0.0.1',
user='john',
password='1234',
dbname='test',
port=1234)
class Foo (threading.Thread):
def __init__ (self):
threading.Thread.__init__(self)
def run (self):
global conn_pool
conn = conn_pool.getconn()
cur = conn.cursor()
sql_query="SELECT id from test_table;"
print(cur.execute (sql_query))
cur.close ()
conn_pool.putconn(conn)
num_threads = 20
threads = []
for i in range (num_threads):
threads.append (Foo())
for i in range (num_threads):
threads[i].start()
for i in range (num_threads):
threads[i].join()
conn_pool.closeall ()
Ошибка:
Exception in thread Thread-13:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "test.py", line 22, in run
conn = conn_pool.getconn()
File "/usr/local/lib/python3.6/dist-packages/psycopg2/pool.py", line 169, in getconn
return self._getconn(key)
File "/usr/local/lib/python3.6/dist-packages/psycopg2/pool.py", line 92, in _getconn
raise PoolError("connection pool exhausted")
psycopg2.pool.PoolError: connection pool exhausted
Ответ №1:
Вам необходимо самостоятельно обработать логику исключения / повторного подключения:
class Foo (threading.Thread):
def __init__ (self):
threading.Thread.__init__(self)
def run (self):
conn = self._getConnection()
cur = conn.cursor()
sql_query="SELECT id from accounts where id > 10000 and id < 10002;"
cur.execute (sql_query)
print(cur.rowcount)
cur.close ()
conn_pool.putconn(conn)
def _getConnection(self):
global conn_pool
while 1:
conn = None
try:
conn = conn_pool.getconn()
except psycopg2.pool.PoolError:
time.sleep(2)
print("waiting ...")
if conn:
return conn
num_threads = 20
threads = []
for i in range (num_threads):
threads.append (Foo())
for i in range (num_threads):
threads[i].start()
for i in range (num_threads):
threads[i].join()
conn_pool.closeall ()
Out:
1
1
1
1
1
1
1
1
1
1
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
1
1
1
1
1
1
1
1
1
1
Комментарии:
1. Похоже, что это было бы очень распространенной функцией, которую psycopg2 должен был бы предоставить