как сделать генератор асинхронного сеанса sqlalchemy(Асинхронный сеанс) в качестве базового класса?

#python #python-3.x #sqlalchemy #background-process

Вопрос:

У меня есть приложение fast API, и я запускаю задачу расписания в фоновом потоке в качестве события запуска в fast API. поэтому , когда я использую асинхронный сеанс SQLAlchemy в области маршрута, например: session: AsyncSession=Depends(instance_manger.db_instance.get_db_session) все в порядке и выполняется правильно, но когда он запускается в фоновом потоке, у меня возникает следующая ошибка. Я использую модуль python => SQLAlchemy[asyncio] asyncmy pymysql fastapi>

database.py

 class DBManager:
    def __init__(self):
        self.SQLALCHEMY_DATABASE_URL = None
        self.config_reader_instance = None
        self.engine = None
        self._session_factory = None
        self.logger_handler_instance = None
        self.db = None

    def initialize(self, config_reader_instance, logger_handler_instance):
        self.logger_handler_instance = logger_handler_instance
        self.config_reader_instance = config_reader_instance
        self.SQLALCHEMY_DATABASE_URL = "mysql asyncmy://{0}:{1}@{2}:{3}/{4}".format(
            self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
            self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
            self.config_reader_instance.DB_INFO['db_name'])
        self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
        # self.engine.begi/n()
        self._session_factory = async_scoped_session(sessionmaker(
            self.engine, class_=AsyncSession, expire_on_commit=False), scopefunc=current_task)
        # self._session_factory = orm.scoped_session(
        #     orm.sessionmaker(
        #         class_=AsyncSession,
        #         autoflush=False,
        #         bind=self.engine,
        #     ),
        # )

    async def get_db_session(self) -> AsyncSession:
        async with self._session_factory() as session:
            try:
                yield session
            except Exception as e:
                self.logger_handler_instance.write_log(__name__, logging.FATAL,
                                                       'Session rollback because of exception')
                self.logger_handler_instance.write_log(__name__, logging.FATAL, e)
                await session.rollback()
                raise
            finally:
                await session.close()
 

background_thread.py

 class BackgroundRunnable:
    def __init__(self):
        self.instance_manger = None
        self.core_process_instance = None
        self.conf_reader_instance = None
        self.process_id = None
        self.process_name = "BTC"

    def initialize(self, instance_manager: InstanceManager):
        self.instance_manger = instance_manager
        return self

    def set_process_info(self, process_name):
        self.process_id = os.getpid()
        self.process_name = process_name

    async def run_main(self):
        self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
                                                               "Background Thread is start")
        results = await CryptoCoinService(
            CryptoCoinRepository(AsyncSession(self.instance_manger.db_instance.engine))).get_coin()
        print(results)

 

crypto_coin_repository.py

 from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper

from db.models.models import CryptoCoinModel


class CryptoCoinRepository:
    def __init__(self, session: AsyncSession) -> None:
        self.session = session

    async def get_all(self) -> bool:
        results = await self.session.execute(
            select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == 'BTC'))
        results_ = results.fetchone()
        if results_.__len__() == 0:
            return False
        else:
            return True
 

main.py

 from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession

from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService

deposit_Router = APIRouter()

instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)


@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
    results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
    print(results)

deposit_app = FastAPI()

@deposit_app.on_event('startup')
async def app_startup():
    background_runnable = BackgroundRunnable()
    background_runnable.initialize(instance_manager)
    asyncio.create_task(background_runnable.run_main())
    # asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())

deposit_app.include_router(deposit_Router)

 

when I run fast API app error like belong output.

 INFO:     Uvicorn running on http://0.0.0.0:5000 (Press CTRL C to quit)
INFO:     Started reloader process [176] using watchgod
INFO:     Started server process [179]
INFO:     Waiting for application startup.
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<BackgroundRunnable.run_main() done, defined at /mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py:48> exception=At
tributeError("'async_generator' object has no attribute 'execute'")>
Traceback (most recent call last):
  File "/mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py", line 51, in run_main
    results = await CryptoCoinService(
  File "/mnt/c/Users/dr_r00t3r/Desktop/main/db/repository/crypto_coin_repository.py", line 17, in get_all
    results = await self.session.execute(
AttributeError: 'async_generator' object has no attribute 'execute'
INFO:     Application startup complete.