#scala #apache-spark #nullpointerexception #databricks
Вопрос:
Фрейм данных состоит из двух столбцов (s3ObjectName, batchName) с десятками тысяч строк, таких как:-
Имя объекта S3Object | Имя пакета |
---|---|
a1.json | 45 |
b2.json | 45 |
c3.json | 45 |
d4.json | 46 |
e5.json | 46 |
Цель состоит в том, чтобы извлекать объекты из корзины S3 и записывать в datalake, используя два столбца из каждой строки в фрейме данных, используя функции foreachPartition() и foreach()
// s3 connector details defined as an object so it can be serialized and
//available on all executors in the cluster
object container1 {
def getDataSource1() = {
val AccessKey = dbutils.secrets.get(scope = "My_Scope", key = "AccessKey-ID")
val SecretKey = dbutils.secrets.get(scope = "My_Scope", key = "AccessKey-Secret")
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1
AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
}
}
dataframe.rdd.foreachPartition(partition => {
//Initialize s3 connection for each partition
val client: AmazonS3 = container1.getDataSource1()
partition.foreach(row => {
val s3ObjectName = row.getString(0)
val batchname = row.getString(1)
val inputS3Stream = client.getObject("s3bucketname", s3ObjectName).getObjectContent
val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
val filePath = s"/dbfs/mnt/test/${batchname}/${s3ObjectName}"
val file = new File(filePath)
val fileWriter = new FileWriter(file)
val bw = new BufferedWriter(fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
})
})
Описанный выше процесс выдает ошибку:- java.lang.Исключение NullPointerException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.databricks.dbutils_v1.DBUtilsHolder$anon$1.invoke(DBUtilsHolder.scala:17)
at com.sun.proxy.$Proxy28.secrets(Unknown Source)
at com.passionbytes1.container1$.getDataSource1(<notebook>:15)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$4(command-4241228593676676:37)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$4$adapted(command-4241228593676676:36)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2514)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2783)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2730)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2724)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2724)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1260)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1260)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1260)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2991)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2932)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2920)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1033)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2474)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2457)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2495)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2514)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2539)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1025)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:419)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1023)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$3(command-4241228593676676:36)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa45.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$retry$1(command-4241228593676670:4)
at scala.util.Try$.apply(Try.scala:213)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa45.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.retry(command-4241228593676670:4)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa45.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.retry(command-4241228593676670:37)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$2(command-4241228593676676:10)
at scala.util.Try$.apply(Try.scala:213)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$1(command-4241228593676676:10)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$1$adapted(command-4241228593676676:8)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:38)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:608)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:269)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:219)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:606)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:606)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:244)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:649)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:241)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:210)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:204)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:268)
Caused by: java.lang.NullPointerException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.databricks.dbutils_v1.DBUtilsHolder$anon$1.invoke(DBUtilsHolder.scala:17)
at com.sun.proxy.$Proxy28.secrets(Unknown Source)
at com.passionbytes1.container1$.getDataSource1(<notebook>:15)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$4(command-4241228593676676:37)
at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$4$adapted(command-4241228593676676:36)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2514)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)