#scala #apache-spark #apache-spark-sql #apache-spark-dataset
Вопрос:
Я столкнулся с проблемой совместимости после миграции spark с 2.4.5 на 3.x. Просто проверьте здесь, не сталкивался ли кто-нибудь с такой же проблемой раньше или с какими-либо советами эксперта spark.
Приведенный ниже код отлично работает на spark 2, но он создает исключение на 3.x. Мой результат отладки показывает, что существует проблема с null.asInstanceOf[Порядок]. До тех пор, пока соответствие шаблону не попадет в строки с нулем.asInstanceOf[Порядок], это хорошо. нулевой.asInstanceOf[Порядок] сам по себе в порядке, поэтому я предполагаю, что это кодер результатов, который не может кодировать нуль порядка.
Кроме того, если я изменю тип на метку времени для кодировщика, он будет хорошо работать без каких-либо исключений. Разница здесь только в кодере или типе.
Я провел некоторое исследование, и, похоже, там не так много соответствующей информации.
package mypkg.transformation
import java.sql.Timestamp
import org.apache.spark.sql.{Encoders, SparkSession}
import org.scalatest.{FlatSpec, Matchers}
import scala.beans.BeanProperty
class UserInfo {
@BeanProperty var username: String = ""
@BeanProperty var email: String = ""
override def toString = s"UserInfo: ($username, $email)"
}
class Order {
@BeanProperty var username: String = ""
@BeanProperty var orderId: String = ""
override def toString = s"Order: ($username, $orderId)"
}
class EncoderTest2 extends FlatSpec with Matchers {
protected val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
"Assemble Profile " should " assemble data" in {
val UserInforEncoder = Encoders.bean(classOf[UserInfo])
val OrderEncoder = Encoders.bean(classOf[Order])
val ResultEncoder = Encoders.tuple(UserInforEncoder, OrderEncoder)
// val ResultEncoder = Encoders.tuple(UserInforEncoder, Encoders.TIMESTAMP)
val dfUserInfo = Seq(("user1", "someone@email.com")).toDF("username", "email")
.as[UserInfo](UserInforEncoder)
val dfOrder = Seq(("user2", "SomeOrderId")).toDF("username", "orderId")
.as[Order](OrderEncoder)
dfUserInfo.show()
dfOrder.show()
val result = dfUserInfo
.joinWith(dfOrder, dfUserInfo("username") === dfOrder("username"), "left_outer")
.map ({
case (userInfo, null) => (userInfo, null.asInstanceOf[Order])
case (userInfo, order) => (userInfo, order)
// case (userInfo, null) => (userInfo, null.asInstanceOf[Timestamp])
// case (userInfo, order) => (userInfo, null.asInstanceOf[Timestamp])
}) (ResultEncoder)
result.show()
}
}