#scala #apache-spark
Вопрос:
Полная ошибка заключается в:
Исключение org.apache.spark.SparkException: В этом RDD отсутствует текст SparkContext. Это может произойти в следующих случаях: (1) Преобразования и действия RDD вызываются НЕ драйвером, а внутри других преобразований; например, rdd1.map(x => rdd2.values.count() * x) недопустимо, поскольку преобразование значений и действие подсчета не могут быть выполнены внутри преобразования rdd1.map. Для получения дополнительной информации см. SPARK-5063. (2) Когда потоковое задание Spark восстанавливается из контрольной точки, это исключение будет устранено, если в операциях DStream используется ссылка на RDD, не определенный потоковым заданием. Для получения дополнительной информации См. SPARK-13758.
но я думаю, что я не использовал вложенное преобразование rdd в своем коде.
как это решить?
мой код scala:
stream.foreachRDD { rdd => {
val nRDD = rdd.map(item => item.value())
val oldRDD = sc.textFile("hdfs://localhost:9011/recData/miniApp/mall")
val top = oldRDD.sortBy(item => {
val arr = item.split(' ')
arr(0)
}, ascending = false).take(200)
val topRDD = sc.makeRDD(top)
val unionRDD = topRDD.union(nRDD)
val validRDD = unionRDD.map(item => {
val arr = item.split(' ')
((arr(1), arr(2)), arr(3).toDouble)
})
.reduceByKey((f, s) => {
if (f > s) f else s
})
.distinct()
val ratings = validRDD.map(item => {
Rating(item._1._2.toInt, item._1._1.toInt, item._2)
})
val rank = 10
val numIterations = 5
val model = ALS.train(ratings, rank, numIterations, 0.01)
nRDD.map(item => {
val arr = item.split(' ')
arr(2)
}).toDS()
.distinct()
.foreach(item=>{
println("als recommending for user " item)
val recommendRes = model.recommendProducts(item.toInt, 10)
for (elem <- recommendRes) {
println(elem)
}
})
nRDD.saveAsTextFile("hdfs://localhost:9011/recData/miniApp/mall")
}
}
Ответ №1:
Ошибка говорит вам, что вы пропускаете a SparkContext
. Я предполагаю, что программа завершится неудачей в этой строке:
val oldRDD = sc.textFile("hdfs://localhost:9011/recData/miniApp/mall")
В документации приведен пример создания a SparkContext
для использования в этой ситуации.
Из документов:
val stream: DStream[String] = ...
stream.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Do things...
}
Хотя вы используете RDD
s вместо DataFrame
s, должны применяться те же принципы.