Соединитель Spark Mongo Hadoop не отображает данные

#apache-spark #mongodb-hadoop

#apache-spark #mongodb-hadoop

Вопрос:

Я пытаюсь сопоставить данные из соединителя mongodb-hadoop внутри приложения spark. У меня нет других ошибок до этой, поэтому я предполагаю, что подключение к mongodb было успешным. я использую следующий код для сопоставления:

 JavaRDD<AppLog> logs = documents.map(

  new Function<Tuple2<Object, BSONObject>, AppLog>() {

      public AppLog call(final Tuple2<Object, BSONObject> tuple) {
          AppLog log = new AppLog();
          BSONObject header =
            (BSONObject) tuple._2().get("headers");

          log.setTarget((String) header.get("target"));
          log.setAction((String) header.get("action"));

          return log;
      }
  }
);
 

При этом происходит сбой кода:

 16/10/12 19:42:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64)
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57)
    at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$anonfun$executePartition$1(TungstenAggregate.scala:110)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/10/12 19:42:31 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64)
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57)
    at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$anonfun$executePartition$1(TungstenAggregate.scala:110)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

16/10/12 19:42:31 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/10/12 19:42:31 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/10/12 19:42:31 INFO TaskSchedulerImpl: Cancelling stage 0
16/10/12 19:42:31 INFO DAGScheduler: ShuffleMapStage 0 (show at DataframeExample.java:83) failed in 1.704 s
16/10/12 19:42:31 INFO DAGScheduler: Job 0 failed: show at DataframeExample.java:83, took 2.370636 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64)
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57)
    at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$anonfun$executePartition$1(TungstenAggregate.scala:110)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1280)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
    at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
    at org.apache.spark.sql.DataFrame$anonfun$collect$1.apply(DataFrame.scala:1386)
    at org.apache.spark.sql.DataFrame$anonfun$collect$1.apply(DataFrame.scala:1386)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
    at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
    at com.hbfinance.DataframeExample.run(DataframeExample.java:83)
    at com.hbfinance.DataframeExample.main(DataframeExample.java:89)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:672)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64)
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57)
    at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$anonfun$executePartition$1(TungstenAggregate.scala:110)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/10/12 19:42:31 INFO SparkContext: Invoking stop() from shutdown hook
16/10/12 19:42:31 INFO SparkUI: Stopped Spark web UI at http://178.62.18.22:4040
16/10/12 19:42:31 INFO DAGScheduler: Stopping DAGScheduler
16/10/12 19:42:31 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/10/12 19:42:31 INFO MemoryStore: MemoryStore cleared
16/10/12 19:42:31 INFO BlockManager: BlockManager stopped
16/10/12 19:42:31 INFO BlockManagerMaster: BlockManagerMaster stopped
16/10/12 19:42:31 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/10/12 19:42:31 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/10/12 19:42:31 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/10/12 19:42:31 INFO SparkContext: Successfully stopped SparkContext
16/10/12 19:42:31 INFO ShutdownHookManager: Shutdown hook called
16/10/12 19:42:31 INFO ShutdownHookManager: Deleting directory /tmp/spark-c9ba1976-0c79-4a43-b261-1e7c98139a6e
 

Ответ №1:

Одна из переменных, используемых в вашем классе mapper, имеет значение null.

Что может быть нулевым?

  • tuple параметр
  • второй элемент кортежа
  • или второй элемент кортежа не имеет headers объекта внутри

У нас нет ваших тестовых данных, вы должны сами проверить, что может быть нулевым. Проблема не в Spark, а в вашем внутреннем коде.

Просто отладьте свое приложение или выполните блоки if-else, и вы обнаружите, какое значение равно null. Если это проблема с параметром кортежа, вы можете сделать documents.filter (x -> x != null).map (...) , чтобы отфильтровать все обнуляемые значения из RDD

Комментарии:

1. спасибо, сэр, вы легенда. я предположил, что из примера требуются заголовки, извлек .get(«заголовки»), и это сработало