K означает кластеризацию с уменьшением карты в spark

#apache-spark #pyspark #mapreduce #k-means

#apache-spark #pyspark #mapreduce #k-означает

Вопрос:

Здравствуйте, может кто-нибудь помочь мне уменьшить карту с помощью Kmeans с помощью Spark. На самом деле можно использовать Kmeans с помощью spark, но я не знаю, как отобразить и уменьшить его. Спасибо.

Ответ №1:

Вам не нужно писать map-reduce. Вы можете использовать spark dataframe API и использовать библиотеку Spark ML.

Подробнее об этом можно прочитать здесь .

https://spark.apache.org/docs/latest/ml-clustering.html

Ответ №2:

Ниже предлагается псевдокод для вашего упражнения:

центроиды = k точек случайной выборки из набора данных

Карта:

  • Задана точка и множество центроидов
  • Вычислите расстояние между точкой и каждым центроидом
  • Выделите точку и ближайший центроид

Уменьшить:

  • Учитывая центроид и точки, принадлежащие его кластеру
  • Вычислите новый центроид как среднее арифметическое положение точек
  • Испускает новый центроид

prev_centroids = центроиды

центроиды = new_centroids

в то время как prev_centroids — центроиды> пороговое значение

Класс mapper вычисляет расстояние между точкой данных и каждым центроидом. Затем выдает индекс ближайшего центроида и точку данных:

 class MAPPER
method MAP(file_offset, point)
    min_distance = POSITIVE_INFINITY
    closest_centroid = -1
    for all centroid in list_of_centroids
        distance = distance(centroid, point)
        if (distance < min_distance)
            closest_centroid = index_of(centroid)
            min_distance = distance
    EMIT(closest_centroid, point) 
 

Редуктор вычисляет новое приближение центроида и выдает его.

 class REDUCER
method REDUCER(centroid_index, list_of_point_sums)
    number_of_points = partial_sum.number_of_points
    point_sum = 0
    for all partial_sum in list_of_partial_sums:
        point_sum  = partial_sum
        point_sum.number_of_points  = partial_sum.number_of_points
    centroid_value = point_sum / point_sum.number_of_points
    EMIT(centroid_index, centroid_value)
 

Фактическая реализация K-Means Spark:

Сначала вы читаете файл с точками и генерируете начальные центроиды с помощью случайной выборки, используя takeSample(False, k): эта функция берет k случайных выборок без замены из RDD; таким образом, приложение генерирует начальные центроиды распределенным образом, избегая перемещения всех данных вдрайвер. Вы можете повторно использовать RDD в итеративном алгоритме, следовательно, кэшировать его в памяти с помощью cache(), чтобы избежать его переоценки при каждом запуске действия:

 points = sc.textFile(INPUT_PATH).map(Point).cache()
initial_centroids = init_centroids(points, k=parameters["k"])

def init_centroids(dataset, k):
    start_time = time.time()
    initial_centroids = dataset.takeSample(False, k)
    print("init centroid execution:", len(initial_centroids), "in", 
    (time.time() - start_time), "s")
    return initial_centroids
 

После этого вы повторяете этапы mapper и reducer до тех пор, пока не будет проверен критерий остановки или не будет достигнуто максимальное количество итераций.

 while True:
    print("--Iteration n. {itr:d}".format(itr=n 1), end="r", 
    flush=True)
    cluster_assignment_rdd = points.map(assign_centroids)
    sum_rdd = cluster_assignment_rdd.reduceByKey(lambda x, y: x.sum(y))
    centroids_rdd = sum_rdd.mapValues(lambda x: 
    x.get_average_point()).sortByKey(ascending=True)

    new_centroids = [item[1] for item in centroids_rdd.collect()]
    stop = stopping_criterion(new_centroids,parameters["threshold"])

    n  = 1
    if(stop == False and n < parameters["maxiteration"]):
        centroids_broadcast = sc.broadcast(new_centroids)
    else:
        break
 

Условие остановки вычисляется таким образом:

 def stopping_criterion(new_centroids, threshold):
    old_centroids = centroids_broadcast.value
    for i in range(len(old_centroids)):
        check = old_centroids[i].distance(new_centroids[i], 
        distance_broadcast.value) <= threshold
        if check == False:
            return False
    return True
 

Для представления точек была определена Точка класса. Он характеризуется следующими полями:

  • набор компонентов numpyarray
  • количество точек: точку можно рассматривать как совокупность многих точек, поэтому эта переменная используется для отслеживания количества точек, представленных объектом

Он включает в себя следующие операции:

  • расстояние (можно передать в качестве параметра тип расстояния)
  • сумма
  • get_average_point: этот метод возвращает точку, которая имеет в качестве компонентов среднее значение фактических компонентов по количеству точек, представленных объектом

    точка класса: def init(self, line): значения = line.split(«,») self.components = np.array([round(float(k), 5) для k в значениях]) self.number_of_points = 1

       def sum(self, p):
          self.components = np.add(self.components, p.components)
          self.number_of_points  = p.number_of_points
          return self
    
      def distance(self, p, h):
          if (h < 0):
             h = 2
          return linalg.norm(self.components - p.components, h)
    
      def get_average_point(self):
          self.components = np.around(np.divide(self.components, 
          self.number_of_points), 5)
          return self
     

Метод mapper вызывается на каждой итерации во входном файле, который содержит точки из набора данных

 cluster_assignment_rdd = points.map(assign_centroids)
 

Функция assign_centroids для каждой вызываемой точки присваивает этой точке ближайший центроид. Центроиды берутся из широковещательной переменной. Функция возвращает результат в виде кортежа (идентификатор центроида, точка)

  def assign_centroids(p):
     min_dist = float("inf")
     centroids = centroids_broadcast.value
     nearest_centroid = 0
     for i in range(len(centroids)):
         distance = p.distance(centroids[i], distance_broadcast.value)
         if(distance < min_dist):
             min_dist = distance
             nearest_centroid = i
     return (nearest_centroid, p)
 

Этап уменьшения выполняется с использованием двух преобразований spark:

  • reduceByKey: для каждого кластера вычислите сумму принадлежащих ему точек. Обязательно передавать одну ассоциативную функцию в качестве параметра. Ассоциативная функция (которая принимает два аргумента и возвращает один элемент) должна быть коммутативной и ассоциативной по математической природе

    sum_rdd = cluster_assignment_rdd.reduceByKey(лямбда x, y: x.sum(y))

  • Значения карты: используется для вычисления средней точки для каждого кластера в конце каждого этапа. Точки уже разделены по ключу. Эта трансформация работает только со значением ключа. Результаты отсортированы для упрощения сравнения.

    centroids_rdd = sum_rdd.mapValues(лямбда x: x.get_average_point()).sortBy(лямбда x: x[1].компоненты[0])

Функция get_average_point() возвращает новый вычисленный центроид.

  def get_average_point(self):
     self.components = np.around(np.divide(self.components, 
     self.number_of_points), 5)
     return self