Является ли `concurrent.futures.map` потокобезопасным?

#python-3.x #concurrent.futures

#python-3.x #concurrent.futures

Вопрос:

Я считаю, что это не упоминается в документе:https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures .Executor.map

Ответ №1:

Ссылка:

Согласно ThreadPoolExecutor

Взаимоблокировки могут возникать, когда вызываемый объект, связанный с будущим, ожидает результатов другого будущего.

Приведенные там 2 примера показывают, как может произойти взаимоблокировка. Попробуйте заменить .submit() на .map() и внесите другие необходимые изменения.`

Под капотом:

Согласно python3.6/concurrent/futures/thread.py модулю Python (найдите этот файл в вашей системе), класс ThreadPoolExecutor фактически использует queue.Queue() (см. Строку 107) для реализации потоковой обработки python и использует примитив threading.Lock() (см. строку 110) для блокировки потоков.

Объяснение:

Если для вас «потокобезопасный» означает несколько потоков в программе, каждый из которых пытается получить доступ к общей структуре данных или местоположению в памяти, то вы должны знать, что concurrent.futures.ThreadPoolExecutor разрешить только одному потоку одновременно обращаться к общей структуре данных или местоположению в памяти; threading.Lock() для управления этим используется примитив. И когда функции в одном из ваших потоков необходимо дождаться результатов в другом потоке, может возникнуть взаимоблокировка, и ваш код не будет работать; этого вам следует избегать.

Ответ №2:

 from concurrent.futures import ThreadPoolExecutor, as_completed
import time

resultDict = {0: [], 1: [], 2: [], 3: [], 4: []}
resultDict1 = {0: [], 1: [], 2: [], 3: [], 4: []}

def appendResult(a, b):
  result = a ** b
  resultDict[result % 5].append(result)
  time.sleep(1)

def appendResult1(a, b):
  result = a ** b
  resultDict1[result % 5].append(result)
  time.sleep(1)

startTime = time.time()
processes = []
with ThreadPoolExecutor(max_workers=12) as executor:
  for i in range(100):
    processes.append(executor.submit(appendResult, i, 2))

for task in as_completed(processes):
  task.result()

print("Cost", time.time() - startTime, "s")


startTime = time.time()
for i in range(100):
  appendResult1(i, 2)
print("Cost", time.time() - startTime, "s")
  

Да, это потокобезопасно. Таким образом, resultDict и resultDict1 будут одинаковыми в конце приведенного выше кода.