Как исправить ошибку, применяющую функцию round в столбцах набора данных (исключение SparkException: задача не сериализуема)

#scala #apache-spark #databricks

#scala #apache-spark #databricks

Вопрос:

Я начинаю использовать Spark с Scala в записной книжке dataBricks, однако у меня странная ошибка:

  SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.Column
 Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: t020101)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@1ccc6944)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
 ...
  

Код работает нормально, когда я выполняю функцию round непосредственно для значений:

  def timeUsageGroupedRound(summed: Dataset[TimeUsageRow]): Dataset[TimeUsageRow] = {

  summed.map{
       case TimeUsageRow(working, sex, age, primaryNeeds, work, other) => 
       TimeUsageRow(working, sex, age, (primaryNeeds* 10).round / 10d, (work* 10).round / 10d, (other* 10).round / 10d)
     }
   }

 val time_Usage_Round_DS = timeUsageGroupedRound(time_Usage_Grouped_DS)
 display(time_Usage_Round_DS)
  

Но, когда я пытаюсь выполнить вспомогательную функцию, я получил ошибку, упомянутую выше:

  def timeUsageGroupedRound(summed: Dataset[TimeUsageRow]): Dataset[TimeUsageRow] = {

  def round1(d:Double):Double = (d * 10).round / 10d

  summed.map{
       case TimeUsageRow(working, sex, age, primaryNeeds, work, other) => 
       TimeUsageRow(working, sex, age, round1(primaryNeeds), round1(work), round1(other))
     }
   }
 val time_Usage_Round_DS = timeUsageGroupedRound(time_Usage_Grouped_DS)
 display(time_Usage_Round_DS)
  

Кто-нибудь может объяснить, почему это происходит? Большое спасибо!

Ответ №1:

Короткий ответ 1:

Переместите объект round1 из вашего класса в объект (возможно, используйте сопутствующий объект https://docs.scala-lang.org/tour/singleton-objects.html ).

Короткий ответ 2:

В качестве альтернативы переместите все, что не Serializable находится за пределами вашего класса (см. Длинный ответ) — хотя это может быть болезненным в зависимости от размера класса.

Длинный ответ:

Это интересный вопрос, который несколько раз сбивал меня с толку в прошлом. Во-первых, когда вы выполняете .map в Dataset / DataFrame, под капотом происходит то, что все внутри карты — в вашем случае:

 case TimeUsageRow(working, sex, age, primaryNeeds, work, other) => 
   TimeUsageRow(working, sex, age, round1(primaryNeeds), round1(work), round1(other))
  

упаковывается и отправляется из драйвера исполнителям. Из-за способа, которым Spark обменивается данными между драйвером и исполнителем, все, что вы отправляете, должно быть Serializable . Эта ошибка возникает из-за того, что при round1 включении она также перетаскивает с собой остальную часть класса, и если в классе есть что-то, чего нет Serializable , тогда возникает эта ошибка.