#python #postgresql #flask #socket.io #flask-socketio
Вопрос:
Технический стек
Интерфейс: VueJS Бэкэнд: Колба Python3
Вопрос:
Я использую VueJS на переднем конце, используя клиентскую библиотеку socket io, которая при нажатии кнопки выдает событие на сокете, подключенном к моему бэкэнду.
Как только событие передается и принимается на серверной части, код вводит метод, который имеет цикл while true и прерывается, когда в базе данных присутствует определенное значение; до этого момента ( 20 минут, 4 часа ) он остается в этом цикле и проверяет состояние состояния каждые несколько секунд.
Ошибка, которую я получаю ( см. Ниже), связана со слишком большим количеством подключенных клиентов, которые я не получаю, потому что элемент управления завершает код, выполняющий запрос, и стек должен быть ясным ( соединение с клиентом postgres закрыто ) тот факт, что запрос находится в операторе with, заставляет меня сказать это, но, очевидно, это не так.
Мой вопрос в том, как мне вернуть клиента? и позволить этому выполнить ту же задачу без ошибок и более эффективно?
ошибка:
File "/home/kidcode/.cache/pypoetry/virtualenvs/application-backend-p56sp5Ck-py3.8/lib/python3.8/site-packages/psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: sorry, too many clients already
FATAL: sorry, too many clients already
(Background on this error at: https://sqlalche.me/e/14/e3q8)
код
имя файла(io.py)
from flask_socketio import SocketIO
socketio = SocketIO(cors_allowed_origins="*")
имя файла(socket_listener.py)
from ..io import socketio
from flask_socketio import emit, disconnect
@socketio.on("connect")
def test_connect():
emit("after connect", {"data": "Client has connected!"})
@socketio.on("system_started")
def test_broadcast_message(message):
socketio.start_background_task(target=SystemMonitorHandler(socketio))
@socketio.on("disconnect_request", namespace="/test")
def disconnect_request():
@copy_current_request_context
def can_disconnect():
disconnect()
имя файла(system_monitor.py)
class SystemMonitorHandler:
def __init__(self, socketio):
socketio = socket.io
self.monitor_system_status()
def monitor_system_progress(self) -> None:
"""[summary]"""
# TODO use values from the query to break this loop
while True:
self.socketio.sleep(2)
_system_queries: object = SystemQueries()
_system_status: dict = (
_data_migration_history.get_system_job_status(system_id=1, uuid=2)
)
self.socketio.emit("message", {"data": status})
def get_system_job_status(self, system_id: str, uuid: str) -> dict:
_system_job_schema: object = SystemJobSchema(many=True)
try:
with SqlAlchemyUnitOfWork(
session=request_session_factory, files=SystemJobRepository
) as uow:
return _system_job_schema.dump(
uow.repositories.files.filter(
system_id=system_id, requested_by=uuid
)
.with_entities(
SystemModel.system_status,
SystemModel.job_end_date,
SystemModel.job_start_date,
)
.all()
)
except (DatabaseTransactionError, DatabaseSessionError) as err:
self.logger.exception(
ConsoleColourHelper.FAIL
"Unable to fetch requested resource from database :
{}".format(
err
)
)
import abc
from functools import cached_property
from types import SimpleNamespace
from typing import Any, Callable, Iterable, Union
from sqlalchemy.orm import Session
from types import SimpleNamespace
class AbstractUnitOfWork(abc.ABC):
repositories: SimpleNamespace
def __enter__(self) -> "AbstractUnitOfWork":
return self
def __exit__(self, *args):
pass
@abc.abstractmethod
def commit(self):
raise NotImplementedError
@abc.abstractmethod
def rollback(self):
raise NotImplementedError
============= separate file START ==========================
class AbstractRepository(abc.ABC):
@abc.abstractmethod
def filter(self, **kwargs) -> Iterable[Any]:
raise NotImplementedError
@abc.abstractmethod
def save(self, model: Any):
raise NotImplementedError
@abc.abstractmethod
def get(self, id_: Any) -> Any:
raise NotImplementedError
class SQLAlchemyAbstractRepository(AbstractRepository, abc.ABC):
model = Any
def __init__(self, session: Session):
self.session = session
def filter(self, **kwargs):
return self.session.query(self.model).filter_by(**kwargs)
def save(self, model):
self.session.add(model)
self.session.flush()
def get(self, id_):
return self.session.get(id_)
def delete(self):
# TODO
pass
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
def __init__(
self,
session=Union[Callable[[], Session], Session],
**kwargs: SQLAlchemyAbstractRepository
):
self._session = session
self._repository_config = kwargs
@cached_property
def session(self):
return self._session() if callable(self._session) else self._session
def __enter__(self) -> AbstractUnitOfWork:
repositories = {
name: repository(self.session)
for name, repository in self._repository_config.items()
}
self.repositories = SimpleNamespace(**repositories)
return super().__enter__()
def __exit__(self, *args):
self.session.close()
def commit(self):
self.session.commit()
def rollback(self):
self.session.rollback()
========== separate file END ==========================
Комментарии:
1. Что это
SqlAlchemyUnitOfWork
такое и как это реализуется?2. @Miguel спасибо за ответ, я добавил запрошенную информацию в приведенный выше фрагмент. PS вижу, что вы довольно активны на других форумах спасибо за ваш вклад 🙂