Как правильно использовать сеансы SQLAlchemy с сельдереем?

#python #flask #orm #sqlalchemy #celery

#python #flask #orm #sqlalchemy #сельдерей

Вопрос:

Я пытался разобраться в этом некоторое время, но многие ответы, которые я могу найти сейчас, устарели (сообщения от> 6 лет назад) или менее связаны.

Вопрос в том, как правильно обрабатывать сеансы базы данных в сельдерее. Моя текущая настройка: у меня есть глобальный объект DBEngine, который содержит dsn , engine , и Session . поэтому каждый раз, когда я хочу использовать сеанс, я просто вызываю sess = db.Session() и начинаю использовать их в своем приложении Flask. Это выглядит следующим образом:

 #db.py
class DbEngine:
  def __init__(self, path, ...):
    self.dsn = self.create_dsn_from_file(path)
    self.engine = create_engine(self.dsn)
    self.Session = scoped_session(sessionmaker(bind=self.engine))
  

Пока я не ввел сельдерей в свое приложение, я часто получал различные ошибки (ошибка протокола, ошибки SSLSocket и т. Д.). И я не смог воспроизвести их локально, и это часто исправляется, если я просто добавляю повторные попытки к этим задачам (обычно это выполняется с 3 повторными попытками). Поэтому я подозреваю, что это может быть вызвано общими сеансами.

Затем я решил изменить свой сеанс для приложения celery: каждый раз, когда мне нужен сеанс, я фактически создаю новый движок, создаю новый сеанс и возвращаю вновь созданный Session() . Однако аналогичная, но не совсем та же проблема возникла снова (различные ошибки протокола с разными кодами).

Я видел, что у сельдерея есть свой собственный SessionManager, но я не смог найти пример кода использования. Я думал о следующей структуре:

 # celery_app.py

celery = ... 

session_manager = SessionManager()
engine, Session = session_manager.create_session(dsn)
  

и в задачах, которые я выполняю:

 # task_1.py
from celery_app import celery, Session

@celery.task
def tsk():
  sess = Session()
  sess.query(...)
  ... 
  sess.close()
  

Но на самом деле не уверен, что это предполагаемый подход, поскольку я вызываю только .create_session() один раз, и я не знаю, когда и как я должен вызывать другие функции в SessionManager.

Для фона я использую DB2.

Если кто-нибудь знает, как правильно использовать SessionManager или имеет опыт использования session с SQLAlchemy, мы будем очень признательны, если вы сможете предоставить какие-либо идеи. Заранее спасибо!

Ответ №1:

Возможно, это не тот ответ, который вы ищете, но flask_sqlalchemy он предоставляет дружественный к Flask слой поверх SQLAlchemy, который почти полностью устраняет необходимость управлять сеансами вручную. И он хорошо работает с сельдереем, как только вы настроите ContextTask в соответствии с примером в документах Flask.

Комментарии:

1. Я сталкиваюсь с той же проблемой, что и вы. Вы читали это prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html ? (это старый, 2013)

Ответ №2:

Публикую это в надежде, что это кому-то поможет. В моем случае мы используем Fastapi, Postgres, Sqlalchemy и Celery.

У меня была такая же проблема, и это было очень сложно. Множество загадочных ошибок из нашей базы данных postgres, выглядящих так, как будто соединения были остановлены в середине потока. PG_TUPLES_OK and no message from the libpq , AttributeError("'_NoResultMetaData' object has no attribute '_indexes_for_keys'") … и т.д. и т.п.

TL; DR

Соединения пулов Sqlalchemy по умолчанию не потокобезопасны, по умолчанию Celery разветвляет процессы: один или другой необходимо изменить.

Решение 1) Отключите объединение в пул Sqlalchemy
* в итоге мы пошли на это, чтобы поддерживать лучший параллелизм

Документы по алхимии Sql

 from sqlalchemy.pool import NullPool
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, poolclass=NullPool
)
  

Решение 2) Заставить Сельдерей работать как единый процесс без разветвления

 celery -A celery_worker.celery worker -E --loglevel=info --pool=solo
  

Статья о объединении сельдерея
Это решение сработало, но означало, что все наши задачи celery будут выполняться последовательно, что устранит ошибки, но нам потребуется больше пропускной способности. Это может подойти для определенных приложений.