Невозможно сгруппировать по данным после flatMap (Spark SQL)

#apache-spark #apache-spark-sql #bigdata

#apache-искра #apache-spark-sql #bigdata

Вопрос:

У меня есть задача с Spark SQL, исходные данные:

  --------- --------- -------------------- -------- ------------ --------- ---------- --------------  |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|  --------- --------- -------------------- -------- ------------ --------- ---------- --------------  | 536365| 85123A|WHITE HANGING HEA...| 6|12/1/10 8:26| 2.55| 17850|United Kingdom| | 536365| 71053| WHITE METAL LANTERN| 6|12/1/10 8:26| 3.39| 17850|United Kingdom| | 536365| 84406B|CREAM CUPID HEART...| 8|12/1/10 8:26| 2.75| 17850|United Kingdom| | 536365| 84029G|KNITTED UNION FLA...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom| | 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom| | 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/10 8:26| 7.65| 17850|United Kingdom| | 536365| 21730|GLASS STAR FROSTE...| 6|12/1/10 8:26| 4.25| 17850|United Kingdom| | 536366| 22633|HAND WARMER UNION...| 6|12/1/10 8:28| 1.85| 17850|United Kingdom| | 536366| 22632|HAND WARMER RED P...| 6|12/1/10 8:28| 1.85| 17850|United Kingdom| | 536367| 84879|ASSORTED COLOUR B...| 32|12/1/10 8:34| 1.69| 13047|United Kingdom| | 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/10 8:34| 2.1| 13047|United Kingdom| | 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/10 8:34| 2.1| 13047|United Kingdom| | 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/10 8:34| 3.75| 13047|United Kingdom| | 536367| 22310|IVORY KNITTED MUG...| 6|12/1/10 8:34| 1.65| 13047|United Kingdom| | 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/10 8:34| 4.25| 13047|United Kingdom| | 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/10 8:34| 4.95| 13047|United Kingdom| | 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/10 8:34| 9.95| 13047|United Kingdom| | 536367| 21754|HOME BUILDING BLO...| 3|12/1/10 8:34| 5.95| 13047|United Kingdom| | 536367| 21755|LOVE BUILDING BLO...| 3|12/1/10 8:34| 5.95| 13047|United Kingdom| | 536367| 21777|RECIPE BOX WITH M...| 4|12/1/10 8:34| 7.95| 13047|United Kingdom|  --------- --------- -------------------- -------- ------------ --------- ---------- --------------   

В своей задаче я хочу подсчитать, какое слово больше всего отображается в поле описания. Поэтому я сделал следующее: используйте flatMap для создания нового фрейма данных из исходного фрейма данных, разделив поле описания пробелами, затем создайте новую таблицу, ниже приведена новая таблица:

  ------ ------- ---  |number| word|lit|  ------ ------- ---  | 0| WHITE| 1| | 1|HANGING| 1| | 2| HEART| 1| | 3|T-LIGHT| 1| | 4| HOLDER| 1| | 5| WHITE| 1| | 6| METAL| 1| | 7|LANTERN| 1| | 8| CREAM| 1| | 9| CUPID| 1| | 10| HEARTS| 1| | 11| COAT| 1| | 12| HANGER| 1| | 13|KNITTED| 1| | 14| UNION| 1| | 15| FLAG| 1| | 16| HOT| 1| | 17| WATER| 1| | 18| BOTTLE| 1| | 19| RED| 1|  ------ ------- ---   

И это мой код:

 SparkSession spark = SparkSession.builder().appName("Part-4").master("local").getOrCreate(); Datasetlt;Rowgt; data = spark.read()  .option("inferSchema", true)  .option("header", true)  .csv("hdfs://localhost:9000/retails.csv");  data.flatMap(new FlatMapFunctionlt;Row, Rowgt;() {  private static final long serialVersionUID = 1L;  private int cnt = 0;    @Override  public Iteratorlt;Rowgt; call(Row r) throws Exception {  Listlt;Stringgt; listItem = Arrays.asList(r.getString(2).split(" "));    Listlt;Rowgt; listItemRow = new ArrayListlt;Rowgt;();  for (String item : listItem) {  listItemRow.add(RowFactory.create(cnt, item, 1));  cnt  ;  }    return listItemRow.iterator();  } }, RowEncoder.apply(new StructType().add("number", "integer").add("word", "string").add("lit", "integer"))).createOrReplaceTempView("data");  spark.sql("select * from data").show();  

I have a problem, that if I group by or perform other complex SQL operations, the program gives an error.

This is my code when group by: spark.sql("select word, count(lit) from data group by word").show();

And this is my error:

 java.lang.NullPointerException  at com.spark.part_4.Main$1.call(Main.java:33)  at com.spark.part_4.Main$1.call(Main.java:27)  at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)  at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)  at org.apache.spark.scheduler.Task.run(Task.scala:131)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)  at java.base/java.lang.Thread.run(Thread.java:829) 21/12/03 00:08:39 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException  at com.spark.part_4.Main$1.call(Main.java:33)  at com.spark.part_4.Main$1.call(Main.java:27)  at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)  at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)  at org.apache.spark.scheduler.Task.run(Task.scala:131)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)  at java.base/java.lang.Thread.run(Thread.java:829)  21/12/03 00:08:39 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job 21/12/03 00:08:39 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool  21/12/03 00:08:39 INFO TaskSchedulerImpl: Cancelling stage 2 21/12/03 00:08:39 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage cancelled 21/12/03 00:08:39 INFO DAGScheduler: ShuffleMapStage 2 (show at Main.java:45) failed in 0.298 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException  at com.spark.part_4.Main$1.call(Main.java:33)  at com.spark.part_4.Main$1.call(Main.java:27)  at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)  at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)  at org.apache.spark.scheduler.Task.run(Task.scala:131)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)  at java.base/java.lang.Thread.run(Thread.java:829)  Driver stacktrace: 21/12/03 00:08:39 INFO DAGScheduler: Job 2 failed: show at Main.java:45, took 0.312624 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException  at com.spark.part_4.Main$1.call(Main.java:33)  at com.spark.part_4.Main$1.call(Main.java:27)  at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)  at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)  at org.apache.spark.scheduler.Task.run(Task.scala:131)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)  at java.base/java.lang.Thread.run(Thread.java:829)  Driver stacktrace:  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)  at scala.Option.foreach(Option.scala:407)  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)  at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)  at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)  at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)  at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)  at org.apache.spark.sql.Dataset.show(Dataset.scala:825)  at org.apache.spark.sql.Dataset.show(Dataset.scala:784)  at org.apache.spark.sql.Dataset.show(Dataset.scala:793)  at com.spark.part_4.Main.main(Main.java:45)  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.base/java.lang.reflect.Method.invoke(Method.java:566)  at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)  at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:951)  at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)  at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)  at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)  at org.apache.spark.deploy.SparkSubmit$anon$2.doSubmit(SparkSubmit.scala:1030)  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NullPointerException  at com.spark.part_4.Main$1.call(Main.java:33)  at com.spark.part_4.Main$1.call(Main.java:27)  at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)  at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)  at org.apache.spark.scheduler.Task.run(Task.scala:131)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)  at java.base/java.lang.Thread.run(Thread.java:829)  

Я надеюсь получить помощь от всех, спасибо…

Ответ №1:

Вы получаете a java.lang.NullPointerException при применении FlatMapFunction , потому что в вашем наборе данных могут быть пустые значения. Похоже, вы используете Description столбец в этом примере.

При пустом значении столбец может быть прочитан как null по spark и следующей строке

 Listlt;Stringgt; listItem = Arrays.asList(r.getString(2).split(" "));  

вероятно, это исключение возникнет при r.getString(2) возврате null , и вы попытаетесь вызвать функцию split по нулевой ссылке.

Вы можете попытаться решить эту проблему, проверив, есть ли null значения перед разделением, например

 data.flatMap(new FlatMapFunctionlt;Row, Rowgt;() {  private static final long serialVersionUID = 1L;  private int cnt = 0;    public Iteratorlt;Rowgt; call(Row r) throws Exception {    Listlt;Rowgt; listItemRow = new ArrayListlt;Rowgt;();  //check if null before splitting here  if(r.getString(2) != null) {   Listlt;Stringgt; listItem = Arrays.asList(r.getString(2).split(" "));  for (String item : listItem) {  listItemRow.add(RowFactory.create(cnt, item, 1));  cnt  ;  }  }    return listItemRow.iterator();  }  }, RowEncoder.apply(  new StructType().add("number", "integer")  .add("word", "string")  .add("lit", "integer")  )).createOrReplaceTempView("data");  

Вы можете просмотреть эти строки со null значениями, используя

 data.where("Description is null").show();  

и аналогичным образом отфильтруйте эти строки перед применением flatMap , например

 data.where("Description is not null")  .flatMap(new FlatMapFunctionlt;Row, Rowgt;() { //continue the rest of your code