Как обрабатывать пул подключений к базе данных с помощью ThreadPoolExecutor в peewee-orm?

#python #python-3.x #database #multithreading #peewee

#python #python-3.x #База данных #многопоточность #peewee

Вопрос:

Я использую peewee и базу данных mariadb в скрипте, скрипт просматривает таблицу БД и отправляет данные из этого запроса в ThreadPoolExecutor. Сам рабочий также должен запрашивать базу данных. После завершения всех фьючерсов скрипт снова запускается с самого начала.

Когда я использую одно соединение, все работает, но поскольку моя рабочая работа в основном связана с сетевым вводом-выводом, я чувствую, что одно соединение для всего рабочего потока станет узким местом.

Если я перейду на пул баз данных, я могу отслеживать, что количество подключений увеличивается, пока не получу сообщение об ошибке «слишком много открытых подключений» от peewee. Сами соединения никогда не закрываются. Это соответствует документации peewee

Однако я понятия не имею, как вручную открывать и закрывать db-соединение изнутри моей рабочей функции.

Я попытался создать переменную базы данных в моделях.py global тогда я мог получить доступ к этому объекту в моем worker, но наблюдение за всеми открытыми соединениями в моей БД заставило меня понять .close() /.open() в этом случае не имело никакого эффекта.

Я также вставил все в один файл, я все еще не мог открыть / закрыть соединение вручную.

В документации приведены только примеры использования пула с различными веб-фреймами.

Мое приложение упростилось

 #app.py
from models.models import MyTableA, MyTableB

def do_work(data):
    MyTableB.create(name="foo")

def main()
    logger = logging.getLogger()
    data = MyTableA.select().dicts()

    with ThreadPoolExecutor(8) as executor:
        future_to_system = {executor.submit(do_work, d): d.id for d in data}
        
        for future in as_completed(future_to_system):
            system = future_to_system[future]
            try:
                future.result()
            except Exception as exc:
                logger.info('%r generated an exception: %s' % (system, exc))

            else:
                logger.debug('%r result is %s' % (system, "ok"))

if __name__ == '__main__':
    main()
  

models.py

 from peewee import *
from playhouse.pool import PooledMySQLDatabase

#uncomment this line to use pool
#database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass', 'max_connections': 32, 'stale_timeout': 300})

#comment that line to use pool
database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass'})




class UnknownField(object):
    def __init__(self, *_, **__): pass

class BaseModel(Model):
    class Meta:
        database = database

class MyTableA(BaseModel):
    name = CharField()

    class Meta:
        table_name = 'my_table'

class MyTableB(BaseModel):
    name = CharField()


    class Meta:
        table_name = 'my_table'
  

Если у кого-нибудь есть идея о том, как использовать пул подключений peewee вместе с Threadpoolexecutor, я был бы очень благодарен.

Ответ №1:

Я нашел решение здесь

models.py

 from peewee import *


class UnknownField(object):
    def __init__(self, *_, **__): pass

class BaseModel(Model):
    class Meta:
        database = None

class MyTableA(BaseModel):
    name = CharField()

    class Meta:
        table_name = 'my_table'

class MyTableB(BaseModel):
    name = CharField()

    class Meta:
        table_name = 'my_table'
  

app.py

 from playhouse.pool import PooledMySQLDatabase
from models.models import MyTableA, MyTableB

def do_work(data, db):
    with db:
        MyTableB.create(name="foo")

def main()
    logger = logging.getLogger()
    

    database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass', 'max_connections': 32, 'stale_timeout': 300})

    database.bind(
        [
            MyTableA, MyTableB
        ]
    )

    with database:
        data = MyTableA.select().dicts()


    with ThreadPoolExecutor(8) as executor:
        future_to_system = {executor.submit(do_work, d): d.id for d in data}
        
        for future in as_completed(future_to_system):
            system = future_to_system[future]
            try:
                future.result()
            except Exception as exc:
                logger.info('%r generated an exception: %s' % (system, exc))

            else:
                logger.debug('%r result is %s' % (system, "ok"))

if __name__ == '__main__':
    main()