#python #multithreading #flask #celery
#python #многопоточность #flask #сельдерей
Вопрос:
Я запускаю удаленный celery worker с помощью Flask. Конфигурация в flask — redis используется для серверной части и rabbitmq в качестве посредника сообщений.
Flask выполняется с несколькими потоками, и функция celery get() используется в двух местах.
@app.routes("/route1")
def method1():
result1_obj = remote_method_1.apply_async()
result1 = result1_obj.get()
@app.routes("/route2")
def method2():
result2_obj = remote_method_2.apply_async()
result2 = result2_obj.get()
Поэтому всякий раз, когда get() вызывается двумя разными потоками одновременно в method1() и method2().
Журналы ошибок выглядят следующим образом:-
Traceback (most recent call last):
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflaskapp.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflaskapp.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflask_restplusapi.py", line 325, in wrapper
resp = resource(*args, **kwargs)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflaskviews.py", line 89, in view
return self.dispatch_request(*args, **kwargs)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesflask_restplusresource.py", line 44, in dispatch_request
resp = meth(*args, **kwargs)
File "C:Usersvenugopal.venkateshOneDrive - VeoneerDocumentssprint35fdt_report_gensrcapplicationroutes.py", line 89, in wrap
return f(*args, **kwargs)
File "C:Usersvenugopal.venkateshOneDrive - VeoneerDocumentssprint35fdt_report_gensrcdb__init__.py", line 281, in wrap
return f(*args, **kwargs)
File "C:Usersvenugopal.venkateshOneDrive - VeoneerDocumentssprint35fdt_report_gensrcdoorsroutes.py", line 141, in post
abort_res = abort_obj.get()
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesceleryresult.py", line 237, in get
on_message=on_message,
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsasynchronous.py", line 200, in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsasynchronous.py", line 268, in _wait_for_pending
on_interval=on_interval):
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsasynchronous.py", line 55, in drain_events_until
yield self.wait_for(p, wait, timeout=interval)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsasynchronous.py", line 64, in wait_for
wait(timeout=timeout)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagescelerybackendsredis.py", line 160, in drain_events
message = self._pubsub.get_message(timeout=timeout)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisclient.py", line 3617, in get_message
response = self.parse_response(block=False, timeout=timeout)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisclient.py", line 3505, in parse_response
response = self._execute(conn, conn.read_response)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisclient.py", line 3479, in _execute
return command(*args, **kwargs)
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisconnection.py", line 739, in read_response
response = self._parser.read_response()
File "C:Usersvenugopal.venkatesh.virtualenvsfdt_report_gen-BnOKsvM9libsite-packagesredisconnection.py", line 331, in read_response
raise InvalidResponse("Protocol Error: %r" % raw)
redis.exceptions.InvalidResponse: Protocol Error:
Есть какие-либо указания на то, как устранить эту ошибку?
Комментарии:
1. Показать полную обратную трассировку ошибки!
2. @KlausD. Я добавил журналы ошибок
3. Вы используете одно соединение redis между потоками?
4. @KlausD. да, я использую одно соединение redis в качестве серверной части, должен ли я использовать несколько соединений redis?
Ответ №1:
Это ошибка в библиотеке Celery https://github.com/celery/celery/pull/6416
внутренний объект в настоящее время не является потокобезопасным!
Вам нужно запустить приложение Flask без потоков, например, uwsgi --processes 12 --threads 1 --http 0.0.0.0:5000 --module server:app