#flask #websocket #celery #flask-socketio
Вопрос:
У меня есть API REST для колбы, использующий Flask-SocketIO, чтобы обновлять клиента при выполнении фоновой задачи с использованием сельдерея. Некоторое время все работает нормально, но через некоторое время события Flask-SocketIO перестают вызываться.
В журналах не появляется никаких ошибок, сообщения от клиента принимаются, как и ПИНГ-ПОНГ, но события перестают вызываться. Перезапуск приложения Flask временно устраняет проблему, сельдерей или redis не нужно перезапускать, и задачи продолжают выполняться. Как будто события Flask-SocketIO через некоторое время не регистрируются.
Я пробовал с обезьяньим патчем и без него, я пытался зарегистрировать события с помощью socketio.on_event вместо декоратора. Я также удалил все из события, за исключением вызова отладки, но оно никогда не вызывается, что предполагает, что проблема не в выбросах. Клиент тоже не проблема, я попробовал с клиентом barebone socket-IO и получил те же результаты. Перемещение события в событие «подключение» также не работает, клиент подключается, но событие не вызывается. Наконец, я также добавил тайм-аут между подключением и отправкой события get_count, но там тоже ничего не было.
Приложение работает с gunicorn. За nginx на сервере, но я получаю ту же проблему локально без nginx.
Вот фрагменты кода, относящиеся к данной проблеме.
extensions.py
from celery import Celery
from flask_socketio import SocketIO
from logbook import debug
def init_celery(app, celery):
"""Add flask app context to celery.Task"""
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery = Celery("src", config_source="src.setup.celeryconfig")
socketio = SocketIO(cors_allowed_origins="*", logger=True, engineio_logger=True)
app.py
import eventlet
eventlet.monkey_patch()
from src.setup.extensions import (
celery,
init_celery,
socketio
)
def create_app():
app = Flask(__name__)
app.config.from_object('src.' os.environ['APP_SETTINGS'])
register_extensions(app)
return app
def register_extensions(app):
socketio.init_app(app, message_queue='redis://localhost:6379/0', )
@socketio.on_error()
def error_handler(e):
debug("ERROR")
debug(e)
@socketio.on('get_count')
def get_count(site_id):
# This is not called after a while.
debug('count')
socketio.emit('count')
app = create_app()
celery = init_celery(app, celery=celery)
if __name__ == '__main__':
socketio.run(app)
Package versions:
celery==5.0.5
Flask==1.1.2
Flask-SocketIO==5.0.1
gunicorn==20.1.0
redis==3.5.3
Startup command
python gunicorn --worker-class 'eventlet' -w 1 -b :8004 -b :3443 --reload --timeout 1200 'src.app'
Logs when the problem occurs and with logger=True and socketio_logger=True:
deyiCxghwRYPLFXgAAAE: Sending packet OPEN data {'sid': 'deyiCxghwRYPLFXgAAAE', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000}
deyiCxghwRYPLFXgAAAE: Received packet MESSAGE data 0
deyiCxghwRYPLFXgAAAE: Sending packet MESSAGE data 0{"sid":"igeaCfCJlLAfvvSjAAAA"}
deyiCxghwRYPLFXgAAAE: Received request to upgrade to websocket
deyiCxghwRYPLFXgAAAE: Upgrade to websocket successful
deyiCxghwRYPLFXgAAAE: Received packet MESSAGE data 2["get_count","426"]
received event "get_count" from igeaCfCJlLAfvvSjAAAA [/]
deyiCxghwRYPLFXgAAAE: Received packet MESSAGE data 2["get_count","427"]
received event "get_count" from igeaCfCJlLAfvvSjAAAA [/]
deyiCxghwRYPLFXgAAAE: Received packet MESSAGE data 2["get_count","428"]
received event "get_count" from igeaCfCJlLAfvvSjAAAA [/]
deyiCxghwRYPLFXgAAAE: Sending packet PING data None
deyiCxghwRYPLFXgAAAE: Received packet PONG data
deyiCxghwRYPLFXgAAAE: Sending packet PING data None
deyiCxghwRYPLFXgAAAE: Received packet PONG data
deyiCxghwRYPLFXgAAAE: Sending packet PING data None
deyiCxghwRYPLFXgAAAE: Received packet PONG data
Регистрируется после перезапуска приложения, когда все работает правильно.
WBPV4M2jRjyx8HHmAAAA: Received packet MESSAGE data 0
WBPV4M2jRjyx8HHmAAAA: Sending packet MESSAGE data 0{"sid":"pv6lRFOFzmisdF0NAAAB"}
WBPV4M2jRjyx8HHmAAAA: Received request to upgrade to websocket
WBPV4M2jRjyx8HHmAAAA: Upgrade to websocket successful
WBPV4M2jRjyx8HHmAAAA: Received packet MESSAGE data 2["get_count","426"]
received event "get_count" from pv6lRFOFzmisdF0NAAAB [/]
WBPV4M2jRjyx8HHmAAAA: Received packet MESSAGE data 2["get_count","427"]
received event "get_count" from pv6lRFOFzmisdF0NAAAB [/]
WBPV4M2jRjyx8HHmAAAA: Received packet MESSAGE data 2["get_count","428"]
received event "get_count" from pv6lRFOFzmisdF0NAAAB [/]
emitting event "count" to all [/]
emitting event "count" to all [/]
emitting event "count" to all [/]
DEBUG: count
pubsub message: emit
WBPV4M2jRjyx8HHmAAAA: Sending packet MESSAGE data 2["count",{"room":"426","count":224,"total":224,"archive_exists":true}]
DEBUG: count
pubsub message: emit
WBPV4M2jRjyx8HHmAAAA: Sending packet MESSAGE data 2["count",{"room":"427","count":108,"total":108,"archive_exists":true}]
DEBUG: count
pubsub message: emit
WBPV4M2jRjyx8HHmAAAA: Sending packet MESSAGE data 2["count",{"room":"428","count":164,"total":165,"archive_exists":true}]
WBPV4M2jRjyx8HHmAAAA: Sending packet PING data None
WBPV4M2jRjyx8HHmAAAA: Received packet PONG data
WBPV4M2jRjyx8HHmAAAA: Sending packet PING data None
WBPV4M2jRjyx8HHmAAAA: Received packet PONG data
Спасибо вам за вашу помощь.
Комментарии:
1. См. Раздел Устранение неполадок в документах, чтобы узнать, как включить ведение журнала для Socket.IO. Затем поделитесь журналами для сервера и клиента в то время, когда возникает проблема.
2. Я добавил журналы к своему вопросу. Как вы можете видеть, событие получено, но ответное сообщение не отправлено.
3. Я не уверен, почему обработчик не вызывается. Можете ли вы создать небольшой пример приложения, которое воспроизводит проблему, чтобы я мог ее отладить?
4. Я создал примерное приложение, но через несколько дней оно все еще работает должным образом. Я буду добавлять функции до тех пор, пока они не выйдут из строя, что должно помочь выявить проблему. github.com/AlekLefebvre/socket-io-minimal
Ответ №1:
Проблема, похоже, решена после обновления моих требований до:
celery==5.1.0
Flask==1.1.4
Flask-SocketIO==5.1.0
gunicorn==20.1.0
redis==3.5.3