можно ли избежать второго обмена, когда spark соединяет два набора данных с помощью joinWith?

#apache-spark #join #apache-spark-dataset #catalyst-optimizer

#apache-искра #Присоединиться #apache-spark-набор данных #катализатор-оптимизатор

Вопрос:

Для следующего фрагмента кода:

 case class SomeRow(key: String, value: String)  spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val ds1 = Seq(SomeRow("A", "1")).toDS().repartition(col("key")) val ds2 = Seq(SomeRow("A", "1"), SomeRow("B","2")).toDS().repartition(col("key"))  val dataSetJoined = ds1.joinWith(ds2, ds1("key")===ds2("key"), "left") val dataFrameJoined = ds1.join(ds2, ds1("key")===ds2("key"), "left") dataSetJoined.explain(true) dataFrameJoined.explain(true)  

Spark генерирует следующий план для набора данных:

 == Physical Plan == SortMergeJoin [_1#132.key], [_2#133.key], LeftOuter :- *(2) Sort [_1#132.key ASC NULLS FIRST], false, 0 :  - Exchange hashpartitioning(_1#132.key, 2) :  - *(1) Project [named_struct(key, key#122, value, value#123) AS _1#132] :  - Exchange hashpartitioning(key#122, 2) :  - LocalTableScan [key#122, value#123]  - *(4) Sort [_2#133.key ASC NULLS FIRST], false, 0   - Exchange hashpartitioning(_2#133.key, 2)   - *(3) Project [named_struct(key, key#128, value, value#129) AS _2#133]   - Exchange hashpartitioning(key#128, 2)   - LocalTableScan [key#128, value#129]  

и для фрейма данных:

 == Physical Plan == SortMergeJoin [key#122], [key#128], LeftOuter :- *(1) Sort [key#122 ASC NULLS FIRST], false, 0 :  - Exchange hashpartitioning(key#122, 2) :  - LocalTableScan [key#122, value#123]  - *(2) Sort [key#128 ASC NULLS FIRST], false, 0   - Exchange hashpartitioning(key#128, 2)   - LocalTableScan [key#128, value#129]  

Можно ли избежать другого такого же обмена при соединении двух наборов данных с помощью joinWith?