#apache-spark #catalyst-optimizer
Вопрос:
Я ответил на вопрос здесь на ЭТОМ и, будучи прилежным человеком, посмотрел на физический план.
Код:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val df = sc.parallelize(Seq( (1.0, 2.0, 1), (0.0, -1.0, 1), (3.0, 4.0, 1), (6.0, -2.3, 4))).toDF("x", "y", "z")
val newSchema = StructType(df.schema.fields Array(StructField("rowid", LongType, false)))
val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq Array(index))}, newSchema)
dfZippedWithId.show(false)
dfZippedWithId.printSchema()
val res = dfZippedWithId.as("dfZ1").join(dfZippedWithId.as("dfZ2"), $"dfZ1.z" === $"dfZ2.z" amp;amp;
$"dfZ1.rowid" === $"dfZ2.rowid" -1
,"inner")
.withColumn("newx", $"dfZ2.x" - $"dfZ1.x").explain(true)
Физический план:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Project [x#1223, y#1224, z#1225, rowid#1226L, x#1248, y#1249, z#1250, rowid#1251L, (x#1248 - x#1223) AS newx#1268]
- SortMergeJoin [z#1225, rowid#1226L], [z#1250, (rowid#1251L - 1)], Inner
:- Sort [z#1225 ASC NULLS FIRST, rowid#1226L ASC NULLS FIRST], false, 0
: - Exchange hashpartitioning(z#1225, rowid#1226L, 200), ENSURE_REQUIREMENTS, [id=#1761]
: - Scan ExistingRDD[x#1223,y#1224,z#1225,rowid#1226L]
- Sort [z#1250 ASC NULLS FIRST, (rowid#1251L - 1) ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(z#1250, (rowid#1251L - 1), 200), ENSURE_REQUIREMENTS, [id=#1762]
- Scan ExistingRDD[x#1248,y#1249,z#1250,rowid#1251L]
По моему образу мыслей:
- должна быть только копия или повторное использование DF
- и идентификатор строки -1 должен быть проверен в том же разделе.
Не уверен, как интерпретировать эту часть:
Exchange hashpartitioning(z#1250, (rowid#1251L - 1), 200)
Ладно, не в масштабах, но, похоже, для имхо были выбраны некоторые ненужные умные решения (rowid#1251L - 1)
. Если только сортировка каким-то образом не компенсирует, но я в этом как-то не убежден. Кто может сказать, что на самом деле означает идея оптимизатора? Я думаю, что могу догадаться об этом, но это кажется излишним афаикой.