#apache-spark #apache-spark-sql #databricks
#apache-spark #apache-spark-sql #блоки данных
Вопрос:
В моем файле parquet у меня есть сглаженные входящие данные в приведенном ниже формате:
Я хочу преобразовать его в приведенный ниже формат, в котором я не сглаживаю свою структуру:
Я попробовал следующее:
Dataset<Row> rows = df.select(col("id"), col("country_cd"),
explode(array("fullname_1", "fullname_2")).as("fullname"),
explode(array("firstname_1", "firstname_2")).as("firstname"));
Но это выдает следующую ошибку:
Исключение в потоке «main» org.apache.spark.sql.AnalysisException: для каждого предложения select разрешен только один генератор, но найдено 2: explode(array(fullname_1, fullname_2)), explode(array(firstname_1, firstname_2));
Я понимаю, это потому, что вы не можете использовать более 1 разнесения в запросе. Я ищу варианты для выполнения вышеуказанного в Spark Java.
Ответ №1:
Этот тип проблемы проще всего решить с .flatMap()
помощью . A .flatMap()
похож на a .map()
, за исключением того, что он позволяет выводить n записей для каждой входной записи, в отличие от соотношения 1: 1.
val df = Seq(
(1, "USA", "Lee M", "Lee", "Dan A White", "Dan"),
(2, "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")
df.flatMap(row => {
val id = row.getAs[Int]("id")
val cc = row.getAs[String]("country_code")
Seq(
(id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1")),
(id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1"))
)
}).toDF("id", "country_code", "fullname", "firstname").show()
Это приводит к следующему:
--- ------------ ----------- ---------
| id|country_code| fullname|firstname|
--- ------------ ----------- ---------
| 1| USA| Lee M| Lee|
| 1| USA| Lee M| Lee|
| 2| CAN|Pate Poland| Pate|
| 2| CAN|Pate Poland| Pate|
--- ------------ ----------- ---------
Комментарии:
1. Это решение находится в scala, но его должно быть легко перенести.
Ответ №2:
Вам нужно обернуть имена и фамилии в массив структур, которые вы позже затем разнесете:
Dataset<Row> rows = df.select(col("id"), col("country_cd"),
explode(
array(
struct(
col("firstname_1").as("firstname"), col("fullname_1").as("fullname")),
struct(
col("firstname_2").as("firstname"), col("fullname_2").as("fullname"))
)
)
)
Таким образом, вы получите быстрое узкое преобразование, будете иметь переносимость Scala / Python / R, и оно должно выполняться быстрее, чем df.flatMap
решение, которое превратит Dataframe в RDD, который оптимизатор запросов не может улучшить. Может возникнуть дополнительное давление со стороны сборщика мусора Java из-за копирования из небезопасных байтовых массивов в объекты java.
Ответ №3:
Как специалист по базам данных, мне нравится использовать операции на основе наборов для подобных вещей, например union
val df = Seq(
("1", "USA", "Lee M", "Lee", "Dan A White", "Dan"),
("2", "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")
val df_new = df
.select("id", "country_code", "fullname_1", "firstname_1").union(df.select("id", "country_code", "fullname_2", "firstname_2"))
.orderBy("id")
df_new.show
df.createOrReplaceTempView("tmp")
Или эквивалентный SQL:
%sql
SELECT id, country_code, fullname_1 AS fullname, firstname_1 AS firstname
FROM tmp
UNION
SELECT id, country_code, fullname_2, firstname_2
FROM tmp
Мои результаты:
Я полагаю, что одним из преимуществ метода flatMap является то, что вам не нужно указывать типы данных, и на первый взгляд это кажется проще. Конечно, это зависит от вас.