Ошибка при повторении кадра данных с использованием функции Java Spark foreach

#java #dataframe #apache-spark #foreach #dataset

Вопрос:

отредактированный

Я пытаюсь выполнить итерацию через фрейм данных, чтобы создать еще один. В этом примере я не использую данные из первого, это просто для того, чтобы показать, что я пытаюсь сделать. Однако идея состоит в том, чтобы использовать первый для создания нового, намного большего размера, на основе данных из первого.

Что бы я ни пробовал в функции void, я всегда получаю ошибку в foreach .

Пример кадра данных для итерации:

 Dataset<Row> obtencionRents = spark.createDataFrame(Arrays.asList(
        new testRentabilidades("0000A0","PORTAL","4-ANUAL","asdasd","asdasd"),
        new testRentabilidades("00A00","PORTAL","","asdasd","sdasd"),
        new testRentabilidades("00A","PORTAL","4-ANUAL","asdasd","asdasd")
), testRentabilidades.class);
 

Функция Foreach для итерации образца фрейма данных:

 obtencionRents.toJavaRDD().foreach(new VoidFunction<Row>() {
        public void call(Row r) throws Exception {

           //add registers to new collection/arraylist/etc.
        }
    });
 

Ошибка, которую я получил:

 Driver stacktrace:
2021-11-03 17:34:41 INFO  DAGScheduler:54 - Job 0 failed: foreach at CargarRentabilidades.java:154, took 0,812094 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:419)
    at batchload.proceso.builder.CargarRentabilidades$1.call(CargarRentabilidades.java:157)
    at batchload.proceso.builder.CargarRentabilidades$1.call(CargarRentabilidades.java:154)
    at org.apache.spark.api.java.JavaRDDLike$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
    at org.apache.spark.api.java.JavaRDDLike$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$28.apply(RDD.scala:921)
    at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$28.apply(RDD.scala:921)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
    at org.apache.spark.rdd.RDD$anonfun$foreach$1.apply(RDD.scala:921)
    at org.apache.spark.rdd.RDD$anonfun$foreach$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:919)
    at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
    at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
    at batchload.proceso.builder.CargarRentabilidades.transformacionRentabilidades(CargarRentabilidades.java:154)
    at batchload.proceso.builder.CargarRentabilidades.coleccionRentabilidades(CargarRentabilidades.java:78)
    at batchload.proceso.builder.CargarRentabilidades.coleccionCargaRentabilidades(CargarRentabilidades.java:52)
    at batchload.proceso.MainBatch.init(MainBatch.java:59)
    at batchload.BatchloadRentabilidades.main(BatchloadRentabilidades.java:24)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:419)
    at batchload.proceso.builder.CargarRentabilidades$1.call(CargarRentabilidades.java:157)
    at batchload.proceso.builder.CargarRentabilidades$1.call(CargarRentabilidades.java:154)
    at org.apache.spark.api.java.JavaRDDLike$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
    at org.apache.spark.api.java.JavaRDDLike$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$28.apply(RDD.scala:921)
    at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$28.apply(RDD.scala:921)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 

Версии:

mongo-spark-connector_2.11-2.3.0 Java 1.8 IntelliJ 2021 1.2 Библиотека сообщества Spark версии 2.11

другие версии зависимостей, которые я использую:

hadoop 2.7, spark 2.3.0, драйвер java 2.7, катализатор spark,ядро,улей,sql ….все 2.11:2.3.0, scala scala-библиотека:2.11.12

Застрял с этим, любая помощь более чем приветствуется

Спасибо!

Ответ №1:

Это может быть связано с проблемой сериализации. Можете ли вы попробовать преобразовать свою анонимную функцию в статический метод класса?

Комментарии:

1. Спасибо за ответ, я попробую

2. Не могу сделать его статичным, потому что я использую сеанс spark в функции и в классе и создаю исключение, потому что оно не сериализуемо, мне придется попробовать другой способ. Получите эти данные из первого фрейма данных, преобразуйте их во что-то, что я могу читать строку за строкой, а затем используйте эти значения для создания нового фрейма данных большего размера. Все, что я пробовал с функциями «foreachrow», вызывает это странное исключение.

3. не удалось заставить это сработать. наконец, я преобразовал фрейм данных в список, а затем использовал цикл for для получения всех строк списка: List<Строка> newList = df.collectAsList();