исключение org.apache.spark.SparkException: В этом RDD отсутствует ошибка SparkContext

#scala #apache-spark

Вопрос:

Полная ошибка заключается в:

Исключение org.apache.spark.SparkException: В этом RDD отсутствует текст SparkContext. Это может произойти в следующих случаях: (1) Преобразования и действия RDD вызываются НЕ драйвером, а внутри других преобразований; например, rdd1.map(x => rdd2.values.count() * x) недопустимо, поскольку преобразование значений и действие подсчета не могут быть выполнены внутри преобразования rdd1.map. Для получения дополнительной информации см. SPARK-5063. (2) Когда потоковое задание Spark восстанавливается из контрольной точки, это исключение будет устранено, если в операциях DStream используется ссылка на RDD, не определенный потоковым заданием. Для получения дополнительной информации См. SPARK-13758.

но я думаю, что я не использовал вложенное преобразование rdd в своем коде.

как это решить?

мой код scala:

 stream.foreachRDD { rdd => {
      val nRDD = rdd.map(item => item.value())
      val oldRDD = sc.textFile("hdfs://localhost:9011/recData/miniApp/mall")
      val top = oldRDD.sortBy(item => {
        val arr = item.split(' ')
        arr(0)
      }, ascending = false).take(200)
      val topRDD = sc.makeRDD(top)
      val unionRDD = topRDD.union(nRDD)
      val validRDD = unionRDD.map(item => {
          val arr = item.split(' ')
          ((arr(1), arr(2)), arr(3).toDouble)
        })
        .reduceByKey((f, s) => {
          if (f > s) f else s
        })
        .distinct()

      val ratings = validRDD.map(item => {
        Rating(item._1._2.toInt, item._1._1.toInt, item._2)
      })
      val rank = 10
      val numIterations = 5
      val model = ALS.train(ratings, rank, numIterations, 0.01)

      nRDD.map(item => {
        val arr = item.split(' ')
        arr(2)
      }).toDS()
        .distinct()
        .foreach(item=>{
          println("als recommending for user " item)
          val recommendRes = model.recommendProducts(item.toInt, 10)
          for (elem <- recommendRes) {
            println(elem)
          }
      })
      nRDD.saveAsTextFile("hdfs://localhost:9011/recData/miniApp/mall")
    }
    }
    
 

Ответ №1:

Ошибка говорит вам, что вы пропускаете a SparkContext . Я предполагаю, что программа завершится неудачей в этой строке:

 val oldRDD = sc.textFile("hdfs://localhost:9011/recData/miniApp/mall")
 

В документации приведен пример создания a SparkContext для использования в этой ситуации.

Из документов:

 val stream: DStream[String] = ...

stream.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Do things...
}
 

Хотя вы используете RDD s вместо DataFrame s, должны применяться те же принципы.