Как избежать ошибки 429 (слишком много запросов) python с помощью Asyncio

#python #python-asyncio #http-status-code-429

#python #python-asyncio #http-status-code-429

Вопрос:

Я использую следующий код для выполнения запросов с помощью клиента aiohttp. Сервер, на который я пытаюсь отправить запрос, имеет ограничение в 30 тыс. запросов в час на IP. Итак, я получаю ошибку 429 слишком много запросов. Я хочу переводить задание в спящий режим всякий раз, когда оно достигает предела.

Я могу извлечь x_rateLimit_reset из заголовка, поэтому я подумал, что могу использовать его, чтобы перевести задание в режим ожидания, но я заметил очень странное поведение. Иногда время ожидания задания становится отрицательным, а иногда оно застревает в спящем режиме.

Например, в последний раз, когда я запускал задание, оно сначала спало в течение 2000 секунд, а затем по истечении времени снова попыталось перейти в спящий режим еще на 2500 секунд и застряло в спящем режиме. Я думаю, что, возможно, другие параллельные процессы вызвали проблему, поэтому было интересно, как справиться со слишком большим количеством сообщений об ошибках запросов при использовании Asyncio.

 @backoff.on_exception(backoff.expo, (asyncio.TimeoutError, aiohttp.client_exceptions.ServerDisconnectedError,TooManyRequests),
                          max_time=300)
    async def fetch(self, url, session, params):
        try:
            async with session.get(url, params=params) as response:
                now = int(time.time())
                print(response)
                output = await response.read()
                output = json.loads(output)

                if 'X-RateLimit-Remaining' in response.headers:
                    rate = response.headers['X-RateLimit-Remaining']

                if 'status' in output and output['status'] == 429:
                    x_rateLimit_reset = int(response.headers['X-RateLimit-Reset'])
                    print("sleep mode")
                    seconds = x_rateLimit_reset - now
                    LOGGER.info("The job will sleep for {} seconds".format(seconds))
                    time.sleep(max(seconds,0))
                    raise TooManyRequests()



            return output

        except (asyncio.TimeoutError, TypeError, json.decoder.JSONDecodeError,
                aiohttp.client_exceptions.ServerDisconnectedError) as e:
            print(str(e))

    async def bound_fetch(self, sem, url, session, params):
        # Getter function with semaphore.
        async with sem:
            output = await self.fetch(url, session, params)
        return {"url": url, "output": output}
  

Отредактировано:
Вот как я инициирую bound_fetch и определяю URL-адреса:

 def get_responses(self, urls, office_token, params=None):   
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(self.run(office_token, urls, params))
    responses = loop.run_until_complete(future)
    return responses

async def run(self, office_token, urls, params):
        tasks = []
        # create instance of Semaphore
        sem = asyncio.BoundedSemaphore(200)
        timeout = ClientTimeout(total=1000)

        async with ClientSession(auth=BasicAuth(office_token, password=' '), timeout=timeout,
                                 connector=TCPConnector(ssl=False)) as session:
            for url in urls:
                # pass Semaphore and session to every GET request
                task = asyncio.ensure_future(self.bound_fetch(sem, url, session, params))
                tasks.append(task)

            responses = await asyncio.gather(*tasks)
            return responses

urls = [
                        "{}/{}".format(self.base_url, "{}?page={}amp;api_key={}".format(object_name, page_number, self.api_keys))
                        for page_number in range(batch * chunk_size   1, chunk_size * (1   batch)   1)]
  

Комментарии:

1. Пожалуйста, покажите код, в котором вы определяете список URL-адресов и инициируете bound_fetch

2. Пожалуйста, смотрите отредактированный раздел

3. Пожалуйста, смотрите отредактированный ответ

Ответ №1:

Основная причина, по которой вы используете time.sleep() вместо await asyncio.sleep() этого.

Обновить

Вот минимальное рабочее решение и некоторые комментарии о том, как оно работает.

Пожалуйста, используйте его для принятия вашего решения.

Взгляните на asyncio-throttle

 import aiohttp
import asyncio
from datetime import datetime


async def fetch(session, task):  # fetching urls and mark result of execution
    async with session.get(task['url']) as response:
        if response.status != 200:
            # response.raise_for_status()
            # Here you need to somehow  handle 429 code if it acquired
            # In my example I just skip it.
            task['result'] = response.status
            task['status'] = 'done'
        await response.text()  # just to be sure we acquire data
        print(f"{str(datetime.now())}: Got result of {task['url']}")  # logging
        task['result'] = response.status
        task['status'] = 'done'


async def fetch_all(session, urls, persecond):
    # convert to list of dicts
    url_tasks = [{'url': i, 'result': None, 'status': 'new'} for i in urls]
    n = 0  # counter
    while True:
        # calc how many tasks are fetching right now
        running_tasks = len([i for i in url_tasks if i['status'] in ['fetch']])
        # calc how many tasks are still need to be executed
        is_tasks_to_wait = len([i for i in url_tasks if i['status'] != 'done'])
        # check we are not in the end of list n < len()
        # check we have room for one more task
        if n < len(url_tasks) and running_tasks < persecond:
            url_tasks[n]['status'] = 'fetch'
            #
            # Here is main trick
            # If you schedule task inside running loop
            # it will start to execute sync code until find some await
            #
            asyncio.create_task(fetch(session, url_tasks[n]))
            n  = 1
            print(f'Schedule tasks {n}. '
                  f'Running {running_tasks} '
                  f'Remain {is_tasks_to_wait}')
        # Check persecond constrain and wait a sec (or period)
        if running_tasks >= persecond:
            print('Throttling')
            await asyncio.sleep(1)
        #
        # Here is another main trick
        # To keep asyncio.run (or loop.run_until_complete) executing
        # we need to wait a little than check that all tasks are done and
        # wait and so on
        if is_tasks_to_wait != 0:
            await asyncio.sleep(0.1)  # wait all tasks done
        else:
            # All tasks done
            break
    return url_tasks


async def main():
    urls = ['http://google.com/?1',
            'http://google.com/?2',
            'http://google.com/?3']*3
    async with aiohttp.ClientSession() as session:
        res = await fetch_all(session, urls, 3)
        print(res)

if __name__ == '__main__':
    asyncio.run(main())
    # (asyncio.run) do cancel all pending tasks (we do not have them,
    #  because we check all task done)
    # (asyncio.run) do await canceling all tasks
    # (asyncio.run) do stop loop
    # exit program
  

Комментарии:

1. Есть идеи, как обрабатывать код 429? Можем ли мы поставить URL-адреса 429 в очередь и повторно отправить их?

2. Легко, его сильная сторона — асинхронность. Таким образом, нет необходимости сериализовать данные, и все переменные доступны. Кроме того, вы не столкнетесь с состоянием гонки в общем случае. В моем примере в fetch функции просто установите статус для задачи как «новый» после получения !=200 условия. Эта задача будет перенесена в main во время выполнения функции fetch_all.