#scala #apache-spark #apache-spark-sql #spark-structured-streaming
Вопрос:
Я получаю ошибку ниже при записи структурированного потокового фрейма данных spark — пожалуйста, скажите мне, где я ошибаюсь при запуске этого кода-
здесь df читает из s3://abc/место тестирования, и я записываю этот фрейм данных в другое место s3 с помощью потоковой передачи spark-
val q = df .writeStream
.trigger(Trigger.Once)
.option("checkpointLocation", "s3://abc/checkpoint")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF
.write
.mode(SaveMode.Append)
.parquet("s3://abc/demo")
}.start()
q.processAllAvailable()
q.stop()
при запуске выше кода я получаю ошибку ниже —
org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
=== Streaming Query ===
Identifier: [id = 82cae180-6190-499a-99ae, runId = 23aa9dca-c6ef-49ff-b860]
Current Committed Offsets: {}
Current Available Offsets: {FileStreamSource[s3://abc/testing]: {"logOffset":0}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
FileStreamSource[s3://abc/testing]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:379)
at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:269)
Caused by: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:230)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:116)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:114)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:139)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:200)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$3(SparkPlan.scala:252)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:999)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:999)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:437)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:421)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:294)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:884)
at line7d42fe70c8664871b443fdc5f6bbc35869.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$withCreateExtract$5(command-3858326:61)
at line7d42fe70c8664871b443fdc5f6bbc35869.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$withCreateExtract$5$adapted(command-3858326:56)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:39)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:593)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:591)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:74)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:591)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:231)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:74)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:199)
at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:193)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:358)
at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:269)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 31 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB.
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2339)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2434)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:273)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:308)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:480)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:401)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:127)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:308)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:308)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:307)
at org.apache.spark.sql.execution.SQLExecution$.withOptimisticTransaction(SQLExecution.scala:325)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:306)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Комментарии:
1. Ну, учитывая, что вы пишете в S3, могут возникнуть проблемы с подключением, вы можете это проверить. Но мне кажется, что основное сообщение об исключении отсутствует, вы уверены, что опубликовали всю трассировку стека?
2. @Filip Я добавил целое сообщение об ошибке, если вы можете мне помочь
3. Если вы нашли мой ответ полезным, пожалуйста, подумайте о том, чтобы принять его, поставив галочку рядом с ответом.
Ответ №1:
Total size of serialized results of 31 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB.
означает, что, когда исполнитель пытается отправить свой результат драйверу, он превышает spark.driver.maxResultSize
. Вы можете решить эту проблему, увеличивая ее до тех пор, пока она не заработает, но это не рекомендация, если исполнитель пытается отправить слишком много данных.
Другая вещь, которая может вызвать это, заключается в том, что данные искажены, вы должны проверить, как данные распределяются по рабочим узлам, возможный сценарий заключается в том, что все данные оказываются на одном узле, что приводит к огромному вводу/выводу данных от одного работника. В этом случае вы можете попытаться перераспределить свои данные, чтобы разделить нагрузку между вашими работниками, что будет гораздо лучшим решением, чем увеличение лимита.