Проблема совместимости кодировщика искры после преобразования искры в 3.X

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

Вопрос:

Я столкнулся с проблемой совместимости после миграции spark с 2.4.5 на 3.x. Просто проверьте здесь, не сталкивался ли кто-нибудь с такой же проблемой раньше или с какими-либо советами эксперта spark.

Приведенный ниже код отлично работает на spark 2, но он создает исключение на 3.x. Мой результат отладки показывает, что существует проблема с null.asInstanceOf[Порядок]. До тех пор, пока соответствие шаблону не попадет в строки с нулем.asInstanceOf[Порядок], это хорошо. нулевой.asInstanceOf[Порядок] сам по себе в порядке, поэтому я предполагаю, что это кодер результатов, который не может кодировать нуль порядка.

Кроме того, если я изменю тип на метку времени для кодировщика, он будет хорошо работать без каких-либо исключений. Разница здесь только в кодере или типе.

Я провел некоторое исследование, и, похоже, там не так много соответствующей информации.

 package mypkg.transformation
import java.sql.Timestamp
import org.apache.spark.sql.{Encoders, SparkSession}
import org.scalatest.{FlatSpec, Matchers}

import scala.beans.BeanProperty

class UserInfo {
  @BeanProperty var username: String = ""
  @BeanProperty var email: String = ""
  override def toString = s"UserInfo: ($username, $email)"
}
class Order {
  @BeanProperty var username: String = ""
  @BeanProperty var orderId: String = ""
  override def toString = s"Order: ($username, $orderId)"
}

class EncoderTest2 extends FlatSpec with Matchers {
  protected val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "5")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._

  "Assemble Profile " should " assemble data" in {

    val UserInforEncoder = Encoders.bean(classOf[UserInfo])
    val OrderEncoder = Encoders.bean(classOf[Order])
    val ResultEncoder = Encoders.tuple(UserInforEncoder, OrderEncoder)
    // val ResultEncoder = Encoders.tuple(UserInforEncoder, Encoders.TIMESTAMP)

    val dfUserInfo = Seq(("user1", "someone@email.com")).toDF("username", "email")
      .as[UserInfo](UserInforEncoder)
    val dfOrder = Seq(("user2", "SomeOrderId")).toDF("username", "orderId")
      .as[Order](OrderEncoder)
    dfUserInfo.show()
    dfOrder.show()

    val result = dfUserInfo
      .joinWith(dfOrder, dfUserInfo("username") === dfOrder("username"), "left_outer")
      .map ({
        case (userInfo, null) => (userInfo, null.asInstanceOf[Order])
        case (userInfo, order) => (userInfo, order)
        // case (userInfo, null) => (userInfo, null.asInstanceOf[Timestamp])
        // case (userInfo, order) => (userInfo, null.asInstanceOf[Timestamp])
      }) (ResultEncoder)
      result.show()
  }
}