Объяснение физического плана Spark для самостоятельного соединения DF и дополнительного условия

#apache-spark #catalyst-optimizer

Вопрос:

Я ответил на вопрос здесь на ЭТОМ и, будучи прилежным человеком, посмотрел на физический план.

Код:

 import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

val df = sc.parallelize(Seq( (1.0, 2.0, 1), (0.0, -1.0, 1), (3.0, 4.0, 1), (6.0, -2.3, 4))).toDF("x", "y", "z")
val newSchema = StructType(df.schema.fields    Array(StructField("rowid", LongType, false)))

val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq    Array(index))}, newSchema)

dfZippedWithId.show(false)
dfZippedWithId.printSchema()

val res = dfZippedWithId.as("dfZ1").join(dfZippedWithId.as("dfZ2"), $"dfZ1.z" ===  $"dfZ2.z" amp;amp;
                                                                    $"dfZ1.rowid" ===  $"dfZ2.rowid" -1
                                                                   ,"inner")
                        .withColumn("newx", $"dfZ2.x" - $"dfZ1.x").explain(true) 
 

Физический план:

 == Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
 - Project [x#1223, y#1224, z#1225, rowid#1226L, x#1248, y#1249, z#1250, rowid#1251L, (x#1248 - x#1223) AS newx#1268]
    - SortMergeJoin [z#1225, rowid#1226L], [z#1250, (rowid#1251L - 1)], Inner
      :- Sort [z#1225 ASC NULLS FIRST, rowid#1226L ASC NULLS FIRST], false, 0
      :   - Exchange hashpartitioning(z#1225, rowid#1226L, 200), ENSURE_REQUIREMENTS, [id=#1761]
      :      - Scan ExistingRDD[x#1223,y#1224,z#1225,rowid#1226L]
       - Sort [z#1250 ASC NULLS FIRST, (rowid#1251L - 1) ASC NULLS FIRST], false, 0
          - Exchange hashpartitioning(z#1250, (rowid#1251L - 1), 200), ENSURE_REQUIREMENTS, [id=#1762]
             - Scan ExistingRDD[x#1248,y#1249,z#1250,rowid#1251L]
 

По моему образу мыслей:

  1. должна быть только копия или повторное использование DF
  2. и идентификатор строки -1 должен быть проверен в том же разделе.

Не уверен, как интерпретировать эту часть:

 Exchange hashpartitioning(z#1250, (rowid#1251L - 1), 200)
 

Ладно, не в масштабах, но, похоже, для имхо были выбраны некоторые ненужные умные решения (rowid#1251L - 1) . Если только сортировка каким-то образом не компенсирует, но я в этом как-то не убежден. Кто может сказать, что на самом деле означает идея оптимизатора? Я думаю, что могу догадаться об этом, но это кажется излишним афаикой.