Зачем spark разделять все данные в одном исполнителе?

#apache-spark #spark-graphx

#apache-spark #spark-graphx

Вопрос:

Я работаю с Spark GraphX. Я строю график из файла (около 620 мб, 50 Тыс. вершин и почти 50 миллионов ребер). Я использую кластер spark с: 4 рабочими, каждый с 8 ядрами и 13,4 гб оперативной памяти, 1 драйвером с теми же характеристиками. Когда я отправляю свой файл .jar в кластер, случайно один из рабочих загружает все данные на него. Все задачи, необходимые для вычислений, запрашиваются у этого исполнителя. Во время вычислений остальные три ничего не делают. Я перепробовал все, и я не нашел ничего, что могло бы заставить вычислять во всех workers.

Когда Spark строит график, и я ищу количество разделов RDD вершин, скажем, 5, но если я перераспределю этот RDD, например, на 32 (общее количество ядер), Spark загружает данные в каждого рабочего, но замедляет вычисления.

Я запускаю отправку spark таким образом:

 spark-submit --master spark://172.30.200.20:7077 --driver-memory 12g --executor-memory 12g --class interscore.InterScore /root/interscore/interscore.jar hdfs://172.30.200.20:9000/user/hadoop/interscore/network.dat hdfs://172.30.200.20:9000/user/hadoop/interscore/community.dat 111
  

Код здесь:

 object InterScore extends App{
  val sparkConf = new SparkConf().setAppName("Big-InterScore")
  val sc = new SparkContext(sparkConf)

  val t0 = System.currentTimeMillis
  runInterScore(args(0), args(1), args(2))
  println("Running time "   (System.currentTimeMillis - t0).toDouble / 1000)

  sc.stop()

  def runInterScore(netPath:String, communitiesPath:String, outputPath:String) = {
    val communities = sc.textFile(communitiesPath).map(x => {
      val a = x.split('t')
      (a(0).toLong, a(1).toInt)
    }).cache

    val graph = GraphLoader.edgeListFile(sc, netPath, true)
      .partitionBy(PartitionStrategy.RandomVertexCut)
      .groupEdges(_   _)
      .joinVertices(communities)((_, _, c) => c)
      .cache

    val lvalues = graph.aggregateMessages[Double](
      m => {
          m.sendToDst(if (m.srcAttr != m.dstAttr) 1 else 0)
          m.sendToSrc(if (m.srcAttr != m.dstAttr) 1 else 0)
      }, _   _)

    val communitiesIndices = communities.map(x => x._2).distinct.collect
    val verticesWithLValue = graph.vertices.repartition(32).join(lvalues).cache
    println("K = "   communitiesIndices.size)
    graph.unpersist()
    graph.vertices.unpersist()
    communitiesIndices.foreach(c => {
    //COMPUTE c
      }
    })
  }
}
  

Комментарии:

1. Привет @jmachin не могли бы вы предоставить свою базу данных из Spark UI и план выполнения из df.explain()?