Соединение набора данных Spark с API дает неправильные результаты

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

Вопрос:

Это небольшой тестовый пример для воспроизведения проблемы, которую я вижу при соединении в моем коде

 case class B(val b1:String, val b2: Int)
val B1 = new B("One",1)
val B2 = new B("Two",2)
val dsB = spark.createDataset(Seq(B1, B2))
dsB.show()
 --- --- 
| b1| b2|
 --- --- 
|One|  1|
|Two|  2|
 --- --- 
val m = Map(1->"Van")
val mapget = spark.udf.register("mapget",  (b: Int) => m.get(b))
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null")
dsB1.show()
 --- --- 
| b1| b2|
 --- --- 
|One|Van|
 --- --- 
val j = dsB1.joinWith(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
 ---------- -------- 
|        _1|      _2|
 ---------- -------- 
|[One, Van]|[One, 1]|
|[One, Van]|[Two, 2]|
 ---------- -------- 
 

joinWith Результат неправильный. По сути, это перекрестный продукт. Есть какие-нибудь подсказки, в чем проблема? Я убедился, что join API работает нормально.

 val j = dsB1.join(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
 --- --- --- --- 
| b1| b2| b1| b2|
 --- --- --- --- 
|One|Van|One|  1|
 --- --- --- --- 
 

Комментарии:

1. Я также получаю ошибку, о которой упоминал @Grisha_WeinTraub . Если вы измените свое внутреннее соединение на cross, тогда оно сработает и выдаст результат, который вы показали.

Ответ №1:

Похоже, вы используете довольно старую версию Spark. В Spark 2.4.4 я получаю следующее исключение при запуске вашего примера:

 org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LocalRelation [_1#55]
and
LocalRelation [_2#56]
Join condition is missing or trivial.
 

Причина в том, что условие соединения фактически сравнивается dsB("b1") с самим собой, и это всегда верно.

Тривиальным решением было бы переименовать столбец. Что-то вроде этого:

 val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null").withColumnRenamed("b1", "b1_2")
val j = dsB1.joinWith(dsB, dsB1("b1_2") === dsB("b1"), "inner")
j.show
 ---------- -------- 
|        _1|      _2|
 ---------- -------- 
|[One, Van]|[One, 1]|
 ---------- -------- 
 

Комментарии:

1.На самом деле у меня более новая версия spark version 3.0.2.19 Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)

2. Также любые комментарии о том, почему join работает, а не joinWith

3. Похоже, что в Spark 3.0 исключение больше не генерируется, и вы просто получаете результат декартового соединения (SPARK-28621). Разница между join и joinWith заключается только в том, как это реализовано прямо сейчас. Есть много связанных проблем (например, SPARK-6459, SPARK-25150, SPARK-20073), некоторые из них выполнены, некоторые в процессе, надеюсь, это тоже будет исправлено однажды.