#docker #concurrency #celery
#docker #параллелизм #сельдерей
Вопрос:
При запуске Celery в контейнере Docker, который получает RestAPI из других контейнеров, я получаю вызов RuntimeError: concurrent poll() .
Кто-нибудь сталкивался с подобной ошибкой?
Я прикрепляю обратную трассировку.
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/opt/www/api/api/training_call.py", line 187, in start_process
result_state.get(on_message=self._on_raw_message, propagate=False)
File "/usr/local/lib/python3.5/dist-packages/celery/result.py", line 226, in get
on_message=on_message,
File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 188, in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 255, in _wait_for_pending
on_interval=on_interval):
File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 56, in drain_events_until
yield self.wait_for(p, wait, timeout=1)
File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 65, in wait_for
wait(timeout=timeout)
File "/usr/local/lib/python3.5/dist-packages/celery/backends/redis.py", line 127, in drain_events
message = self._pubsub.get_message(timeout=timeout)
File "/usr/local/lib/python3.5/dist-packages/redis/client.py", line 3135, in get_message
response = self.parse_response(block=False, timeout=timeout)
File "/usr/local/lib/python3.5/dist-packages/redis/client.py", line 3034, in parse_response
if not block and not connection.can_read(timeout=timeout):
File "/usr/local/lib/python3.5/dist-packages/redis/connection.py", line 628, in can_read
return self._parser.can_read() or self._selector.can_read(timeout)
File "/usr/local/lib/python3.5/dist-packages/redis/selector.py", line 28, in can_read
return self.check_can_read(timeout)
File "/usr/local/lib/python3.5/dist-packages/redis/selector.py", line 156, in check_can_read
events = self.read_poller.poll(timeout)
RuntimeError: concurrent poll() invocation
Комментарии:
1. Проблема возникает из-за исключения в основной библиотеке Python через Redis. Вот где это обсуждалось bugs.python.org/issue8865 и добавлен в основной код python hg.python.org/cpython/rev/4543408e2ba6
2. @Gianfranco: я действительно сталкиваюсь с подобной ошибкой (тот же stacktrace, начинающийся с «celery/result.py «, строка 226»)
3. @Laizer: ошибка, о которой вы упомянули, была исправлена в 2013 году; Джанфранко, похоже, использует python3.5, а я использую python 3.6.6 — похоже, что обе версии включают это исправление. Есть еще идеи?
Ответ №1:
Соединение брокера не является потокобезопасным, поэтому вам необходимо обеспечить потокобезопасность в коде вашего приложения. @Laizer упомянул тикет, в котором эта ошибка была введена в основную библиотеку python
Один из способов сделать это — обернуть все вызовы, которые блокируют до завершения задачи, в общую блокировку:
import celery
import threading
@celery.shared_task
def debug_task(self):
print('Hello, world')
def boom(nb_tasks):
""" not thread safe - raises RuntimeError during concurrent executions """
tasks = celery.group([debug_task.s() for _ in range(nb_tasks)])
pool = tasks.apply_async()
pool.join() # raised from here
CELERY_POLL_LOCK = threading.Lock()
def safe(nb_tasks):
tasks = celery.group([debug_task.s() for _ in range(nb_tasks)])
pool = tasks.apply_async()
with CELERY_POLL_LOCK: # prevents concurrent calls to poll()
pool.join()
def main(nb_threads, nb_tasks_per_thread):
for func in (safe, boom):
threads = [threading.Thread(target=func, args=(nb_tasks_per_thread, )) for _ in range(nb_threads)]
for a_thread in threads:
a_thread.start()
for a_thread in threads:
a_thread.join()
main(10, 100)
Это наивный подход, который подходит для меня, потому что я не ожидаю большого параллелизма, и все задачи выполняются относительно быстро (~ 10 секунд).
Если у вас другой «профиль», вам может понадобиться что-то более запутанное (например, одна задача опроса, которая периодически опрашивает все ожидающие группы / задачи).
Ответ №2:
У меня была такая же ошибка с приложением, которое напрямую использовало Redis pub / sub. Запуск множества вызовов redis.client.PubSub.getMessage
в быстрой последовательности привел к этому состоянию гонки. Мое решение состояло в том, чтобы замедлить скорость опроса новых сообщений.
Комментарии:
1. Привет @laizer, как ты это сделал?
2. Я вызывал
getMessage
напрямую при каждом запросе к моему веб-приложению. Я ввел глобальный счетчик, который запускал каждые 100 запросов.
Ответ №3:
Я столкнулся с той же проблемой и решил ее
pip install -U "celery[redis]"
надеюсь, это полезно для вас
https://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html