Python — объединение многопроцессорной обработки с asyncio работает только иногда

#python #multiprocessing #python-asyncio #python-multiprocessing

#python #многопроцессорная обработка #python-asyncio #python-многопроцессорная обработка

Вопрос:

Я хотел бы объединить asyncio и многопроцессорную обработку, поскольку у меня есть задача, в которой часть связана с вводом-выводом, а другая — с процессором. Сначала я попытался использовать loop.run_in_executor(), но, вероятно, не смог заставить его работать. Вместо этого я пошел с созданием двух процессов, в которых один использует asyncio, а другой — нет.

Код таков, что у меня есть класс с некоторыми неблокирующими функциями и одной блокировкой. У меня есть asyncio.Queue для передачи информации между неблокирующими частями и многопроцессорной обработкой.Очередь для передачи информации между неблокирующей и блокирующей функциями.

 import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time


class TestClass:
    def __init__(self):
        m = mp.Manager()
        self.blocking_queue = m.Queue()

    async def run(self):
        loop = asyncio.get_event_loop()
        self.non_blocking_queue = asyncio.Queue() # asyncio Queue must be declared within event loop
        task1 = loop.create_task(self.non_blocking1())
        task2 = loop.create_task(self.non_blocking2())
        task3 = loop.create_task(self.print_msgs())
        await asyncio.gather(task1, task2)
        task3.cancel()

    def blocking(self):
        i = 0
        while i < 5:
            time.sleep(0.6)
            i  = 1
            print("Blocking ", i)
            line = self.blocking_queue.get()
            print("Blocking: ", line)
        print("blocking done")

    async def non_blocking1(self):
        for i in range(5):
            await self.non_blocking_queue.put("Hello")
            await asyncio.sleep(0.4)

    async def non_blocking2(self):
        for i in range(5):
            await self.non_blocking_queue.put("World")
            await asyncio.sleep(0.5)

    async def print_msgs(self):
        while True:
            line = await self.non_blocking_queue.get()
            self.blocking_queue.put(line)
            print(line)


test_class = TestClass()
with ProcessPoolExecutor() as pool:
    pool.submit(test_class.blocking)
    pool.submit(asyncio.run(test_class.run()))
print("done")
  

Примерно в половине случаев, когда я запускаю это, оно работает нормально и выводит текст в блокирующей и неблокирующей очередях. Другая половина выводит только результаты неблокирующей очереди. Похоже, что процесс блокировки вообще не запускается. Это не является следствием каждый раз. Он может работать пять раз подряд, а затем не работать пять раз подряд.

Что может вызвать такую проблему? Каким лучшим способом я могу это сделать, используя как многопроцессорную обработку, так и asyncio?

Комментарии:

1. Посмотрите на github.com/omnilib/aiomultiprocess

Ответ №1:

запуск асинхронной задачи «внутри» другого процесса работает для меня, например:

 def runfn(fn):
    return asyncio.run(fn())

with ProcessPoolExecutor() as pool:
    pool.submit(test_class.blocking)
    pool.submit(runfn, test_class.run)
  

предположительно, внутри asyncio / задачи есть какое-то состояние, которое должно быть согласованным или нарушается при запуске в другом процессе

Комментарии:

1. Я не думаю, что понимаю, почему это будет иметь значение. Но это так. Теперь это работает каждый раз. Большое вам спасибо.