Обработка данных MapReduce Spark с помощью rdd (scala)

#scala #apache-spark #mapreduce #rdd

#scala #apache-spark #mapreduce #rdd

Вопрос:

У меня есть большие данные, и я хочу использовать mapRuduce для этих данных, но я ничего не нахожу для этой задачи. (Язык: Scala)

Данные для этого процесса являются:

 Y,20,01
G,18,40
J,19,10
D,50,10
R,20,01
Z,18,40
T,19,10
Q,50,10
... (2.000 )
  

Для всех этих данных я хочу загрузить на карты: (например)

 Y,20,01
G,18,40
J,19,10     MAP 1
D,50,10
---------------------
R,20,01
Z,18,40     MAP 2
T,19,10
Q,50,10
... (2.000 )
  

На всех картах локально я хочу найти минимальные столбцы.

После отправки всех карт в reduce локальные данные и в reduce находят глобальные минимальные столбцы.

Можете ли вы мне помочь? Как я это сделаю?

Ответ №1:

Если я правильно понимаю, вы хотите прочитать эти данные в несколько разделов и вызвать некоторую функцию в каждом разделе, а не выполнять итерации по каждому элементу.

Давайте предположим, что вы хотите найти максимальное значение для каждого из заданных разделов.

Вы можете либо использовать mapPartitions , либо mapPartitionsWithIndex оба варианта, которые будут выполняться для каждого из разделов.

mapPartitions примет входные данные, которые будут итератором и mapPartitionsWithIndex должны принимать 2 параметра, а именно index и iterator .

Давайте определим функцию, которая получает максимум для данного итератора.

 //Function to find the max for an iterator and return back an iterator with only the max element
def findMax(numbers :Iterator[Int]) : Iterator[Int] = {
  val max = numbers.max;
  Iterator(max)
}
findMax(Iterator(7,8,9,2,3)).next
//9: Int
  

Давайте создадим rdd, состоящий из 2 разделов, и распечатаем элементы каждого раздела.

 val rdd = sc.parallelize(1 to 30, 2)

val mapped = rdd.mapPartitionsWithIndex{ (index, iterator) => { 
  val myList = iterator.toList
  val item = Map(index -> myList)
  item.iterator
  }
}

mapped.collect().foreach(println)
/*
(0,List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
(1,List(16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30))
*/
  

Теперь мы видим, что есть 2 раздела — 0 и 1 .

Далее мы найдем максимальное значение для каждого раздела, используя нашу findMax функцию, определенную выше.

 val maxByPartitions = rdd.mapPartitions(findMax).collect()
maxByPartitions: Array[Int] = Array(15, 30)