Python нарезает массив для многопоточности и собирает результаты по порядку?

#python #multithreading

#python #многопоточность

Вопрос:

Возможно, этот вопрос уже задавался здесь, но я не смог подобрать правильные ключевые слова для поиска.

У меня есть массив, который я хотел бы разбить на куски и передать их потокам, чтобы они проделали некоторую работу над каждым фрагментом и выгрузили результат.

Однако мне нужно повторно собрать результаты из каждого потока по порядку.

Я попытался передать блокировку для каждого потока, чтобы заблокировать и выгрузить результат в другой массив, но порядок неправильный. Я предполагаю, потому что каждый поток завершается в разное время.

Какой был бы лучший способ сделать это в Python 3?

 import threading        
import numpy as np
from queue import Queue

def add(lock, work):
    value = 0
    for v in work:
        #Do some work!
    lock.acquire()  
    result.append(value)
    lock.release()  

a = np. arange(0,100)
result = []
lock = threading.Lock()    
q = Queue()
for i in range(0,a.shape[0],10):
    work = a[i:i 10]
    t = threading.Thread(target=add, args=(lock,work))
    t.start()
    q.put(t)

while q.empty() == False:
    q.get().join()

value = 0
for v in result:
    #Assemble
print(value)
  

Ответ №1:

Вы получаете свои результаты в перепутанном порядке, потому что append каждый результат помещается в конец списка при его поступлении, который может быть не в том порядке, в котором были запущены потоки. Лучшим подходом могло бы быть передать каждому рабочему файлу индекс в список надлежащего размера и позволить ему присваивать свои результаты там всякий раз, когда он завершается. Списки достаточно потокобезопасны, поэтому вам не понадобится блокировка для этого (ваша Queue также совершенно не нужна, поскольку с ней взаимодействует только основной поток).

 def add(work, result_index):
    value = 0
    for v in work:
        #Do some work!
    result[result_index] = value

a = np.arange(0,100)
results = [] 
threads = []
for i in range(0,a.shape[0],10):
    work = a[i:i 10]
    results.append(None) # enlarge the results list, so we have room for this thread's result
    t = threading.Thread(target=add, args=(work, i//10))
    t.start()
    threads.append(t)

for t in threads:
    t.join()
  

Я бы предупредил вас, что если ваш #Do some work! код ограничен процессором, вы вряд ли получите много пользы от использования нескольких потоков. Интерпретатор CPython имеет глобальную блокировку интерпретатора, которая предотвращает одновременное выполнение кода Python несколькими потоками (так что состояние интерпретатора, подобное количеству ссылок на объекты, может оставаться согласованным, не требуя для каждого из них собственной блокировки). Многопоточность действительно полезна только для заданий с ограниченным вводом-выводом (например, для извлечения большого количества документов из Интернета.

Для ограниченной работы процессора вы обычно хотите использовать multiprocessing вместо этого. Если это то, что вам нужно, посмотрите на multiprocessing.map , который может обрабатывать передачу объектов между процессами и автоматическую сборку результатов в упорядоченный список.

Комментарии:

1. Спасибо! В итоге использовал multiprocessing.map!