Проблема с равенством типов данных Spark для встроенных типов Spark

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

При запуске приложения spark я получаю ошибки глубоко внутри catalyst.

Например:

 java.lang.RuntimeException: scala.MatchError: LongType (of class org.apache.spark.sql.types.LongType$)
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$nullSafeCastFunction(Cast.scala:637)
org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:625)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
  

Я сузил это до следующего в плане spark:

 Project [if (isnull(_rawTime#348L)) null else UDF(toTime(_rawTime#348L)) AS _time#438,
  

(обратите внимание, что я не могу контролировать, чтобы схема была нулевой, поскольку я получаю этот базовый фрейм данных из соединителя spark hbase)

Где toTime UDF занимает много времени и выдает временную метку. Кажется, что catalyst не может соответствовать LongType , даже если оператор match имеет:

  case LongType => castToLongCode(from, ctx)
  

Интересно то, что когда я запускаю это в первый раз, все работает нормально. При втором запуске возникает эта проблема.

Обратите внимание, что это выполняется через apache Livy, поэтому базовый сеанс spark должен быть одинаковым между исполнениями.

Я разместил следующий код в начале своей работы.

   logger.info("----------")
  logger.info(LongType   " "   System.identityHashCode(LongType))
  logger.info(DataTypes.LongType   " "   System.identityHashCode(DataTypes.LongType))
  logger.info("Equal "   (DataTypes.LongType == LongType))
  logger.info("----------")

  

И затем, запустив его, я вижу:

 first run:
----------
LongType 1044985410
LongType 1044985410
Equal true
----------
second run:
----------
LongType 355475697
LongType 1044985410
Equal false
----------
  

Вы можете видеть при запуске 2, что объектный вызов LongType не совпадает с идентификатором, который был запущен при первом запуске.

Комментарий Spark предполагает, что люди используют синглтоны, которые находятся в типах данных. Например .. DataTypes.LongType что имеет смысл, поскольку кажется, что они остаются неизменными. Однако собственный код spark использует не-синглтон.

LongType определяется как

 /**
 * @since 1.3.0
 */
@InterfaceStability.Stable
case object LongType extends LongType
  

В то время DataTypes.LongType как

 public static final DataType LongType = LongType$.MODULE$;
  

Который ссылается на первый (объект case). Имеет смысл, что синглтон останется постоянным. На самом деле в коде spark указаны Please use the singleton типы данных.LongType . .. несмотря на то, что множество внутреннего кода spark этого не делает. Для меня это похоже на ошибку.

Кажется очень странным, что код Scala в Spark будет нормально компилироваться, а затем завершится неудачей с этим внезапным изменением идентификатора типов.

Итак, мои вопросы:

  • Каковы рекомендации по использованию DataType в Spark? Должен ли я использовать синглтоны или не синглтоны?
  • Что может привести к изменению этого идентификатора подо мной?

Ответ №1:

Я решил проблему.

В основном все экземпляры типов данных определяются в Scala как:

  * @since 1.3.0
 */
@InterfaceStability.Stable
case object LongType extends LongType
  

Но … во многих местах Spark использует Java-код, который получает типы данных с использованием одиночек:

  * Gets the LongType object.
 */
public static final DataType LongType = LongType$.MODULE$;
  

Это то LongType$.MODULE$; , как вызвать объект case из java land.

Но я сериализовал a DataType в Livy с помощью Kryo, и Kryo повторно инициализирует внутренне LongType$.MODULE$; . В Scala ссылка, которую вы получаете при получении объекта case, привязана не к первому созданному экземпляру, а к последнему созданному экземпляру.

Таким образом, временная шкала:

  • время 0: имело значение 1, также имеет значение 1. DataTypes.LongType LongType (где ref просто указывает на ссылку)
  • время 1: Kryo десериализует и, таким образом, переустанавливает объект. Однако одноэлементные типы данных.LongType указывает на первый экземпляр. ie DataTypes.LongType имел ссылку 1, LongType имеет ссылку 2
  • время> = 2: наступает хаос — типы данных не проходят проверку на равенство.

Решение состоит в том, чтобы не передавать объекты case в Kryo таким образом. Возможно, по какой-то причине мы неправильно используем Kryo, или нам нужно использовать twitter / chill.