#python #multiprocessing
#python #многопроцессорная обработка
Вопрос:
В функции класса у меня есть итеративная функция, которая перебирает все векторы заданной матрицы входных данных. Я делаю это по индексам, поскольку отслеживаю, какие точки были проверены и к какому кластеру они принадлежат. Теперь я попытался распараллелить это, и возникли две проблемы:
[pool.apply(self.expandcluster, args=i) for i in np.where(self._checked == 0)[0]]
Вызывает ошибку:
TypeError: expandcluster() argument after * must be an iterable, not int
Итак, мне интересно, как я могу выполнить итерацию через функцию expandcluster (), передавая ей индексы логического вектора, на который влияют параллельные процессы.
Тогда мне также, вероятно, нужно беспокоиться о том, как параллельные процессы влияют на векторы. Однако учебники / документы по многопроцессорной обработке очень скудны по этой теме. Как вы можете видеть, теперь я пытаюсь выполнить итерацию через функцию, которая не возвращает ничего, просто изменяет два вектора.
class A():
def _init_(self):
init stuff...
def fit(self, x):
self._checked = False * np.array(np.size(X, axis=0))
pool = mp.Pool(8)
[pool.apply(self.expandcluster, args=i) for i in p.where(self._checked
== 0)[0]]
def expandcluster(self, pointIndice):
self._checked[pointIndice] = 1
self._clusterIDs = self.find_points(pointIndice)
Это для специальной реализации DBSCAN, неконтролируемого алгоритма кластеризации. Причина, по которой я структурирую поток данных таким образом, заключается в том, что, когда я проверяю одну точку, и она оказывается центром кластера, я проверяю все окружающие точки в той же области. Теперь мне не нужно проверять их снова.
Сначала я реализовал алгоритм без распараллеливания, который работает нормально. Однако я могу использовать дополнительную вычислительную мощность. Исходный алгоритм выполняется через цикл for с диапазоном (np.size(X, axis=0)).
Комментарии:
1. Следующий пример демонстрирует использование пула: обратите внимание на
pool.apply_async(...
строку.2. Пример не охватывает то, чего я хочу достичь. Этот вопрос остается открытым. Однако я подозреваю, что это невозможно из-за того, что мой вывод представляет собой один вектор, и разные процессы время от времени изменяют определенные значения внутри этого вектора.
3. Сообщение об ошибке должно гласить:
TypeError: expandcluster() argument 'i' must be an iterable, not int
. Сравните с данной ссылкой:.apply_async(f, (10,))