потоковая передача нескольких веб-сайтов с помощью asyncio/aiohttp

#python #asynchronous #websocket #python-asyncio #aiohttp

Вопрос:

Я пытаюсь подписаться на несколько потоковых веб-сайтов, используя asyncio и python aiohttp .

Когда я запускаю приведенный ниже код, он выводит только «a», но больше ничего в консоли в качестве вывода. Он не выдает никаких ошибок, и я не могу отлаживать шаг за шагом, так как это асинхронный код.

Я хотел бы выяснить, в чем проблема, очень признателен, если кто-нибудь сможет помочь.

 import aiohttp
import asyncio

async def coro(event, item1, item2):
    print("a")
    async with aiohttp.ClientSession.ws_connect(url='url') as ws:
        event.set()
        print("b")
        await asyncio.gather(ws.send_json(item1),
                             ws.send_json(item2))
        async for msg in ws:
            print("c")
            print(msg)

async def ws_connect(item1, item2):
    event = asyncio.Event()
    task = asyncio.create_task(coro(event, item1, item2))
    await event.wait()  # wait until the event is set() to True, while waiting, block
    return task

async def main():
    item1 = {
        "method": "subscribe",
        "params": {'channel': "bar"}
    }
    item2 = {
        "method": "subscribe",
        "params": {'channel': "foo"}
    }
    ws_task = await ws_connect(item1, item2)
    await ws_task

asyncio.run(main())

 

Ответ №1:

Вы неправильно вызываете ws_connect . Правильный путь:

 async with aiohttp.ClientSession() as session:
    async with session.ws_connect('url') as was:
        ...
 

Полный пример:

 import aiohttp
import asyncio

async def coro(event, item1, item2):
    print("a")
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect('wss://echo.websocket.org') as ws:
            event.set()
            print("b")
            await asyncio.gather(ws.send_json(item1),
                                 ws.send_json(item2))
            async for msg in ws:
                print("c")
                print(msg)


async def ws_connect(item1, item2):
    event = asyncio.Event()
    task = asyncio.create_task(coro(event, item1, item2))
    await event.wait()  # wait until the event is set() to True, while waiting, block
    return task

async def main():
    item1 = {
        "method": "subscribe",
        "params": {'channel': "bar"}
    }
    item2 = {
        "method": "subscribe",
        "params": {'channel': "foo"}
    }
    ws_task = await ws_connect(item1, item2)
    await ws_task

asyncio.run(main())