#json #scala #apache-spark
#json #scala #apache-spark
Вопрос:
у меня есть файл json с журналами:
{"a": "cat1", "b": "name", "c": "Caesar", "d": "2016-10-01"}
{"a": "cat1", "b": "legs", "c": "4", "d": "2016-10-01"}
{"a": "cat1", "b": "color", "c": "black", "d": "2016-10-01"}
{"a": "cat1", "b": "tail", "c": "20cm", "d": "2016-10-01"}
{"a": "cat2", "b": "name", "c": "Dickens", "d": "2016-10-02"}
{"a": "cat2", "b": "legs", "c": "4", "d": "2016-10-02"}
{"a": "cat2", "b": "color", "c": "red", "d": "2016-10-02"}
{"a": "cat2", "b": "tail", "c": "15cm", "d": "2016-10-02"}
{"a": "cat2", "b": "ears", "c": "5cm", "d": "2016-10-02"}
{"a": "cat1", "b": "tail", "c": "10cm", "d": "2016-10-10"}
желаемый результат:
("id": "cat1", "name": "Caesar", "legs": "4", "color": "black", "tail": "10cm", "day": "2016-10-10")
("id": "cat2", "name": "Dickens", "legs": "4", "color": "red", "tail": "10cm", "ears": "5cm", "day": "2016-10-02")
я могу сделать это шаг за шагом, используя циклы for и collections, но мне нужно сделать это надлежащим образом, используя maps, flatmaps, aggregatebykey и другую магию spark
case class cat_input(a: String, b:String, c:String, d: String)
case class cat_output(id: String, name: String, legs: String, color: String, tail: String, day: String, ears: String, claws: String)
object CatLog {
def main(args: Array[String]) {
val sconf = new SparkConf().setAppName("Cat log")
val sc = new SparkContext(sconf)
sc.setLogLevel("WARN")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sqlContext.read.json("cats1.txt").as[cat_input]
val step1 = df.rdd.groupBy(_.a)
//step1 = (String, Iterator[cat_input]) = (cat1, CompactBuffer(cat_input( "cat1", "name", "Caesar", "2016-10-01"), ... ) )
val step2 = step1.map(x => x._2)
//step2 = Iterator[cat_input]
val step3 = step2.map(y => (y.b,y.c))
//step3 = ("name", "Caesar")
val step4 = step3.map( case(x,y) => { cat_output(x) = y })
// it should return cat_output(id: "cat1", name: "Caesar", legs: "4", color: "black", tail: "10cm", day: NULL, ears: NULL, claws: NULL)
- шаг 4, очевидно, не работает
- как вернуть хотя бы этот cat_output (идентификатор: «cat1», имя: «Цезарь», ноги: «4», цвет: «черный», хвост: «10 см», день: НОЛЬ, уши: НОЛЬ, когти: НОЛЬ)
- как проверить значения по общему столбцу и выбрать самое новое из них, а также ввести самую новую дату в cat_output (дата)?
Ответ №1:
Предполагается, что данные обладают уникальными свойствами для каждого cat (cat1, cat2). Примените некоторую логику для дубликатов. Вы можете попробовать что-то подобное для своего класса case:
#method to reduce 2 cat_output objects to one
def makeFinalRec(a: cat_output, b:cat_output): cat_output ={ return cat_output( a.id,
if(a.name=="" amp;amp; b.name!="") b.name else a.name,
if(a.legs=="" amp;amp; b.legs!="") b.legs else a.legs,
if(a.color=="" amp;amp; b.color!="") b.color else a.color,
if(a.tail=="" amp;amp; b.tail!="") b.tail else a.tail,
if(a.day=="" amp;amp; b.day!="") b.day else a.day,
if(a.ears=="" amp;amp; b.ears!="") b.ears else a.ears,
if(a.claws=="" amp;amp; b.claws!="") b.claws else a.claws ); }
dt.map(x => (x(0), x(1), x(2))).map(x => (x._1.toString,
cat_output(x._1.toString,
(x._2.toString match { case "name" => x._3.toString case _ => ""}),
(x._2.toString match { case "legs" => x._3.toString case _ => ""}),
(x._2.toString match { case "color" => x._3.toString case _ => ""}),
(x._2.toString match { case "tail" => x._3.toString case _ => ""}),
(x._2.toString match { case "day" => x._3.toString case _ => ""}),
(x._2.toString match { case "ears" => x._3.toString case _ => ""}),
(x._2.toString match { case "claws" => x._3.toString case _ => ""})
) )).reduceByKey((a,b) => makeFinalRec(a,b)).map(x=>x._2).toDF().toJSON.foreach(println)
Output:
{"id":"cat2","name":"Dickens","legs":"4","color":"red","tail":"15cm","day":"","ears":"5cm","claws":""}
{"id":"cat1","name":"Caesar","legs":"4","color":"black","tail":"20cm","day":"","ears":"","claws":""}
Также обратите внимание, что я не применил фактическую «дату», потому что есть дубликаты. Для получения максимального значения для каждого ключа, а затем объединения обоих наборов данных, требуется другая логика map() amp; max.
Ответ №2:
Один из способов — использовать функцию aggregateByKey и сохранить ответ в изменяемой карте.
//case class defined outside main()
case class cat_input(a: String, b:String, c:String, d: String)
val df = sqlContext.read.json("cats1.txt").as[cat_input]
val add_to_map = (a: scala.collection.mutable.Map[String,String], x: cat_input) => {
val ts = x.d
if(a contains "date"){
if((a contains x.b) amp;amp; (ts>=a("date")))
{
a(x.b) = x.c
a("date")=ts
}
else if (!(a contains x.b))
{
a(x.b) = x.c
if(a("date")<ts){
a("date")=ts
}
}
}
else
{
a(x.b) = x.c
a("date")=ts
}
a
}
val merge_maps = (a:scala.collection.mutable.Map[String,String], b:scala.collection.mutable.Map[String,String]) => {
if( a("date") > b("date") ) {
a.keys.map( k => b(k) = a(k) )
a
} else {
b.keys.map( k => a(k) = b(k) )
b
}
}
val step3 = df.map(x=> (x.a, x)).aggregateByKey( scala.collection.mutable.Map[String,String]() )(add_to_map, merge_maps)