#java #dataframe #apache-spark #foreach #dataset
Вопрос:
отредактированный
Я пытаюсь выполнить итерацию через фрейм данных, чтобы создать еще один. В этом примере я не использую данные из первого, это просто для того, чтобы показать, что я пытаюсь сделать. Однако идея состоит в том, чтобы использовать первый для создания нового, намного большего размера, на основе данных из первого.
Что бы я ни пробовал в функции void, я всегда получаю ошибку в foreach
.
Пример кадра данных для итерации:
Dataset<Row> obtencionRents = spark.createDataFrame(Arrays.asList(
new testRentabilidades("0000A0","PORTAL","4-ANUAL","asdasd","asdasd"),
new testRentabilidades("00A00","PORTAL","","asdasd","sdasd"),
new testRentabilidades("00A","PORTAL","4-ANUAL","asdasd","asdasd")
), testRentabilidades.class);
Функция Foreach для итерации образца фрейма данных:
obtencionRents.toJavaRDD().foreach(new VoidFunction<Row>() {
public void call(Row r) throws Exception {
//add registers to new collection/arraylist/etc.
}
});
Ошибка, которую я получил:
Driver stacktrace:
2021-11-03 17:34:41 INFO DAGScheduler:54 - Job 0 failed: foreach at CargarRentabilidades.java:154, took 0,812094 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:419)
at batchload.proceso.builder.CargarRentabilidades$1.call(CargarRentabilidades.java:157)
at batchload.proceso.builder.CargarRentabilidades$1.call(CargarRentabilidades.java:154)
at org.apache.spark.api.java.JavaRDDLike$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
at org.apache.spark.api.java.JavaRDDLike$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$28.apply(RDD.scala:921)
at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$28.apply(RDD.scala:921)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD$anonfun$foreach$1.apply(RDD.scala:921)
at org.apache.spark.rdd.RDD$anonfun$foreach$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:919)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
at batchload.proceso.builder.CargarRentabilidades.transformacionRentabilidades(CargarRentabilidades.java:154)
at batchload.proceso.builder.CargarRentabilidades.coleccionRentabilidades(CargarRentabilidades.java:78)
at batchload.proceso.builder.CargarRentabilidades.coleccionCargaRentabilidades(CargarRentabilidades.java:52)
at batchload.proceso.MainBatch.init(MainBatch.java:59)
at batchload.BatchloadRentabilidades.main(BatchloadRentabilidades.java:24)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:419)
at batchload.proceso.builder.CargarRentabilidades$1.call(CargarRentabilidades.java:157)
at batchload.proceso.builder.CargarRentabilidades$1.call(CargarRentabilidades.java:154)
at org.apache.spark.api.java.JavaRDDLike$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
at org.apache.spark.api.java.JavaRDDLike$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$28.apply(RDD.scala:921)
at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$28.apply(RDD.scala:921)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Версии:
mongo-spark-connector_2.11-2.3.0 Java 1.8 IntelliJ 2021 1.2 Библиотека сообщества Spark версии 2.11
другие версии зависимостей, которые я использую:
hadoop 2.7, spark 2.3.0, драйвер java 2.7, катализатор spark,ядро,улей,sql ….все 2.11:2.3.0, scala scala-библиотека:2.11.12
Застрял с этим, любая помощь более чем приветствуется
Спасибо!
Ответ №1:
Это может быть связано с проблемой сериализации. Можете ли вы попробовать преобразовать свою анонимную функцию в статический метод класса?
Комментарии:
1. Спасибо за ответ, я попробую
2. Не могу сделать его статичным, потому что я использую сеанс spark в функции и в классе и создаю исключение, потому что оно не сериализуемо, мне придется попробовать другой способ. Получите эти данные из первого фрейма данных, преобразуйте их во что-то, что я могу читать строку за строкой, а затем используйте эти значения для создания нового фрейма данных большего размера. Все, что я пробовал с функциями «foreachrow», вызывает это странное исключение.
3. не удалось заставить это сработать. наконец, я преобразовал фрейм данных в список, а затем использовал цикл for для получения всех строк списка: List<Строка> newList = df.collectAsList();