#java #scala #apache-spark
#java #scala #apache-spark
Вопрос:
def URLEnc(input: String): String = {
URLEncoder.encode(input, "UTF-8")
}
val URLEncUDF: UserDefinedFunction = udf(URLEnc(_: String))
val file = spark.read.format("xml")
.option("rootTag", "channel").option("rowTag", "item")
.load("path")
where file is of xml format
val file1 = file.withColumn("description", URLEncUDF(col("g:description")))
журналы выглядят следующим образом:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:434)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at org.apache.spark.sql.Dataset.show(Dataset.scala:826)
at org.apache.spark.sql.Dataset.show(Dataset.scala:803)
at AIFeed.<init>(AIFeed.scala:16)
at AIFeed$.delayedEndpoint$AIFeed$1(AIFeed.scala:113)
at AIFeed$delayedInit$body.apply(AIFeed.scala:112)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at AIFeed$.main(AIFeed.scala:112)
at AIFeed.main(AIFeed.scala)
Caused by: java.io.NotSerializableException: AIFeed
Serialization stack:
- object not serializable (class: AIFeed, value: AIFeed@5bccef9f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class FeedFunction, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic FeedFunction.$anonfun$URLEncUDF$1:(LFeedFunction;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class FeedFunction$$Lambda$275/1443173326, FeedFunction$$Lambda$275/1443173326@51e94b7d)
- element of array (index: 5)
- array (class [Ljava.lang.Object;, size 6)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2116/996471089, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2116/996471089@565a6af)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
... 45 more
20/12/16 17:55:15 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.4:34511 in memory (size: 2.9 KiB, free: 1407.3 MiB)
20/12/16 17:55:15 INFO SparkContext: Invoking stop() from shutdown hook
20/12/16 17:55:15 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.4:34511 in memory (size: 23.7 KiB, free: 1407.3 MiB)
20/12/16 17:55:15 INFO SparkUI: Stopped Spark web UI at http://192.168.1.4:4040
20/12/16 17:55:15 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/12/16 17:55:15 INFO MemoryStore: MemoryStore cleared
20/12/16 17:55:15 INFO BlockManager: BlockManager stopped
20/12/16 17:55:15 INFO BlockManagerMaster: BlockManagerMaster stopped
20/12/16 17:55:15 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/12/16 17:55:15 INFO SparkContext: Successfully stopped SparkContext
20/12/16 17:55:15 INFO ShutdownHookManager: Shutdown hook called
Ответ №1:
Из трассировки стека видно, что Spark не может выполнить сериализацию AIFeed
, которая, предположительно, является классом, который содержит опубликованный вами код.
Это потому, что ваш UDF полагается на URLEnc
, который является методом этого класса и, следовательно, имеет ссылку на окружающий класс. Spark пытается сериализовать весь ваш класс драйверов, который не является сериализуемым. Иногда вы можете просто пометить класс, serializable
чтобы исправить это.
Обычно вы можете исправить это проще, создав URLEnc
функцию, а не метод, например, вместо:
def someMeth(a:Int): Int = a 1
использовать:
val someFunc = (a: Int) => a 1