#json #mongodb #apache-spark #bson
#json #mongodb #apache-spark #bson
Вопрос:
Я загружаю bson
дамп из Mongo в Spark, как описано здесь . Это работает, но я получаю:
org.apache.spark.rdd.RDD[(Object, org.bson.BSONObject)]
В основном это должен быть просто JSON со всеми String
полями. Остальная часть моего кода требует объекта DataFrame для обработки данных. Но, конечно, toDF
сбой в этом RDD. Как я могу преобразовать его в Spark DataFrame со всеми полями как String
? spark.read.json
Было бы здорово иметь что-то похожее на.
Ответ №1:
val datapath = "path_to_bson_file.bson"
import org.apache.hadoop.conf.Configuration
// Set up the configuration for reading from bson dump.
val bsonConfig = new Configuration()
bsonConfig.set("mongo.job.input.format", "com.mongodb.hadoop.BSONFileInputFormat")
// given with your spark session
implicit lazy val sparkSession = initSpark()
// read the RDD[org.bson.BSONObject]
val bson_data_as_json_string = sparkSession.sparkContext.newAPIHadoopFile(datapath,
classOf[com.mongodb.hadoop.BSONFileInputFormat].
asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object, org.bson.BSONObject]]),
classOf[Object],
classOf[org.bson.BSONObject],
bsonConfig).
map{row => {
// map BSON object to JSON string
val json = com.mongodb.util.JSON.serialize(row._2)
json
}
}
// read into JSON spark Dataset:
val bson_data_as_json_dataset = sparkSession.sqlContext.read.json(bson_data_as_json_string)
// eval the schema:
bson_data_as_json_dataset.printSchema()
Ответ №2:
Попробуйте использовать приведенный ниже код
def parseData(s:String)={
val doc=org.bson.Document.parse(s)
val jsonDoc=com.mongodb.util.JSON.serialize(doc)
jsonDoc
val df=spark.read.json(spark.sparkContext.newAPIHadoopFile("src//main//resources//MyDummyData",classOf[BSONFileInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object,BSONObject]]), classOf[Object], classOf[BSONObject]).map(x=>x._2).map(x=>parseData(x.toString)))