#scala #apache-spark #apache-spark-sql #bigdata
#scala #apache-spark #apache-spark-sql #bigdata
Вопрос:
Мне нужно добавить несколько столбцов к существующему фрейму данных spark, где имена столбцов указаны в списке, предполагая, что значения для новых столбцов постоянны, например, заданные входные столбцы и фрейм данных
val columnsNames=List("col1","col2")
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4))
и после добавления обоих столбцов, предполагая, что постоянными значениями являются «val1» для col1 и «val2» для col2, выходной фрейм данных должен быть
----- --- ------- ------
| _1| _2|col1 |col2|
----- --- ------- ------
| one| 1|val1 |val2|
| two| 2|val1 |val2|
|three| 3|val1 |val2|
| four| 4|val1 |val2|
----- --- ------- ------
я написал функцию для добавления столбцов
def appendColumns (cols: List[String], ds: DataFrame): DataFrame = {
cols match {
case Nil => ds
case h :: Nil => appendColumns(Nil, ds.withColumn(h, lit(h)))
case h :: tail => appendColumns(tail, ds.withColumn(h, lit(h)))
}
}
Есть ли лучший способ и более функциональный способ сделать это.
Спасибо
Комментарии:
1. Просто чтобы уточнить, в
appendColumns
имени столбца совпадает со значением столбца, в то время как в ожидаемом выходном фрейме данных значение для, напримерcol1
val1
, может быть одинаковым (имя столбца и значение) или вы хотите, чтобы они были отдельными?2. имя столбца и значение столбца могут быть одинаковыми.
3. Странная причина закрытия.
4. Привет, вы нашли ответ на свой вопрос? Или что-то еще неясно?
5. Спасибо, Оли, да, предложенный подход был очень хорошим.
Ответ №1:
Да, есть лучший и более простой способ. По сути, вы делаете столько вызовов withColumn
, сколько у вас есть столбцов. При большом количестве столбцов catalyst, движок, оптимизирующий запросы spark, может показаться немного перегруженным (у меня был опыт в прошлом с аналогичным вариантом использования). Я даже видел, как это вызывало ООМ в драйвере при экспериментировании с тысячами столбцов. Чтобы избежать нагрузки на catalyst (и писать меньше кода ;-)), вы можете просто использовать select
, как показано ниже, чтобы сделать это одной командой spark:
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF
// let's assume that we have a map that associates column names to their values
val columnMap = Map("col1" -> "val1", "col2" -> "val2")
// Let's create the new columns from the map
val newCols = columnMap.keys.map(k => lit(columnMap(k)) as k)
// selecting the old columns the new ones
data.select(data.columns.map(col) newCols : _*).show
----- --- ---- ----
| _1| _2|col1|col2|
----- --- ---- ----
| one| 1|val1|val2|
| two| 2|val1|val2|
|three| 3|val1|val2|
| four| 4|val1|val2|
----- --- ---- ----
Ответ №2:
В отличие от рекурсии, более общий подход с использованием foldLeft, я думаю, будет более общим для ограниченного числа столбцов. Использование блокнота Databricks:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import spark.implicits._
val columnNames = Seq("c3","c4")
val df = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("c1", "c2")
def addCols(df: DataFrame, columns: Seq[String]): DataFrame = {
columns.foldLeft(df)((acc, col) => {
acc.withColumn(col, lit(col)) })
}
val df2 = addCols(df, columnNames)
df2.show(false)
ВОЗВРАТ:
----- --- --- ---
|c1 |c2 |c3 |c4 |
----- --- --- ---
|one |1 |c3 |c4 |
|two |2 |c3 |c4 |
|three|3 |c3 |c4 |
|four |4 |c3 |c4 |
----- --- --- ---
Пожалуйста, остерегайтесь следующего: https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 хотя и в несколько ином контексте, и другой ответ ссылается на это с помощью подхода select.