#scala #apache-spark #databricks
#scala #apache-spark #databricks
Вопрос:
Я начинаю использовать Spark с Scala в записной книжке dataBricks, однако у меня странная ошибка:
SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: t020101)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@1ccc6944)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
...
Код работает нормально, когда я выполняю функцию round непосредственно для значений:
def timeUsageGroupedRound(summed: Dataset[TimeUsageRow]): Dataset[TimeUsageRow] = {
summed.map{
case TimeUsageRow(working, sex, age, primaryNeeds, work, other) =>
TimeUsageRow(working, sex, age, (primaryNeeds* 10).round / 10d, (work* 10).round / 10d, (other* 10).round / 10d)
}
}
val time_Usage_Round_DS = timeUsageGroupedRound(time_Usage_Grouped_DS)
display(time_Usage_Round_DS)
Но, когда я пытаюсь выполнить вспомогательную функцию, я получил ошибку, упомянутую выше:
def timeUsageGroupedRound(summed: Dataset[TimeUsageRow]): Dataset[TimeUsageRow] = {
def round1(d:Double):Double = (d * 10).round / 10d
summed.map{
case TimeUsageRow(working, sex, age, primaryNeeds, work, other) =>
TimeUsageRow(working, sex, age, round1(primaryNeeds), round1(work), round1(other))
}
}
val time_Usage_Round_DS = timeUsageGroupedRound(time_Usage_Grouped_DS)
display(time_Usage_Round_DS)
Кто-нибудь может объяснить, почему это происходит? Большое спасибо!
Ответ №1:
Короткий ответ 1:
Переместите объект round1
из вашего класса в объект (возможно, используйте сопутствующий объект https://docs.scala-lang.org/tour/singleton-objects.html ).
Короткий ответ 2:
В качестве альтернативы переместите все, что не Serializable
находится за пределами вашего класса (см. Длинный ответ) — хотя это может быть болезненным в зависимости от размера класса.
Длинный ответ:
Это интересный вопрос, который несколько раз сбивал меня с толку в прошлом. Во-первых, когда вы выполняете .map в Dataset / DataFrame, под капотом происходит то, что все внутри карты — в вашем случае:
case TimeUsageRow(working, sex, age, primaryNeeds, work, other) =>
TimeUsageRow(working, sex, age, round1(primaryNeeds), round1(work), round1(other))
упаковывается и отправляется из драйвера исполнителям. Из-за способа, которым Spark обменивается данными между драйвером и исполнителем, все, что вы отправляете, должно быть Serializable
. Эта ошибка возникает из-за того, что при round1
включении она также перетаскивает с собой остальную часть класса, и если в классе есть что-то, чего нет Serializable
, тогда возникает эта ошибка.