Асинхронный с двумя петлями, лучшая практика

#python-3.x #python-asyncio

Вопрос:

У меня есть два бесконечных цикла. Их обработка является легкой. Я не хочу, чтобы они блокировали друг друга. Является ли использование await asyncio.sleep(0) хорошей практикой?

Это мой код

 import asyncio

async def loop1():
    while True:
        print("loop1")
        # pull data from kafka
        await asyncio.sleep(0)


async def loop2():
    while True:
        print("loop2")
        # send data to all clients using asyncio stream api
        await asyncio.sleep(0)


async def main():
    await asyncio.gather(loop1(), loop2())


asyncio.run(main())
 

Комментарии:

1. Ваши сопрограммы действительно что-то делают с asyncio? Если нет, то потоки (ввод-вывод) или процессы (процессор) могут быть лучшим решением.

2. Да, loop1 извлекает данные из кафки с помощью aiokafka, а loop2 отправляет эти данные через сокет с помощью API asyncio stream. Возможно, облегченная обработка не была подходящим описанием

Ответ №1:

Две (и многие другие) asyncio задачи не будут блокировать друг друга до тех пор, пока одна из задач не выполнит длительную синхронизацию внутри.

В обеих ваших задачах есть только сетевые операции внутри (запросы Кафки и API), поэтому ни одна из них не заблокирует другую задачу.

Когда вы должны использовать asyncio.sleep(0) ?

Представьте, что у вас есть несколько длительных операций синхронизации — вычислений. Вычисления — это не операция ввода-вывода.

Этот пример больше похож на то, что полезно знать, если у вас есть такие операции в реальном приложении, вы должны перенести их loop.run_in_executor и использовать concurrent.futures.ProcessPoolExecutor в качестве исполнителя. Пример:

 import asyncio


async def long_calc():
    """
    Some Heavy CPU bound task.
    Better make it sync function and move to ProcessPoolExecutor
    """
    s = 0

    for _ in range(100):
        for i in range(1_000_000):
            s  = i**2

        # comment the line and watch result
        # you'll get no working messages
        # that's why I use sleep(0.0) here
        await asyncio.sleep(0.0)

    return s


async def pinger():
    """Task which shows that app is alive"""
    n = 0
    
    while True:
        await asyncio.sleep(1)
        print(f"Working {n}")
        n  = 1


async def amain():
    """Main async function  in this app"""
    # run in asyncio.create_task since we want the task
    # to run in parallel with long_calc  
    # we do not want to wait till it will be finished
    # If it were thread it would be called daemon thread
    asyncio.create_task(pinger())
    # await results of long task
    s = await long_calc()
    print(f"Done: {s}")


if __name__ == '__main__':
    asyncio.run(amain())

 

Если вам нужно, чтобы я привел вам run_in_executor пример — дайте мне знать.

Комментарии:

1. Но «в то время как» в моей функции может (и я думаю, что это так) занять все процессорное время, если мои задачи ввода-вывода являются суперреактивными? Я думаю, смысл в том, чтобы предположить, что в loop1 и loop2 есть ключевое слово «ожидание» для моих задач ввода-вывода, в этом случае цикл событий знает, как переключить задачу, верно?

2. @nicolascarrara Проблема не в том, что вы используете while в своих асинхронных задачах, потенциальная проблема заключается в том, что у вас внутри while цикла. Если у вас есть асинхронные операции ввода — вывода в задачах-все в порядке, переключайтесь с одной задачи на другую. Если у вас есть задачи, связанные с процессором, — это проблема. Дополнительная информация и рабочий пример в моем ответе.