#scala #apache-spark #apache-spark-sql #apache-spark-dataset
#scala #apache-spark #apache-spark-sql #apache-spark-dataset
Вопрос:
Это небольшой тестовый пример для воспроизведения проблемы, которую я вижу при соединении в моем коде
case class B(val b1:String, val b2: Int)
val B1 = new B("One",1)
val B2 = new B("Two",2)
val dsB = spark.createDataset(Seq(B1, B2))
dsB.show()
--- ---
| b1| b2|
--- ---
|One| 1|
|Two| 2|
--- ---
val m = Map(1->"Van")
val mapget = spark.udf.register("mapget", (b: Int) => m.get(b))
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null")
dsB1.show()
--- ---
| b1| b2|
--- ---
|One|Van|
--- ---
val j = dsB1.joinWith(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
---------- --------
| _1| _2|
---------- --------
|[One, Van]|[One, 1]|
|[One, Van]|[Two, 2]|
---------- --------
joinWith
Результат неправильный. По сути, это перекрестный продукт. Есть какие-нибудь подсказки, в чем проблема? Я убедился, что join
API работает нормально.
val j = dsB1.join(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
--- --- --- ---
| b1| b2| b1| b2|
--- --- --- ---
|One|Van|One| 1|
--- --- --- ---
Комментарии:
1. Я также получаю ошибку, о которой упоминал @Grisha_WeinTraub . Если вы измените свое внутреннее соединение на cross, тогда оно сработает и выдаст результат, который вы показали.
Ответ №1:
Похоже, вы используете довольно старую версию Spark. В Spark 2.4.4 я получаю следующее исключение при запуске вашего примера:
org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LocalRelation [_1#55]
and
LocalRelation [_2#56]
Join condition is missing or trivial.
Причина в том, что условие соединения фактически сравнивается dsB("b1")
с самим собой, и это всегда верно.
Тривиальным решением было бы переименовать столбец. Что-то вроде этого:
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null").withColumnRenamed("b1", "b1_2")
val j = dsB1.joinWith(dsB, dsB1("b1_2") === dsB("b1"), "inner")
j.show
---------- --------
| _1| _2|
---------- --------
|[One, Van]|[One, 1]|
---------- --------
Комментарии:
1.На самом деле у меня более новая версия
spark version 3.0.2.19
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
2. Также любые комментарии о том, почему
join
работает, а неjoinWith
3. Похоже, что в Spark 3.0 исключение больше не генерируется, и вы просто получаете результат декартового соединения (SPARK-28621). Разница между
join
иjoinWith
заключается только в том, как это реализовано прямо сейчас. Есть много связанных проблем (например, SPARK-6459, SPARK-25150, SPARK-20073), некоторые из них выполнены, некоторые в процессе, надеюсь, это тоже будет исправлено однажды.