Ввод-вывод сокетов и зависания postgresql в цикле while

#python #postgresql #flask #socket.io #flask-socketio

Вопрос:

Технический стек

Интерфейс: VueJS Бэкэнд: Колба Python3

Вопрос:

Я использую VueJS на переднем конце, используя клиентскую библиотеку socket io, которая при нажатии кнопки выдает событие на сокете, подключенном к моему бэкэнду.

Как только событие передается и принимается на серверной части, код вводит метод, который имеет цикл while true и прерывается, когда в базе данных присутствует определенное значение; до этого момента ( 20 минут, 4 часа ) он остается в этом цикле и проверяет состояние состояния каждые несколько секунд.

Ошибка, которую я получаю ( см. Ниже), связана со слишком большим количеством подключенных клиентов, которые я не получаю, потому что элемент управления завершает код, выполняющий запрос, и стек должен быть ясным ( соединение с клиентом postgres закрыто ) тот факт, что запрос находится в операторе with, заставляет меня сказать это, но, очевидно, это не так.

Мой вопрос в том, как мне вернуть клиента? и позволить этому выполнить ту же задачу без ошибок и более эффективно?

ошибка:

 File "/home/kidcode/.cache/pypoetry/virtualenvs/application-backend-p56sp5Ck-py3.8/lib/python3.8/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL:  sorry, too many clients already
FATAL:  sorry, too many clients already

(Background on this error at: https://sqlalche.me/e/14/e3q8)
 

код

имя файла(io.py)

 from flask_socketio import SocketIO
socketio = SocketIO(cors_allowed_origins="*")
 

имя файла(socket_listener.py)

     from ..io import socketio
    from flask_socketio import emit, disconnect
     
     
    @socketio.on("connect")
    def test_connect():
       emit("after connect", {"data": "Client has connected!"})
     
     
    @socketio.on("system_started")
    def test_broadcast_message(message):
       socketio.start_background_task(target=SystemMonitorHandler(socketio))
     
     
    @socketio.on("disconnect_request", namespace="/test")
    def disconnect_request():
       @copy_current_request_context
       def can_disconnect():
           disconnect()
     
 

имя файла(system_monitor.py)

 class SystemMonitorHandler:
    def __init__(self, socketio):
        socketio = socket.io
        self.monitor_system_status()

def monitor_system_progress(self) -> None:
       """[summary]"""
       # TODO use values from the query to break this loop
       while True:
           self.socketio.sleep(2)
           _system_queries: object = SystemQueries()
           _system_status: dict = (
               _data_migration_history.get_system_job_status(system_id=1, uuid=2)
           )
           self.socketio.emit("message", {"data": status})



   def get_system_job_status(self, system_id: str, uuid: str) -> dict:

       _system_job_schema: object = SystemJobSchema(many=True)
       try:
           with SqlAlchemyUnitOfWork(
               session=request_session_factory, files=SystemJobRepository
           ) as uow:
               return _system_job_schema.dump(
                   uow.repositories.files.filter(
                       system_id=system_id, requested_by=uuid
                   )
                   .with_entities(
                       SystemModel.system_status,
                       SystemModel.job_end_date,
                       SystemModel.job_start_date,
                   )
                   .all()
               )
       except (DatabaseTransactionError, DatabaseSessionError) as err:
           self.logger.exception(
               ConsoleColourHelper.FAIL
                 "Unable to fetch requested resource from database : 
                   {}".format(
                   err
               )
           )




import abc
from functools import cached_property
from types import SimpleNamespace
from typing import Any, Callable, Iterable, Union
 
from sqlalchemy.orm import Session
from types import SimpleNamespace
 
 
class AbstractUnitOfWork(abc.ABC):
   repositories: SimpleNamespace
 
   def __enter__(self) -> "AbstractUnitOfWork":
       return self
 
   def __exit__(self, *args):
       pass
 
   @abc.abstractmethod
   def commit(self):
       raise NotImplementedError
 
   @abc.abstractmethod
   def rollback(self):
       raise NotImplementedError
 
 
 
 
============= separate file START ==========================
 
class AbstractRepository(abc.ABC):
   @abc.abstractmethod
   def filter(self, **kwargs) -> Iterable[Any]:
       raise NotImplementedError
 
   @abc.abstractmethod
   def save(self, model: Any):
       raise NotImplementedError
 
   @abc.abstractmethod
   def get(self, id_: Any) -> Any:
       raise NotImplementedError
 
 
class SQLAlchemyAbstractRepository(AbstractRepository, abc.ABC):
   model = Any
 
   def __init__(self, session: Session):
       self.session = session
 
   def filter(self, **kwargs):
       return self.session.query(self.model).filter_by(**kwargs)
 
   def save(self, model):
       self.session.add(model)
       self.session.flush()
 
   def get(self, id_):
       return self.session.get(id_)
 
   def delete(self):
       # TODO
       pass
 
 
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
   def __init__(
       self,
       session=Union[Callable[[], Session], Session],
       **kwargs: SQLAlchemyAbstractRepository
   ):
       self._session = session
       self._repository_config = kwargs
 
   @cached_property
   def session(self):
       return self._session() if callable(self._session) else self._session
 
 
 
 
   def __enter__(self) -> AbstractUnitOfWork:
       repositories = {
           name: repository(self.session)
           for name, repository in self._repository_config.items()
       }
       self.repositories = SimpleNamespace(**repositories)
       return super().__enter__()
 
   def __exit__(self, *args):
       self.session.close()
 
   def commit(self):
       self.session.commit()
 
   def rollback(self):
       self.session.rollback()
 
========== separate file END ==========================
 

Комментарии:

1. Что это SqlAlchemyUnitOfWork такое и как это реализуется?

2. @Miguel спасибо за ответ, я добавил запрошенную информацию в приведенный выше фрагмент. PS вижу, что вы довольно активны на других форумах спасибо за ваш вклад 🙂