Результат задачи сельдерея не может получить update_state и результат с помощью Flask с wsgi и Apache на AWS EC2, но он отлично работает в локальной разработке Flask

#python #flask #amazon-ec2 #celery

Вопрос:

Я создаю веб-сайт с задачей, обработка которой занимает довольно много времени и требует больших вычислений. Я написал код так, чтобы задача могла давать частичный вывод (давать результат). Я использую celery worker для запуска задачи на другой машине и задаю задачу для отправки обновления с помощью update_state() .

Мое celery_tasks.py досье выглядит примерно так

 @celery.task(name="my_task") def my_task():  final_output = []  for output_part in long_process():  my_task.update_state(state="PROGRESS", meta={"progress": output_part})  final_output.append(output_part)  return final_output  

В Flask я создаю маршрут для SSE, чтобы интерфейс мог получать часть вывода, как только это будет сделано. Я написал два маршрута — один для создания задачи, а другой для получения потока результатов.

 @app.route('/gen-task', methods=['POST']) def gen_task():  if request.method == 'POST':  r = my_task.apply_async()  return make_response(jsonify({"task_id": r.id}), 200)  @app.route('/task-output-stream/lt;task_idgt;') def task_output_stream(task_id):  task = AsyncResult(task_id, app=celery)  def generate_stream():  while not task.ready():  print(f'State={task.state}, info type={type(task.info)}')  info = task.info  if info:  output_part = info['progress']  json_data = json.dumps({'output': output_part})  yield f"data:{json_data}nn"  time.sleep(2)  return Response(generate_stream(), mimetype='text/event-stream')  

Я тестирую приложение flask на своем локальном компьютере (терминал macOS). Результат задачи отправляется на интерфейс каждые 2 секунды, как и ожидалось. Журнал колбы выглядит примерно так:

 State=PENDING, info type=lt;class 'NoneType'gt; State=PENDING, info type=lt;class 'NoneType'gt; ... State=PROGRESS, info type=lt;class 'dict'gt; State=PROGRESS, info type=lt;class 'dict'gt; ...  

Тем не менее, я пытаюсь развернуть свое приложение flask на EC2, но task.info всегда None и task.state всегда PENDING . Я проверяю работника сельдерея, и задача выполняется нормально и, наконец, завершена, но веб-сервер не может получить обновление и результат. Таким образом, на переднем конце нет события.

Итак, я написал еще один тестовый api для задачи сельдерея, и он может отлично выполнить задачу на EC2. Задача.ready() заключается в True

 @app.route('/get-task', methods=['POST']) def get_task():  if request.method == 'POST':  r = my_task.apply_async()  output = r.get()  print('task.ready()', r.ready())  return make_response(jsonify({"output": output}), 200)  

Для настройки сельдерея я использую RabbitMQ в качестве брокера и RPC в качестве бэкенда.

В чем может быть причина этой проблемы? И как я могу это исправить?

Обновление: Я заменил серверную часть RPC на серверную часть Redis, и теперь она может работать так, как ожидалось. Я думаю, что проблема связана с ограничением серверной части RPC.