#java #apache-spark #pyspark #apache-spark-sql #dataset
Вопрос:
Я использую Spark с Java, и у меня есть такой фрейм данных:
id | array_column
-------------------
12 | [a:123, b:125, c:456]
13 | [a:443, b:225, c:126]
и я хочу взорвать array_column с тем же идентификатором, однако explode
этого недостаточно, потому что я хочу, чтобы фрейм данных был:
id | a | b | c
-------------------
12 |123 |125 | 456
13 |443 |225 | 126
Поэтому нормальное explode
включение array_column
в этом случае не работает хорошо.
Я был бы рад вашей помощи, спасибо!
Комментарии:
1. У вас есть фиксированное количество элементов на карте или его можно изменить?
2. @ArtemAstashov Номер не фиксирован, но при необходимости его можно заблокировать большим номером
Ответ №1:
Очень похожий подход, подобный ответу ggordon на Java:
import static org.apache.spark.sql.functions.*;
Dataset<Row> df = ...
df.withColumn("array_column", explode(col("array_column")))
.withColumn("array_column", split(col("array_column"), ":"))
.withColumn("key", col("array_column").getItem(0))
.withColumn("value", col("array_column").getItem(1))
.groupBy(col("id"))
.pivot(col("key"))
.agg(first("value")) //1
.show();
Выход:
--- --- --- ---
| id| a| b| c|
--- --- --- ---
| 12|456|225|126|
| 11|123|125|456|
--- --- --- ---
Я предполагаю, что комбинация id
и и ключевого поля в массиве уникальна. Вот почему функция агрегирования, используемая в //1
is first
. Если эта комбинация не уникальна, можно изменить функцию агрегирования collect_list
, чтобы получить массив всех совпадающих значений.
Ответ №2:
Следующий подход будет работать со списками переменной длины в array_column
. Этот подход используется explode
для расширения списка строковых элементов array_column
перед разделением каждого строкового элемента с помощью :
на два разных столбца col_name
и col_val
соответственно. Наконец, сводная таблица используется с группой по для переноса данных в нужный формат.
В следующем примере используется api pyspark, но его можно легко перевести на API java/scala, поскольку они похожи. Я предположил, что ваш набор данных находится в фрейме данных с именем input_df
from pyspark.sql import functions as F
output_df = (
input_df.select("id",F.explode("array_column").alias("acol"))
.select(
"id",
F.split("acol",":")[0].alias("col_name"),
F.split("acol",":")[1].cast("integer").alias("col_val")
)
.groupBy("id")
.pivot("col_name")
.max("col_val")
)
Дайте мне знать, если это сработает для вас.