#python #flask #amazon-ec2 #celery
Вопрос:
Я создаю веб-сайт с задачей, обработка которой занимает довольно много времени и требует больших вычислений. Я написал код так, чтобы задача могла давать частичный вывод (давать результат). Я использую celery worker для запуска задачи на другой машине и задаю задачу для отправки обновления с помощью update_state()
.
Мое celery_tasks.py
досье выглядит примерно так
@celery.task(name="my_task") def my_task(): final_output = [] for output_part in long_process(): my_task.update_state(state="PROGRESS", meta={"progress": output_part}) final_output.append(output_part) return final_output
В Flask я создаю маршрут для SSE, чтобы интерфейс мог получать часть вывода, как только это будет сделано. Я написал два маршрута — один для создания задачи, а другой для получения потока результатов.
@app.route('/gen-task', methods=['POST']) def gen_task(): if request.method == 'POST': r = my_task.apply_async() return make_response(jsonify({"task_id": r.id}), 200) @app.route('/task-output-stream/lt;task_idgt;') def task_output_stream(task_id): task = AsyncResult(task_id, app=celery) def generate_stream(): while not task.ready(): print(f'State={task.state}, info type={type(task.info)}') info = task.info if info: output_part = info['progress'] json_data = json.dumps({'output': output_part}) yield f"data:{json_data}nn" time.sleep(2) return Response(generate_stream(), mimetype='text/event-stream')
Я тестирую приложение flask на своем локальном компьютере (терминал macOS). Результат задачи отправляется на интерфейс каждые 2 секунды, как и ожидалось. Журнал колбы выглядит примерно так:
State=PENDING, info type=lt;class 'NoneType'gt; State=PENDING, info type=lt;class 'NoneType'gt; ... State=PROGRESS, info type=lt;class 'dict'gt; State=PROGRESS, info type=lt;class 'dict'gt; ...
Тем не менее, я пытаюсь развернуть свое приложение flask на EC2, но task.info
всегда None
и task.state
всегда PENDING
. Я проверяю работника сельдерея, и задача выполняется нормально и, наконец, завершена, но веб-сервер не может получить обновление и результат. Таким образом, на переднем конце нет события.
Итак, я написал еще один тестовый api для задачи сельдерея, и он может отлично выполнить задачу на EC2. Задача.ready() заключается в True
@app.route('/get-task', methods=['POST']) def get_task(): if request.method == 'POST': r = my_task.apply_async() output = r.get() print('task.ready()', r.ready()) return make_response(jsonify({"output": output}), 200)
Для настройки сельдерея я использую RabbitMQ в качестве брокера и RPC в качестве бэкенда.
В чем может быть причина этой проблемы? И как я могу это исправить?
Обновление: Я заменил серверную часть RPC на серверную часть Redis, и теперь она может работать так, как ожидалось. Я думаю, что проблема связана с ограничением серверной части RPC.