Обработчики колб-носков не работают после некоторого простоя в течение некоторого времени

#flask #websocket #celery #flask-socketio

Вопрос:

У меня есть API REST для колбы, использующий Flask-SocketIO, чтобы обновлять клиента при выполнении фоновой задачи с использованием сельдерея. Некоторое время все работает нормально, но через некоторое время события Flask-SocketIO перестают вызываться.

В журналах не появляется никаких ошибок, сообщения от клиента принимаются, как и ПИНГ-ПОНГ, но события перестают вызываться. Перезапуск приложения Flask временно устраняет проблему, сельдерей или redis не нужно перезапускать, и задачи продолжают выполняться. Как будто события Flask-SocketIO через некоторое время не регистрируются.

Я пробовал с обезьяньим патчем и без него, я пытался зарегистрировать события с помощью socketio.on_event вместо декоратора. Я также удалил все из события, за исключением вызова отладки, но оно никогда не вызывается, что предполагает, что проблема не в выбросах. Клиент тоже не проблема, я попробовал с клиентом barebone socket-IO и получил те же результаты. Перемещение события в событие «подключение» также не работает, клиент подключается, но событие не вызывается. Наконец, я также добавил тайм-аут между подключением и отправкой события get_count, но там тоже ничего не было.

Приложение работает с gunicorn. За nginx на сервере, но я получаю ту же проблему локально без nginx.

Вот фрагменты кода, относящиеся к данной проблеме.

extensions.py

 from celery import Celery
from flask_socketio import SocketIO
from logbook import debug

def init_celery(app, celery):
    """Add flask app context to celery.Task"""

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

celery = Celery("src", config_source="src.setup.celeryconfig")
socketio = SocketIO(cors_allowed_origins="*", logger=True, engineio_logger=True)
 

app.py

 import eventlet
eventlet.monkey_patch()

from src.setup.extensions import (
    celery,
    init_celery,
    socketio
)

def create_app():
    app = Flask(__name__)

    app.config.from_object('src.' os.environ['APP_SETTINGS'])

    register_extensions(app)

    return app

def register_extensions(app):
    socketio.init_app(app, message_queue='redis://localhost:6379/0', )

@socketio.on_error()
def error_handler(e):
    debug("ERROR")
    debug(e)

@socketio.on('get_count')
def get_count(site_id):
    # This is not called after a while.
    debug('count')
    socketio.emit('count')


app = create_app()
celery = init_celery(app, celery=celery)

if __name__ == '__main__':
    socketio.run(app)
 

Package versions:

 celery==5.0.5
Flask==1.1.2
Flask-SocketIO==5.0.1
gunicorn==20.1.0
redis==3.5.3
 

Startup command

 python gunicorn --worker-class 'eventlet' -w 1 -b :8004 -b :3443 --reload --timeout 1200 'src.app'
 

Logs when the problem occurs and with logger=True and socketio_logger=True:

 deyiCxghwRYPLFXgAAAE: Sending packet OPEN data {'sid': 'deyiCxghwRYPLFXgAAAE', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000}
deyiCxghwRYPLFXgAAAE: Received packet MESSAGE data 0
deyiCxghwRYPLFXgAAAE: Sending packet MESSAGE data 0{"sid":"igeaCfCJlLAfvvSjAAAA"}
deyiCxghwRYPLFXgAAAE: Received request to upgrade to websocket
deyiCxghwRYPLFXgAAAE: Upgrade to websocket successful
deyiCxghwRYPLFXgAAAE: Received packet MESSAGE data 2["get_count","426"]
received event "get_count" from igeaCfCJlLAfvvSjAAAA [/]
deyiCxghwRYPLFXgAAAE: Received packet MESSAGE data 2["get_count","427"]
received event "get_count" from igeaCfCJlLAfvvSjAAAA [/]
deyiCxghwRYPLFXgAAAE: Received packet MESSAGE data 2["get_count","428"]
received event "get_count" from igeaCfCJlLAfvvSjAAAA [/]
deyiCxghwRYPLFXgAAAE: Sending packet PING data None
deyiCxghwRYPLFXgAAAE: Received packet PONG data
deyiCxghwRYPLFXgAAAE: Sending packet PING data None
deyiCxghwRYPLFXgAAAE: Received packet PONG data
deyiCxghwRYPLFXgAAAE: Sending packet PING data None
deyiCxghwRYPLFXgAAAE: Received packet PONG data
 

Регистрируется после перезапуска приложения, когда все работает правильно.

 WBPV4M2jRjyx8HHmAAAA: Received packet MESSAGE data 0
WBPV4M2jRjyx8HHmAAAA: Sending packet MESSAGE data 0{"sid":"pv6lRFOFzmisdF0NAAAB"}
WBPV4M2jRjyx8HHmAAAA: Received request to upgrade to websocket
WBPV4M2jRjyx8HHmAAAA: Upgrade to websocket successful
WBPV4M2jRjyx8HHmAAAA: Received packet MESSAGE data 2["get_count","426"]
received event "get_count" from pv6lRFOFzmisdF0NAAAB [/]
WBPV4M2jRjyx8HHmAAAA: Received packet MESSAGE data 2["get_count","427"]
received event "get_count" from pv6lRFOFzmisdF0NAAAB [/]
WBPV4M2jRjyx8HHmAAAA: Received packet MESSAGE data 2["get_count","428"]
received event "get_count" from pv6lRFOFzmisdF0NAAAB [/]
emitting event "count" to all [/]
emitting event "count" to all [/]
emitting event "count" to all [/]
DEBUG: count
pubsub message: emit
WBPV4M2jRjyx8HHmAAAA: Sending packet MESSAGE data 2["count",{"room":"426","count":224,"total":224,"archive_exists":true}]
DEBUG: count
pubsub message: emit
WBPV4M2jRjyx8HHmAAAA: Sending packet MESSAGE data 2["count",{"room":"427","count":108,"total":108,"archive_exists":true}]
DEBUG: count
pubsub message: emit
WBPV4M2jRjyx8HHmAAAA: Sending packet MESSAGE data 2["count",{"room":"428","count":164,"total":165,"archive_exists":true}]
WBPV4M2jRjyx8HHmAAAA: Sending packet PING data None
WBPV4M2jRjyx8HHmAAAA: Received packet PONG data
WBPV4M2jRjyx8HHmAAAA: Sending packet PING data None
WBPV4M2jRjyx8HHmAAAA: Received packet PONG data
 

Спасибо вам за вашу помощь.

Комментарии:

1. См. Раздел Устранение неполадок в документах, чтобы узнать, как включить ведение журнала для Socket.IO. Затем поделитесь журналами для сервера и клиента в то время, когда возникает проблема.

2. Я добавил журналы к своему вопросу. Как вы можете видеть, событие получено, но ответное сообщение не отправлено.

3. Я не уверен, почему обработчик не вызывается. Можете ли вы создать небольшой пример приложения, которое воспроизводит проблему, чтобы я мог ее отладить?

4. Я создал примерное приложение, но через несколько дней оно все еще работает должным образом. Я буду добавлять функции до тех пор, пока они не выйдут из строя, что должно помочь выявить проблему. github.com/AlekLefebvre/socket-io-minimal

Ответ №1:

Проблема, похоже, решена после обновления моих требований до:

 celery==5.1.0
Flask==1.1.4
Flask-SocketIO==5.1.0
gunicorn==20.1.0
redis==3.5.3