#apache-spark #rdd #partitioning
#apache-spark #rdd #разделение
Вопрос:
Меня смущает операция перераспределения. Пожалуйста, смотрите приведенный ниже код
import org.apache.spark._
import org.apache.log4j._
object FriendsByAge {
def parseLine(line: String)={
val fields = line.split(",")
val age = fields(2).toInt
val numFriends = fields(3).toInt
(age, numFriends)
}
def main(args: Array[String]) = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "FriendsByAge")
val lines = sc.textFile("./data/fakefriends-noheader.csv").repartition(1000)
val rdd = lines.map(parseLine)
println(rdd.getNumPartitions)
val totalsByAge = rdd.mapValues(x=> (x,1)).reduceByKey((x, y) => (x._1 y._1, x._2 y._2))
println(totalsByAge.getNumPartitions)
val averagesByAges = totalsByAge.mapValues(x => x._1/x._2)
println(averagesByAges.getNumPartitions)
val results = averagesByAges.collect()
results.sortWith(_._2> _._2).foreach(println)
}
}
Здесь я перераспределяю rdd после чтения файла на 1000 разделов. Поскольку операция сопоставления создает новый RDD, разделение не сохраняется. Я все еще вижу то же количество разделов.
Вопрос в том, как я узнаю, сохранит ли дочерний RDD родительские разделы RDD? Каковы критерии, когда перераспределение будет признано недействительным дочерним RDD.
Комментарии:
1. сколько записей у вас на самом деле?
2. @thebluephantom У меня 1856 записей. Я пытаюсь понять раздел spark, поэтому я использовал небольшие данные.
3. Пожалуйста, добавьте количество разделов за итерацию.
Ответ №1:
mapValues
не изменяет уже действующее разделение, это narrow
преобразование. У вас их два.
reduceByKey
является ассоциативным. Spark агрегирует локально и отправляет эти результаты в драйвер или в соответствующие разделы — в вашем случае. Если вы не используете параметр on reduceByKey
для number of partitions
, вы сохраняете то же количество разделов для нового RDD, хотя и с другим распределением.