Параллельный Python.Исполнитель пула процессов фьючерсов неверный вывод

#python #submit #wait #concurrent.futures

Вопрос:

Я пытаюсь использовать ProcessPoolExecutor() для запуска некоторых функций, но мне не удается понять, как получить возврат функций с помощью.

  def threaded_upload(i):
    time.sleep(2)
    if i == 0:
        k = 10
    elif i == 2:
        k = i*i
    else:
        k = -99
    return [k]

 def controller():
    if __name__ == "__main__":
        futures = []
        with ProcessPoolExecutor() as pool:
            for paso in range(4):
                futuro_i = pool.submit(threaded_upload,paso)
                wth=[futuro_i.result()]
                futures.append(futuro_i)

            wait(futures, return_when=ALL_COMPLETED)

            merged_list = []
            for future in futures:
                for valor in future.result():
                    merged_list.append(valor)
            Lista_Final = merged_list
            wait(futures, return_when=ALL_COMPLETED)
        return Lista_Final

  print(controller())
 

Вывод кода выглядит следующим образом:
Нет
[10, -99, 4, -99]

Я не уверен, почему? «Ожидание», похоже, тоже не ждет, пока не будут выполнены все функции.

Честно говоря, я читаю и читаю уже несколько дней, но описание concurrent.futures или многопроцессорной обработки более продвинуто, чем мои текущие знания.

Любые разъяснения будут оценены по достоинству. Заранее спасибо.

Ответ №1:

Сначала вы отправляете задания, а затем ждете результатов. Вы также можете вернуть целое число вместо a list , а затем пропустить внутреннее loop :

test.py:

 import random
import time

from concurrent.futures import ProcessPoolExecutor, wait


def worker(i):
    t = random.uniform(1, 5)

    print(f"START: {i} ({t:.3f}s)")

    time.sleep(t)

    if i == 0:
        k = 10
    elif i == 2:
        k = i * i
    else:
        k = -99

    print(f"END: {i}")

    return k


def main():
    futures = []

    with ProcessPoolExecutor() as pool:
        for i in range(4):
            future = pool.submit(worker, i)
            futures.append(future)

        results = []

        done, pending = wait(futures) # ALL_COMPLETED is the default value

        for future in done:
            results.append(future.result())

        print(results)


if __name__ == "__main__":
    main()
 

Тест:

 $ python test.py
START: 0 (1.608s)
START: 1 (1.718s)
START: 2 (1.545s)
START: 3 (1.588s)
END: 2
END: 3
END: 0
END: 1
[10, -99, 4, -99]