#apache-spark #join #apache-spark-dataset #catalyst-optimizer
#apache-искра #Присоединиться #apache-spark-набор данных #катализатор-оптимизатор
Вопрос:
Для следующего фрагмента кода:
case class SomeRow(key: String, value: String) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val ds1 = Seq(SomeRow("A", "1")).toDS().repartition(col("key")) val ds2 = Seq(SomeRow("A", "1"), SomeRow("B","2")).toDS().repartition(col("key")) val dataSetJoined = ds1.joinWith(ds2, ds1("key")===ds2("key"), "left") val dataFrameJoined = ds1.join(ds2, ds1("key")===ds2("key"), "left") dataSetJoined.explain(true) dataFrameJoined.explain(true)
Spark генерирует следующий план для набора данных:
== Physical Plan == SortMergeJoin [_1#132.key], [_2#133.key], LeftOuter :- *(2) Sort [_1#132.key ASC NULLS FIRST], false, 0 : - Exchange hashpartitioning(_1#132.key, 2) : - *(1) Project [named_struct(key, key#122, value, value#123) AS _1#132] : - Exchange hashpartitioning(key#122, 2) : - LocalTableScan [key#122, value#123] - *(4) Sort [_2#133.key ASC NULLS FIRST], false, 0 - Exchange hashpartitioning(_2#133.key, 2) - *(3) Project [named_struct(key, key#128, value, value#129) AS _2#133] - Exchange hashpartitioning(key#128, 2) - LocalTableScan [key#128, value#129]
и для фрейма данных:
== Physical Plan == SortMergeJoin [key#122], [key#128], LeftOuter :- *(1) Sort [key#122 ASC NULLS FIRST], false, 0 : - Exchange hashpartitioning(key#122, 2) : - LocalTableScan [key#122, value#123] - *(2) Sort [key#128 ASC NULLS FIRST], false, 0 - Exchange hashpartitioning(key#128, 2) - LocalTableScan [key#128, value#129]
Можно ли избежать другого такого же обмена при соединении двух наборов данных с помощью joinWith?