Операторы обращения в Spark

#scala #apache-spark #spark-dataframe #rdd

#scala #apache-spark #spark-dataframe #rdd

Вопрос:

Я пишу искровой код, в котором мне нужно преобразовать RDD типа (String,(String,String)) в ((String,String),String ).

У меня есть следующий входной текстовый файл:

 Language,Language-code,TotalViewsInThatLang
English,en,10965376,"Main_Page",2938355
Russian,ru,1925718,"Загл,915495
Spanish,es,1010810,"Wikipedia:Portada",13603
 

Я создал RDD следующим образом:

 val line = sc.textFile(inputFile)
val nrdd = line.map(x=>(x.split(",")(0),(x.split(",")(1),x.split(",")(2))))
nrdd: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[2] at map at <console>:26
 

Исходя из этого, я хочу использовать case функцию для создания RDD типа ((String,String),String) .

Как я могу сделать это с case помощью операторов в map ?
Редактировать
Я получаю следующую ошибку, когда пытаюсь использовать функцию case:

 scala> val frdd = nrdd.map( {case(x,(y,z))=>((x,y),z))}) 
<console>:1: error: ';' expected but ')' found.
val frdd = nrdd.map({case(x,(y,z))=>((x,y),z))})
                                             ^      
 

Ответ №1:

Если я не неправильно понял ваш вопрос, вы хотите этого:

 val list: List[((String, String), String)] = List((("a1", "b1"), "c1"), (("a2", "b2"), "c2"))
val res = list.map { case ((a, b), c) => (a, (b, c)) }

println(res) // List((a1,(b1,c1)), (a2,(b2,c2)))
 

Комментарии:

1. да …. но могу ли я сделать то же самое с rdd nrdd в вопросе выше?

2. если вы спрашиваете case , будет ли работать использование сопоставления, тогда да. какая часть, по вашему мнению, является проблематичной?

Ответ №2:

Поскольку ваш RDD является парным RDD, вы могли бы использовать swap from Keyed RDD .

Пример кода:

 val keyRDD = sc.parallelize(List((("a1", "b1"), "c1"), (("a2", "b2"), "c2")), 2)
val swappedRDD = keyRDD.map(_.swap)
swappedRDD.foreach(x => println(x))