Как я могу выполнить mapreduce для нескольких столбцов в dataframe в scala?

#apache-spark #dataframe #mapreduce

#apache-spark #фрейм данных #mapreduce

Вопрос:

Мой фрейм данных spark выглядит следующим образом:

  ------- ------ ------- ------ ------ 
|userid1|time  |userid2|name1 |name2 |
 ------- ------ ------- ------ ------ 
|23     |1     |33     |user1 |user2 | 
|23     |2     |33     |new   |user2 |
|231    |1     |23     |231n  |new   |
|231    |4     |33     |231n  |user2 |
 ------- ------ ------- ------ ------ 
  

Для каждой строки есть 2 идентификатора пользователя с соответствующими именами, но только один раз.

Я хочу получить последнее имя для каждого пользователя. это похоже на объединение столбцов userid1 и userid2 .

Результат должен быть:

  ------ ----------- 
|userid|latest name|
 ------ ----------- 
|23    |new        |
|33    |user2      |
|231   |231n       |
 ------ ----------- 
  

Как я могу это сделать?

Я подумываю об использовании partitonBy , но я не знаю, как объединить результат столбца userid1 и userid2 и получить последнее имя.

Я также подумываю об использовании rdd.flatMap((row => row._1 -> row._2),(row => row._3 -> row._2)).reduceByKey(_ max _)) , но это dataframe, а не rdd, и я не уверен в синтаксисе. Col и $ в daatframe действительно меня смущают.(извините, я относительно новичок в Spark.)

Ответ №1:

Не могли бы вы, пожалуйста, попробовать это решение?

 import spark.implicits._

val users = Seq(
  (23, 1, 33, "user1", "user2"),
  (23, 2, 33, "new", "user2"),
  (231, 1, 23, "231", "new"),
  (231, 4, 33, "231", "user2")
).toDF("userid1", "time", "userid2", "name1", "name2")

val users1 = users.select(col("userid1").as("userid"), col("name1").as("name"), col("time"))
val users2 = users.select(col("userid2").as("userid"), col("name2").as("name"), col("time"))

val unitedUsers = users1.union(users2)

val resultDf = unitedUsers
  .withColumn("max_time", max("time").over(Window.partitionBy("userid")))
  .where(col("max_time") === col("time"))
  .select(col("userid"), col("name").as("latest_name"))
  .distinct()
  

Комментарии:

1. я бы столкнулся с этой ошибкой <console>:1: error: illegal start of definition .withColumn("max_time", max("time").over(Window.partitionBy("userid"))) ^ похоже, что я не могу использовать withColumn.

2. Вы пытаетесь запустить из оболочки spark?

3. я пытаюсь запустить Zeppline Notebook с помощью интерпретатора spark.

4. Вероятно, вам следует поместить всю команду в одну строку в zeppelin. Я имею в виду значение resultDf = …