#apache-spark #spark-graphx
#apache-spark #spark-graphx
Вопрос:
Я работаю с Spark GraphX. Я строю график из файла (около 620 мб, 50 Тыс. вершин и почти 50 миллионов ребер). Я использую кластер spark с: 4 рабочими, каждый с 8 ядрами и 13,4 гб оперативной памяти, 1 драйвером с теми же характеристиками. Когда я отправляю свой файл .jar в кластер, случайно один из рабочих загружает все данные на него. Все задачи, необходимые для вычислений, запрашиваются у этого исполнителя. Во время вычислений остальные три ничего не делают. Я перепробовал все, и я не нашел ничего, что могло бы заставить вычислять во всех workers.
Когда Spark строит график, и я ищу количество разделов RDD вершин, скажем, 5, но если я перераспределю этот RDD, например, на 32 (общее количество ядер), Spark загружает данные в каждого рабочего, но замедляет вычисления.
Я запускаю отправку spark таким образом:
spark-submit --master spark://172.30.200.20:7077 --driver-memory 12g --executor-memory 12g --class interscore.InterScore /root/interscore/interscore.jar hdfs://172.30.200.20:9000/user/hadoop/interscore/network.dat hdfs://172.30.200.20:9000/user/hadoop/interscore/community.dat 111
Код здесь:
object InterScore extends App{
val sparkConf = new SparkConf().setAppName("Big-InterScore")
val sc = new SparkContext(sparkConf)
val t0 = System.currentTimeMillis
runInterScore(args(0), args(1), args(2))
println("Running time " (System.currentTimeMillis - t0).toDouble / 1000)
sc.stop()
def runInterScore(netPath:String, communitiesPath:String, outputPath:String) = {
val communities = sc.textFile(communitiesPath).map(x => {
val a = x.split('t')
(a(0).toLong, a(1).toInt)
}).cache
val graph = GraphLoader.edgeListFile(sc, netPath, true)
.partitionBy(PartitionStrategy.RandomVertexCut)
.groupEdges(_ _)
.joinVertices(communities)((_, _, c) => c)
.cache
val lvalues = graph.aggregateMessages[Double](
m => {
m.sendToDst(if (m.srcAttr != m.dstAttr) 1 else 0)
m.sendToSrc(if (m.srcAttr != m.dstAttr) 1 else 0)
}, _ _)
val communitiesIndices = communities.map(x => x._2).distinct.collect
val verticesWithLValue = graph.vertices.repartition(32).join(lvalues).cache
println("K = " communitiesIndices.size)
graph.unpersist()
graph.vertices.unpersist()
communitiesIndices.foreach(c => {
//COMPUTE c
}
})
}
}
Комментарии:
1. Привет @jmachin не могли бы вы предоставить свою базу данных из Spark UI и план выполнения из df.explain()?