Аргумент multiprocess.pool.apply вызывает ошибку не итерируемого int, в то время как это int из цикла

#python #multiprocessing

#python #многопроцессорная обработка

Вопрос:

В функции класса у меня есть итеративная функция, которая перебирает все векторы заданной матрицы входных данных. Я делаю это по индексам, поскольку отслеживаю, какие точки были проверены и к какому кластеру они принадлежат. Теперь я попытался распараллелить это, и возникли две проблемы:

 [pool.apply(self.expandcluster, args=i) for i in np.where(self._checked == 0)[0]]
  

Вызывает ошибку:

 TypeError: expandcluster() argument after * must be an iterable, not int
  

Итак, мне интересно, как я могу выполнить итерацию через функцию expandcluster (), передавая ей индексы логического вектора, на который влияют параллельные процессы.

Тогда мне также, вероятно, нужно беспокоиться о том, как параллельные процессы влияют на векторы. Однако учебники / документы по многопроцессорной обработке очень скудны по этой теме. Как вы можете видеть, теперь я пытаюсь выполнить итерацию через функцию, которая не возвращает ничего, просто изменяет два вектора.

 class A():

    def _init_(self):
         init stuff...

    def fit(self, x):
        self._checked = False * np.array(np.size(X, axis=0))
        pool = mp.Pool(8)
        [pool.apply(self.expandcluster, args=i) for i in p.where(self._checked 
         == 0)[0]]

    def expandcluster(self, pointIndice):
        self._checked[pointIndice] = 1
        self._clusterIDs = self.find_points(pointIndice)
  

Это для специальной реализации DBSCAN, неконтролируемого алгоритма кластеризации. Причина, по которой я структурирую поток данных таким образом, заключается в том, что, когда я проверяю одну точку, и она оказывается центром кластера, я проверяю все окружающие точки в той же области. Теперь мне не нужно проверять их снова.
Сначала я реализовал алгоритм без распараллеливания, который работает нормально. Однако я могу использовать дополнительную вычислительную мощность. Исходный алгоритм выполняется через цикл for с диапазоном (np.size(X, axis=0)).

Комментарии:

1. Следующий пример демонстрирует использование пула: обратите внимание на pool.apply_async(... строку.

2. Пример не охватывает то, чего я хочу достичь. Этот вопрос остается открытым. Однако я подозреваю, что это невозможно из-за того, что мой вывод представляет собой один вектор, и разные процессы время от времени изменяют определенные значения внутри этого вектора.

3. Сообщение об ошибке должно гласить: TypeError: expandcluster() argument 'i' must be an iterable, not int . Сравните с данной ссылкой: .apply_async(f, (10,))