Первая и последняя агрегированные функции Spark SQL — неожиданный результат

#scala #apache-spark #dataframe #apache-spark-sql

#scala #apache-spark #фрейм данных #apache-spark-sql

Вопрос:

Получение неожиданного результата при выполнении первой и последней агрегированных функций в Spark Dataframe.

У меня есть фрейм данных spark, содержащий столбцы colA, ColB, ColC, colD, colE, extraCol1, extraCol2

И мне нужно выполнить агрегацию в этом фрейме данных с помощью

группировка -> colA amp; ColB, max -> ColC, max -> colD, first -> colE, extraCol1, extraCol2

Итак, ниже приведен фрейм данных (df), который я использую, и я использую разделение spark (3)

 colA    colB    colC    colD    colE    extraCol1   extracol2
Harshit 23        43    44         A           q    z
Mohit   24        56    62         B           w    x
Harshit 23        32    44         C           e    c
Kali    10        20    460        D           r    v
Aman    20        30    180        E           t    b
Ram     30        100   270        F          yu    n
Kali    10        600   360        G          io    m
Kali    10        600   460        k           p    o
 

Ниже приведен код scala и spark, который я использую для выполнения операции groupBy

  val cols = List("colA","colB")

 var  aggFuncSeq = List(max(`colC`) as colC_new, max(`colD`) as colD_new, first(`colE`,true) as colE, first(`extracol2`,true) as extracol2, first(`extraCol1`,true) as extraCol1)

 var aggFuncs = aggFuncSeq.map(e => expr(e))

 df = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

 df.show(10)
 

После выполнения я получаю неожиданный результат, как показано ниже.

 colA    colB    colC_new    colD_new    colE    extracol2   extraCol1
Harshit     23      43            44        C       c       e
Aman        20      30            180       E       b       t
Kali        10      600           460       D       v       r
Ram         30      100           270       F       n       yu
Mohit       24      56            62        B       x       w
 

Но в соответствии с условием группировки и выполненной операцией агрегирования, выходной результат должен иметь первую строку, соответствующую Harshit, для colE, extracol2, extracol1

Итак, ожидаемый результат был следующим

 colA    colB    colC_new    colD_new    colE    extracol2   extraCol1
Harshit     23      43            44        A       q       z
Aman        20      30            180       E       b       t
Kali        10      600           460       D       v       r
Ram         30      100           270       F       n       yu
Mohit       24      56            62        B       x       w
 

Но я не могу понять эту концепцию SQL, как это возможно. Итак, если кто-нибудь может помочь мне разобраться в этой странной проблеме.

Это из-за разделения?

Как это дает этот результат и как исправить его как ожидаемый результат?

Любая помощь приветствуется. Спасибо

Комментарии:

1. Первая — это функция window, пока вы не закажете по ней, она не даст вам ожидаемый результат. Вам нужно сделать что-то вроде Window.partitionBy(colA, ColB).OrderBy(colE))

2. @sp_user123 Тогда я также могу выполнить max (colE), это даст мне тот же результат. Но я хочу получить первый или последний столбец в соответствии с входным фреймом данных, предоставленным пользователем

3. orderby(colE) — это всего лишь пример, в вашем случае, я думаю, вам нужно упорядочить с помощью того же ключа, что и группы coulmns (desc или asc)

4. Возможно, вам захочется добавить столбец F.monotonically_increasing_id перед выполнением какой-либо перетасовки

5. @TarunKhaneja Пожалуйста, проверьте варианты ниже и не забудьте принять ответ, если он соответствует вашим потребностям / вопросам. Спасибо!

Ответ №1:

Когда вы groupBy используете Spark, вы можете изменить порядок вашего фрейма данных. Но не всегда (например. если ваши данные содержатся на одном работнике, они не изменятся). Следовательно, просто чтобы убедиться и иметь масштабируемое решение, вам нужно изменить порядок в вашей оконной функции.

В этом случае попробуйте следующее:

 val w = Window.partitionBy($"key").orderBy($"value")
df
  .withColumn("row_number", row_number.over(w))
  .where($"row_number" === 1)
  .drop("row_number")
 

При этом выбирается только первая строка, отфильтрованная по параметру row_number with row_number , определяемому как индекс строки после упорядочения. Впоследствии это отбрасывается, потому что становится бесполезным.

Примечание: вы можете заменить $ операторы col операторами. Это всего лишь ярлык для более сжатого кода.

Комментарии:

1. Только что обновил мой ответ, пожалуйста, дайте мне знать, если это не сработает.

Ответ №2:

  import org.apache.spark.sql.functions.{max, _}
    import spark.implicits._

    val columnsDF = Seq(
      ("Harshit", 23, 43, 44, "A", "q", "z"),
      ("Mohit", 24, 56, 62, "B", "w", "x"),
      ("Harshit", 23, 32, 44, "C", "e", "c"),
      ("Kali", 10, 20, 460, "D", "r", "v"),
      ("Aman", 20, 30, 180, "E", "t", "b"),
      ("Ram", 30, 100, 270, "F", "yu", "n"),
      ("Kali", 10, 600, 360, "G", "io", "m"),
      ("Kali", 10, 600, 460, "k", "p", "o")
    ).toDF("ColA", "ColB", "ColC", "ColD", "ColE", "extraCol1", "extraCol2")


    println("Before Aggregation")
    columnsDF.show()

    val cols = List("colA", "colB")

    println("After Aggregation")
    val aggSeqFunction = columnsDF.agg(max(columnsDF.columns(2)),
      max(columnsDF.columns(3)),
      first(columnsDF.columns(4)),
      first(columnsDF.columns(6)),
      first(columnsDF.columns(5)))

    val aggFunction = aggSeqFunction.columns.map(en => expr(en))


    columnsDF.groupBy(cols.head, cols.tail: _*).agg(aggFunction.head, aggFunction.tail: _*).show()

    /*
             ------- ---- --------- --------- ------------------ ----------------------- ----------------------- 
            |   colA|colB|max(ColC)|max(ColD)|first(ColE, false)|first(extraCol2, false)|first(extraCol1, false)|
             ------- ---- --------- --------- ------------------ ----------------------- ----------------------- 
            |Harshit|  23|       43|       44|                 A|                      z|                      q|
            |   Aman|  20|       30|      180|                 E|                      b|                      t|
            |   Kali|  10|      600|      460|                 D|                      v|                      r|
            |    Ram|  30|      100|      270|                 F|                      n|                     yu|
            |  Mohit|  24|       56|       62|                 B|                      x|                      w|
             ------- ---- --------- --------- ------------------ ----------------------- ----------------------- 
     */
 

Я могу получить ожидаемый результат.

Надеюсь, это поможет.

Комментарии:

1. Этот параметр масштабируется в кластере не потому, что он работает на вашем компьютере. После a groupBy у вас нет никаких гарантий, что вы DataFrame останетесь отсортированными.