#scala #apache-spark #dataframe #apache-spark-sql
#scala #apache-spark #фрейм данных #apache-spark-sql
Вопрос:
Получение неожиданного результата при выполнении первой и последней агрегированных функций в Spark Dataframe.
У меня есть фрейм данных spark, содержащий столбцы colA, ColB, ColC, colD, colE, extraCol1, extraCol2
И мне нужно выполнить агрегацию в этом фрейме данных с помощью
группировка -> colA amp; ColB, max -> ColC, max -> colD, first -> colE, extraCol1, extraCol2
Итак, ниже приведен фрейм данных (df), который я использую, и я использую разделение spark (3)
colA colB colC colD colE extraCol1 extracol2
Harshit 23 43 44 A q z
Mohit 24 56 62 B w x
Harshit 23 32 44 C e c
Kali 10 20 460 D r v
Aman 20 30 180 E t b
Ram 30 100 270 F yu n
Kali 10 600 360 G io m
Kali 10 600 460 k p o
Ниже приведен код scala и spark, который я использую для выполнения операции groupBy
val cols = List("colA","colB")
var aggFuncSeq = List(max(`colC`) as colC_new, max(`colD`) as colD_new, first(`colE`,true) as colE, first(`extracol2`,true) as extracol2, first(`extraCol1`,true) as extraCol1)
var aggFuncs = aggFuncSeq.map(e => expr(e))
df = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)
df.show(10)
После выполнения я получаю неожиданный результат, как показано ниже.
colA colB colC_new colD_new colE extracol2 extraCol1
Harshit 23 43 44 C c e
Aman 20 30 180 E b t
Kali 10 600 460 D v r
Ram 30 100 270 F n yu
Mohit 24 56 62 B x w
Но в соответствии с условием группировки и выполненной операцией агрегирования, выходной результат должен иметь первую строку, соответствующую Harshit, для colE, extracol2, extracol1
Итак, ожидаемый результат был следующим
colA colB colC_new colD_new colE extracol2 extraCol1
Harshit 23 43 44 A q z
Aman 20 30 180 E b t
Kali 10 600 460 D v r
Ram 30 100 270 F n yu
Mohit 24 56 62 B x w
Но я не могу понять эту концепцию SQL, как это возможно. Итак, если кто-нибудь может помочь мне разобраться в этой странной проблеме.
Это из-за разделения?
Как это дает этот результат и как исправить его как ожидаемый результат?
Любая помощь приветствуется. Спасибо
Комментарии:
1. Первая — это функция window, пока вы не закажете по ней, она не даст вам ожидаемый результат. Вам нужно сделать что-то вроде Window.partitionBy(colA, ColB).OrderBy(colE))
2. @sp_user123 Тогда я также могу выполнить max (colE), это даст мне тот же результат. Но я хочу получить первый или последний столбец в соответствии с входным фреймом данных, предоставленным пользователем
3. orderby(colE) — это всего лишь пример, в вашем случае, я думаю, вам нужно упорядочить с помощью того же ключа, что и группы coulmns (desc или asc)
4. Возможно, вам захочется добавить столбец
F.monotonically_increasing_id
перед выполнением какой-либо перетасовки5. @TarunKhaneja Пожалуйста, проверьте варианты ниже и не забудьте принять ответ, если он соответствует вашим потребностям / вопросам. Спасибо!
Ответ №1:
Когда вы groupBy
используете Spark, вы можете изменить порядок вашего фрейма данных. Но не всегда (например. если ваши данные содержатся на одном работнике, они не изменятся). Следовательно, просто чтобы убедиться и иметь масштабируемое решение, вам нужно изменить порядок в вашей оконной функции.
В этом случае попробуйте следующее:
val w = Window.partitionBy($"key").orderBy($"value")
df
.withColumn("row_number", row_number.over(w))
.where($"row_number" === 1)
.drop("row_number")
При этом выбирается только первая строка, отфильтрованная по параметру row_number
with row_number
, определяемому как индекс строки после упорядочения. Впоследствии это отбрасывается, потому что становится бесполезным.
Примечание: вы можете заменить $
операторы col
операторами. Это всего лишь ярлык для более сжатого кода.
Комментарии:
1. Только что обновил мой ответ, пожалуйста, дайте мне знать, если это не сработает.
Ответ №2:
import org.apache.spark.sql.functions.{max, _}
import spark.implicits._
val columnsDF = Seq(
("Harshit", 23, 43, 44, "A", "q", "z"),
("Mohit", 24, 56, 62, "B", "w", "x"),
("Harshit", 23, 32, 44, "C", "e", "c"),
("Kali", 10, 20, 460, "D", "r", "v"),
("Aman", 20, 30, 180, "E", "t", "b"),
("Ram", 30, 100, 270, "F", "yu", "n"),
("Kali", 10, 600, 360, "G", "io", "m"),
("Kali", 10, 600, 460, "k", "p", "o")
).toDF("ColA", "ColB", "ColC", "ColD", "ColE", "extraCol1", "extraCol2")
println("Before Aggregation")
columnsDF.show()
val cols = List("colA", "colB")
println("After Aggregation")
val aggSeqFunction = columnsDF.agg(max(columnsDF.columns(2)),
max(columnsDF.columns(3)),
first(columnsDF.columns(4)),
first(columnsDF.columns(6)),
first(columnsDF.columns(5)))
val aggFunction = aggSeqFunction.columns.map(en => expr(en))
columnsDF.groupBy(cols.head, cols.tail: _*).agg(aggFunction.head, aggFunction.tail: _*).show()
/*
------- ---- --------- --------- ------------------ ----------------------- -----------------------
| colA|colB|max(ColC)|max(ColD)|first(ColE, false)|first(extraCol2, false)|first(extraCol1, false)|
------- ---- --------- --------- ------------------ ----------------------- -----------------------
|Harshit| 23| 43| 44| A| z| q|
| Aman| 20| 30| 180| E| b| t|
| Kali| 10| 600| 460| D| v| r|
| Ram| 30| 100| 270| F| n| yu|
| Mohit| 24| 56| 62| B| x| w|
------- ---- --------- --------- ------------------ ----------------------- -----------------------
*/
Я могу получить ожидаемый результат.
Надеюсь, это поможет.
Комментарии:
1. Этот параметр масштабируется в кластере не потому, что он работает на вашем компьютере. После a
groupBy
у вас нет никаких гарантий, что выDataFrame
останетесь отсортированными.