#apache-spark #mongodb-hadoop
#apache-spark #mongodb-hadoop
Вопрос:
Я пытаюсь сопоставить данные из соединителя mongodb-hadoop внутри приложения spark. У меня нет других ошибок до этой, поэтому я предполагаю, что подключение к mongodb было успешным. я использую следующий код для сопоставления:
JavaRDD<AppLog> logs = documents.map(
new Function<Tuple2<Object, BSONObject>, AppLog>() {
public AppLog call(final Tuple2<Object, BSONObject> tuple) {
AppLog log = new AppLog();
BSONObject header =
(BSONObject) tuple._2().get("headers");
log.setTarget((String) header.get("target"));
log.setAction((String) header.get("action"));
return log;
}
}
);
При этом происходит сбой кода:
16/10/12 19:42:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64)
at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57)
at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$anonfun$executePartition$1(TungstenAggregate.scala:110)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/10/12 19:42:31 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64)
at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57)
at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$anonfun$executePartition$1(TungstenAggregate.scala:110)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/10/12 19:42:31 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/10/12 19:42:31 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/10/12 19:42:31 INFO TaskSchedulerImpl: Cancelling stage 0
16/10/12 19:42:31 INFO DAGScheduler: ShuffleMapStage 0 (show at DataframeExample.java:83) failed in 1.704 s
16/10/12 19:42:31 INFO DAGScheduler: Job 0 failed: show at DataframeExample.java:83, took 2.370636 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64)
at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57)
at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$anonfun$executePartition$1(TungstenAggregate.scala:110)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1280)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
at org.apache.spark.sql.DataFrame$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.DataFrame$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
at com.hbfinance.DataframeExample.run(DataframeExample.java:83)
at com.hbfinance.DataframeExample.main(DataframeExample.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64)
at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57)
at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$anonfun$executePartition$1(TungstenAggregate.scala:110)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$anonfun$doExecute$1$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/10/12 19:42:31 INFO SparkContext: Invoking stop() from shutdown hook
16/10/12 19:42:31 INFO SparkUI: Stopped Spark web UI at http://178.62.18.22:4040
16/10/12 19:42:31 INFO DAGScheduler: Stopping DAGScheduler
16/10/12 19:42:31 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/10/12 19:42:31 INFO MemoryStore: MemoryStore cleared
16/10/12 19:42:31 INFO BlockManager: BlockManager stopped
16/10/12 19:42:31 INFO BlockManagerMaster: BlockManagerMaster stopped
16/10/12 19:42:31 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/10/12 19:42:31 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/10/12 19:42:31 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/10/12 19:42:31 INFO SparkContext: Successfully stopped SparkContext
16/10/12 19:42:31 INFO ShutdownHookManager: Shutdown hook called
16/10/12 19:42:31 INFO ShutdownHookManager: Deleting directory /tmp/spark-c9ba1976-0c79-4a43-b261-1e7c98139a6e
Ответ №1:
Одна из переменных, используемых в вашем классе mapper, имеет значение null.
Что может быть нулевым?
tuple
параметр- второй элемент кортежа
- или второй элемент кортежа не имеет
headers
объекта внутри
У нас нет ваших тестовых данных, вы должны сами проверить, что может быть нулевым. Проблема не в Spark, а в вашем внутреннем коде.
Просто отладьте свое приложение или выполните блоки if-else, и вы обнаружите, какое значение равно null. Если это проблема с параметром кортежа, вы можете сделать documents.filter (x -> x != null).map (...)
, чтобы отфильтровать все обнуляемые значения из RDD
Комментарии:
1. спасибо, сэр, вы легенда. я предположил, что из примера требуются заголовки, извлек .get(«заголовки»), и это сработало