Spark: задача не сериализуема странная ошибка

#apache-spark #serialization

#apache-spark #сериализация

Вопрос:

Это не работает и жалуется на проблемы с сериализацией:

 import java.nio.charset.StandardCharsets
import scala.util.Try
import java.net.URLDecoder

import spark.implicits._

val df = List("http://forum.krasmama.ru/viewforum.php?f=247").toDF("URL")

def parseURL(s: String): String = {
    Try(URLDecoder.decode(s, StandardCharsets.UTF_8.name())).toOption.getOrElse(null)
}

val parseURLudf = udf[String, String](parseURL)

val myCond = col("URL").startsWith("http")
val df2 = df.filter(myCond)
val dfWithParsedUrl = df2.withColumn("URL", parseURLudf(col("URL")))
dfWithParsedUrl.show(5, truncate=30)
  

но если я удалю myCond переменную и вставлю col("URL").startsWith("http") непосредственно в filter нее, это сработает, но почему?

Вот журнал ошибок:

 org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:661)
  ... 53 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.Column, value: startswith(URL, http))
    - field (class: $iw, name: myCond, type: class org.apache.spark.sql.Column)
    - object (class $iw, $iw@720343cf)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[0, string, false]))
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
  ... 82 more
  

Кстати, я не знаю, можно ли это воспроизвести локально, потому что в этом случае Spark не обязательно что-либо сериализует? (В случае, если он сериализуется только тогда, когда ему нужно отправить код из драйвера исполнителям, извините, я не знаю подробностей о том, как это работает).

Комментарии:

1. Какое сообщение об исключении сериализации возвращается вам?

2. На первый взгляд, я бы сказал val myCond = col("URL").startsWith("http") , использует val объявление. Вы предполагаете, что получаете логическое значение, но это не так? Каково содержимое myCond переменной и ее тип, если вы печатаете их на экране?

3. Тип myCond — столбец. Сообщение об ошибке, которое я добавлю к своему вопросу