Исключение SparkException:- java.lang.Исключение NullPointerException

#scala #apache-spark #nullpointerexception #databricks

Вопрос:

Фрейм данных состоит из двух столбцов (s3ObjectName, batchName) с десятками тысяч строк, таких как:-

Имя объекта S3Object Имя пакета
a1.json 45
b2.json 45
c3.json 45
d4.json 46
e5.json 46

Цель состоит в том, чтобы извлекать объекты из корзины S3 и записывать в datalake, используя два столбца из каждой строки в фрейме данных, используя функции foreachPartition() и foreach()

      // s3 connector details defined as an object so it can be serialized and 
     //available on all executors in the cluster

object container1 {
  
  def getDataSource1() = {
    val AccessKey = dbutils.secrets.get(scope = "My_Scope", key = "AccessKey-ID")
    val SecretKey = dbutils.secrets.get(scope = "My_Scope", key = "AccessKey-Secret")
    val creds = new BasicAWSCredentials(AccessKey, SecretKey)
    val clientRegion: Regions = Regions.US_EAST_1
    AmazonS3ClientBuilder.standard()
    .withRegion(clientRegion)
    .withCredentials(new AWSStaticCredentialsProvider(creds))
    .build()
    
  }
}

dataframe.rdd.foreachPartition(partition => {
      //Initialize s3 connection for each partition
      val client: AmazonS3 = container1.getDataSource1()
      partition.foreach(row => {
        val s3ObjectName = row.getString(0)
        val batchname = row.getString(1)
        val inputS3Stream = client.getObject("s3bucketname", s3ObjectName).getObjectContent
        val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
        val filePath = s"/dbfs/mnt/test/${batchname}/${s3ObjectName}"
        val file = new File(filePath)
        val fileWriter = new FileWriter(file)
        val bw = new BufferedWriter(fileWriter)
        bw.write(inputS3String)
        bw.close()
        fileWriter.close()
        })
      })  
 

Описанный выше процесс выдает ошибку:- java.lang.Исключение NullPointerException

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.databricks.dbutils_v1.DBUtilsHolder$anon$1.invoke(DBUtilsHolder.scala:17)
    at com.sun.proxy.$Proxy28.secrets(Unknown Source)
    at com.passionbytes1.container1$.getDataSource1(<notebook>:15)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$4(command-4241228593676676:37)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$4$adapted(command-4241228593676676:36)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2514)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2783)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2730)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2724)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2724)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1260)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1260)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1260)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2991)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2932)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2920)
    at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1033)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2474)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2457)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2495)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2514)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2539)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1025)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:419)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1023)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$3(command-4241228593676676:36)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa45.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$retry$1(command-4241228593676670:4)
    at scala.util.Try$.apply(Try.scala:213)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa45.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.retry(command-4241228593676670:4)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa45.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.retry(command-4241228593676670:37)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$2(command-4241228593676676:10)
    at scala.util.Try$.apply(Try.scala:213)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$1(command-4241228593676676:10)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$1$adapted(command-4241228593676676:8)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:38)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:608)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:269)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:606)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:606)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:244)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:649)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:241)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:210)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:204)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:341)
    at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:268)
Caused by: java.lang.NullPointerException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.databricks.dbutils_v1.DBUtilsHolder$anon$1.invoke(DBUtilsHolder.scala:17)
    at com.sun.proxy.$Proxy28.secrets(Unknown Source)
    at com.passionbytes1.container1$.getDataSource1(<notebook>:15)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$4(command-4241228593676676:37)
    at $line4e7b3acc6aca4dc28ec58b4a8c0d68aa71.$read$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw.$anonfun$tripsummarywriter$4$adapted(command-4241228593676676:36)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2514)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)