Как получить последовательный результат от асинхронной многопоточности

#python #python-3.x #multithreading

#python #python-3.x #многопоточность

Вопрос:

Я пытаюсь получить упорядоченный список через некоторый веб-API с помощью многопоточности. Последовательность каждой страницы действительно важна, и я должен их сохранить.

Вот мой последовательный код:

 def get_all_items(max_count):
    res = []
    curr_page = 1
    per_page = 100
    while True:
        try:
            pagination_list = get_pagination_list(page=curr_page, per_page=per_page)  # Assume this is a 3rd-party API, slow Network IO
        except Exception:
            break
        if not pagination_list:
            break
        if (curr_page - 1) * per_page > max_count:
            break
        # ... and other conditions to break, lets just make it simple

        res.extend(pagination_list)
        curr_page  = 1

    return res
  

Моя текущая идея состоит в том, чтобы использовать dict { [curr_page]: [pagination_list] }, чтобы упорядочить список, но я не знаю, что сделать, чтобы заменить break логику на ThreadPoolExecutor или threading . Кроме того, созданные потоки должны быть завершены.

Есть идеи? Заранее спасибо.

Ответ №1:

 import concurrent.futures
import time
import math

def get_pagination_list(page, per_page):
    time.sleep(0.5)
    if page >= 4: # Suppose the server only ever returns 4 pages of results
        return []
    return [f"Result {i} in page {page}." for i in range(per_page)]


def get_all_items(max_count=999):
    per_page = 6
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        futures = {}
        for i in range(math.ceil(max_count/per_page)):
            future = executor.submit(get_pagination_list, page=i, per_page=per_page)
            futures[future] = i
            
        results = {}
        count = 0
        for completed_future in concurrent.futures.as_completed(futures):
            try:
                result = completed_future.result()
            except Exception as exc:
                print(exc) # Any exceptions raised in get_pagination_list are raised again when completed_future.result() is called
            else:
                count  = len(result)
                print("Count:", count)
                # If the page is empty or we have reached the max number of results we should cancel all the other futures
                if not result or count >= max_count:
                    for future in futures:
                        cancelled = future.cancel()
                        # "If the call is currently being executed or finished running and cannot be cancelled then the method will 
                        # return False, otherwise the call will be cancelled and the method will return True."
                        # These 3 lines below can be deleted.
                        if cancelled:
                            page_number = futures[future]
                            print(f"Successfully cancelled future for page {page_number}", future)
                page_number = futures[completed_future]
                results[page_number] = result
    
    res = []
    print("These are the results of the completed futures:")
    for key, value in sorted(results.items()):
        print(f"Page number: {key}, result: {value}")
        res.extend(value)
    
    return res[:max_count] # Only return at most max_count items

max_items = 10
items = get_all_items(max_items)
print(f"get_all_items({max_items}) returned {len(items)} items:")
for item in items:
    print(item)

max_items = 100
items = get_all_items(max_items)
print(f"get_all_items({max_items}) returned {len(items)} items:")
for item in items:
    print(item)
  

Вывод:

 Count: 6
Count: 12
These are the results of the completed futures:
Page number: 0, result: ['Result 0 in page 0.', 'Result 1 in page 0.', 'Result 2 in page 0.', 'Result 3 in page 0.', 'Result 4 in page 0.', 'Result 5 in page 0.']
Page number: 1, result: ['Result 0 in page 1.', 'Result 1 in page 1.', 'Result 2 in page 1.', 'Result 3 in page 1.', 'Result 4 in page 1.', 'Result 5 in page 1.']
get_all_items(10) returned 10 items:
Result 0 in page 0.
Result 1 in page 0.
Result 2 in page 0.
Result 3 in page 0.
Result 4 in page 0.
Result 5 in page 0.
Result 0 in page 1.
Result 1 in page 1.
Result 2 in page 1.
Result 3 in page 1.
Count: 6
Count: 12
Count: 18
Count: 24
Count: 24
Successfully cancelled future for page 7 <Future at 0x199ddb2ec50 state=cancelled>
Successfully cancelled future for page 8 <Future at 0x199ddb2e7f0 state=cancelled>
Successfully cancelled future for page 9 <Future at 0x199ddb2e860 state=cancelled>
Successfully cancelled future for page 11 <Future at 0x199ddb2e940 state=cancelled>
Successfully cancelled future for page 12 <Future at 0x199ddb2e198 state=cancelled>
Successfully cancelled future for page 13 <Future at 0x199ddb2e390 state=cancelled>
Successfully cancelled future for page 14 <Future at 0x199ddb2e9e8 state=cancelled>
Successfully cancelled future for page 15 <Future at 0x199ddb2e6d8 state=cancelled>
Successfully cancelled future for page 16 <Future at 0x199ddb2e630 state=cancelled>
Count: 24
Successfully cancelled future for page 7 <Future at 0x199ddb2ec50 state=cancelled>
Successfully cancelled future for page 8 <Future at 0x199ddb2e7f0 state=cancelled>
Successfully cancelled future for page 9 <Future at 0x199ddb2e860 state=cancelled>
Successfully cancelled future for page 11 <Future at 0x199ddb2e940 state=cancelled>
Successfully cancelled future for page 12 <Future at 0x199ddb2e198 state=cancelled>
Successfully cancelled future for page 13 <Future at 0x199ddb2e390 state=cancelled>
Successfully cancelled future for page 14 <Future at 0x199ddb2e9e8 state=cancelled>
Successfully cancelled future for page 15 <Future at 0x199ddb2e6d8 state=cancelled>
Successfully cancelled future for page 16 <Future at 0x199ddb2e630 state=cancelled>



Count: 24
Successfully cancelled future for page 7 <Future at 0x199ddb2ec50 state=cancelled>
Successfully cancelled future for page 8 <Future at 0x199ddb2e7f0 state=cancelled>
Successfully cancelled future for page 9 <Future at 0x199ddb2e860 state=cancelled>
Successfully cancelled future for page 11 <Future at 0x199ddb2e940 state=cancelled>
Successfully cancelled future for page 12 <Future at 0x199ddb2e198 state=cancelled>
Successfully cancelled future for page 13 <Future at 0x199ddb2e390 state=cancelled>
Successfully cancelled future for page 14 <Future at 0x199ddb2e9e8 state=cancelled>
Successfully cancelled future for page 15 <Future at 0x199ddb2e6d8 state=cancelled>
Successfully cancelled future for page 16 <Future at 0x199ddb2e630 state=cancelled>






Count: 24
Successfully cancelled future for page 7 <Future at 0x199ddb2ec50 state=cancelled>
Successfully cancelled future for page 8 <Future at 0x199ddb2e7f0 state=cancelled>
Successfully cancelled future for page 9 <Future at 0x199ddb2e860 state=cancelled>
Successfully cancelled future for page 11 <Future at 0x199ddb2e940 state=cancelled>
Successfully cancelled future for page 12 <Future at 0x199ddb2e198 state=cancelled>
Successfully cancelled future for page 13 <Future at 0x199ddb2e390 state=cancelled>
Successfully cancelled future for page 14 <Future at 0x199ddb2e9e8 state=cancelled>
Successfully cancelled future for page 15 <Future at 0x199ddb2e6d8 state=cancelled>
Successfully cancelled future for page 16 <Future at 0x199ddb2e630 state=cancelled>
These are the results of the completed futures:
Page number: 0, result: ['Result 0 in page 0.', 'Result 1 in page 0.', 'Result 2 in page 0.', 'Result 3 in page 0.', 'Result 4 in page 0.', 'Result 5 in page 0.']
Page number: 1, result: ['Result 0 in page 1.', 'Result 1 in page 1.', 'Result 2 in page 1.', 'Result 3 in page 1.', 'Result 4 in page 1.', 'Result 5 in page 1.']
Page number: 2, result: ['Result 0 in page 2.', 'Result 1 in page 2.', 'Result 2 in page 2.', 'Result 3 in page 2.', 'Result 4 in page 2.', 'Result 5 in page 2.']
Page number: 3, result: ['Result 0 in page 3.', 'Result 1 in page 3.', 'Result 2 in page 3.', 'Result 3 in page 3.', 'Result 4 in page 3.', 'Result 5 in page 3.']
Page number: 4, result: []
Page number: 5, result: []
Page number: 6, result: []
Page number: 10, result: []
get_all_items(100) returned 24 items:
Result 0 in page 0.
Result 1 in page 0.
Result 2 in page 0.
Result 3 in page 0.
Result 4 in page 0.
Result 5 in page 0.
Result 0 in page 1.
Result 1 in page 1.
Result 2 in page 1.
Result 3 in page 1.
Result 4 in page 1.
Result 5 in page 1.
Result 0 in page 2.
Result 1 in page 2.
Result 2 in page 2.
Result 3 in page 2.
Result 4 in page 2.
Result 5 in page 2.
Result 0 in page 3.
Result 1 in page 3.
Result 2 in page 3.
Result 3 in page 3.
Result 4 in page 3.
Result 5 in page 3.
  

Ответ №2:

Вы могли бы использовать concurrent.futures.ThreadPoolExecutor.

Его submit метод позволяет нам запланировать выполнение функции в пуле потоков и вернуть Future объект, который мы можем сохранить в списке, а затем выполнить итерацию по нему упорядоченным образом. Также мы можем использовать with context manager, и пул потоков будет автоматически закрыт и очищен после того, как all futures вернет результат.

Пример, иллюстрирующий подход.

 import concurrent.futures
import time
import random


def process_page(index):
    time.sleep(random.randint(0, 5))
    if index % 5 == 0:
        raise ValueError()  # emulate some failed


def process_all(max_count):
    future_list = []
    result = []
    need_break = False
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for index in range(max_count):
            future = executor.submit(process_page, index)
            future_list.append(future)

        for index, future in enumerate(future_list):
            try:
                if need_break:
                    future.cancel()
                else:
                    result.append(future.result())
                    print(f"Page {index} processed")

                    if index >= 22:
                        need_break = True  # emulate some end condition
            except Exception as e:
                print(f"Page {index} failed to process")


process_all(100)
  

Вывод:

 Page 0 failed to process
Page 1 processed
Page 2 processed
Page 3 processed
Page 4 processed
Page 5 failed to process
Page 6 processed
Page 7 processed
...
  

Ответ №3:

Я думаю, вы можете создать простой массив, основанный на количестве страниц. вот псевдокод,

 pages[] = new Array[totalPages];
while(pageCounter < totalPages):
     // Initiate an async thread to get the page data by sharing the array and its index.
     // Add the created thread to the same threadgroup so that we can close the thread pool after all the threads got finished

// In the thread after getting the page content push the page into that specific index like below,
       // get the page data
       pageArray[pageIndex] = pageData;
  

после завершения всех этих потоков ваши страницы упорядочились автоматически.
вы не беспокоитесь об этом.
Надеюсь, эта идея вам чем-то поможет.
Вместо массива даже вы можете попробовать это со списком, а также с вставкой по указанному индексу, как в array.