#python #python-3.x #python-asyncio
#python #python-3.x #python-asyncio
Вопрос:
import queue
qq = queue.Queue()
qq.put('hi')
class MyApp():
def __init__(self, q):
self._queue = q
def _process_item(self, item):
print(f'Processing this item: {item}')
def get_item(self):
try:
item = self._queue.get_nowait()
self._process_item(item)
except queue.Empty:
pass
async def listen_for_orders(self):
'''
Asynchronously check the orders queue for new incoming orders
'''
while True:
self.get_item()
await asyncio.sleep(0)
a = MyApp(qq)
loop = asyncio.get_event_loop()
loop.run_until_complete(a.listen_for_orders())
С использованием Python 3.6.
Я пытаюсь написать обработчик событий, который постоянно прослушивает сообщения в queue
и обрабатывает их (в данном случае печатает их). Но он должен быть асинхронным — мне нужно иметь возможность запускать его в терминале (IPython) и вручную передавать данные в queue
(по крайней мере, изначально, для тестирования).
Этот код не работает — он блокируется навсегда.
Как мне заставить это выполняться вечно, но возвращать управление после каждой итерации while
цикла?
Спасибо.
примечание: Чтобы цикл событий работал с IPython (версия 7.2), я использую этот код из ib_insync
библиотеки, я использую эту библиотеку для решения реальной проблемы в примере выше.
Комментарии:
1. наличие
while True
в целом плохо, вместо этого вам следует сделать следующее: когда вы добавляете что-то в очередь (используя метод), вы должны вызвать другой метод, чтобы взять элемент из очереди, выполнить его действия и в конце снова проверить очередь на наличие дополнительных элементов, если таковых нет, завершить. Также проще использовать потоки для подобных вещей2.Я не совсем понимаю, что вы пытаетесь сделать. Ваш метод действительно выглядит синхронным, хотя, конечно,
loop.run_until_complete()
будет блокироваться. Я не вижу, где сообщения помещаются в очередь, кроме как вверху. Вы упоминаете, что отправляете сообщения вручную… что это значит?3. @NikolasStevenson-Molnar У меня есть другой поток, который опрашивает внешний сетевой ресурс на предмет данных (интенсивный ввод-вывод) и сбрасывает входящие сообщения в этот поток. Это обработчик для получения входящих данных.
4. В этом случае кажется, что вы используете asyncio не в том месте. Это потрясающе для сетевого ввода-вывода, но, похоже, не приносит никакой пользы при вашем использовании здесь (по крайней мере, из того, что показано в вашем примере кода); вы в основном создали асинхронную процедуру, которая, по сути, действует как синхронная .
Ответ №1:
Вам нужно создать свою очередь asyncio.Queue
и добавлять элементы в очередь потокобезопасным способом. Например:
qq = asyncio.Queue()
class MyApp():
def __init__(self, q):
self._queue = q
def _process_item(self, item):
print(f'Processing this item: {item}')
async def get_item(self):
item = await self._queue.get()
self._process_item(item)
async def listen_for_orders(self):
'''
Asynchronously check the orders queue for new incoming orders
'''
while True:
await self.get_item()
a = MyApp(qq)
loop = asyncio.get_event_loop()
loop.run_until_complete(a.listen_for_orders())
Ваш другой поток должен помещать данные в очередь следующим образом:
loop.call_soon_threadsafe(qq.put_nowait, <item>)
call_soon_threadsafe
обеспечит правильную блокировку, а также то, что цикл событий запускается, когда новый элемент очереди готов.
Комментарии:
1. Это хорошо, но когда я вызываю
loop.run_until_complete(a.listen_for_orders())
, это блокирует ввод дополнительных команд в консоль python. Мне нужно, чтобы он не блокировал2. Затем @JoshD запускается
run_until_complete
в фоновом потоке и считывается с консоли в основном потоке. Код должен работать независимо от того, в каком потоке выполняется цикл событий, при условии, что вы используетеcall_soon_threadsafe
для добавления материала в очередь.3. Спасибо. Хотя это не совсем решает мою проблему, я подозреваю, что у меня что-то не так с моей общей архитектурой, если я объединяю потоки и асинхронность. Я вижу из ваших других ответов, что вы отлично разбираетесь в асинхронности, вы не возражаете, если я свяжусь с вами в автономном режиме для помощи с этим? Вы доступны? Спасибо.
Ответ №2:
Это не асинхронная очередь. Вам нужно использовать asyncio.Queue
qq = queue.Queue()
Асинхронность — это цикл событий. Вы вызываете цикл, передавая ему управление, и он выполняет цикл до завершения вашей функции, чего никогда не происходит:
loop.run_until_complete(a.listen_for_orders())
Вы прокомментировали:
У меня есть другой поток, который опрашивает внешний сетевой ресурс на предмет данных (интенсивный ввод-вывод) и сбрасывает входящие сообщения в этот поток.
Напишите этот код асинхронно, чтобы у вас был:
async def run():
while 1:
item = await get_item_from_network()
process_item(item)
loop = asyncio.get_event_loop()
loop.run_until_complete( run() )
Если вы не хотите этого делать, то вы можете выполнить пошаговое выполнение цикла, хотя вы и не хотите этого делать.
import asyncio
def run_once(loop):
loop.call_soon(loop.stop)
loop.run_forever()
loop = asyncio.get_event_loop()
for x in range(100):
print(x)
run_once(loop)
Затем вы просто вызываете свою функцию async, и каждый раз, когда вы вызываете run_once, она проверяет вашу (очередь asyncio) и передает управление вашей функции listen for orders, если в очереди есть элемент.
Комментарии:
1. Спасибо, но у меня
get_item_from_network()
нетawaitable
— это вызов Python SDK для получения сообщений из очереди Azure Storage Queue для получения сообщений из очереди, и я полагаю, что он построен поверхrequests
который блокирует. Вот почему я запускаю код, который получает эти сообщения в отдельном потоке и передает вqueue.Queue