#apache-spark #dataframe #mapreduce
#apache-spark #фрейм данных #mapreduce
Вопрос:
Мой фрейм данных spark выглядит следующим образом:
------- ------ ------- ------ ------
|userid1|time |userid2|name1 |name2 |
------- ------ ------- ------ ------
|23 |1 |33 |user1 |user2 |
|23 |2 |33 |new |user2 |
|231 |1 |23 |231n |new |
|231 |4 |33 |231n |user2 |
------- ------ ------- ------ ------
Для каждой строки есть 2 идентификатора пользователя с соответствующими именами, но только один раз.
Я хочу получить последнее имя для каждого пользователя. это похоже на объединение столбцов userid1
и userid2
.
Результат должен быть:
------ -----------
|userid|latest name|
------ -----------
|23 |new |
|33 |user2 |
|231 |231n |
------ -----------
Как я могу это сделать?
Я подумываю об использовании partitonBy
, но я не знаю, как объединить результат столбца userid1
и userid2
и получить последнее имя.
Я также подумываю об использовании rdd.flatMap((row => row._1 -> row._2),(row => row._3 -> row._2)).reduceByKey(_ max _))
, но это dataframe, а не rdd, и я не уверен в синтаксисе. Col и $ в daatframe действительно меня смущают.(извините, я относительно новичок в Spark.)
Ответ №1:
Не могли бы вы, пожалуйста, попробовать это решение?
import spark.implicits._
val users = Seq(
(23, 1, 33, "user1", "user2"),
(23, 2, 33, "new", "user2"),
(231, 1, 23, "231", "new"),
(231, 4, 33, "231", "user2")
).toDF("userid1", "time", "userid2", "name1", "name2")
val users1 = users.select(col("userid1").as("userid"), col("name1").as("name"), col("time"))
val users2 = users.select(col("userid2").as("userid"), col("name2").as("name"), col("time"))
val unitedUsers = users1.union(users2)
val resultDf = unitedUsers
.withColumn("max_time", max("time").over(Window.partitionBy("userid")))
.where(col("max_time") === col("time"))
.select(col("userid"), col("name").as("latest_name"))
.distinct()
Комментарии:
1. я бы столкнулся с этой ошибкой
<console>:1: error: illegal start of definition .withColumn("max_time", max("time").over(Window.partitionBy("userid"))) ^
похоже, что я не могу использовать withColumn.2. Вы пытаетесь запустить из оболочки spark?
3. я пытаюсь запустить Zeppline Notebook с помощью интерпретатора spark.
4. Вероятно, вам следует поместить всю команду в одну строку в zeppelin. Я имею в виду значение resultDf = …