#python #python-3.x #sqlalchemy #background-process
Вопрос:
У меня есть приложение fast API, и я запускаю задачу расписания в фоновом потоке в качестве события запуска в fast API. поэтому , когда я использую асинхронный сеанс SQLAlchemy в области маршрута, например: session: AsyncSession=Depends(instance_manger.db_instance.get_db_session)
все в порядке и выполняется правильно, но когда он запускается в фоновом потоке, у меня возникает следующая ошибка. Я использую модуль python => SQLAlchemy[asyncio] asyncmy pymysql fastapi>
database.py
class DBManager:
def __init__(self):
self.SQLALCHEMY_DATABASE_URL = None
self.config_reader_instance = None
self.engine = None
self._session_factory = None
self.logger_handler_instance = None
self.db = None
def initialize(self, config_reader_instance, logger_handler_instance):
self.logger_handler_instance = logger_handler_instance
self.config_reader_instance = config_reader_instance
self.SQLALCHEMY_DATABASE_URL = "mysql asyncmy://{0}:{1}@{2}:{3}/{4}".format(
self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
self.config_reader_instance.DB_INFO['db_name'])
self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
# self.engine.begi/n()
self._session_factory = async_scoped_session(sessionmaker(
self.engine, class_=AsyncSession, expire_on_commit=False), scopefunc=current_task)
# self._session_factory = orm.scoped_session(
# orm.sessionmaker(
# class_=AsyncSession,
# autoflush=False,
# bind=self.engine,
# ),
# )
async def get_db_session(self) -> AsyncSession:
async with self._session_factory() as session:
try:
yield session
except Exception as e:
self.logger_handler_instance.write_log(__name__, logging.FATAL,
'Session rollback because of exception')
self.logger_handler_instance.write_log(__name__, logging.FATAL, e)
await session.rollback()
raise
finally:
await session.close()
background_thread.py
class BackgroundRunnable:
def __init__(self):
self.instance_manger = None
self.core_process_instance = None
self.conf_reader_instance = None
self.process_id = None
self.process_name = "BTC"
def initialize(self, instance_manager: InstanceManager):
self.instance_manger = instance_manager
return self
def set_process_info(self, process_name):
self.process_id = os.getpid()
self.process_name = process_name
async def run_main(self):
self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
"Background Thread is start")
results = await CryptoCoinService(
CryptoCoinRepository(AsyncSession(self.instance_manger.db_instance.engine))).get_coin()
print(results)
crypto_coin_repository.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper
from db.models.models import CryptoCoinModel
class CryptoCoinRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def get_all(self) -> bool:
results = await self.session.execute(
select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == 'BTC'))
results_ = results.fetchone()
if results_.__len__() == 0:
return False
else:
return True
main.py
from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService
deposit_Router = APIRouter()
instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)
@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
print(results)
deposit_app = FastAPI()
@deposit_app.on_event('startup')
async def app_startup():
background_runnable = BackgroundRunnable()
background_runnable.initialize(instance_manager)
asyncio.create_task(background_runnable.run_main())
# asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())
deposit_app.include_router(deposit_Router)
when I run fast API app error like belong output.
INFO: Uvicorn running on http://0.0.0.0:5000 (Press CTRL C to quit)
INFO: Started reloader process [176] using watchgod
INFO: Started server process [179]
INFO: Waiting for application startup.
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<BackgroundRunnable.run_main() done, defined at /mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py:48> exception=At
tributeError("'async_generator' object has no attribute 'execute'")>
Traceback (most recent call last):
File "/mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py", line 51, in run_main
results = await CryptoCoinService(
File "/mnt/c/Users/dr_r00t3r/Desktop/main/db/repository/crypto_coin_repository.py", line 17, in get_all
results = await self.session.execute(
AttributeError: 'async_generator' object has no attribute 'execute'
INFO: Application startup complete.