Передать разные инициализации другому рабочему в ProcessPoolExecutor

#python #multiprocessing

#python #многопроцессорная обработка

Вопрос:

 In [5]: def fn(x): 
   ...:     os.environ["var_{}".format(x)] = x 
   ...:      
   ...:                                                                                                                                       
[PYFLYBY] import os

In [6]: def gn(x): 
   ...:     return os.environ["var_{}".format(x)] 
   ...:      
   ...:      
   ...:                                                                                                                                     

a = ["1", "2", "3"]

In [8]: with concurrent.futures.ProcessPoolExecutor(max_workers=3, initializer=fn, initargs=a) as e: 
   ...:     r = e.map(gn, a) 
   ...:                                                                                                                                       
Exception in initializer:
Traceback (most recent call last):
  File "/opt/python/python-3.7/lib64/python3.7/concurrent/futures/process.py", line 226, in _process_worker
    initializer(*initargs)
TypeError: fn() takes 1 positional argument but 3 were given
Exception in initializer:
Traceback (most recent call last):
  File "/opt/python/python-3.7/lib64/python3.7/concurrent/futures/process.py", line 226, in _process_worker
    initializer(*initargs)
TypeError: fn() takes 1 positional argument but 3 were given
Exception in initializer:
Traceback (most recent call last):
  File "/opt/python/python-3.7/lib64/python3.7/concurrent/futures/process.py", line 226, in _process_worker
    initializer(*initargs)
TypeError: fn() takes 1 positional argument but 3 were given
  

Итак, в принципе, я хочу, чтобы [0] было передано первому рабочему, [1] второму и так далее… есть ли какой-либо способ добиться этого таким образом? Прямо сейчас весь a передается в fn, что вызывает эту ошибку.

Ответ №1:

Ваш пример не совсем корректен, но что касается вопроса:

Вы можете передать multiprocessing.Queue функцию инициализатора, поместить в нее данные, специфичные для рабочего, и выполнить по одному queue.get() в каждом рабочем процессе:

 import os
import concurrent.futures
import multiprocessing
import time


def fn(q):
  x = q.get()
  os.environ["var_x"] = x


def gn(i):
  time.sleep(0.5) 
  return f"pid={os.getpid()} var_x={os.environ['var_x']}n"

q = multiprocessing.Queue()
a = ["1", "2", "3"]
with concurrent.futures.ProcessPoolExecutor(max_workers=3, initializer=fn, initargs=(q,)) as e:
  [q.put(i) for i in a]
  print(*e.map(gn, a))
  

Вывод:

 pid=1218 var_x=1
pid=1219 var_x=2
pid=1220 var_x=3