#apache-spark #apache-kafka #spark-structured-streaming #kafka-producer-api
Вопрос:
Я использую структурированную потоковую передачу Spark, чтобы отправить сообщение кафке. Структурированный потоковый поток чтения из другого кафки, я преобразовал запись в список объектов и использовал foreach для отправки новых данных кафке. Но я получил ошибку, когда отправил продюсера кафки:
26/09/2021 19:52:00 ERROR [stream execution thread for [id = 526c7e4b-2c3a-4d14-950a-461f024b2c18, runId = 5e32940e-b648-42e2-b65c-1d2cf5013060]] MicroBatchExecution: Query [id = 526c7e4b-2c3a-4d14-950a-461f024b2c18, runId = 5e32940e-b648-42e2-b65c-1d2cf5013060] terminated with error
java.lang.NullPointerException
at main.MainStreaming$anonfun$1$anonfun$apply$15.apply(MainStreaming.scala:440)
at main.MainStreaming$anonfun$1$anonfun$apply$15.apply(MainStreaming.scala:436)
at scala.collection.immutable.List.foreach(List.scala:392)
at main.MainStreaming$anonfun$1.apply(MainStreaming.scala:436)
at main.MainStreaming$anonfun$1.apply(MainStreaming.scala:136)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$runBatch$5$anonfun$apply$19.apply(MicroBatchExecution.scala:548)
at org.apache.spark.sql.execution.SQLExecution$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$runBatch$5.apply(MicroBatchExecution.scala:546)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$runBatch(MicroBatchExecution.scala:545)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$anonfun$runActivatedStream$1$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$anonfun$runActivatedStream$1$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$anonfun$runActivatedStream$1$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:193)
org.apache.spark.sql.streaming.StreamingQueryException: null
=== Streaming Query ===
Identifier: [id = 526c7e4b-2c3a-4d14-950a-461f024b2c18, runId = 5e32940e-b648-42e2-b65c-1d2cf5013060]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[multi-events]]: {"multi-events":{"1":169,"0":173}}}
Комментарии:
1. Не могли бы вы, пожалуйста, показать свой код? Все ли ваши сообщения ненулевые?