Объединить несколько столбцов в один столбец в SPARK

#apache-spark #apache-spark-sql #databricks

#apache-spark #apache-spark-sql #блоки данных

Вопрос:

В моем файле parquet у меня есть сглаженные входящие данные в приведенном ниже формате:

введите описание изображения здесь

Я хочу преобразовать его в приведенный ниже формат, в котором я не сглаживаю свою структуру:

введите описание изображения здесь

Я попробовал следующее:

 Dataset<Row> rows = df.select(col("id"), col("country_cd"),
                explode(array("fullname_1", "fullname_2")).as("fullname"),
                explode(array("firstname_1", "firstname_2")).as("firstname"));
  

Но это выдает следующую ошибку:

Исключение в потоке «main» org.apache.spark.sql.AnalysisException: для каждого предложения select разрешен только один генератор, но найдено 2: explode(array(fullname_1, fullname_2)), explode(array(firstname_1, firstname_2));

Я понимаю, это потому, что вы не можете использовать более 1 разнесения в запросе. Я ищу варианты для выполнения вышеуказанного в Spark Java.

Ответ №1:

Этот тип проблемы проще всего решить с .flatMap() помощью . A .flatMap() похож на a .map() , за исключением того, что он позволяет выводить n записей для каждой входной записи, в отличие от соотношения 1: 1.

 val df = Seq(
    (1, "USA", "Lee M", "Lee", "Dan A White", "Dan"),
    (2, "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
    ).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")

df.flatMap(row => {
    val id = row.getAs[Int]("id")
    val cc = row.getAs[String]("country_code")
    Seq(
        (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1")),
        (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1"))
    )
}).toDF("id", "country_code", "fullname", "firstname").show()
  

Это приводит к следующему:

  --- ------------ ----------- --------- 
| id|country_code|   fullname|firstname|
 --- ------------ ----------- --------- 
|  1|         USA|      Lee M|      Lee|
|  1|         USA|      Lee M|      Lee|
|  2|         CAN|Pate Poland|     Pate|
|  2|         CAN|Pate Poland|     Pate|
 --- ------------ ----------- --------- 
  

Комментарии:

1. Это решение находится в scala, но его должно быть легко перенести.

Ответ №2:

Вам нужно обернуть имена и фамилии в массив структур, которые вы позже затем разнесете:

 Dataset<Row> rows = df.select(col("id"), col("country_cd"),
  explode(
    array(
      struct(
        col("firstname_1").as("firstname"), col("fullname_1").as("fullname")),
      struct(
        col("firstname_2").as("firstname"), col("fullname_2").as("fullname"))
    )
  )
)
  

Таким образом, вы получите быстрое узкое преобразование, будете иметь переносимость Scala / Python / R, и оно должно выполняться быстрее, чем df.flatMap решение, которое превратит Dataframe в RDD, который оптимизатор запросов не может улучшить. Может возникнуть дополнительное давление со стороны сборщика мусора Java из-за копирования из небезопасных байтовых массивов в объекты java.

Ответ №3:

Как специалист по базам данных, мне нравится использовать операции на основе наборов для подобных вещей, например union

 val df = Seq(
  ("1", "USA", "Lee M", "Lee", "Dan A White", "Dan"),
  ("2", "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")


val df_new = df
  .select("id", "country_code", "fullname_1", "firstname_1").union(df.select("id", "country_code", "fullname_2", "firstname_2"))
  .orderBy("id")

df_new.show
df.createOrReplaceTempView("tmp")
  

Или эквивалентный SQL:

 %sql
SELECT id, country_code, fullname_1 AS fullname, firstname_1 AS firstname
FROM tmp
UNION
SELECT id, country_code, fullname_2, firstname_2
FROM tmp
  

Мои результаты:

Мои результаты Я полагаю, что одним из преимуществ метода flatMap является то, что вам не нужно указывать типы данных, и на первый взгляд это кажется проще. Конечно, это зависит от вас.