#scala #apache-spark #mapreduce #rdd
#scala #apache-spark #mapreduce #rdd
Вопрос:
У меня есть большие данные, и я хочу использовать mapRuduce для этих данных, но я ничего не нахожу для этой задачи. (Язык: Scala)
Данные для этого процесса являются:
Y,20,01
G,18,40
J,19,10
D,50,10
R,20,01
Z,18,40
T,19,10
Q,50,10
... (2.000 )
Для всех этих данных я хочу загрузить на карты: (например)
Y,20,01
G,18,40
J,19,10 MAP 1
D,50,10
---------------------
R,20,01
Z,18,40 MAP 2
T,19,10
Q,50,10
... (2.000 )
На всех картах локально я хочу найти минимальные столбцы.
После отправки всех карт в reduce локальные данные и в reduce находят глобальные минимальные столбцы.
Можете ли вы мне помочь? Как я это сделаю?
Ответ №1:
Если я правильно понимаю, вы хотите прочитать эти данные в несколько разделов и вызвать некоторую функцию в каждом разделе, а не выполнять итерации по каждому элементу.
Давайте предположим, что вы хотите найти максимальное значение для каждого из заданных разделов.
Вы можете либо использовать mapPartitions
, либо mapPartitionsWithIndex
оба варианта, которые будут выполняться для каждого из разделов.
mapPartitions
примет входные данные, которые будут итератором и mapPartitionsWithIndex
должны принимать 2 параметра, а именно index
и iterator
.
Давайте определим функцию, которая получает максимум для данного итератора.
//Function to find the max for an iterator and return back an iterator with only the max element
def findMax(numbers :Iterator[Int]) : Iterator[Int] = {
val max = numbers.max;
Iterator(max)
}
findMax(Iterator(7,8,9,2,3)).next
//9: Int
Давайте создадим rdd, состоящий из 2 разделов, и распечатаем элементы каждого раздела.
val rdd = sc.parallelize(1 to 30, 2)
val mapped = rdd.mapPartitionsWithIndex{ (index, iterator) => {
val myList = iterator.toList
val item = Map(index -> myList)
item.iterator
}
}
mapped.collect().foreach(println)
/*
(0,List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
(1,List(16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30))
*/
Теперь мы видим, что есть 2 раздела — 0
и 1
.
Далее мы найдем максимальное значение для каждого раздела, используя нашу findMax
функцию, определенную выше.
val maxByPartitions = rdd.mapPartitions(findMax).collect()
maxByPartitions: Array[Int] = Array(15, 30)