Заполнение недостающих значений в строках с помощью Apache spark

#scala #apache-spark #dataframe #apache-spark-sql

#scala #apache-spark #фрейм данных #apache-spark-sql

Вопрос:

У меня есть конкретное требование заполнить все значения (категории) в столбце. Например, как показано в таблице ниже. Мне нужен способ заполнить категорию «НЕВИДИМЫЙ» и «НАЗНАЧЕННЫЙ» для кода HL_14108.

 val df = Seq(
("HL_13203","DELIVERED",3226), 
("HL_13203","UNSEEN",249),     
("HL_13203","UNDELIVERED",210),
("HL_13203","ASSIGNED",2),    
("HL_14108","DELIVERED",3083), 
("HL_14108","UNDELIVERED",164),
("HL_14108","PICKED",1)).toDF("code","status","count")
  

Ввод:

  -------- ----------- ----- 
|    code|     status|count|
 -------- ----------- ----- 
|HL_13203|  DELIVERED| 3226|
|HL_13203|     UNSEEN|  249|
|HL_13203|UNDELIVERED|  210|
|HL_13203|   ASSIGNED|    2|
|HL_14108|  DELIVERED| 3083|
|HL_14108|UNDELIVERED|  164|
|HL_14108|     PICKED|    1|
 -------- ----------- ----- 
  

Ожидаемый результат:

  -------- ----------- ----- 
|    code|     status|count|
 -------- ----------- ----- 
|HL_13203|  DELIVERED| 3226|
|HL_13203|     UNSEEN|  249|
|HL_13203|UNDELIVERED|  210|
|HL_13203|   ASSIGNED|    2|
|HL_13203|     PICKED|    0|
|HL_14108|  DELIVERED| 3083|
|HL_14108|UNDELIVERED|  164|
|HL_14108|     PICKED|    1|
|HL_14108|     UNSEEN|    0|
|HL_14108|   ASSIGNED|    0|
 -------- ----------- ----- 
  

Я хочу добавить недостающие строки категории для каждого кода. Какой был бы правильный подход для этого в Apache spark?

Ответ №1:

Сначала создайте новый фрейм данных со всеми возможными комбинациями столбцов code и status . Это можно сделать разными способами, но наиболее простым является перекрестное соединение:

 val states = df.select("status").dropDuplicates()
val codes = df.select("code").dropDuplicates()
val df2 = codes.crossJoin(states)
  

Лучшим подходом было бы сначала определить все возможные состояния, а затем использовать explode и typedLit (доступно с версии Spark 2.2 ). Это приведет к тому же фрейму данных:

 val states = df.select("status").dropDuplicates().as[String].collect()
val codes = df.select("code").dropDuplicates()
val df2 = codes.withColumn("status", explode(typedLit(states)))
  

Для более старых версий Spark та же функциональность, что typedLit может быть достигнута с помощью array(states.map(lit(_)): _*) .


Затем, join этот новый фрейм данных со старым, чтобы получить count столбец. Строки без count значения будут NaN , поэтому na.fill(0) используется для установки их в 0:

 df2.join(df, Seq("code", "status"), "left").na.fill(0)
  

Результирующий фрейм данных:

  -------- ----------- ----- 
|    code|     status|count|
 -------- ----------- ----- 
|HL_13203|UNDELIVERED|  210|
|HL_13203|   ASSIGNED|    2|
|HL_13203|     UNSEEN|  249|
|HL_13203|     PICKED|    0|
|HL_13203|  DELIVERED| 3226|
|HL_14108|UNDELIVERED|  164|
|HL_14108|   ASSIGNED|    0|
|HL_14108|     UNSEEN|    0|
|HL_14108|     PICKED|    1|
|HL_14108|  DELIVERED| 3083|
 -------- ----------- -----