#python #flask #orm #sqlalchemy #celery
#python #flask #orm #sqlalchemy #сельдерей
Вопрос:
Я пытался разобраться в этом некоторое время, но многие ответы, которые я могу найти сейчас, устарели (сообщения от> 6 лет назад) или менее связаны.
Вопрос в том, как правильно обрабатывать сеансы базы данных в сельдерее. Моя текущая настройка: у меня есть глобальный объект DBEngine, который содержит dsn
, engine
, и Session
. поэтому каждый раз, когда я хочу использовать сеанс, я просто вызываю sess = db.Session()
и начинаю использовать их в своем приложении Flask. Это выглядит следующим образом:
#db.py
class DbEngine:
def __init__(self, path, ...):
self.dsn = self.create_dsn_from_file(path)
self.engine = create_engine(self.dsn)
self.Session = scoped_session(sessionmaker(bind=self.engine))
Пока я не ввел сельдерей в свое приложение, я часто получал различные ошибки (ошибка протокола, ошибки SSLSocket и т. Д.). И я не смог воспроизвести их локально, и это часто исправляется, если я просто добавляю повторные попытки к этим задачам (обычно это выполняется с 3 повторными попытками). Поэтому я подозреваю, что это может быть вызвано общими сеансами.
Затем я решил изменить свой сеанс для приложения celery: каждый раз, когда мне нужен сеанс, я фактически создаю новый движок, создаю новый сеанс и возвращаю вновь созданный Session()
. Однако аналогичная, но не совсем та же проблема возникла снова (различные ошибки протокола с разными кодами).
Я видел, что у сельдерея есть свой собственный SessionManager, но я не смог найти пример кода использования. Я думал о следующей структуре:
# celery_app.py
celery = ...
session_manager = SessionManager()
engine, Session = session_manager.create_session(dsn)
и в задачах, которые я выполняю:
# task_1.py
from celery_app import celery, Session
@celery.task
def tsk():
sess = Session()
sess.query(...)
...
sess.close()
Но на самом деле не уверен, что это предполагаемый подход, поскольку я вызываю только .create_session()
один раз, и я не знаю, когда и как я должен вызывать другие функции в SessionManager.
Для фона я использую DB2.
Если кто-нибудь знает, как правильно использовать SessionManager или имеет опыт использования session с SQLAlchemy, мы будем очень признательны, если вы сможете предоставить какие-либо идеи. Заранее спасибо!
Ответ №1:
Возможно, это не тот ответ, который вы ищете, но flask_sqlalchemy
он предоставляет дружественный к Flask слой поверх SQLAlchemy, который почти полностью устраняет необходимость управлять сеансами вручную. И он хорошо работает с сельдереем, как только вы настроите ContextTask в соответствии с примером в документах Flask.
Комментарии:
1. Я сталкиваюсь с той же проблемой, что и вы. Вы читали это prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html ? (это старый, 2013)
Ответ №2:
Публикую это в надежде, что это кому-то поможет. В моем случае мы используем Fastapi, Postgres, Sqlalchemy и Celery.
У меня была такая же проблема, и это было очень сложно. Множество загадочных ошибок из нашей базы данных postgres, выглядящих так, как будто соединения были остановлены в середине потока. PG_TUPLES_OK and no message from the libpq
, AttributeError("'_NoResultMetaData' object has no attribute '_indexes_for_keys'")
… и т.д. и т.п.
TL; DR
Соединения пулов Sqlalchemy по умолчанию не потокобезопасны, по умолчанию Celery разветвляет процессы: один или другой необходимо изменить.
Решение 1) Отключите объединение в пул Sqlalchemy
* в итоге мы пошли на это, чтобы поддерживать лучший параллелизм
from sqlalchemy.pool import NullPool
engine = create_engine(
SQLALCHEMY_DATABASE_URL, poolclass=NullPool
)
Решение 2) Заставить Сельдерей работать как единый процесс без разветвления
celery -A celery_worker.celery worker -E --loglevel=info --pool=solo
Статья о объединении сельдерея
Это решение сработало, но означало, что все наши задачи celery будут выполняться последовательно, что устранит ошибки, но нам потребуется больше пропускной способности. Это может подойти для определенных приложений.