asyncio.run (и другие ароматизаторы) не могут восстановиться после исключения в сельдерее

#python #celery #python-asyncio

Вопрос:

Я использую сельдерей в качестве рабочего для спорадических задач. Библиотеки, которые мне нужно вызвать из моей задачи, все асинхронны, и я пробовал различные способы заставить это работать.

В основном это работает, если только выполняемая мной задача не создает исключение. Он регистрирует исключение просто отлично, но в следующий раз, когда он захватывает задачу, он выдает ошибку, сообщая, что «цикл событий закрыт» (этого не происходит, если задача выполняется правильно).

Я попробовал следующие ароматы, все с тем же результатом;

 @app.task
def score_survey(sku, auto_push=False):
    async_to_sync(score_survey_handler)(sku, auto_push)
 
 @app.task
def score_survey(sku, auto_push=False):
    asyncio.run(score_survey_handler(sku, auto_push))
 
 @app.task
def score_survey(sku, auto_push=False):
    loop = asyncio.new_event_loop()
    task = loop.create_task(score_survey_handler(sku, auto_push))
    try:
        loop.run_until_complete(task)
    except:
        task.exception()
        task.cancel()
        raise
    finally:
        loop.stop()
        loop.close()
 
 @app.task
def score_survey(sku, auto_push=False):
    loop = asyncio.new_event_loop()
    loop.run_until_complete(score_survey_handler(sku, auto_push))
 

Все точно такое же поведение; он работает нормально, пока score_survey_handler завершается без каких-либо проблем, но в тот момент, когда он создает исключение, следующая задача, принятая Сельдереем, создает исключение, сообщающее, что цикл событий закрыт.

Я также пытался просто использовать исключения и не вызывать их в задаче Сельдерея, и это тоже работает, но мне нужны исключения, зарегистрированные в бэкэнде результатов Сельдерея, так что это далеко не идеально.

Есть идеи, как я могу заставить это работать? Или почему это не работает, даже если я позвоню new_event_loop() ?