#apache-spark #apache-spark-sql #bigdata
#apache-искра #apache-spark-sql #bigdata
Вопрос:
У меня есть задача с Spark SQL, исходные данные:
--------- --------- -------------------- -------- ------------ --------- ---------- -------------- |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country| --------- --------- -------------------- -------- ------------ --------- ---------- -------------- | 536365| 85123A|WHITE HANGING HEA...| 6|12/1/10 8:26| 2.55| 17850|United Kingdom| | 536365| 71053| WHITE METAL LANTERN| 6|12/1/10 8:26| 3.39| 17850|United Kingdom| | 536365| 84406B|CREAM CUPID HEART...| 8|12/1/10 8:26| 2.75| 17850|United Kingdom| | 536365| 84029G|KNITTED UNION FLA...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom| | 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom| | 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/10 8:26| 7.65| 17850|United Kingdom| | 536365| 21730|GLASS STAR FROSTE...| 6|12/1/10 8:26| 4.25| 17850|United Kingdom| | 536366| 22633|HAND WARMER UNION...| 6|12/1/10 8:28| 1.85| 17850|United Kingdom| | 536366| 22632|HAND WARMER RED P...| 6|12/1/10 8:28| 1.85| 17850|United Kingdom| | 536367| 84879|ASSORTED COLOUR B...| 32|12/1/10 8:34| 1.69| 13047|United Kingdom| | 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/10 8:34| 2.1| 13047|United Kingdom| | 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/10 8:34| 2.1| 13047|United Kingdom| | 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/10 8:34| 3.75| 13047|United Kingdom| | 536367| 22310|IVORY KNITTED MUG...| 6|12/1/10 8:34| 1.65| 13047|United Kingdom| | 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/10 8:34| 4.25| 13047|United Kingdom| | 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/10 8:34| 4.95| 13047|United Kingdom| | 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/10 8:34| 9.95| 13047|United Kingdom| | 536367| 21754|HOME BUILDING BLO...| 3|12/1/10 8:34| 5.95| 13047|United Kingdom| | 536367| 21755|LOVE BUILDING BLO...| 3|12/1/10 8:34| 5.95| 13047|United Kingdom| | 536367| 21777|RECIPE BOX WITH M...| 4|12/1/10 8:34| 7.95| 13047|United Kingdom| --------- --------- -------------------- -------- ------------ --------- ---------- --------------
В своей задаче я хочу подсчитать, какое слово больше всего отображается в поле описания. Поэтому я сделал следующее: используйте flatMap для создания нового фрейма данных из исходного фрейма данных, разделив поле описания пробелами, затем создайте новую таблицу, ниже приведена новая таблица:
------ ------- --- |number| word|lit| ------ ------- --- | 0| WHITE| 1| | 1|HANGING| 1| | 2| HEART| 1| | 3|T-LIGHT| 1| | 4| HOLDER| 1| | 5| WHITE| 1| | 6| METAL| 1| | 7|LANTERN| 1| | 8| CREAM| 1| | 9| CUPID| 1| | 10| HEARTS| 1| | 11| COAT| 1| | 12| HANGER| 1| | 13|KNITTED| 1| | 14| UNION| 1| | 15| FLAG| 1| | 16| HOT| 1| | 17| WATER| 1| | 18| BOTTLE| 1| | 19| RED| 1| ------ ------- ---
И это мой код:
SparkSession spark = SparkSession.builder().appName("Part-4").master("local").getOrCreate(); Datasetlt;Rowgt; data = spark.read() .option("inferSchema", true) .option("header", true) .csv("hdfs://localhost:9000/retails.csv"); data.flatMap(new FlatMapFunctionlt;Row, Rowgt;() { private static final long serialVersionUID = 1L; private int cnt = 0; @Override public Iteratorlt;Rowgt; call(Row r) throws Exception { Listlt;Stringgt; listItem = Arrays.asList(r.getString(2).split(" ")); Listlt;Rowgt; listItemRow = new ArrayListlt;Rowgt;(); for (String item : listItem) { listItemRow.add(RowFactory.create(cnt, item, 1)); cnt ; } return listItemRow.iterator(); } }, RowEncoder.apply(new StructType().add("number", "integer").add("word", "string").add("lit", "integer"))).createOrReplaceTempView("data"); spark.sql("select * from data").show();
I have a problem, that if I group by or perform other complex SQL operations, the program gives an error.
This is my code when group by: spark.sql("select word, count(lit) from data group by word").show();
And this is my error:
java.lang.NullPointerException at com.spark.part_4.Main$1.call(Main.java:33) at com.spark.part_4.Main$1.call(Main.java:27) at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 21/12/03 00:08:39 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException at com.spark.part_4.Main$1.call(Main.java:33) at com.spark.part_4.Main$1.call(Main.java:27) at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 21/12/03 00:08:39 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job 21/12/03 00:08:39 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 21/12/03 00:08:39 INFO TaskSchedulerImpl: Cancelling stage 2 21/12/03 00:08:39 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage cancelled 21/12/03 00:08:39 INFO DAGScheduler: ShuffleMapStage 2 (show at Main.java:45) failed in 0.298 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException at com.spark.part_4.Main$1.call(Main.java:33) at com.spark.part_4.Main$1.call(Main.java:27) at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Driver stacktrace: 21/12/03 00:08:39 INFO DAGScheduler: Job 2 failed: show at Main.java:45, took 0.312624 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException at com.spark.part_4.Main$1.call(Main.java:33) at com.spark.part_4.Main$1.call(Main.java:27) at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685) at org.apache.spark.sql.Dataset.head(Dataset.scala:2722) at org.apache.spark.sql.Dataset.take(Dataset.scala:2929) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301) at org.apache.spark.sql.Dataset.showString(Dataset.scala:338) at org.apache.spark.sql.Dataset.show(Dataset.scala:825) at org.apache.spark.sql.Dataset.show(Dataset.scala:784) at org.apache.spark.sql.Dataset.show(Dataset.scala:793) at com.spark.part_4.Main.main(Main.java:45) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$anon$2.doSubmit(SparkSubmit.scala:1030) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NullPointerException at com.spark.part_4.Main$1.call(Main.java:33) at com.spark.part_4.Main$1.call(Main.java:27) at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
Я надеюсь получить помощь от всех, спасибо…
Ответ №1:
Вы получаете a java.lang.NullPointerException
при применении FlatMapFunction
, потому что в вашем наборе данных могут быть пустые значения. Похоже, вы используете Description
столбец в этом примере.
При пустом значении столбец может быть прочитан как null
по spark и следующей строке
Listlt;Stringgt; listItem = Arrays.asList(r.getString(2).split(" "));
вероятно, это исключение возникнет при r.getString(2)
возврате null
, и вы попытаетесь вызвать функцию split
по нулевой ссылке.
Вы можете попытаться решить эту проблему, проверив, есть ли null
значения перед разделением, например
data.flatMap(new FlatMapFunctionlt;Row, Rowgt;() { private static final long serialVersionUID = 1L; private int cnt = 0; public Iteratorlt;Rowgt; call(Row r) throws Exception { Listlt;Rowgt; listItemRow = new ArrayListlt;Rowgt;(); //check if null before splitting here if(r.getString(2) != null) { Listlt;Stringgt; listItem = Arrays.asList(r.getString(2).split(" ")); for (String item : listItem) { listItemRow.add(RowFactory.create(cnt, item, 1)); cnt ; } } return listItemRow.iterator(); } }, RowEncoder.apply( new StructType().add("number", "integer") .add("word", "string") .add("lit", "integer") )).createOrReplaceTempView("data");
Вы можете просмотреть эти строки со null
значениями, используя
data.where("Description is null").show();
и аналогичным образом отфильтруйте эти строки перед применением flatMap
, например
data.where("Description is not null") .flatMap(new FlatMapFunctionlt;Row, Rowgt;() { //continue the rest of your code