Не могу понять, как правильно планировать мои функции asyncio

#python-asyncio

#python-asyncio

Вопрос:

Я довольно новичок в asyncio, и я прочитал некоторую документацию, примеры и другие вопросы, но я не могу найти никаких ответов относительно того, что я здесь делаю неправильно. Я недостаточно разбираюсь в asyncio, чтобы понять, почему это происходит. У меня есть класс с функциями asyncio, один из которых открывает websocket, а затем создает три задачи asyncio, которые он выполняет. Первый выполняется полностью нормально. Меня смущает выполнение второй и третьей задач. Даже после того, как вторая из них выполнила asyncio.sleep, третья задача, похоже, не выполняет websock.recv() , и поэтому вторая задача выполняет websocket.recv() и получает данные, которые я имел в виду для получения третьей задачи. Что я здесь делаю не так? Заранее спасибо за любую помощь.

Редактировать: Извините, код недоступен для выполнения, если вы не создаете или не используете discord-бота и не используете его токен в коде.

 import websockets
import requests
import asyncio
import json

class GatewayConnection:
    def __init__(self):
        self.heartbeat = None
        self.s = None
        self.info = None
        self.connected = False

        self.uri = f"{self.get_gateway()}/?v=8amp;encoding=json"

    @staticmethod
    def get_gateway():
        endpoint = requests.get(OAUTH 'gateway')
        return endpoint.json()['url']

    async def get_gateway_info(self, websock):
        recv = json.loads(await websock.recv())
        self.heartbeat = recv['d']['heartbeat_interval']
        self.s = recv['s']

    async def finish_gateway_connect(self, websock):
        await websock.send(json.dumps({
            'op': 2,
            'd': {
                'token': TOKEN,
                'intents': 513,
                'properties': {
                    '$os': 'windows',
                    '$browser': 'Sheepp',
                    '$device': 'Sheepp'
                }
            }
        }))

        print("waiting to received ready info")
        self.info = json.loads(await websock.recv())
        print("Ready info received")
        print(self.info)

    async def communicate(self):
        async with websockets.connect(self.uri) as websock:
            self.connected = True

            get = asyncio.create_task(self.get_gateway_info(websock))
            heartbeat = asyncio.create_task(self.send_heartbeat(websock))
            finish = asyncio.create_task(self.finish_gateway_connect(websock))

            await get  # First task
            await heartbeat  # Second task
            await finish  # Third task
        
    async def send_heartbeat(self, websock):
        while self.connected:
            await websock.send(json.dumps({'op': 1, 'd': self.s}))
            print("Sent heartbeat")
            response = json.loads(await websock.recv())
            print("Received 'heartbeat'")
            if response['op'] != 11:
                self.connected = False  # Discord api says to terminate connection upon not receiving a valid heartbeat response
                print(f"The server did not send back a valid heartbeat response, but instead: {response}")
            await asyncio.sleep(self.heartbeat/1000)

gateway = GatewayConnection()
try:
    loop.run_until_complete(gateway.communicate())
except KeyboardInterrupt:
    print("Keyboard Interruption")
finally:
    loop.close()
 

Комментарии:

1. Вы имели в виду get , heartbeat и finish чтобы их казнили одного за другим? Потому что, как вы закодировали их ожидания, они будут выполняться параллельно , несмотря на внешний вид.

2. Теперь, когда я думаю об этом, я, вероятно, должен get сначала выполнить, а не как задачу. Но я имею в виду, чтобы вторая и третья задачи выполнялись одновременно. Мой план состоит в том, что сердцебиение постоянно выполняется, в то время как другие вещи также выполняются, потому что сердцебиение должно поддерживаться. Мне также нужно выполнить heartbeat , прежде чем finish

3. Тогда, возможно, вы можете использовать что-то вроде await self.get_gateway_info(websock); await asyncio.gather(self.send_heartbeat(websock), self.finish_gateway_connect(websock))

4. Я получаю ошибку времени выполнения : cannot call recv while another coroutine is already waiting for the next message . Вывод из операторов печати выполняется в следующем порядке: Sent heartbeat , waiting to receive ready info , received 'heartbeat' , sleep . Я добавил оператор th at last print прямо перед asyncio.sleep in send_heartbeat . Также ошибка возникает из recv finish_gateway_connect

5. @user4815162342 как вы думаете, почему get, heartbeat и finish будут выполняться параллельно? У них есть ожидание, поэтому следующая строка не будет выполняться до завершения первого ожидания. await one(), await two() запустят два после завершения одного.

Ответ №1:

Чтобы исправить ошибку, которую я получал, я создал отдельный класс для обработки websocket, который гарантирует, что функции recv и send выполняются вовремя правильно. Я не уверен, насколько хорошо я создал класс, но вот он:

 class WebsocketHandler:
    def __init__(self, websocket):
        self.ws = websocket
        self.sending = False

        self.sends = []
        self.recvs = []

    async def send(self, data):
        if self.sending:
            self.sends.append(json.dumps(data))
        else:
            self.sending = True
            await self.ws.send(json.dumps(data))
            while self.sends:
                data = self.sends[0]
                self.sends.pop(0)
                await self.ws.send(json.dumps(data))
            self.sending = False

    async def recv(self):
        event = asyncio.Event()
        self.recvs.append(event)
        while self.recvs[0] != event:
            await event.wait()
        try:
            data = await asyncio.wait_for(self.ws.recv(), timeout=1)
        except asyncio.exceptions.TimeoutError:
            self.recvs.pop(0)
            raise asyncio.exceptions.TimeoutError
        self.recvs.pop(0)
        if len(self.recvs) > 0:
            self.recvs[0].set()
        return json.loads(data)
 

Ошибка, которую я получал, была RuntimeError: cannot call recv while another coroutine is already waiting for the next message
Редактировать: не стесняйтесь критиковать мой код.

Редактировать 2: я внес пару изменений по мере необходимости, чтобы исправить некоторые ошибки, которые я получал, а также для удобства.

Комментарии:

1. Вы могли бы использовать asyncio.Event вместо цикла со сном.