#apache-spark #pyspark #mapreduce #k-means
#apache-spark #pyspark #mapreduce #k-означает
Вопрос:
Здравствуйте, может кто-нибудь помочь мне уменьшить карту с помощью Kmeans с помощью Spark. На самом деле можно использовать Kmeans с помощью spark, но я не знаю, как отобразить и уменьшить его. Спасибо.
Ответ №1:
Вам не нужно писать map-reduce. Вы можете использовать spark dataframe API и использовать библиотеку Spark ML.
Подробнее об этом можно прочитать здесь .
Ответ №2:
Ниже предлагается псевдокод для вашего упражнения:
центроиды = k точек случайной выборки из набора данных
Карта:
- Задана точка и множество центроидов
- Вычислите расстояние между точкой и каждым центроидом
- Выделите точку и ближайший центроид
Уменьшить:
- Учитывая центроид и точки, принадлежащие его кластеру
- Вычислите новый центроид как среднее арифметическое положение точек
- Испускает новый центроид
prev_centroids = центроиды
центроиды = new_centroids
в то время как prev_centroids — центроиды> пороговое значение
Класс mapper вычисляет расстояние между точкой данных и каждым центроидом. Затем выдает индекс ближайшего центроида и точку данных:
class MAPPER
method MAP(file_offset, point)
min_distance = POSITIVE_INFINITY
closest_centroid = -1
for all centroid in list_of_centroids
distance = distance(centroid, point)
if (distance < min_distance)
closest_centroid = index_of(centroid)
min_distance = distance
EMIT(closest_centroid, point)
Редуктор вычисляет новое приближение центроида и выдает его.
class REDUCER
method REDUCER(centroid_index, list_of_point_sums)
number_of_points = partial_sum.number_of_points
point_sum = 0
for all partial_sum in list_of_partial_sums:
point_sum = partial_sum
point_sum.number_of_points = partial_sum.number_of_points
centroid_value = point_sum / point_sum.number_of_points
EMIT(centroid_index, centroid_value)
Фактическая реализация K-Means Spark:
Сначала вы читаете файл с точками и генерируете начальные центроиды с помощью случайной выборки, используя takeSample(False, k): эта функция берет k случайных выборок без замены из RDD; таким образом, приложение генерирует начальные центроиды распределенным образом, избегая перемещения всех данных вдрайвер. Вы можете повторно использовать RDD в итеративном алгоритме, следовательно, кэшировать его в памяти с помощью cache(), чтобы избежать его переоценки при каждом запуске действия:
points = sc.textFile(INPUT_PATH).map(Point).cache()
initial_centroids = init_centroids(points, k=parameters["k"])
def init_centroids(dataset, k):
start_time = time.time()
initial_centroids = dataset.takeSample(False, k)
print("init centroid execution:", len(initial_centroids), "in",
(time.time() - start_time), "s")
return initial_centroids
После этого вы повторяете этапы mapper и reducer до тех пор, пока не будет проверен критерий остановки или не будет достигнуто максимальное количество итераций.
while True:
print("--Iteration n. {itr:d}".format(itr=n 1), end="r",
flush=True)
cluster_assignment_rdd = points.map(assign_centroids)
sum_rdd = cluster_assignment_rdd.reduceByKey(lambda x, y: x.sum(y))
centroids_rdd = sum_rdd.mapValues(lambda x:
x.get_average_point()).sortByKey(ascending=True)
new_centroids = [item[1] for item in centroids_rdd.collect()]
stop = stopping_criterion(new_centroids,parameters["threshold"])
n = 1
if(stop == False and n < parameters["maxiteration"]):
centroids_broadcast = sc.broadcast(new_centroids)
else:
break
Условие остановки вычисляется таким образом:
def stopping_criterion(new_centroids, threshold):
old_centroids = centroids_broadcast.value
for i in range(len(old_centroids)):
check = old_centroids[i].distance(new_centroids[i],
distance_broadcast.value) <= threshold
if check == False:
return False
return True
Для представления точек была определена Точка класса. Он характеризуется следующими полями:
- набор компонентов numpyarray
- количество точек: точку можно рассматривать как совокупность многих точек, поэтому эта переменная используется для отслеживания количества точек, представленных объектом
Он включает в себя следующие операции:
- расстояние (можно передать в качестве параметра тип расстояния)
- сумма
- get_average_point: этот метод возвращает точку, которая имеет в качестве компонентов среднее значение фактических компонентов по количеству точек, представленных объектом
точка класса: def init(self, line): значения = line.split(«,») self.components = np.array([round(float(k), 5) для k в значениях]) self.number_of_points = 1
def sum(self, p): self.components = np.add(self.components, p.components) self.number_of_points = p.number_of_points return self def distance(self, p, h): if (h < 0): h = 2 return linalg.norm(self.components - p.components, h) def get_average_point(self): self.components = np.around(np.divide(self.components, self.number_of_points), 5) return self
Метод mapper вызывается на каждой итерации во входном файле, который содержит точки из набора данных
cluster_assignment_rdd = points.map(assign_centroids)
Функция assign_centroids для каждой вызываемой точки присваивает этой точке ближайший центроид. Центроиды берутся из широковещательной переменной. Функция возвращает результат в виде кортежа (идентификатор центроида, точка)
def assign_centroids(p):
min_dist = float("inf")
centroids = centroids_broadcast.value
nearest_centroid = 0
for i in range(len(centroids)):
distance = p.distance(centroids[i], distance_broadcast.value)
if(distance < min_dist):
min_dist = distance
nearest_centroid = i
return (nearest_centroid, p)
Этап уменьшения выполняется с использованием двух преобразований spark:
- reduceByKey: для каждого кластера вычислите сумму принадлежащих ему точек. Обязательно передавать одну ассоциативную функцию в качестве параметра. Ассоциативная функция (которая принимает два аргумента и возвращает один элемент) должна быть коммутативной и ассоциативной по математической природе
sum_rdd = cluster_assignment_rdd.reduceByKey(лямбда x, y: x.sum(y))
- Значения карты: используется для вычисления средней точки для каждого кластера в конце каждого этапа. Точки уже разделены по ключу. Эта трансформация работает только со значением ключа. Результаты отсортированы для упрощения сравнения.
centroids_rdd = sum_rdd.mapValues(лямбда x: x.get_average_point()).sortBy(лямбда x: x[1].компоненты[0])
Функция get_average_point() возвращает новый вычисленный центроид.
def get_average_point(self):
self.components = np.around(np.divide(self.components,
self.number_of_points), 5)
return self