Искра разнесет массив по столбцам

#java #apache-spark #pyspark #apache-spark-sql #dataset

Вопрос:

Я использую Spark с Java, и у меня есть такой фрейм данных:

 id  | array_column
-------------------
12  | [a:123, b:125, c:456]
13  | [a:443, b:225, c:126]
 

и я хочу взорвать array_column с тем же идентификатором, однако explode этого недостаточно, потому что я хочу, чтобы фрейм данных был:

 id  | a  | b  | c
-------------------
12  |123 |125 | 456 
13  |443 |225 | 126
 

Поэтому нормальное explode включение array_column в этом случае не работает хорошо.

Я был бы рад вашей помощи, спасибо!

Комментарии:

1. У вас есть фиксированное количество элементов на карте или его можно изменить?

2. @ArtemAstashov Номер не фиксирован, но при необходимости его можно заблокировать большим номером

Ответ №1:

Очень похожий подход, подобный ответу ggordon на Java:

 import static org.apache.spark.sql.functions.*;

Dataset<Row> df = ...

df.withColumn("array_column", explode(col("array_column")))
        .withColumn("array_column", split(col("array_column"), ":"))
        .withColumn("key", col("array_column").getItem(0))
        .withColumn("value", col("array_column").getItem(1))
        .groupBy(col("id"))
        .pivot(col("key"))
        .agg(first("value")) //1
        .show();
 

Выход:

  --- --- --- --- 
| id|  a|  b|  c|
 --- --- --- --- 
| 12|456|225|126|
| 11|123|125|456|
 --- --- --- --- 
 

Я предполагаю, что комбинация id и и ключевого поля в массиве уникальна. Вот почему функция агрегирования, используемая в //1 is first . Если эта комбинация не уникальна, можно изменить функцию агрегирования collect_list , чтобы получить массив всех совпадающих значений.

Ответ №2:

Следующий подход будет работать со списками переменной длины в array_column . Этот подход используется explode для расширения списка строковых элементов array_column перед разделением каждого строкового элемента с помощью : на два разных столбца col_name и col_val соответственно. Наконец, сводная таблица используется с группой по для переноса данных в нужный формат.

В следующем примере используется api pyspark, но его можно легко перевести на API java/scala, поскольку они похожи. Я предположил, что ваш набор данных находится в фрейме данных с именем input_df

 from pyspark.sql import functions as F

output_df = (
    input_df.select("id",F.explode("array_column").alias("acol"))
            .select(
                "id",
                F.split("acol",":")[0].alias("col_name"),
                F.split("acol",":")[1].cast("integer").alias("col_val")
            )
            .groupBy("id")
            .pivot("col_name")
            .max("col_val")
)
 

Дайте мне знать, если это сработает для вас.