#python #multiprocessing #python-asyncio #python-multiprocessing
#python #многопроцессорная обработка #python-asyncio #python-многопроцессорная обработка
Вопрос:
Я хотел бы объединить asyncio и многопроцессорную обработку, поскольку у меня есть задача, в которой часть связана с вводом-выводом, а другая — с процессором. Сначала я попытался использовать loop.run_in_executor(), но, вероятно, не смог заставить его работать. Вместо этого я пошел с созданием двух процессов, в которых один использует asyncio, а другой — нет.
Код таков, что у меня есть класс с некоторыми неблокирующими функциями и одной блокировкой. У меня есть asyncio.Queue для передачи информации между неблокирующими частями и многопроцессорной обработкой.Очередь для передачи информации между неблокирующей и блокирующей функциями.
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time
class TestClass:
def __init__(self):
m = mp.Manager()
self.blocking_queue = m.Queue()
async def run(self):
loop = asyncio.get_event_loop()
self.non_blocking_queue = asyncio.Queue() # asyncio Queue must be declared within event loop
task1 = loop.create_task(self.non_blocking1())
task2 = loop.create_task(self.non_blocking2())
task3 = loop.create_task(self.print_msgs())
await asyncio.gather(task1, task2)
task3.cancel()
def blocking(self):
i = 0
while i < 5:
time.sleep(0.6)
i = 1
print("Blocking ", i)
line = self.blocking_queue.get()
print("Blocking: ", line)
print("blocking done")
async def non_blocking1(self):
for i in range(5):
await self.non_blocking_queue.put("Hello")
await asyncio.sleep(0.4)
async def non_blocking2(self):
for i in range(5):
await self.non_blocking_queue.put("World")
await asyncio.sleep(0.5)
async def print_msgs(self):
while True:
line = await self.non_blocking_queue.get()
self.blocking_queue.put(line)
print(line)
test_class = TestClass()
with ProcessPoolExecutor() as pool:
pool.submit(test_class.blocking)
pool.submit(asyncio.run(test_class.run()))
print("done")
Примерно в половине случаев, когда я запускаю это, оно работает нормально и выводит текст в блокирующей и неблокирующей очередях. Другая половина выводит только результаты неблокирующей очереди. Похоже, что процесс блокировки вообще не запускается. Это не является следствием каждый раз. Он может работать пять раз подряд, а затем не работать пять раз подряд.
Что может вызвать такую проблему? Каким лучшим способом я могу это сделать, используя как многопроцессорную обработку, так и asyncio?
Комментарии:
1. Посмотрите на github.com/omnilib/aiomultiprocess
Ответ №1:
запуск асинхронной задачи «внутри» другого процесса работает для меня, например:
def runfn(fn):
return asyncio.run(fn())
with ProcessPoolExecutor() as pool:
pool.submit(test_class.blocking)
pool.submit(runfn, test_class.run)
предположительно, внутри asyncio / задачи есть какое-то состояние, которое должно быть согласованным или нарушается при запуске в другом процессе
Комментарии:
1. Я не думаю, что понимаю, почему это будет иметь значение. Но это так. Теперь это работает каждый раз. Большое вам спасибо.