Ошибка протокола при использовании celery worker с колбой

#python #multithreading #flask #celery

#python #многопоточность #flask #сельдерей

Вопрос:

Я запускаю удаленный celery worker с помощью Flask. Конфигурация в flask — redis используется для серверной части и rabbitmq в качестве посредника сообщений.

Flask выполняется с несколькими потоками, и функция celery get() используется в двух местах.

 @app.routes("/route1")
def method1():
    result1_obj = remote_method_1.apply_async()
    result1 = result1_obj.get()

@app.routes("/route2")
def method2():
    result2_obj = remote_method_2.apply_async()
    result2 = result2_obj.get()
  

Поэтому всякий раз, когда get() вызывается двумя разными потоками одновременно в method1() и method2().

Журналы ошибок выглядят следующим образом:-

 Traceback (most recent call last):
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflaskapp.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflaskapp.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflask_restplusapi.py", line 325, in wrapper
    resp = resource(*args, **kwargs)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflaskviews.py", line 89, in view
    return self.dispatch_request(*args, **kwargs)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflask_restplusresource.py", line 44, in dispatch_request
    resp = meth(*args, **kwargs)
  File "C:Usersvenugopal.venkateshOneDrive - VeoneerDocumentssprint35fdt_report_gensrcapplicationroutes.py", line 89, in wrap
    return f(*args, **kwargs)
  File "C:Usersvenugopal.venkateshOneDrive - VeoneerDocumentssprint35fdt_report_gensrcdb__init__.py", line 281, in wrap
    return f(*args, **kwargs)
  File "C:Usersvenugopal.venkateshOneDrive - VeoneerDocumentssprint35fdt_report_gensrcdoorsroutes.py", line 141, in post
    abort_res = abort_obj.get()
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesceleryresult.py", line 237, in get
    on_message=on_message,
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsasynchronous.py", line 200, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsasynchronous.py", line 268, in _wait_for_pending
    on_interval=on_interval):
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsasynchronous.py", line 55, in drain_events_until
    yield self.wait_for(p, wait, timeout=interval)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsasynchronous.py", line 64, in wait_for
    wait(timeout=timeout)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsredis.py", line 160, in drain_events
    message = self._pubsub.get_message(timeout=timeout)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisclient.py", line 3617, in get_message
    response = self.parse_response(block=False, timeout=timeout)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisclient.py", line 3505, in parse_response
    response = self._execute(conn, conn.read_response)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisclient.py", line 3479, in _execute
    return command(*args, **kwargs)
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisconnection.py", line 739, in read_response
    response = self._parser.read_response()
  File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisconnection.py", line 331, in read_response
    raise InvalidResponse("Protocol Error: %r" % raw)
redis.exceptions.InvalidResponse: Protocol Error:
  

Есть какие-либо указания на то, как устранить эту ошибку?

Комментарии:

1. Показать полную обратную трассировку ошибки!

2. @KlausD. Я добавил журналы ошибок

3. Вы используете одно соединение redis между потоками?

4. @KlausD. да, я использую одно соединение redis в качестве серверной части, должен ли я использовать несколько соединений redis?

Ответ №1:

Это ошибка в библиотеке Celery https://github.com/celery/celery/pull/6416

внутренний объект в настоящее время не является потокобезопасным!

Вам нужно запустить приложение Flask без потоков, например, uwsgi --processes 12 --threads 1 --http 0.0.0.0:5000 --module server:app