#apache-spark #serialization
#apache-spark #сериализация
Вопрос:
Это не работает и жалуется на проблемы с сериализацией:
import java.nio.charset.StandardCharsets
import scala.util.Try
import java.net.URLDecoder
import spark.implicits._
val df = List("http://forum.krasmama.ru/viewforum.php?f=247").toDF("URL")
def parseURL(s: String): String = {
Try(URLDecoder.decode(s, StandardCharsets.UTF_8.name())).toOption.getOrElse(null)
}
val parseURLudf = udf[String, String](parseURL)
val myCond = col("URL").startsWith("http")
val df2 = df.filter(myCond)
val dfWithParsedUrl = df2.withColumn("URL", parseURLudf(col("URL")))
dfWithParsedUrl.show(5, truncate=30)
но если я удалю myCond
переменную и вставлю col("URL").startsWith("http")
непосредственно в filter
нее, это сработает, но почему?
Вот журнал ошибок:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
at org.apache.spark.sql.Dataset.show(Dataset.scala:661)
... 53 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: startswith(URL, http))
- field (class: $iw, name: myCond, type: class org.apache.spark.sql.Column)
- object (class $iw, $iw@720343cf)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[0, string, false]))
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
... 82 more
Кстати, я не знаю, можно ли это воспроизвести локально, потому что в этом случае Spark не обязательно что-либо сериализует? (В случае, если он сериализуется только тогда, когда ему нужно отправить код из драйвера исполнителям, извините, я не знаю подробностей о том, как это работает).
Комментарии:
1. Какое сообщение об исключении сериализации возвращается вам?
2. На первый взгляд, я бы сказал
val myCond = col("URL").startsWith("http")
, используетval
объявление. Вы предполагаете, что получаете логическое значение, но это не так? Каково содержимоеmyCond
переменной и ее тип, если вы печатаете их на экране?3. Тип myCond — столбец. Сообщение об ошибке, которое я добавлю к своему вопросу