#scala #apache-spark #rdd #amazon-emr
#scala #apache-spark #rdd #amazon-emr
Вопрос:
Я использую Spark в AWS EMR и пытаюсь загрузить данные из базы данных в RDD. Однако, как только данные загружены и после очень простого преобразования, я пытаюсь запустить RDD.count() для вновь созданного RDD для вывода количества записей в RDD, и здесь я попадаю в исключение, трассировка стека которого отображается в консоли и в приведенной ниже цитате.
Сначала я получил эту ошибку в Zeppelin, но затем я попробовал в spark-shell (думая, что, возможно, какая-то настройка путей к классам неверна) и все равно получил то же самое. вывод приведен ниже:
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.6-amzn-0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_265)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import java.sql.{Connection, DriverManager, ResultSet}
import java.sql.{Connection, DriverManager, ResultSet}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext
scala>
scala> val verticesTable = "..."
verticesTable: String = ...
scala> val config = new SparkConf().setAppName("Read JDBC Data: " verticesTable)
config: org.apache.spark.SparkConf = org.apache.spark.SparkConf@24f8ff82
scala> config.set("spark.driver.allowMultipleContexts","true")
res0: org.apache.spark.SparkConf = org.apache.spark.SparkConf@24f8ff82
scala> print("Started.......n")
Started.......
scala> // JDBC connection details
scala> val driver = "com.mysql.jdbc.Driver"
driver: String = com.mysql.jdbc.Driver
scala> val url = "jdbc:mysql://...."
url: String = jdbc:mysql://....
scala> val user = "..."
scala> val pass = "..."
scala> // JDBC Connection and load table in Dataframe
scala> val verticesDf = spark.read.format("jdbc").option("driver", driver).option("url", url).option("dbtable", verticesTable).option("user", user).option("password", pass).load()
Sun Oct 18 12:29:55 UTC 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45 , 5.6.26 and 5.7.6 requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
verticesDf: org.apache.spark.sql.DataFrame = [id: int, url: string ... 7 more fields]
scala> verticesDf.printSchema
root
|-- id: integer (nullable = true)
|-- url: string (nullable = true)
|-- a1: string (nullable = true)
|-- batch_id: integer (nullable = true)
|-- x: double (nullable = true)
|-- y: double (nullable = true)
|-- z: double (nullable = true)
|-- size: double (nullable = true)
|-- sR: integer (nullable = true)
scala> verticesDf.cache
res4: verticesDf.type = [id: int, url: string ... 7 more fields]
scala> verticesDf.show
COMMENT: table output goes here - seems like data is loaded and correct
only showing top 20 rows
scala> val vDf = verticesDf.map(
| row=>{
| (
| (row.getAs[Integer](0)).toLong,
| (
| row.getAs[String](1),
| row.getAs[Integer](3),
| row.getAs[Double](4),
| row.getAs[Double](5),
| row.getAs[Double](6),
| row.getAs[Double](7)
| )
| )
|
| })
vDf: org.apache.spark.sql.Dataset[(Long, (String, Integer, Double, Double, Double, Double))] = [_1: bigint, _2: struct<_1: string, _2: int ... 4 more fields>]
scala> vDf.cache
res8: vDf.type = [_1: bigint, _2: struct<_1: string, _2: int ... 4 more fields>]
scala> vDf.count
20/10/18 12:38:13 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, ip-172-31-14-11.ec2.internal, executor 1): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:115)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1164)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Stage 2:> (0 1) / 1]20/10/18 12:38:13 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, ip-172-31-14-11.ec2.internal, executor 1): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:115)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1164)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:146)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$finalPhysicalPlan$1.apply(AdaptiveSparkPlanExec.scala:128)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$finalPhysicalPlan$1.apply(AdaptiveSparkPlanExec.scala:127)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:778)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.finalPhysicalPlan(AdaptiveSparkPlanExec.scala:127)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:134)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2838)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2837)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2837)
... 55 elided
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:115)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1164)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Ответ №1:
Внутри вашего метода map может быть, что строка будет нулевой или любые другие столбцы будут нулевыми, попробуйте обработать null
столбцы внутри вашей map
функции. ниже приведен пример, если ваше row
значение равно нулю.
Измените приведенный ниже код
scala> val vDf = verticesDf.map(
| row=>{
| (
| (row.getAs[Integer](0)).toLong,
| (
| row.getAs[String](1),
| row.getAs[Integer](3),
| row.getAs[Double](4),
| row.getAs[Double](5),
| row.getAs[Double](6),
| row.getAs[Double](7)
| )
| )
|
| })
Для
val vDf = verticesDf.map{ row =>
row match {
case Nil | null => ((null),null,null,null,null,null,null,null)
case _ => {((row.getAs[Integer](0)).toLong,(row.getAs[String](1),row.getAs[Integer](3),row.getAs[Double](4),row.getAs[Double](5),row.getAs[Double](6),row.getAs[Double](7)))}
}
}